Files
tick-computing/client_go/tick_reader.go
2026-04-12 23:24:43 +08:00

207 lines
5.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"encoding/binary"
"fmt"
"log"
"time"
"unsafe"
"github.com/vmihailenco/msgpack/v5"
"golang.org/x/sys/windows"
)
// TickData 行情数据结构
type TickData map[string]interface{}
// TickReader 共享内存读取器
type TickReader struct {
shmName string
bufferSize int
mappedFile windows.Handle
view uintptr
lastTimestamp uint32 // 上次读取的时间戳
}
// NewTickReader 创建新的读取器
func NewTickReader(shmName string, bufferSize int) (*TickReader, error) {
if shmName == "" {
shmName = "tick_ipc"
}
if bufferSize == 0 {
bufferSize = 10 * 1024 * 1024 // 10MB
}
reader := &TickReader{
shmName: shmName,
bufferSize: bufferSize,
lastTimestamp: 0,
}
// 打开共享内存
err := reader.openSharedMemory()
if err != nil {
return nil, fmt.Errorf("打开共享内存失败: %w", err)
}
log.Printf("✓ 成功连接到共享内存: %s (大小: %d bytes)", shmName, bufferSize)
return reader, nil
}
// openSharedMemory 打开 Windows 共享内存
func (r *TickReader) openSharedMemory() error {
// 打开已存在的共享内存Python 创建的)
namePtr, err := windows.UTF16PtrFromString(r.shmName)
if err != nil {
return fmt.Errorf("转换字符串失败: %w", err)
}
// 使用 syscall 直接调用 Windows API
kernel32 := windows.NewLazySystemDLL("kernel32.dll")
openFileMappingProc := kernel32.NewProc("OpenFileMappingW")
mapViewOfFileProc := kernel32.NewProc("MapViewOfFile")
// 调用 OpenFileMappingW
handle, _, callErr := openFileMappingProc.Call(
uintptr(windows.FILE_MAP_READ),
0, // bInheritHandle
uintptr(unsafe.Pointer(namePtr)),
)
if callErr != nil && callErr.Error() != "The operation completed successfully." {
return fmt.Errorf("打开文件映射失败: %w", callErr)
}
if handle == 0 {
return fmt.Errorf("打开文件映射失败: 返回空句柄")
}
mappingHandle := windows.Handle(handle)
// 映射视图
view, _, callErr := mapViewOfFileProc.Call(
uintptr(mappingHandle),
uintptr(windows.FILE_MAP_READ),
0,
0,
uintptr(r.bufferSize),
)
if callErr != nil && callErr.Error() != "The operation completed successfully." {
windows.CloseHandle(mappingHandle)
return fmt.Errorf("映射视图失败: %w", callErr)
}
if view == 0 {
windows.CloseHandle(mappingHandle)
return fmt.Errorf("映射视图失败: 返回空地址")
}
r.mappedFile = mappingHandle
r.view = view
return nil
}
// ReadLatestTick 读取最新的行情数据
func (r *TickReader) ReadLatestTick() (TickData, error) {
if r.view == 0 {
return nil, fmt.Errorf("共享内存未初始化")
}
// 读取头部信息16字节
// 格式: [version(4)] [write_pos(4)] [last_data_len(4)] [timestamp(4)]
header := unsafeByteSlice(r.view, 0, 16)
// 解析时间戳
currentTimestamp := binary.LittleEndian.Uint32(header[12:16])
// 检查数据是否更新
if currentTimestamp == r.lastTimestamp {
return nil, fmt.Errorf("数据未更新")
}
// 解析最后数据长度
lastDataLen := binary.LittleEndian.Uint32(header[8:12])
if lastDataLen == 0 {
return nil, fmt.Errorf("无可用数据")
}
// 解析写入位置
writePos := binary.LittleEndian.Uint32(header[4:8])
// 计算数据起始位置
var dataStart uint32
if writePos >= lastDataLen+4 {
dataStart = 16 + writePos - lastDataLen - 4
} else {
dataStart = 16
}
// 读取数据长度
dataLenBytes := unsafeByteSlice(r.view, int(dataStart), 4)
dataLen := binary.LittleEndian.Uint32(dataLenBytes)
// 读取 msgpack 数据
packedData := unsafeByteSlice(r.view, int(dataStart)+4, int(dataLen))
// 反序列化 msgpack
var tickData TickData
err := msgpack.Unmarshal(packedData, &tickData)
if err != nil {
return nil, fmt.Errorf("反序列化失败: %w", err)
}
// 更新最后时间戳
r.lastTimestamp = currentTimestamp
return tickData, nil
}
// unsafeByteSlice 将内存地址转换为字节切片
func unsafeByteSlice(base uintptr, offset int, length int) []byte {
return (*[1 << 30]byte)(unsafe.Pointer(base + uintptr(offset)))[:length:length]
}
// Close 关闭读取器
func (r *TickReader) Close() {
if r.view != 0 {
windows.UnmapViewOfFile(r.view)
r.view = 0
}
if r.mappedFile != 0 {
windows.CloseHandle(r.mappedFile)
r.mappedFile = 0
}
log.Println("✓ 共享内存连接已关闭")
}
// StartContinuousRead 持续读取行情数据
func (r *TickReader) StartContinuousRead(interval time.Duration, handler func(TickData)) {
log.Printf("开始持续读取,间隔: %v", interval)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
tick, err := r.ReadLatestTick()
if err != nil {
continue
}
if handler != nil {
handler(tick)
}
}
}
func main() {
// 创建读取器
reader, err := NewTickReader("tick_ipc", 0)
if err != nil {
log.Fatalf("初始化失败: %v", err)
}
defer reader.Close()
// 示例2持续读取取消注释以启用
fmt.Println("\n=== 持续读取示例(按 Ctrl+C 停止)===")
reader.StartContinuousRead(time.Second, func(tick TickData) {
fmt.Printf("\n[%s] 接收到新行情:%d个标的\n", time.Now().Format("15:04:05"), len(tick))
})
}