review code.

This commit is contained in:
2026-04-17 13:08:48 +08:00
parent d02082a3b5
commit af8135d45d
10 changed files with 2864 additions and 208 deletions

View File

@@ -71,20 +71,18 @@ func main() {
// runCollection 执行一次数据采集和存储
func runCollection(coll *collector.Collector, store *storage.Storage) {
log.Println("开始采集...")
// 计算Ymd
// 检查是否为开市时间
now := time.Now()
ymd := now.Year()*10000 + int(now.Month())*100 + now.Day()
if !collector.IsTradingTime(now) {
return
}
log.Println("开始采集...")
// 采集数据并检查变化
status, dataHash, changed, err := coll.CollectAndCheck()
if err != nil {
log.Printf("采集失败: %v", err)
// 记录失败的日志
if err := store.SaveCollectionLog("", ymd, false, err.Error()); err != nil {
log.Printf("保存采集日志失败: %v", err)
}
return
}
@@ -94,27 +92,20 @@ func runCollection(coll *collector.Collector, store *storage.Storage) {
// 如果数据没有变化,只记录日志
if !changed {
log.Println("数据未变化,跳过存储")
if err := store.SaveCollectionLog(dataHash, ymd, false, "数据未变化"); err != nil {
log.Printf("保存采集日志失败: %v", err)
}
return
}
// 数据有变化,保存到数据库
log.Println("数据已变化,开始存储到数据库...")
if err := store.SaveStatus(status, dataHash); err != nil {
if err := store.SaveData(status); err != nil {
log.Printf("保存数据失败: %v", err)
// 记录失败的日志
if err := store.SaveCollectionLog(dataHash, ymd, true, err.Error()); err != nil {
log.Printf("保存采集日志失败: %v", err)
}
return
}
// 记录成功的日志
if err := store.SaveCollectionLog(dataHash, ymd, true, "数据保存成功"); err != nil {
log.Printf("保存采集日志失败: %v", err)
}
// if err := store.SaveCollectionLog(dataHash, ymd, true, "数据保存成功"); err != nil {
// log.Printf("保存采集日志失败: %v", err)
// }
log.Printf("数据存储成功 - 资产账户: %s, 订单数: %d, 持仓数: %d, 行情数: %d",
status.Data.Assets.AccountID,

View File

@@ -31,7 +31,7 @@ func NewCollector(url string) *Collector {
}
// FetchData 从HTTP接口获取数据
func (c *Collector) FetchData() (*types.Status, error) {
func (c *Collector) FetchData() (*types.StatusData, error) {
resp, err := c.httpClient.Get(c.url)
if err != nil {
return nil, fmt.Errorf("HTTP请求失败: %w", err)
@@ -47,7 +47,7 @@ func (c *Collector) FetchData() (*types.Status, error) {
return nil, fmt.Errorf("读取响应失败: %w", err)
}
var status types.Status
var status types.StatusData
if err := json.Unmarshal(body, &status); err != nil {
return nil, fmt.Errorf("JSON解析失败: %w", err)
}
@@ -56,7 +56,7 @@ func (c *Collector) FetchData() (*types.Status, error) {
}
// CalculateHash 计算数据的SHA256哈希值
func (c *Collector) CalculateHash(status *types.Status) (string, error) {
func (c *Collector) CalculateHash(status *types.StatusData) (string, error) {
// 将数据序列化为JSON
data, err := json.Marshal(status)
if err != nil {
@@ -87,7 +87,7 @@ func (c *Collector) GetLastHash() string {
}
// CollectAndCheck 采集数据并检查是否变化
func (c *Collector) CollectAndCheck() (*types.Status, string, bool, error) {
func (c *Collector) CollectAndCheck() (*types.StatusData, string, bool, error) {
// 获取数据
status, err := c.FetchData()
if err != nil {

41
collector/utils.go Normal file
View File

@@ -0,0 +1,41 @@
package collector
import (
"time"
)
// IsTradingTime 判断当前时间是否为A股交易时间
// A股交易时间
// - 工作日(周一至周五)
// - 上午9:30 - 11:30
// - 下午13:00 - 15:00
func IsTradingTime(t time.Time) bool {
// 检查是否为工作日1=周一, 5=周五)
weekday := t.Weekday()
if weekday == time.Saturday || weekday == time.Sunday {
return false
}
// 获取小时和分钟
hour := t.Hour()
minute := t.Minute()
// 转换为分钟数便于比较
currentMinutes := hour*60 + minute
// 上午交易时间9:30 - 11:30 (570 - 690分钟)
morningStart := 9*60 + 30 // 570
morningEnd := 11*60 + 30 // 690
// 下午交易时间13:00 - 15:00 (780 - 900分钟)
afternoonStart := 13 * 60 // 780
afternoonEnd := 15 * 60 // 900
// 判断是否在交易时间段内
if (currentMinutes >= morningStart && currentMinutes < morningEnd) ||
(currentMinutes >= afternoonStart && currentMinutes < afternoonEnd) {
return true
}
return false
}

File diff suppressed because one or more lines are too long

14
go.mod
View File

@@ -4,16 +4,18 @@ go 1.26.1
require (
github.com/robfig/cron/v3 v3.0.1
gorm.io/driver/postgres v1.5.4
gorm.io/gorm v1.25.5
gorm.io/driver/postgres v1.6.0
gorm.io/gorm v1.31.1
)
require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.4.3 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/pgx/v5 v5.9.1 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/text v0.36.0 // indirect
)

23
go.sum
View File

@@ -5,8 +5,14 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.4.3 h1:cxFyXhxlvAifxnkKKdlxv8XqUf59tDlYjnV5YYfsJJY=
github.com/jackc/pgx/v5 v5.4.3/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
github.com/jackc/pgx/v5 v5.9.1 h1:uwrxJXBnx76nyISkhr33kQLlUqjv7et7b9FjCen/tdc=
github.com/jackc/pgx/v5 v5.9.1/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
@@ -20,15 +26,32 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk=
golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4=
golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg=
golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/postgres v1.5.4 h1:Iyrp9Meh3GmbSuyIAGyjkN+n9K+GHX9b9MqsTL4EJCo=
gorm.io/driver/postgres v1.5.4/go.mod h1:Bgo89+h0CRcdA33Y6frlaHHVuTdOf87pmyzwW9C/BH0=
gorm.io/driver/postgres v1.6.0 h1:2dxzU8xJ+ivvqTRph34QX+WrRaJlmfyPqXmoGVjMBa4=
gorm.io/driver/postgres v1.6.0/go.mod h1:vUw0mrGgrTK+uPHEhAdV4sfFELrByKVGnaVRkXDhtWo=
gorm.io/gorm v1.25.5 h1:zR9lOiiYf09VNh5Q1gphfyia1JpiClIWG9hQaxB/mls=
gorm.io/gorm v1.25.5/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
gorm.io/gorm v1.25.10 h1:dQpO+33KalOA+aFYGlK+EfxcI5MbO7EP2yYygwh9h+s=
gorm.io/gorm v1.25.10/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
gorm.io/gorm v1.31.1 h1:7CA8FTFz/gRfgqgpeKIBcervUn3xSyPUmr6B2WXJ7kg=
gorm.io/gorm v1.31.1/go.mod h1:XyQVbO2k6YkOis7C2437jSit3SsDK72s7n7rsSHd+Gs=

View File

@@ -16,7 +16,6 @@ type CollectorAssets struct {
MarketValue float64 `json:"market_value" gorm:"type:decimal(15,2);not null;default:0;column:market_value;comment:持仓市值"`
Profit float64 `json:"profit" gorm:"type:decimal(15,2);not null;default:0;comment:当日盈亏"`
TotalAsset float64 `json:"total_asset" gorm:"type:decimal(15,2);not null;default:0;column:total_asset;comment:总资产"`
DataHash string `json:"data_hash" gorm:"type:varchar(64);not null;index;comment:数据哈希值(用于变化检测)"`
CollectedAt time.Time `json:"collected_at" gorm:"not null;index;comment:数据采集时间"`
CreatedAt time.Time `json:"created_at" gorm:"autoCreateTime;comment:记录创建时间"`
DeletedAt gorm.DeletedAt `json:"-" gorm:"index;comment:软删除时间"`

View File

@@ -1,3 +1,3 @@
#!/bin/bash
go build -o ../builds/collector.exe ./cmd/main.go
go build -o D:\work\quant\qmt\bin\collector.exe ./cmd/main.go

View File

@@ -86,210 +86,163 @@ func (s *Storage) AutoMigrate() error {
}
// SaveStatus 保存完整状态数据(使用事务)
func (s *Storage) SaveStatus(status *types.Status, dataHash string) error {
func (s *Storage) SaveData(status *types.StatusData) error {
// 验证必要字段 - AccountID是所有Upsert的共同条件
if status.Data.Assets.AccountID == "" {
return fmt.Errorf("账户ID不能为空")
}
return s.db.Transaction(func(tx *gorm.DB) error {
// 计算Ymd (年月日数字格式,如20260407)
now := time.Now()
ymd := now.Year()*10000 + int(now.Month())*100 + now.Day()
// 计算Ymd (年月日数字格式,如20260407)
now := time.Now()
ymd := now.Year()*10000 + int(now.Month())*100 + now.Day()
// 保存资产快照 (先查询后更新/插入)
var existingAsset models.CollectorAssets
err := tx.Where("account_id = ? AND ymd = ?", status.Data.Assets.AccountID, ymd).First(&existingAsset).Error
// 保存资产快照 (先查询后更新/插入)
var existingAsset models.CollectorAssets
err := s.db.Where("account_id = ? AND ymd = ?", status.Data.Assets.AccountID, ymd).First(&existingAsset).Error
asset := models.CollectorAssets{
AccountID: status.Data.Assets.AccountID,
Ymd: ymd,
Cash: status.Data.Assets.Cash,
FrozenCash: status.Data.Assets.FrozenCash,
MarketValue: status.Data.Assets.MarketValue,
Profit: status.Data.Assets.Profit,
TotalAsset: status.Data.Assets.TotalAsset,
DataHash: dataHash,
CollectedAt: now,
asset := models.CollectorAssets{
AccountID: status.Data.Assets.AccountID,
Ymd: ymd,
Cash: status.Data.Assets.Cash,
FrozenCash: status.Data.Assets.FrozenCash,
MarketValue: status.Data.Assets.MarketValue,
Profit: status.Data.Assets.Profit,
TotalAsset: status.Data.Assets.TotalAsset,
CollectedAt: now,
}
if err == gorm.ErrRecordNotFound {
// 记录不存在,插入新记录
if err := s.db.Create(&asset).Error; err != nil {
return fmt.Errorf("插入资产快照失败: %w", err)
}
if err == gorm.ErrRecordNotFound {
// 记录不存在,插入新记录
if err := tx.Create(&asset).Error; err != nil {
return fmt.Errorf("插入资产快照失败: %w", err)
}
} else if err != nil {
// 查询出错
return fmt.Errorf("查询资产快照失败: %w", err)
} else {
// 记录存在,更新现有记录
} else if err != nil {
// 查询出错
return fmt.Errorf("查询资产快照失败: %w", err)
} else {
// 记录存在,更新现有记录
if asset.Cash != existingAsset.Cash || asset.FrozenCash != existingAsset.FrozenCash || asset.MarketValue != existingAsset.MarketValue || asset.Profit != existingAsset.Profit || asset.TotalAsset != existingAsset.TotalAsset {
asset.ID = existingAsset.ID
if err := tx.Save(&asset).Error; err != nil {
if err := s.db.Save(&asset).Error; err != nil {
return fmt.Errorf("更新资产快照失败: %w", err)
}
}
}
// 批量保存订单 (先查询后更新/插入)
if len(status.Data.Orders) > 0 {
for _, order := range status.Data.Orders {
// 验证必要条件: OrderID和StockCode
if order.OrderID == 0 {
continue
}
if order.StockCode == "" {
continue
}
// 只存储成交的订单
if order.OrderStatus != 56 {
continue
}
var open_price float64
var info types.OrderInfo
err = json.Unmarshal([]byte(order.OrderRemark), &info)
if err == nil {
open_price = info.Op
}
// 批量保存订单 (先查询后更新/插入)
if len(status.Data.Orders) > 0 {
for _, order := range status.Data.Orders {
// 验证必要条件: OrderID和StockCode
if order.OrderID == 0 {
continue
}
if order.StockCode == "" {
continue
}
// 只存储成交的订单
if order.OrderStatus != 56 {
continue
}
var open_price float64
var info types.OrderInfo
err = json.Unmarshal([]byte(order.OrderRemark), &info)
if err == nil {
open_price = info.Op
}
// 查询是否存在
var existingOrder models.CollectorOrder
err := tx.Where("account_id = ? AND order_id = ? AND ymd = ?",
status.Data.Assets.AccountID, order.OrderID, ymd).First(&existingOrder).Error
// 查询是否存在
var cnt int64
s.db.Where("account_id = ? AND order_id = ? AND ymd = ?", status.Data.Assets.AccountID, order.OrderID, ymd).Count(&cnt)
if cnt > 0 {
continue
}
orderRecord := models.CollectorOrder{
OrderID: order.OrderID,
AccountID: status.Data.Assets.AccountID,
StockCode: order.StockCode,
Ymd: ymd,
Price: order.Price,
Volume: order.Volume,
OpenPrice: open_price,
TradedPrice: order.TradedPrice,
TradedVolume: order.TradedVolume,
OrderStatus: order.OrderStatus,
OrderTime: order.OrderTime,
OrderRemark: order.OrderRemark,
DataHash: dataHash,
OffsetFlag: order.OffsetFlag,
CollectedAt: now,
}
orderRecord := models.CollectorOrder{
OrderID: order.OrderID,
AccountID: status.Data.Assets.AccountID,
StockCode: order.StockCode,
Ymd: ymd,
Price: order.Price,
Volume: order.Volume,
OpenPrice: open_price,
TradedPrice: order.TradedPrice,
TradedVolume: order.TradedVolume,
OrderStatus: order.OrderStatus,
OrderTime: order.OrderTime,
OrderRemark: order.OrderRemark,
OffsetFlag: order.OffsetFlag,
CollectedAt: now,
CreatedAt: now,
}
if err == gorm.ErrRecordNotFound {
// 记录不存在,插入新记录
if err := tx.Create(&orderRecord).Error; err != nil {
return fmt.Errorf("插入订单失败: %w", err)
}
} else if err != nil {
// 查询出错
return fmt.Errorf("查询订单失败: %w", err)
} else {
// 记录存在,更新现有记录
orderRecord.ID = existingOrder.ID
if err := tx.Save(&orderRecord).Error; err != nil {
return fmt.Errorf("更新订单失败: %w", err)
}
}
// 记录不存在,插入新记录
if err := s.db.Create(&orderRecord).Error; err != nil {
return fmt.Errorf("插入订单失败: %w", err)
}
}
}
// 批量保存持仓 (先查询后更新/插入)
if len(status.Data.Positions) > 0 {
for _, pos := range status.Data.Positions {
// 验证必要条件: Code
if pos.Code == "" {
continue
// 批量保存持仓 (先查询后更新/插入)
if len(status.Data.Positions) > 0 {
for _, pos := range status.Data.Positions {
// 验证必要条件: Code
if pos.Code == "" {
continue
}
// 查询是否存在
var existingPosition models.CollectorPosition
err := s.db.Where("account_id = ? AND code = ? AND ymd = ?",
status.Data.Assets.AccountID, pos.Code, ymd).First(&existingPosition).Error
positionRecord := models.CollectorPosition{
AccountID: status.Data.Assets.AccountID,
Code: pos.Code,
Ymd: ymd,
Volume: pos.Volume,
CanUseVolume: pos.CanUseVolume,
FrozenVolume: pos.FrozenVolume,
AvgPrice: pos.AvgPrice,
OpenPrice: pos.OpenPrice,
CurrentPrice: pos.CurrentPrice,
MarketValue: pos.MarketValue,
Profit: pos.Profit,
ProfitRate: pos.ProfitRate,
MinProfitRate: pos.MinProfitRate,
CollectedAt: now,
}
if err == gorm.ErrRecordNotFound {
// 记录不存在,插入新记录
if err := s.db.Create(&positionRecord).Error; err != nil {
return fmt.Errorf("插入持仓失败: %w", err)
}
// 查询是否存在
var existingPosition models.CollectorPosition
err := tx.Where("account_id = ? AND code = ? AND ymd = ?",
status.Data.Assets.AccountID, pos.Code, ymd).First(&existingPosition).Error
positionRecord := models.CollectorPosition{
AccountID: status.Data.Assets.AccountID,
Code: pos.Code,
Ymd: ymd,
Volume: pos.Volume,
CanUseVolume: pos.CanUseVolume,
FrozenVolume: pos.FrozenVolume,
AvgPrice: pos.AvgPrice,
OpenPrice: pos.OpenPrice,
CurrentPrice: pos.CurrentPrice,
MarketValue: pos.MarketValue,
Profit: pos.Profit,
ProfitRate: pos.ProfitRate,
MinProfitRate: pos.MinProfitRate,
DataHash: dataHash,
CollectedAt: now,
}
if err == gorm.ErrRecordNotFound {
// 记录不存在,插入新记录
if err := tx.Create(&positionRecord).Error; err != nil {
return fmt.Errorf("插入持仓失败: %w", err)
}
} else if err != nil {
// 查询出错
return fmt.Errorf("查询持仓失败: %w", err)
} else {
} else if err != nil {
// 查询出错
return fmt.Errorf("查询持仓失败: %w", err)
} else {
if positionRecord.Volume != existingPosition.Volume ||
positionRecord.CanUseVolume != existingPosition.CanUseVolume ||
positionRecord.FrozenVolume != existingPosition.FrozenVolume ||
positionRecord.AvgPrice != existingPosition.AvgPrice ||
positionRecord.OpenPrice != existingPosition.OpenPrice ||
positionRecord.CurrentPrice != existingPosition.CurrentPrice ||
positionRecord.MarketValue != existingPosition.MarketValue ||
positionRecord.Profit != existingPosition.Profit ||
positionRecord.ProfitRate != existingPosition.ProfitRate {
// 记录存在,更新现有记录
positionRecord.ID = existingPosition.ID
if err := tx.Save(&positionRecord).Error; err != nil {
if err := s.db.Save(&positionRecord).Error; err != nil {
return fmt.Errorf("更新持仓失败: %w", err)
}
}
}
}
}
// 批量保存行情数据 (先查询后更新/插入)
if len(status.Data.TickData) > 0 {
for code, tick := range status.Data.TickData {
// 验证必要条件: StockCode
if code == "" {
continue
}
return nil
// 查询是否存在
var existingTick models.CollectorTick
err := tx.Where("stock_code = ? AND ymd = ?", code, ymd).First(&existingTick).Error
tickRecord := models.CollectorTick{
StockCode: code,
Ymd: ymd,
LastPrice: tick.LastPrice,
Open: tick.Open,
High: tick.High,
Low: tick.Low,
LastClose: tick.LastClose,
Volume: tick.Volume,
Amount: tick.Amount,
PVolume: tick.PVolume,
Time: tick.Time,
TimeTag: tick.TimeTag,
StockStatus: tick.StockStatus,
DataHash: dataHash,
CollectedAt: now,
}
if err == gorm.ErrRecordNotFound {
// 记录不存在,插入新记录
if err := tx.Create(&tickRecord).Error; err != nil {
return fmt.Errorf("插入行情数据失败: %w", err)
}
} else if err != nil {
// 查询出错
return fmt.Errorf("查询行情数据失败: %w", err)
} else {
// 记录存在,更新现有记录
tickRecord.ID = existingTick.ID
if err := tx.Save(&tickRecord).Error; err != nil {
return fmt.Errorf("更新行情数据失败: %w", err)
}
}
}
}
return nil
})
}
// SaveCollectionLog 保存采集日志

View File

@@ -1,7 +1,7 @@
package types
// Status 状态数据结构
type Status struct {
type StatusData struct {
Data Data `json:"data"`
Status Config `json:"status"`
}