package nats

import (
	"strings"

	"git.apinb.com/bsm-sdk/core/errcode"
	"git.apinb.com/bsm-sdk/core/vars"
	natsgo "github.com/nats-io/nats.go"
)

type Nats struct {
	Client natsgo.JetStreamContext
}

func NewNats(endpoints []string, space string) (*Nats, error) {
	if len(endpoints) == 0 {
		return nil, errcode.ErrMq
	}
	if space == "" {
		space = vars.MQSpaceName
	}
	jetStream, err := NatsNew(endpoints, space)
	if err != nil {
		return nil, err
	}

	return &Nats{
		Client: jetStream,
	}, nil
}

func NatsNew(endpoints []string, space string) (natsgo.JetStreamContext, error) {
	var serverUrl string
	if len(endpoints) > 1 {
		serverUrl = strings.Join(endpoints, ",")
	} else {
		serverUrl = endpoints[0]
	}

	conn, err := natsgo.Connect(serverUrl, natsgo.DontRandomize())
	if err != nil {
		return nil, err
	}
	defer conn.Close()

	js, err := conn.JetStream(natsgo.PublishAsyncMaxPending(256))
	if err != nil {
		return nil, err
	}

	js.AddStream(&natsgo.StreamConfig{
		Name:      space,
		Subjects:  []string{space}, //jetstream不支持通配符
		Retention: natsgo.WorkQueuePolicy,
		MaxBytes:  8,
	})

	return js, nil
}

func (mq *Nats) Subscribe(topic string, do func([]byte)) (err error) {
	_, err = mq.Client.Subscribe(topic, func(m *natsgo.Msg) {
		do(m.Data)
		m.Ack()
	})
	return
}

func (mq *Nats) Producer(topic string, data []byte) (err error) {
	_, err = mq.Client.Publish(topic, data)
	return
}