engine/queue/nats/nats.go

66 lines
1.2 KiB
Go

package nats
import (
"strings"
"git.apinb.com/bsm-sdk/engine/vars"
natsgo "github.com/nats-io/nats.go"
)
type Nats struct {
Client natsgo.JetStreamContext
}
func NewNats(endpoints []string) (*Nats, error) {
jetStream, err := NatsNew(endpoints)
if err != nil {
return nil, err
}
return &Nats{
Client: jetStream,
}, nil
}
func NatsNew(endpoints []string) (natsgo.JetStreamContext, error) {
var serverUrl string
if len(endpoints) > 1 {
serverUrl = strings.Join(endpoints, ",")
} else {
serverUrl = endpoints[0]
}
conn, err := natsgo.Connect(serverUrl, natsgo.DontRandomize())
if err != nil {
return nil, err
}
defer conn.Close()
js, err := conn.JetStream(natsgo.PublishAsyncMaxPending(256))
if err != nil {
return nil, err
}
js.AddStream(&natsgo.StreamConfig{
Name: vars.MQSpaceName,
Subjects: []string{vars.MQSpaceName}, //jetstream不支持通配符
Retention: natsgo.WorkQueuePolicy,
MaxBytes: 8,
})
return js, nil
}
func (mq *Nats) Subscribe(topic string, do func([]byte)) (err error) {
_, err = mq.Client.Subscribe(topic, func(m *natsgo.Msg) {
do(m.Data)
m.Ack()
})
return
}
func (mq *Nats) Producer(topic string, data []byte) (err error) {
_, err = mq.Client.Publish(topic, data)
return
}