package main import ( "encoding/binary" "fmt" "log" "time" "unsafe" "github.com/vmihailenco/msgpack/v5" "golang.org/x/sys/windows" ) // TickData 行情数据结构 type TickData map[string]interface{} // TickReader 共享内存读取器 type TickReader struct { shmName string bufferSize int mappedFile windows.Handle view uintptr lastTimestamp uint32 // 上次读取的时间戳 } // NewTickReader 创建新的读取器 func NewTickReader(shmName string, bufferSize int) (*TickReader, error) { if shmName == "" { shmName = "tick_ipc" } if bufferSize == 0 { bufferSize = 10 * 1024 * 1024 // 10MB } reader := &TickReader{ shmName: shmName, bufferSize: bufferSize, lastTimestamp: 0, } // 打开共享内存 err := reader.openSharedMemory() if err != nil { return nil, fmt.Errorf("打开共享内存失败: %w", err) } log.Printf("✓ 成功连接到共享内存: %s (大小: %d bytes)", shmName, bufferSize) return reader, nil } // openSharedMemory 打开 Windows 共享内存 func (r *TickReader) openSharedMemory() error { // 打开已存在的共享内存(Python 创建的) namePtr, err := windows.UTF16PtrFromString(r.shmName) if err != nil { return fmt.Errorf("转换字符串失败: %w", err) } // 使用 syscall 直接调用 Windows API kernel32 := windows.NewLazySystemDLL("kernel32.dll") openFileMappingProc := kernel32.NewProc("OpenFileMappingW") mapViewOfFileProc := kernel32.NewProc("MapViewOfFile") // 调用 OpenFileMappingW handle, _, callErr := openFileMappingProc.Call( uintptr(windows.FILE_MAP_READ), 0, // bInheritHandle uintptr(unsafe.Pointer(namePtr)), ) if callErr != nil && callErr.Error() != "The operation completed successfully." { return fmt.Errorf("打开文件映射失败: %w", callErr) } if handle == 0 { return fmt.Errorf("打开文件映射失败: 返回空句柄") } mappingHandle := windows.Handle(handle) // 映射视图 view, _, callErr := mapViewOfFileProc.Call( uintptr(mappingHandle), uintptr(windows.FILE_MAP_READ), 0, 0, uintptr(r.bufferSize), ) if callErr != nil && callErr.Error() != "The operation completed successfully." { windows.CloseHandle(mappingHandle) return fmt.Errorf("映射视图失败: %w", callErr) } if view == 0 { windows.CloseHandle(mappingHandle) return fmt.Errorf("映射视图失败: 返回空地址") } r.mappedFile = mappingHandle r.view = view return nil } // ReadLatestTick 读取最新的行情数据 func (r *TickReader) ReadLatestTick() (TickData, error) { if r.view == 0 { return nil, fmt.Errorf("共享内存未初始化") } // 读取头部信息(16字节) // 格式: [version(4)] [write_pos(4)] [last_data_len(4)] [timestamp(4)] header := unsafeByteSlice(r.view, 0, 16) // 解析时间戳 currentTimestamp := binary.LittleEndian.Uint32(header[12:16]) // 检查数据是否更新 if currentTimestamp == r.lastTimestamp { return nil, fmt.Errorf("数据未更新") } // 解析最后数据长度 lastDataLen := binary.LittleEndian.Uint32(header[8:12]) if lastDataLen == 0 { return nil, fmt.Errorf("无可用数据") } // 解析写入位置 writePos := binary.LittleEndian.Uint32(header[4:8]) // 计算数据起始位置 var dataStart uint32 if writePos >= lastDataLen+4 { dataStart = 16 + writePos - lastDataLen - 4 } else { dataStart = 16 } // 读取数据长度 dataLenBytes := unsafeByteSlice(r.view, int(dataStart), 4) dataLen := binary.LittleEndian.Uint32(dataLenBytes) // 读取 msgpack 数据 packedData := unsafeByteSlice(r.view, int(dataStart)+4, int(dataLen)) // 反序列化 msgpack var tickData TickData err := msgpack.Unmarshal(packedData, &tickData) if err != nil { return nil, fmt.Errorf("反序列化失败: %w", err) } // 更新最后时间戳 r.lastTimestamp = currentTimestamp return tickData, nil } // unsafeByteSlice 将内存地址转换为字节切片 func unsafeByteSlice(base uintptr, offset int, length int) []byte { return (*[1 << 30]byte)(unsafe.Pointer(base + uintptr(offset)))[:length:length] } // Close 关闭读取器 func (r *TickReader) Close() { if r.view != 0 { windows.UnmapViewOfFile(r.view) r.view = 0 } if r.mappedFile != 0 { windows.CloseHandle(r.mappedFile) r.mappedFile = 0 } log.Println("✓ 共享内存连接已关闭") } // StartContinuousRead 持续读取行情数据 func (r *TickReader) StartContinuousRead(interval time.Duration, handler func(TickData)) { log.Printf("开始持续读取,间隔: %v", interval) ticker := time.NewTicker(interval) defer ticker.Stop() for range ticker.C { tick, err := r.ReadLatestTick() if err != nil { continue } if handler != nil { handler(tick) } } } func main() { // 创建读取器 reader, err := NewTickReader("tick_ipc", 0) if err != nil { log.Fatalf("初始化失败: %v", err) } defer reader.Close() // 示例2:持续读取(取消注释以启用) fmt.Println("\n=== 持续读取示例(按 Ctrl+C 停止)===") reader.StartContinuousRead(time.Second, func(tick TickData) { fmt.Printf("\n[%s] 接收到新行情:%d个标的\n", time.Now().Format("15:04:05"), len(tick)) }) }