113 lines
2.5 KiB
Go
113 lines
2.5 KiB
Go
|
package etcd
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"log"
|
||
|
"time"
|
||
|
|
||
|
"git.apinb.com/bsm-sdk/engine/types"
|
||
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
||
|
"go.etcd.io/etcd/client/pkg/v3/transport"
|
||
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||
|
"google.golang.org/grpc"
|
||
|
)
|
||
|
|
||
|
// Etcd .
|
||
|
type Etcd struct {
|
||
|
Client *clientv3.Client
|
||
|
Kv clientv3.KV
|
||
|
Watcher clientv3.Watcher
|
||
|
}
|
||
|
|
||
|
var (
|
||
|
WithPrefix = clientv3.WithPrefix()
|
||
|
IsDelete = mvccpb.DELETE
|
||
|
MeshEtcdClient *Etcd = nil
|
||
|
PreKv = clientv3.WithPrevKV()
|
||
|
)
|
||
|
|
||
|
// NewEtcd .
|
||
|
func NewEtcd(endpoints []string, tls *types.EtcdTls) *Etcd {
|
||
|
if MeshEtcdClient != nil {
|
||
|
return MeshEtcdClient
|
||
|
}
|
||
|
|
||
|
cfg := clientv3.Config{
|
||
|
Context: context.Background(),
|
||
|
Endpoints: endpoints,
|
||
|
// set timeout per request to fail fast when the target endpoints is unavailable
|
||
|
DialKeepAliveTimeout: 10 * time.Second,
|
||
|
DialTimeout: 5 * time.Second,
|
||
|
DialOptions: []grpc.DialOption{
|
||
|
grpc.WithBlock(),
|
||
|
},
|
||
|
}
|
||
|
|
||
|
// 证书
|
||
|
if tls != nil {
|
||
|
tlsInfo := transport.TLSInfo{
|
||
|
TrustedCAFile: tls.Ca,
|
||
|
CertFile: tls.Cert,
|
||
|
KeyFile: tls.CertKey,
|
||
|
}
|
||
|
_tlsConfig, err := tlsInfo.ClientConfig()
|
||
|
if err != nil {
|
||
|
log.Fatal(fmt.Sprintf("tlsconfig failed, err: %v", err))
|
||
|
}
|
||
|
cfg.TLS = _tlsConfig
|
||
|
}
|
||
|
|
||
|
client, err := clientv3.New(cfg)
|
||
|
|
||
|
if err != nil {
|
||
|
log.Fatal(fmt.Sprintf("new etcd client v3 by config %+v error:%s", cfg, err))
|
||
|
}
|
||
|
|
||
|
//检测etcd服务是否连接畅通
|
||
|
timeoutCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||
|
defer cancel()
|
||
|
_, err = client.Status(timeoutCtx, cfg.Endpoints[0])
|
||
|
if err != nil {
|
||
|
log.Fatal(fmt.Sprintf("error checking etcd status: %v", err))
|
||
|
}
|
||
|
|
||
|
MeshEtcdClient = &Etcd{
|
||
|
Client: client,
|
||
|
Kv: clientv3.NewKV(client),
|
||
|
Watcher: clientv3.NewWatcher(client),
|
||
|
}
|
||
|
return MeshEtcdClient
|
||
|
}
|
||
|
|
||
|
func (e *Etcd) Get(key string) (string, int64, error) {
|
||
|
var err error
|
||
|
var val string
|
||
|
var version int64
|
||
|
resp, err := e.Kv.Get(context.Background(), key)
|
||
|
|
||
|
if err == nil && len(resp.Kvs) != 0 {
|
||
|
val = string(resp.Kvs[0].Value)
|
||
|
version = resp.Kvs[0].ModRevision
|
||
|
}
|
||
|
return val, version, err
|
||
|
}
|
||
|
|
||
|
func (e *Etcd) Fetch(key string) ([]*mvccpb.KeyValue, error) {
|
||
|
var err error
|
||
|
resp, err := e.Kv.Get(context.Background(), key, clientv3.WithPrefix())
|
||
|
|
||
|
return resp.Kvs, err
|
||
|
}
|
||
|
|
||
|
func (e *Etcd) GetWithRev(key string, version int64) (string, error) {
|
||
|
var err error
|
||
|
var val string
|
||
|
resp, err := e.Kv.Get(context.Background(), key, clientv3.WithRev(version))
|
||
|
|
||
|
if err == nil && len(resp.Kvs) != 0 {
|
||
|
val = string(resp.Kvs[0].Value)
|
||
|
}
|
||
|
return val, err
|
||
|
}
|