package trade import ( "encoding/json" "log" "strings" "time" "git.apinb.com/bsm-sdk/core/utils" "git.apinb.com/quant/strategy/internal/impl" "git.apinb.com/quant/strategy/internal/models" "github.com/bitget-golang/sdk-api/pkg/client/ws" "github.com/bitget-golang/sdk-api/types" "github.com/robfig/cron/v3" "github.com/vmihailenco/msgpack/v5" ) type WebsocketPositionMessage struct { Action string `json:"action"` Arg struct { Channel string `json:"channel"` InstID string `json:"instId"` InstType string `json:"instType"` } `json:"arg"` Data []struct { AchievedProfits string `json:"achievedProfits"` AssetMode string `json:"assetMode"` AutoMargin string `json:"autoMargin"` Available string `json:"available"` BreakEvenPrice string `json:"breakEvenPrice"` CTime string `json:"cTime"` DeductedFee string `json:"deductedFee"` Frozen string `json:"frozen"` HoldSide string `json:"holdSide"` InstID string `json:"instId"` // symbol KeepMarginRate string `json:"keepMarginRate"` Leverage int64 `json:"leverage"` LiquidationPrice string `json:"liquidationPrice"` MarginCoin string `json:"marginCoin"` MarginMode string `json:"marginMode"` MarginRate string `json:"marginRate"` MarginSize string `json:"marginSize"` MarkPrice string `json:"markPrice"` OpenPriceAvg string `json:"openPriceAvg"` PosID string `json:"posId"` PosMode string `json:"posMode"` Total string `json:"total"` TotalFee string `json:"totalFee"` UTime string `json:"uTime"` UnrealizedPL string `json:"unrealizedPL"` UnrealizedPLR string `json:"unrealizedPLR"` } `json:"data"` Ts int64 `json:"ts"` } var PlanKeyName string func (bg *BitgetClient) RefreshByApi(planKeyName string) { PlanKeyName = planKeyName // 根据基本币,监控帐号可用资金变动,仓位,以及最近7天的交易情况 c := cron.New(cron.WithSeconds()) c.AddFunc("@every 1s", func() { bg.PositionsByApi(planKeyName) }) c.Start() } func (bg *BitgetClient) RefreshByWebSocket(planKeyName string) { // 初始获取持仓 PlanKeyName = planKeyName bg.PositionsByApi(planKeyName) // 根据WebSocket推送,实时更新持仓,添加重连机制 go func() { for { var channels []types.SubscribeReq positions := types.SubscribeReq{ InstType: "USDT-FUTURES", Channel: "positions", InstId: "default", } channels = append(channels, positions) wsClient := new(ws.BitgetWsClient).Init(true, receiveHandler, errorHandler) wsClient.SubscribeDef(channels) log.Println("Bitget Websocket Connect...") // Connect() 是阻塞调用,当连接断开时会返回 wsClient.Connect() // 连接断开后,等待5秒后重连 log.Println("Bitget Websocket disconnected, will reconnect in 5 seconds...") time.Sleep(5 * time.Second) // 重新获取持仓数据 bg.PositionsByApi(planKeyName) } }() } func receiveHandler(message string) { var reply WebsocketPositionMessage err := json.Unmarshal([]byte(message), &reply) if err != nil { log.Println("WatchPositions JSON Unmarshal Error:", err) return } if len(reply.Data) == 0 { impl.RedisService.Client.Del(impl.RedisService.Ctx, PlanKeyName+".PosSummary").Result() impl.RedisService.Client.Del(impl.RedisService.Ctx, PlanKeyName+".PosOrders").Result() log.Println("WatchPositions:", "No Positions") return } orders := make(map[string][]*models.QuantOrders, 0) var summary []string for _, v := range reply.Data { side := strings.ToUpper(v.HoldSide) // 假设 v.CTime 是一个表示毫秒时间戳的字符串 t := time.UnixMilli(utils.String2Int64(v.CTime)) record := &models.QuantOrders{ Exchange: "BITGET", Symbol: v.InstID, Side: side, Fee: utils.String2Float64(v.TotalFee), OpenPrice: utils.String2Float64(v.OpenPriceAvg), // 开仓均价 Volume: utils.String2Float64(v.Total), // 交易币成交数量 MarginSize: utils.String2Float64(v.MarginSize), // 计价币成交数量 Leverage: int(v.Leverage), UnrealizedPL: utils.String2Float64(v.UnrealizedPL), CreatedAt: t, // 毫秒转time.Time } orders[v.InstID] = append(orders[v.InstID], record) log.Println("Record", record) summary = append(summary, v.InstID+"."+side) } _, err = impl.RedisService.Client.Set(impl.RedisService.Ctx, PlanKeyName+".PosSummary", strings.Join(summary, ","), 0).Result() if err != nil { log.Println("WatchPositions:", err) } // 序列化为 MessagePack ordersPack, _ := msgpack.Marshal(orders) _, err = impl.RedisService.Client.Set(impl.RedisService.Ctx, PlanKeyName+".PosOrders", ordersPack, 0).Result() if err != nil { log.Println("WatchPositions:", err) } } func errorHandler(message string) { log.Println("WebSocket Error:", message) } func (bg *BitgetClient) PositionsByApi(planKeyName string) { args := map[string]string{ "productType": "USDT-FUTURES", "marginCoin": "USDT", } log.Println("===", "RefreshPositions:", planKeyName, "===") resp, err := bg.AccountClient.AllPosition(args) if err != nil { log.Println("WatchPositions:", err) return } var reply AllPositionResp json.Unmarshal([]byte(resp), &reply) if err != nil { log.Println("WatchPositions:", err) return } if reply.Code != "00000" { log.Println("WatchPositions:", reply.Code+" "+reply.Msg) return } if len(reply.Data) == 0 { impl.RedisService.Client.Del(impl.RedisService.Ctx, planKeyName+".PosSummary").Result() impl.RedisService.Client.Del(impl.RedisService.Ctx, planKeyName+".PosOrders").Result() log.Println("WatchPositions:", "No Positions") return } orders := make(map[string][]*models.QuantOrders, 0) var summary []string for _, v := range reply.Data { side := strings.ToUpper(v.HoldSide) // 假设 v.CTime 是一个表示毫秒时间戳的字符串 t := time.UnixMilli(utils.String2Int64(v.CTime)) record := &models.QuantOrders{ Exchange: "BITGET", Symbol: v.Symbol, Side: side, Fee: utils.String2Float64(v.TotalFee), OpenPrice: utils.String2Float64(v.OpenPriceAvg), // 开仓均价 Volume: utils.String2Float64(v.Total), // 交易币成交数量 MarginSize: utils.String2Float64(v.MarginSize), // 计价币成交数量 Leverage: utils.String2Int(v.Leverage), UnrealizedPL: utils.String2Float64(v.UnrealizedPL), CreatedAt: t, // 毫秒转time.Time } orders[v.Symbol] = append(orders[v.Symbol], record) log.Println("Record", record) summary = append(summary, v.Symbol+"."+side) } _, err = impl.RedisService.Client.Set(impl.RedisService.Ctx, planKeyName+".PosSummary", strings.Join(summary, ","), 0).Result() if err != nil { log.Println("WatchPositions:", err) } // 序列化为 MessagePack ordersPack, _ := msgpack.Marshal(orders) _, err = impl.RedisService.Client.Set(impl.RedisService.Ctx, planKeyName+".PosOrders", ordersPack, 0).Result() if err != nil { log.Println("WatchPositions:", err) } }