engine/etcd/etcd.go

113 lines
2.5 KiB
Go
Raw Normal View History

2024-02-11 01:31:01 +08:00
package etcd
import (
"context"
"fmt"
"log"
"time"
"git.apinb.com/bsm-sdk/engine/types"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/client/pkg/v3/transport"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
)
// Etcd .
type Etcd struct {
Client *clientv3.Client
Kv clientv3.KV
Watcher clientv3.Watcher
}
var (
WithPrefix = clientv3.WithPrefix()
IsDelete = mvccpb.DELETE
MeshEtcdClient *Etcd = nil
PreKv = clientv3.WithPrevKV()
)
// NewEtcd .
func NewEtcd(endpoints []string, tls *types.EtcdTls) *Etcd {
if MeshEtcdClient != nil {
return MeshEtcdClient
}
cfg := clientv3.Config{
Context: context.Background(),
Endpoints: endpoints,
// set timeout per request to fail fast when the target endpoints is unavailable
DialKeepAliveTimeout: 10 * time.Second,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBlock(),
},
}
// 证书
if tls != nil {
tlsInfo := transport.TLSInfo{
TrustedCAFile: tls.Ca,
CertFile: tls.Cert,
KeyFile: tls.CertKey,
}
_tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
log.Fatal(fmt.Sprintf("tlsconfig failed, err: %v", err))
}
cfg.TLS = _tlsConfig
}
client, err := clientv3.New(cfg)
if err != nil {
log.Fatal(fmt.Sprintf("new etcd client v3 by config %+v error:%s", cfg, err))
}
//检测etcd服务是否连接畅通
timeoutCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_, err = client.Status(timeoutCtx, cfg.Endpoints[0])
if err != nil {
log.Fatal(fmt.Sprintf("error checking etcd status: %v", err))
}
MeshEtcdClient = &Etcd{
Client: client,
Kv: clientv3.NewKV(client),
Watcher: clientv3.NewWatcher(client),
}
return MeshEtcdClient
}
func (e *Etcd) Get(key string) (string, int64, error) {
var err error
var val string
var version int64
resp, err := e.Kv.Get(context.Background(), key)
if err == nil && len(resp.Kvs) != 0 {
val = string(resp.Kvs[0].Value)
version = resp.Kvs[0].ModRevision
}
return val, version, err
}
func (e *Etcd) Fetch(key string) ([]*mvccpb.KeyValue, error) {
var err error
resp, err := e.Kv.Get(context.Background(), key, clientv3.WithPrefix())
return resp.Kvs, err
}
func (e *Etcd) GetWithRev(key string, version int64) (string, error) {
var err error
var val string
resp, err := e.Kv.Get(context.Background(), key, clientv3.WithRev(version))
if err == nil && len(resp.Kvs) != 0 {
val = string(resp.Kvs[0].Value)
}
return val, err
}