55 lines
1.0 KiB
Go
55 lines
1.0 KiB
Go
package trade
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
"strings"
|
|
|
|
"git.apinb.com/quant/strategy/internal/impl"
|
|
)
|
|
|
|
// 接收行情
|
|
var tickerCancel context.CancelFunc
|
|
|
|
// 启动行情接收
|
|
func StartOnTicker(topics []string, do func(payload string, args []string)) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
tickerCancel = cancel
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Println("OnTicker stopped")
|
|
return
|
|
default:
|
|
if len(topics) == 0 {
|
|
log.Println("No Topics")
|
|
return
|
|
}
|
|
|
|
sub := impl.RedisService.Client.Subscribe(impl.RedisService.Ctx, topics...)
|
|
for {
|
|
msg, err := sub.ReceiveMessage(impl.RedisService.Ctx)
|
|
if err != nil {
|
|
log.Println("Subscribe Error:", err)
|
|
// 连接断开时,关闭当前订阅并跳出内层循环,重新创建订阅
|
|
sub.Close()
|
|
break
|
|
}
|
|
args := strings.Split(msg.Channel, ":")
|
|
do(msg.Payload, args)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// 停止行情接收
|
|
func StopOnTicker() {
|
|
if tickerCancel != nil {
|
|
tickerCancel()
|
|
tickerCancel = nil
|
|
}
|
|
}
|