review code.

This commit is contained in:
2026-04-17 14:50:34 +08:00
parent a0a6d6adaa
commit 592c8e31dd
15 changed files with 487 additions and 338 deletions

100
internal/logic/boot.go Normal file
View File

@@ -0,0 +1,100 @@
package logic
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/robfig/cron/v3"
)
var (
COLLECTOR_URL = "http://localhost:5000/status"
COLLECTION_INTERVAL = 30
)
func Boot() {
log.Println("=== QMT数据采集器启动 ===")
log.Printf("采集地址: %s", COLLECTOR_URL)
log.Printf("采集间隔: %d秒", COLLECTION_INTERVAL)
// 创建采集器
coll := NewCollector(COLLECTOR_URL)
// 创建cron调度器
c := cron.New(cron.WithSeconds())
// 构建cron表达式 (每N秒执行一次)
cronSpec := fmt.Sprintf("@every %ds", COLLECTION_INTERVAL)
log.Printf("定时任务表达式: %s", cronSpec)
// 添加定时任务
_, err := c.AddFunc(cronSpec, func() {
runCollection(coll)
})
if err != nil {
log.Fatalf("添加定时任务失败: %v", err)
}
// 启动调度器
c.Start()
log.Println("定时任务已启动")
// 等待退出信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("收到退出信号,正在关闭...")
c.Stop()
log.Println("采集器已停止")
}
// runCollection 执行一次数据采集和存储
func runCollection(coll *Collector) {
// 检查是否为开市时间
now := time.Now()
if !IsTradingTime(now) {
return
}
log.Println("开始采集...")
// 采集数据并检查变化
status, dataHash, changed, err := coll.CollectAndCheck()
if err != nil {
log.Printf("采集失败: %v", err)
return
}
log.Printf("数据哈希: %s", dataHash)
log.Printf("数据是否变化: %v", changed)
// 如果数据没有变化,只记录日志
if !changed {
log.Println("数据未变化,跳过存储")
return
}
// 数据有变化,保存到数据库
log.Println("数据已变化,开始存储到数据库...")
if err := SaveData(status); err != nil {
log.Printf("保存数据失败: %v", err)
return
}
// 记录成功的日志
// if err := store.SaveCollectionLog(dataHash, ymd, true, "数据保存成功"); err != nil {
// log.Printf("保存采集日志失败: %v", err)
// }
log.Printf("数据存储成功 - 资产账户: %s, 订单数: %d, 持仓数: %d, 行情数: %d",
status.Data.Assets.AccountID,
len(status.Data.Orders),
len(status.Data.Positions),
len(status.Data.TickData))
}

112
internal/logic/collector.go Normal file
View File

