Files
collector/internal/logic/storage.go
2026-04-17 21:57:23 +08:00

297 lines
10 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package logic
import (
"encoding/json"
"fmt"
"log"
"time"
"git.apinb.com/quant/collector/internal/impl"
"git.apinb.com/quant/collector/internal/models"
"git.apinb.com/quant/collector/internal/types"
"gorm.io/gorm"
)
// SaveStatus 保存完整状态数据(使用事务)
func SaveData(status *types.StatusData) error {
// 验证必要字段 - AccountID是所有Upsert的共同条件
if status.Data.Assets.AccountID == "" {
return fmt.Errorf("账户ID不能为空")
}
// 计算Ymd (年月日数字格式,如20260407)
now := time.Now()
ymd := now.Year()*10000 + int(now.Month())*100 + now.Day()
// 保存资产快照 (先查询后更新/插入)
var existingAsset models.CollectorAssets
err := impl.DBService.Where("account_id = ? AND ymd = ?", status.Data.Assets.AccountID, ymd).First(&existingAsset).Error
asset := models.CollectorAssets{
AccountID: status.Data.Assets.AccountID,
Ymd: ymd,
Cash: status.Data.Assets.Cash,
FrozenCash: status.Data.Assets.FrozenCash,
MarketValue: status.Data.Assets.MarketValue,
Profit: status.Data.Assets.Profit,
TotalAsset: status.Data.Assets.TotalAsset,
CollectedAt: now,
}
if err == gorm.ErrRecordNotFound {
// 记录不存在,插入新记录
if err := impl.DBService.Create(&asset).Error; err != nil {
return fmt.Errorf("插入资产快照失败: %w", err)
}
} else if err != nil {
// 查询出错
return fmt.Errorf("查询资产快照失败: %w", err)
} else {
// 记录存在,更新现有记录
if asset.Cash != existingAsset.Cash || asset.FrozenCash != existingAsset.FrozenCash || asset.MarketValue != existingAsset.MarketValue || asset.Profit != existingAsset.Profit || asset.TotalAsset != existingAsset.TotalAsset {
asset.ID = existingAsset.ID
if err := impl.DBService.Save(&asset).Error; err != nil {
return fmt.Errorf("更新资产快照失败: %w", err)
}
}
}
// 批量保存订单 (先查询后更新/插入)
if len(status.Data.Orders) > 0 {
for _, order := range status.Data.Orders {
// 验证必要条件: OrderID和StockCode
if order.OrderID == 0 {
continue
}
if order.StockCode == "" {
continue
}
// 只存储成交的订单
if order.OrderStatus != 56 {
continue
}
var open_price float64
var info types.OrderInfo
err = json.Unmarshal([]byte(order.OrderRemark), &info)
if err == nil {
open_price = info.Op
}
// 查询是否存在
var cnt int64
impl.DBService.Where("account_id = ? AND order_id = ? AND ymd = ?", status.Data.Assets.AccountID, order.OrderID, ymd).Count(&cnt)
if cnt > 0 {
continue
}
orderRecord := models.CollectorOrder{
OrderID: order.OrderID,
AccountID: status.Data.Assets.AccountID,
StockCode: order.StockCode,
Ymd: ymd,
Price: order.Price,
Volume: order.Volume,
OpenPrice: open_price,
TradedPrice: order.TradedPrice,
TradedVolume: order.TradedVolume,
OrderStatus: order.OrderStatus,
OrderTime: order.OrderTime,
OrderRemark: order.OrderRemark,
OffsetFlag: order.OffsetFlag,
CollectedAt: now,
CreatedAt: now,
}
// 记录不存在,插入新记录
if err := impl.DBService.Create(&orderRecord).Error; err != nil {
return fmt.Errorf("插入订单失败: %w", err)
}
// 处理订单簿逻辑
// if err := processOrderBook(status.Data.Assets.AccountID, order, ymd, now, open_price); err != nil {
// fmt.Printf("处理订单簿失败: %v\n", err)
// // 不返回错误,避免影响主流程
// }
}
}
// 批量保存持仓 (先查询后更新/插入)
if len(status.Data.Positions) > 0 {
for _, pos := range status.Data.Positions {
// 验证必要条件: Code
if pos.Code == "" {
continue
}
// 查询是否存在
var existingPosition models.CollectorPosition
err := impl.DBService.Where("account_id = ? AND code = ? AND ymd = ?",
status.Data.Assets.AccountID, pos.Code, ymd).First(&existingPosition).Error
positionRecord := models.CollectorPosition{
AccountID: status.Data.Assets.AccountID,
Code: pos.Code,
Ymd: ymd,
Volume: pos.Volume,
CanUseVolume: pos.CanUseVolume,
FrozenVolume: pos.FrozenVolume,
AvgPrice: pos.AvgPrice,
OpenPrice: pos.OpenPrice,
CurrentPrice: pos.CurrentPrice,
MarketValue: pos.MarketValue,
Profit: pos.Profit,
ProfitRate: pos.ProfitRate,
MinProfitRate: pos.MinProfitRate,
CollectedAt: now,
}
if err == gorm.ErrRecordNotFound {
// 记录不存在,插入新记录
if err := impl.DBService.Create(&positionRecord).Error; err != nil {
return fmt.Errorf("插入持仓失败: %w", err)
}
} else if err != nil {
// 查询出错
return fmt.Errorf("查询持仓失败: %w", err)
} else {
if positionRecord.Volume != existingPosition.Volume ||
positionRecord.CanUseVolume != existingPosition.CanUseVolume ||
positionRecord.FrozenVolume != existingPosition.FrozenVolume ||
positionRecord.AvgPrice != existingPosition.AvgPrice ||
positionRecord.OpenPrice != existingPosition.OpenPrice ||
positionRecord.CurrentPrice != existingPosition.CurrentPrice ||
positionRecord.MarketValue != existingPosition.MarketValue ||
positionRecord.Profit != existingPosition.Profit ||
positionRecord.ProfitRate != existingPosition.ProfitRate {
// 记录存在,更新现有记录
positionRecord.ID = existingPosition.ID
if err := impl.DBService.Save(&positionRecord).Error; err != nil {
return fmt.Errorf("更新持仓失败: %w", err)
}
}
}
}
}
return nil
}
// processOrderBook 处理订单簿逻辑
func processOrderBook(accountID string, order types.Order, ymd int, now time.Time, openPrice float64) error {
// OffsetFlag: 通常 0=买入, 1=卖出 (需要根据实际系统确认)
// 这里假设: OffsetFlag == 0 表示买入, OffsetFlag == 1 表示卖出
if order.OffsetFlag == types.FLAG_BUY {
// 先检查有无未闭合的订单簿记录
var existingOrderBook models.OrderBook
err := impl.DBService.Where(
"account_id = ? AND stock_code = ? AND is_closed = ?",
accountID, order.StockCode, false,
).Order("buy_time DESC").First(&existingOrderBook).Error
if err == gorm.ErrRecordNotFound {
// 没有未闭合的订单簿记录,创建新的
orderBook := models.OrderBook{
AccountID: accountID,
StockCode: order.StockCode,
Ymd: ymd,
BuyOrderID: fmt.Sprintf("%d", order.OrderID),
BuyPrice: order.TradedPrice,
BuyVolume: order.TradedVolume,
BuyTime: order.OrderTime,
BuyCollectedAt: now,
IsClosed: false,
}
if err := impl.DBService.Create(&orderBook).Error; err != nil {
return fmt.Errorf("创建订单簿记录失败: %w", err)
}
log.Printf("新建订单簿: account=%s, stock=%s, orderID=%d, price=%.4f, volume=%d",
accountID, order.StockCode, order.OrderID, order.TradedPrice, order.TradedVolume)
} else if err != nil {
return fmt.Errorf("查询订单簿记录失败: %w", err)
} else {
// 存在未闭合的订单簿记录,更新数量和价格(加权平均)
totalVolume := existingOrderBook.BuyVolume + order.TradedVolume
totalAmount := existingOrderBook.BuyPrice*float64(existingOrderBook.BuyVolume) + order.TradedPrice*float64(order.TradedVolume)
newAvgPrice := totalAmount / float64(totalVolume)
existingOrderBook.BuyVolume = totalVolume
existingOrderBook.BuyPrice = newAvgPrice
// 追加订单ID到 BuyOrderID用逗号分隔
existingOrderBook.BuyOrderID = existingOrderBook.BuyOrderID + "," + fmt.Sprintf("%d", order.OrderID)
if err := impl.DBService.Save(&existingOrderBook).Error; err != nil {
return fmt.Errorf("更新订单簿记录失败: %w", err)
}
log.Printf("更新订单簿: account=%s, stock=%s, 新订单ID=%d, 总数量=%d, 平均价格=%.4f",
accountID, order.StockCode, order.OrderID, totalVolume, newAvgPrice)
}
} else if order.OffsetFlag == types.FLAG_SELL {
// 卖出订单 - 查找对应的买入记录并闭合
var orderBook models.OrderBook
// 查找同一账户、同一股票、未闭合的订单簿记录
err := impl.DBService.Where(
"account_id = ? AND stock_code = ? AND is_closed = ?",
accountID, order.StockCode, false,
).Order("buy_time ASC").First(&orderBook).Error
if err == gorm.ErrRecordNotFound {
// 没有找到对应的买入记录,可能是之前就已经持有的仓位
fmt.Printf("未找到对应的买入记录: account=%s, stock=%s\n", accountID, order.StockCode)
return nil
} else if err != nil {
return fmt.Errorf("查询订单簿记录失败: %w", err)
}
// 计算盈亏(支持部分卖出)
sellVolume := order.TradedVolume
sellPrice := order.TradedPrice
buyPrice := orderBook.BuyPrice
// 如果卖出数量小于买入数量,按比例计算盈亏
var profit float64
var profitRate float64
if sellVolume >= orderBook.BuyVolume {
// 全部卖出或超额卖出
buyAmount := float64(orderBook.BuyVolume) * buyPrice
sellAmount := float64(sellVolume) * sellPrice
profit = sellAmount - buyAmount
if buyAmount > 0 {
profitRate = (profit / buyAmount) * 100
}
} else {
// 部分卖出,按比例计算
sellRatio := float64(sellVolume) / float64(orderBook.BuyVolume)
buyAmount := float64(orderBook.BuyVolume) * buyPrice * sellRatio
sellAmount := float64(sellVolume) * sellPrice
profit = sellAmount - buyAmount
if buyAmount > 0 {
profitRate = (profit / buyAmount) * 100
}
}
// 更新订单簿记录
sellOrderID := fmt.Sprintf("%d", order.OrderID)
orderBook.SellOrderID = &sellOrderID
orderBook.SellPrice = &order.TradedPrice
orderBook.SellVolume = &order.TradedVolume
orderBook.SellTime = &order.OrderTime
orderBook.SellCollectedAt = &now
orderBook.IsClosed = true
orderBook.Profit = &profit
orderBook.ProfitRate = &profitRate
if err := impl.DBService.Save(&orderBook).Error; err != nil {
return fmt.Errorf("更新订单簿记录失败: %w", err)
}
log.Printf("闭合订单簿: account=%s, stock=%s, buyOrderID=%s, sellOrderID=%s, profit=%.2f",
accountID, order.StockCode, orderBook.BuyOrderID, sellOrderID, profit)
}
return nil
}