core/queue/nats/nats.go

73 lines
1.4 KiB
Go
Raw Normal View History

2025-02-07 13:01:38 +08:00
package nats
import (
"strings"
2025-02-07 20:33:27 +08:00
"git.apinb.com/bsm-sdk/core/errcode"
2025-02-07 15:31:27 +08:00
"git.apinb.com/bsm-sdk/core/vars"
2025-02-07 13:01:38 +08:00
natsgo "github.com/nats-io/nats.go"
)
type Nats struct {
Client natsgo.JetStreamContext
}
2025-02-07 20:33:27 +08:00
func NewNats(endpoints []string, space string) (*Nats, error) {
if len(endpoints) == 0 {
return nil, errcode.ErrMq
}
if space == "" {
space = vars.MQSpaceName
}
jetStream, err := NatsNew(endpoints, space)
2025-02-07 13:01:38 +08:00
if err != nil {
return nil, err
}
return &Nats{
Client: jetStream,
}, nil
}
2025-02-07 20:33:27 +08:00
func NatsNew(endpoints []string, space string) (natsgo.JetStreamContext, error) {
2025-02-07 13:01:38 +08:00
var serverUrl string
if len(endpoints) > 1 {
serverUrl = strings.Join(endpoints, ",")
} else {
serverUrl = endpoints[0]
}
conn, err := natsgo.Connect(serverUrl, natsgo.DontRandomize())
if err != nil {
return nil, err
}
defer conn.Close()
js, err := conn.JetStream(natsgo.PublishAsyncMaxPending(256))
if err != nil {
return nil, err
}
js.AddStream(&natsgo.StreamConfig{
2025-02-07 20:33:27 +08:00
Name: space,
Subjects: []string{space}, //jetstream不支持通配符
2025-02-07 13:01:38 +08:00
Retention: natsgo.WorkQueuePolicy,
MaxBytes: 8,
})
return js, nil
}
func (mq *Nats) Subscribe(topic string, do func([]byte)) (err error) {
_, err = mq.Client.Subscribe(topic, func(m *natsgo.Msg) {
do(m.Data)
m.Ack()
})
return
}
func (mq *Nats) Producer(topic string, data []byte) (err error) {
_, err = mq.Client.Publish(topic, data)
return
}