@@ -0,0 +1,112 @@
package logic
import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"git.apinb.com/quant/collector/internal/types"
)
// Collector 数据采集器
type Collector struct {
url string
httpClient *http.Client
lastHash string
}
// NewCollector 创建新的采集器
func NewCollector(url string) *Collector {
return &Collector{
url: url,
httpClient: &http.Client{
Timeout: 10 * time.Second,
},
lastHash: "",
}
}
// FetchData 从HTTP接口获取数据
func (c *Collector) FetchData() (*types.StatusData, error) {
resp, err := c.httpClient.Get(c.url)
if err != nil {
return nil, fmt.Errorf("HTTP请求失败: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("HTTP状态码错误: %d", resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("读取响应失败: %w", err)
}
var status types.StatusData
if err := json.Unmarshal(body, &status); err != nil {
return nil, fmt.Errorf("JSON解析失败: %w", err)
}
return &status, nil
}
// CalculateHash 计算数据的SHA256哈希值
func (c *Collector) CalculateHash(status *types.StatusData) (string, error) {
// 将数据序列化为JSON
data, err := json.Marshal(status)
if err != nil {
return "", fmt.Errorf("序列化数据失败: %w", err)
}
// 计算SHA256哈希
hash := sha256.Sum256(data)
return hex.EncodeToString(hash[:]), nil
}
// HasChanged 检查数据是否发生变化
func (c *Collector) HasChanged(currentHash string) bool {
if c.lastHash == "" {
return true // 第一次采集
}
return c.lastHash != currentHash
}
// UpdateHash 更新上次哈希值
func (c *Collector) UpdateHash(hash string) {
c.lastHash = hash
}
// GetLastHash 获取上次哈希值
func (c *Collector) GetLastHash() string {
return c.lastHash
}
// CollectAndCheck 采集数据并检查是否变化
func (c *Collector) CollectAndCheck() (*types.StatusData, string, bool, error) {
// 获取数据
status, err := c.FetchData()
if err != nil {
return nil, "", false, err
}
// 计算哈希
currentHash, err := c.CalculateHash(status)
if err != nil {
return nil, "", false, err
}
// 检查是否变化
changed := c.HasChanged(currentHash)
// 如果变化了,更新哈希
if changed {
c.UpdateHash(currentHash)
}
return status, currentHash, changed, nil
}

244
internal/logic/storage.go Normal file
View File

@@ -0,0 +1,244 @@
package logic
import (
"encoding/json"
"fmt"
"time"
"git.apinb.com/quant/collector/internal/impl"
"git.apinb.com/quant/collector/internal/models"
"git.apinb.com/quant/collector/internal/types"
"gorm.io/gorm"
)
// SaveStatus 保存完整状态数据(使用事务)
func SaveData(status *types.StatusData) error {
// 验证必要字段 - AccountID是所有Upsert的共同条件
if status.Data.Assets.AccountID == "" {
return fmt.Errorf("账户ID不能为空")
}
// 计算Ymd (年月日数字格式,如20260407)
now := time.Now()
ymd := now.Year()*10000 + int(now.Month())*100 + now.Day()
// 保存资产快照 (先查询后更新/插入)
var existingAsset models.CollectorAssets
err := impl.DBService.Where("account_id = ? AND ymd = ?", status.Data.Assets.AccountID, ymd).First(&existingAsset).Error
asset := models.CollectorAssets{
AccountID: status.Data.Assets.AccountID,
Ymd: ymd,
Cash: status.Data.Assets.Cash,
FrozenCash: status.Data.Assets.FrozenCash,
MarketValue: status.Data.Assets.MarketValue,
Profit: status.Data.Assets.Profit,
TotalAsset: status.Data.Assets.TotalAsset,
CollectedAt: now,
}
if err == gorm.ErrRecordNotFound {
// 记录不存在,插入新记录
if err := impl.DBService.Create(&asset).Error; err != nil {
return fmt.Errorf("插入资产快照失败: %w", err)
}
} else if err != nil {
// 查询出错
return fmt.Errorf("查询资产快照失败: %w", err)
} else {
// 记录存在,更新现有记录
if asset.Cash != existingAsset.Cash || asset.FrozenCash != existingAsset.FrozenCash || asset.MarketValue != existingAsset.MarketValue || asset.Profit != existingAsset.Profit || asset.TotalAsset != existingAsset.TotalAsset {
asset.ID = existingAsset.ID
if err := impl.DBService.Save(&asset).Error; err != nil {
return fmt.Errorf("更新资产快照失败: %w", err)
}
}
}
// 批量保存订单 (先查询后更新/插入)
if len(status.Data.Orders) > 0 {
for _, order := range status.Data.Orders {
// 验证必要条件: OrderID和StockCode
if order.OrderID == 0 {
continue
}
if order.StockCode == "" {
continue
}
// 只存储成交的订单
if order.OrderStatus != 56 {
continue
}
var open_price float64
var info types.OrderInfo
err = json.Unmarshal([]byte(order.OrderRemark), &info)
if err == nil {
open_price = info.Op
}
// 查询是否存在
var cnt int64
impl.DBService.Where("account_id = ? AND order_id = ? AND ymd = ?", status.Data.Assets.AccountID, order.OrderID, ymd).Count(&cnt)
if cnt > 0 {
continue
}
orderRecord := models.CollectorOrder{
OrderID: order.OrderID,
AccountID: status.Data.Assets.AccountID,
StockCode: order.StockCode,
Ymd: ymd,
Price: order.Price,
Volume: order.Volume,
OpenPrice: open_price,
TradedPrice: order.TradedPrice,
TradedVolume: order.TradedVolume,
OrderStatus: order.OrderStatus,
OrderTime: order.OrderTime,
OrderRemark: order.OrderRemark,
OffsetFlag: order.OffsetFlag,
CollectedAt: now,
CreatedAt: now,
}
// 记录不存在,插入新记录
if err := impl.DBService.Create(&orderRecord).Error; err != nil {
return fmt.Errorf("插入订单失败: %w", err)
}
// 处理订单簿逻辑
if err := processOrderBook(status.Data.Assets.AccountID, order, ymd, now, open_price); err != nil {
fmt.Printf("处理订单簿失败: %v\n", err)
// 不返回错误,避免影响主流程
}
}
}
// 批量保存持仓 (先查询后更新/插入)
if len(status.Data.Positions) > 0 {
for _, pos := range status.Data.Positions {
// 验证必要条件: Code
if pos.Code == "" {
continue
}
// 查询是否存在
var existingPosition models.CollectorPosition
err := impl.DBService.Where("account_id = ? AND code = ? AND ymd = ?",
status.Data.Assets.AccountID, pos.Code, ymd).First(&existingPosition).Error
positionRecord := models.CollectorPosition{
AccountID: status.Data.Assets.AccountID,
Code: pos.Code,
Ymd: ymd,
Volume: pos.Volume,
CanUseVolume: pos.CanUseVolume,
FrozenVolume: pos.FrozenVolume,
AvgPrice: pos.AvgPrice,
OpenPrice: pos.OpenPrice,
CurrentPrice: pos.CurrentPrice,
MarketValue: pos.MarketValue,
Profit: pos.Profit,
ProfitRate: pos.ProfitRate,
MinProfitRate: pos.MinProfitRate,
CollectedAt: now,
}
if err == gorm.ErrRecordNotFound {
// 记录不存在,插入新记录
if err := impl.DBService.Create(&positionRecord).Error; err != nil {
return fmt.Errorf("插入持仓失败: %w", err)
}
} else if err != nil {
// 查询出错
return fmt.Errorf("查询持仓失败: %w", err)
} else {
if positionRecord.Volume != existingPosition.Volume ||
positionRecord.CanUseVolume != existingPosition.CanUseVolume ||
positionRecord.FrozenVolume != existingPosition.FrozenVolume ||
positionRecord.AvgPrice != existingPosition.AvgPrice ||
positionRecord.OpenPrice != existingPosition.OpenPrice ||
positionRecord.CurrentPrice != existingPosition.CurrentPrice ||
positionRecord.MarketValue != existingPosition.MarketValue ||
positionRecord.Profit != existingPosition.Profit ||
positionRecord.ProfitRate != existingPosition.ProfitRate {
// 记录存在,更新现有记录
positionRecord.ID = existingPosition.ID
if err := impl.DBService.Save(&positionRecord).Error; err != nil {
return fmt.Errorf("更新持仓失败: %w", err)
}
}
}
}
}
return nil
}
// processOrderBook 处理订单簿逻辑
func processOrderBook(accountID string, order types.Order, ymd int, now time.Time, openPrice float64) error {
// OffsetFlag: 通常 0=买入, 1=卖出 (需要根据实际系统确认)
// 这里假设: OffsetFlag == 0 表示买入, OffsetFlag == 1 表示卖出
if order.OffsetFlag == 0 {
// 买入订单 - 创建新的订单簿记录
orderBook := models.OrderBook{
AccountID: accountID,
StockCode: order.StockCode,
Ymd: ymd,
BuyOrderID: order.OrderID,
BuyPrice: order.TradedPrice,
BuyVolume: order.TradedVolume,
BuyTime: order.OrderTime,
BuyCollectedAt: now,
IsClosed: false,
}
if err := impl.DBService.Create(&orderBook).Error; err != nil {
return fmt.Errorf("创建订单簿记录失败: %w", err)
}
} else if order.OffsetFlag == 1 {
// 卖出订单 - 查找对应的买入记录并闭合
var orderBook models.OrderBook
// 查找同一账户、同一股票、未闭合的订单簿记录
err := impl.DBService.Where(
"account_id = ? AND stock_code = ? AND is_closed = ?",
accountID, order.StockCode, false,
).Order("buy_time ASC").First(&orderBook).Error
if err == gorm.ErrRecordNotFound {
// 没有找到对应的买入记录,可能是之前就已经持有的仓位
fmt.Printf("未找到对应的买入记录: account=%s, stock=%s\n", accountID, order.StockCode)
return nil
} else if err != nil {
return fmt.Errorf("查询订单簿记录失败: %w", err)
}
// 计算盈亏
buyAmount := float64(orderBook.BuyVolume) * orderBook.BuyPrice
sellAmount := float64(order.TradedVolume) * order.TradedPrice
profit := sellAmount - buyAmount
var profitRate float64
if buyAmount > 0 {
profitRate = (profit / buyAmount) * 100
}
// 更新订单簿记录
orderBook.SellOrderID = &order.OrderID
orderBook.SellPrice = &order.TradedPrice
orderBook.SellVolume = &order.TradedVolume
orderBook.SellTime = &order.OrderTime
orderBook.SellCollectedAt = &now
orderBook.IsClosed = true
orderBook.Profit = &profit
orderBook.ProfitRate = &profitRate
if err := impl.DBService.Save(&orderBook).Error; err != nil {
return fmt.Errorf("更新订单簿记录失败: %w", err)
}
}
return nil
}

41
internal/logic/utils.go Normal file
View File

@@ -0,0 +1,41 @@
package logic
import (
"time"
)
// IsTradingTime 判断当前时间是否为A股交易时间
// A股交易时间
// - 工作日(周一至周五)
// - 上午9:30 - 11:30
// - 下午13:00 - 15:00
func IsTradingTime(t time.Time) bool {
// 检查是否为工作日1=周一, 5=周五)
weekday := t.Weekday()
if weekday == time.Saturday || weekday == time.Sunday {
return false
}
// 获取小时和分钟
hour := t.Hour()
minute := t.Minute()
// 转换为分钟数便于比较
currentMinutes := hour*60 + minute
// 上午交易时间9:30 - 11:30 (570 - 690分钟)
morningStart := 9*60 + 30 // 570
morningEnd := 11*60 + 30 // 690
// 下午交易时间13:00 - 15:00 (780 - 900分钟)
afternoonStart := 13 * 60 // 780
afternoonEnd := 15 * 60 // 900
// 判断是否在交易时间段内
if (currentMinutes >= morningStart && currentMinutes < morningEnd) ||
(currentMinutes >= afternoonStart && currentMinutes < afternoonEnd) {
return true
}
return false
}