66 lines
1.2 KiB
Go
66 lines
1.2 KiB
Go
|
package nats
|
||
|
|
||
|
import (
|
||
|
"strings"
|
||
|
|
||
|
"git.apinb.com/bsm-sdk/engine/vars"
|
||
|
natsgo "github.com/nats-io/nats.go"
|
||
|
)
|
||
|
|
||
|
type Nats struct {
|
||
|
Client natsgo.JetStreamContext
|
||
|
}
|
||
|
|
||
|
func NewNats(endpoints []string) (*Nats, error) {
|
||
|
jetStream, err := NatsNew(endpoints)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return &Nats{
|
||
|
Client: jetStream,
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
func NatsNew(endpoints []string) (natsgo.JetStreamContext, error) {
|
||
|
var serverUrl string
|
||
|
if len(endpoints) > 1 {
|
||
|
serverUrl = strings.Join(endpoints, ",")
|
||
|
} else {
|
||
|
serverUrl = endpoints[0]
|
||
|
}
|
||
|
|
||
|
conn, err := natsgo.Connect(serverUrl, natsgo.DontRandomize())
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
defer conn.Close()
|
||
|
|
||
|
js, err := conn.JetStream(natsgo.PublishAsyncMaxPending(256))
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
js.AddStream(&natsgo.StreamConfig{
|
||
|
Name: vars.MQSpaceName,
|
||
|
Subjects: []string{vars.MQSpaceName}, //jetstream不支持通配符
|
||
|
Retention: natsgo.WorkQueuePolicy,
|
||
|
MaxBytes: 8,
|
||
|
})
|
||
|
|
||
|
return js, nil
|
||
|
}
|
||
|
|
||
|
func (mq *Nats) Subscribe(topic string, do func([]byte)) (err error) {
|
||
|
_, err = mq.Client.Subscribe(topic, func(m *natsgo.Msg) {
|
||
|
do(m.Data)
|
||
|
m.Ack()
|
||
|
})
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func (mq *Nats) Producer(topic string, data []byte) (err error) {
|
||
|
_, err = mq.Client.Publish(topic, data)
|
||
|
return
|
||
|
}
|