package nats import ( "strings" "git.apinb.com/bsm-sdk/engine/vars" natsgo "github.com/nats-io/nats.go" ) type Nats struct { Client natsgo.JetStreamContext } func NewNats(endpoints []string) (*Nats, error) { jetStream, err := NatsNew(endpoints) if err != nil { return nil, err } return &Nats{ Client: jetStream, }, nil } func NatsNew(endpoints []string) (natsgo.JetStreamContext, error) { var serverUrl string if len(endpoints) > 1 { serverUrl = strings.Join(endpoints, ",") } else { serverUrl = endpoints[0] } conn, err := natsgo.Connect(serverUrl, natsgo.DontRandomize()) if err != nil { return nil, err } defer conn.Close() js, err := conn.JetStream(natsgo.PublishAsyncMaxPending(256)) if err != nil { return nil, err } js.AddStream(&natsgo.StreamConfig{ Name: vars.MQSpaceName, Subjects: []string{vars.MQSpaceName}, //jetstream不支持通配符 Retention: natsgo.WorkQueuePolicy, MaxBytes: 8, }) return js, nil } func (mq *Nats) Subscribe(topic string, do func([]byte)) (err error) { _, err = mq.Client.Subscribe(topic, func(m *natsgo.Msg) { do(m.Data) m.Ack() }) return } func (mq *Nats) Producer(topic string, data []byte) (err error) { _, err = mq.Client.Publish(topic, data) return }