89 lines
2.1 KiB
Go
89 lines
2.1 KiB
Go
|
package pulsar
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
|
||
|
"git.apinb.com/bsm-sdk/engine/types"
|
||
|
"git.apinb.com/bsm-sdk/engine/vars"
|
||
|
pulsargo "github.com/apache/pulsar-client-go/pulsar"
|
||
|
)
|
||
|
|
||
|
type Pulsar struct {
|
||
|
Client pulsargo.Client
|
||
|
}
|
||
|
|
||
|
func NewPulsar(cfg *types.PulsarConf) (*Pulsar, error) {
|
||
|
client, err := pulsargo.NewClient(pulsargo.ClientOptions{
|
||
|
URL: cfg.Endpoints, //TODO: 更换为接入点地址(控制台集群管理页完整复制)
|
||
|
Authentication: pulsargo.NewAuthenticationToken(cfg.Token),
|
||
|
OperationTimeout: vars.OperationTimeout,
|
||
|
ConnectionTimeout: vars.ConnectionTimeout,
|
||
|
})
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return &Pulsar{
|
||
|
Client: client,
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
// push to pulsar server, return messageid.
|
||
|
func (mq *Pulsar) Producer(topic, msg string) (MessageID string, err error) {
|
||
|
if msg == "" {
|
||
|
return "", errors.New("Message is nil.")
|
||
|
}
|
||
|
|
||
|
producer, err := mq.Client.CreateProducer(pulsargo.ProducerOptions{
|
||
|
Topic: topic,
|
||
|
CompressionType: pulsargo.ZSTD,
|
||
|
})
|
||
|
|
||
|
if err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
|
||
|
msgID, err := producer.Send(context.Background(), &pulsargo.ProducerMessage{
|
||
|
Payload: []byte(msg),
|
||
|
})
|
||
|
|
||
|
if err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
|
||
|
return msgID.String(), nil
|
||
|
}
|
||
|
|
||
|
func (mq *Pulsar) Subscribe(topic, subscription string, subType pulsargo.SubscriptionType, do func([]byte)) error {
|
||
|
// we can listen this channel
|
||
|
channel := make(chan pulsargo.ConsumerMessage, 100)
|
||
|
|
||
|
options := pulsargo.ConsumerOptions{
|
||
|
Topic: topic,
|
||
|
SubscriptionName: subscription,
|
||
|
Type: subType,
|
||
|
// fill `MessageChannel` field will create a listener
|
||
|
MessageChannel: channel,
|
||
|
}
|
||
|
|
||
|
consumer, err := mq.Client.Subscribe(options)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
defer consumer.Close()
|
||
|
|
||
|
// Receive messages from channel. The channel returns a struct `ConsumerMessage` which contains message and the consumer from where
|
||
|
// the message was received. It's not necessary here since we have 1 single consumer, but the channel could be
|
||
|
// shared across multiple consumers as well
|
||
|
for cm := range channel {
|
||
|
consumer := cm.Consumer
|
||
|
msg := cm.Message
|
||
|
|
||
|
do(msg.Payload())
|
||
|
consumer.Ack(msg)
|
||
|
}
|
||
|
return nil
|
||
|
}
|