package pulsar import ( "context" "errors" "git.apinb.com/bsm-sdk/engine/types" "git.apinb.com/bsm-sdk/engine/vars" pulsargo "github.com/apache/pulsar-client-go/pulsar" ) type Pulsar struct { Client pulsargo.Client } func NewPulsar(cfg *types.PulsarConf) (*Pulsar, error) { client, err := pulsargo.NewClient(pulsargo.ClientOptions{ URL: cfg.Endpoints, //TODO: 更换为接入点地址(控制台集群管理页完整复制) Authentication: pulsargo.NewAuthenticationToken(cfg.Token), OperationTimeout: vars.OperationTimeout, ConnectionTimeout: vars.ConnectionTimeout, }) if err != nil { return nil, err } return &Pulsar{ Client: client, }, nil } // push to pulsar server, return messageid. func (mq *Pulsar) Producer(topic, msg string) (MessageID string, err error) { if msg == "" { return "", errors.New("Message is nil.") } producer, err := mq.Client.CreateProducer(pulsargo.ProducerOptions{ Topic: topic, CompressionType: pulsargo.ZSTD, }) if err != nil { return "", err } msgID, err := producer.Send(context.Background(), &pulsargo.ProducerMessage{ Payload: []byte(msg), }) if err != nil { return "", err } return msgID.String(), nil } func (mq *Pulsar) Subscribe(topic, subscription string, subType pulsargo.SubscriptionType, do func([]byte)) error { // we can listen this channel channel := make(chan pulsargo.ConsumerMessage, 100) options := pulsargo.ConsumerOptions{ Topic: topic, SubscriptionName: subscription, Type: subType, // fill `MessageChannel` field will create a listener MessageChannel: channel, } consumer, err := mq.Client.Subscribe(options) if err != nil { return err } defer consumer.Close() // Receive messages from channel. The channel returns a struct `ConsumerMessage` which contains message and the consumer from where // the message was received. It's not necessary here since we have 1 single consumer, but the channel could be // shared across multiple consumers as well for cm := range channel { consumer := cm.Consumer msg := cm.Message do(msg.Payload()) consumer.Ack(msg) } return nil }