init
This commit is contained in:
65
queue/nats/nats.go
Normal file
65
queue/nats/nats.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package nats
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"git.apinb.com/bsm-sdk/engine/vars"
|
||||
natsgo "github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
type Nats struct {
|
||||
Client natsgo.JetStreamContext
|
||||
}
|
||||
|
||||
func NewNats(endpoints []string) (*Nats, error) {
|
||||
jetStream, err := NatsNew(endpoints)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Nats{
|
||||
Client: jetStream,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NatsNew(endpoints []string) (natsgo.JetStreamContext, error) {
|
||||
var serverUrl string
|
||||
if len(endpoints) > 1 {
|
||||
serverUrl = strings.Join(endpoints, ",")
|
||||
} else {
|
||||
serverUrl = endpoints[0]
|
||||
}
|
||||
|
||||
conn, err := natsgo.Connect(serverUrl, natsgo.DontRandomize())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
js, err := conn.JetStream(natsgo.PublishAsyncMaxPending(256))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
js.AddStream(&natsgo.StreamConfig{
|
||||
Name: vars.MQSpaceName,
|
||||
Subjects: []string{vars.MQSpaceName}, //jetstream不支持通配符
|
||||
Retention: natsgo.WorkQueuePolicy,
|
||||
MaxBytes: 8,
|
||||
})
|
||||
|
||||
return js, nil
|
||||
}
|
||||
|
||||
func (mq *Nats) Subscribe(topic string, do func([]byte)) (err error) {
|
||||
_, err = mq.Client.Subscribe(topic, func(m *natsgo.Msg) {
|
||||
do(m.Data)
|
||||
m.Ack()
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (mq *Nats) Producer(topic string, data []byte) (err error) {
|
||||
_, err = mq.Client.Publish(topic, data)
|
||||
return
|
||||
}
|
||||
88
queue/pulsar/pulsar.go
Normal file
88
queue/pulsar/pulsar.go
Normal file
@@ -0,0 +1,88 @@
|
||||
package pulsar
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"git.apinb.com/bsm-sdk/engine/types"
|
||||
"git.apinb.com/bsm-sdk/engine/vars"
|
||||
pulsargo "github.com/apache/pulsar-client-go/pulsar"
|
||||
)
|
||||
|
||||
type Pulsar struct {
|
||||
Client pulsargo.Client
|
||||
}
|
||||
|
||||
func NewPulsar(cfg *types.PulsarConf) (*Pulsar, error) {
|
||||
client, err := pulsargo.NewClient(pulsargo.ClientOptions{
|
||||
URL: cfg.Endpoints, //TODO: 更换为接入点地址(控制台集群管理页完整复制)
|
||||
Authentication: pulsargo.NewAuthenticationToken(cfg.Token),
|
||||
OperationTimeout: vars.OperationTimeout,
|
||||
ConnectionTimeout: vars.ConnectionTimeout,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Pulsar{
|
||||
Client: client,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// push to pulsar server, return messageid.
|
||||
func (mq *Pulsar) Producer(topic, msg string) (MessageID string, err error) {
|
||||
if msg == "" {
|
||||
return "", errors.New("Message is nil.")
|
||||
}
|
||||
|
||||
producer, err := mq.Client.CreateProducer(pulsargo.ProducerOptions{
|
||||
Topic: topic,
|
||||
CompressionType: pulsargo.ZSTD,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
msgID, err := producer.Send(context.Background(), &pulsargo.ProducerMessage{
|
||||
Payload: []byte(msg),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return msgID.String(), nil
|
||||
}
|
||||
|
||||
func (mq *Pulsar) Subscribe(topic, subscription string, subType pulsargo.SubscriptionType, do func([]byte)) error {
|
||||
// we can listen this channel
|
||||
channel := make(chan pulsargo.ConsumerMessage, 100)
|
||||
|
||||
options := pulsargo.ConsumerOptions{
|
||||
Topic: topic,
|
||||
SubscriptionName: subscription,
|
||||
Type: subType,
|
||||
// fill `MessageChannel` field will create a listener
|
||||
MessageChannel: channel,
|
||||
}
|
||||
|
||||
consumer, err := mq.Client.Subscribe(options)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer consumer.Close()
|
||||
|
||||
// Receive messages from channel. The channel returns a struct `ConsumerMessage` which contains message and the consumer from where
|
||||
// the message was received. It's not necessary here since we have 1 single consumer, but the channel could be
|
||||
// shared across multiple consumers as well
|
||||
for cm := range channel {
|
||||
consumer := cm.Consumer
|
||||
msg := cm.Message
|
||||
|
||||
do(msg.Payload())
|
||||
consumer.Ack(msg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user