commit 089361707495f96abd3adb9543a2c9c57ffa0021 Author: david Date: Sun Feb 11 01:31:01 2024 +0800 init diff --git a/cache/mem/mem.go b/cache/mem/mem.go new file mode 100644 index 0000000..b303a04 --- /dev/null +++ b/cache/mem/mem.go @@ -0,0 +1,14 @@ +package mem + +import ( + "git.apinb.com/bsm-sdk/engine/vars" + "github.com/FishGoddess/cachego" +) + +func New() cachego.Cache { + return cachego.NewCache( + cachego.WithGC(vars.MemGcDuration), + cachego.WithShardings(vars.MemShardings), + cachego.WithLFU(vars.MemLRUMaxNumber), + ) +} diff --git a/cache/redis/redis.go b/cache/redis/redis.go new file mode 100644 index 0000000..078e454 --- /dev/null +++ b/cache/redis/redis.go @@ -0,0 +1,61 @@ +package redis + +import ( + "context" + "net/url" + "strconv" + "strings" + + "git.apinb.com/bsm-sdk/engine/vars" + cacheRedis "github.com/redis/go-redis/v9" +) + +const ( + Nil = cacheRedis.Nil +) + +// RedisClient . +type RedisClient struct { + DB int + Client *cacheRedis.Client + Ctx context.Context +} + +func New(dsn string, hashRadix int) *RedisClient { + arg, err := url.Parse(dsn) + if err != nil { + panic(err) + } + pwd, _ := arg.User.Password() + + //get db number,default:0 + var db int = 0 + arg.Path = strings.ReplaceAll(arg.Path, "/", "") + if arg.Path == "" { + db = Hash(hashRadix) + } else { + db, _ = strconv.Atoi(arg.Path) + } + + //connect redis server + client := cacheRedis.NewClient(&cacheRedis.Options{ + Addr: arg.Host, + Password: pwd, // no password set + DB: db, // use default DB + Protocol: 3, + }) + _, err = client.Ping(context.Background()).Result() + if err != nil { + panic(err) + } + + return &RedisClient{ + DB: db, + Client: client, + Ctx: context.Background(), + } +} + +func Hash(i int) int { + return i % vars.RedisShardings +} diff --git a/cmd/service.go b/cmd/service.go new file mode 100644 index 0000000..aedbd13 --- /dev/null +++ b/cmd/service.go @@ -0,0 +1,30 @@ +package cmd + +import ( + "fmt" + "os" + "strings" + + "git.apinb.com/bsm-sdk/engine/vars" +) + +func NewCmd() { + if len(os.Args) > 1 { + parseArgs(os.Args[1]) + } +} + +func parseArgs(cmd string) { + cmd = strings.ToLower(cmd) + switch cmd { + case "-v", "--v", "-version", "--version": + versionCmd() + } + + os.Exit(0) +} + +func versionCmd() { + fmt.Printf("[Blocks Service: %s] Version: %s \n", vars.ServiceKey, vars.VERSION) + fmt.Printf("[Blocks Service: %s] Compile: %s by %s build.\n", vars.ServiceKey, vars.GO_VERSION, vars.BUILD_TIME) +} diff --git a/configure/configure.go b/configure/configure.go new file mode 100644 index 0000000..e0e5d4d --- /dev/null +++ b/configure/configure.go @@ -0,0 +1,23 @@ +package configure + +import ( + "os" + "strings" + + "github.com/zeromicro/go-zero/core/conf" +) + +// Load loads config into v from file, .json, .yaml and .yml are acceptable. +func LoadYamlFile(file string, repl map[string]string, v any) error { + contentBytes, err := os.ReadFile(file) + if err != nil { + return err + } + + contentString := string(contentBytes) + for k, v := range repl { + contentString = strings.ReplaceAll(contentString, "{"+k+"}", v) + } + + return conf.LoadFromYamlBytes([]byte(contentString), v) +} diff --git a/database/elastic/elasticsearch.go b/database/elastic/elasticsearch.go new file mode 100644 index 0000000..d484b1b --- /dev/null +++ b/database/elastic/elasticsearch.go @@ -0,0 +1,220 @@ +package elastic + +import ( + "bytes" + "context" + "encoding/json" + "log" + "sync/atomic" + "time" + + "git.apinb.com/bsm-sdk/engine/vars" + "github.com/elastic/go-elasticsearch/v8" + "github.com/elastic/go-elasticsearch/v8/esapi" + "github.com/elastic/go-elasticsearch/v8/esutil" +) + +type ES struct { + Client *elasticsearch.Client +} + +func NewElastic(endpoints []string, username, password string) (*ES, error) { + + cfg := elasticsearch.Config{ + Addresses: endpoints, + Username: username, + Password: password, + } + var err error + client, err := elasticsearch.NewClient(cfg) + if err != nil { + return nil, err + } + + return &ES{ + Client: client, + }, nil +} + +// idx 为空,默认随机唯一字符串 +// +// doc := map[string]interface{}{ +// "title": "中国", +// "content": "中国早日统一台湾", +// "time": time.Now().Unix(), +// "date": time.Now(), +// } +func (es *ES) CreateDocument(index string, id string, doc *interface{}) { + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(doc); err != nil { + log.Println("Elastic NewEncoder:", err) + } + // Set up the request object. + req := esapi.IndexRequest{ + Index: index, + DocumentID: id, + Body: &buf, + Refresh: "true", + } + + // Perform the request with the client. + res, err := req.Do(context.Background(), es.Client) + if err != nil { + log.Println("Elastic Error:", res.String()) + } + defer res.Body.Close() + + if res.IsError() { + log.Println("Elastic Error:", res.String()) + } +} + +// 批量写入文档。 +// Action field configures the operation to perform (index, create, delete, update) +// create 如果文档不存在就创建,但如果文档存在就返回错误 +// index 如果文档不存在就创建,如果文档存在就更新 +// update 更新一个文档,如果文档不存在就返回错误 +// delete 删除一个文档,如果要删除的文档id不存在,就返回错误 +func (es *ES) Batch(index string, documens []map[string]interface{}, action string) { + log.SetFlags(0) + + var ( + countSuccessful uint64 + + err error + ) + + // >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> + // + // Create the BulkIndexer + // + // NOTE: For optimal performance, consider using a third-party JSON decoding package. + // See an example in the "benchmarks" folder. + // + bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ + Index: index, // The default index name + Client: es.Client, // The Elasticsearch client + NumWorkers: vars.ESNumWorkers, // The number of worker goroutines + FlushBytes: vars.ESFlushBytes, // The flush threshold in bytes + FlushInterval: 30 * time.Second, // The periodic flush interval + }) + if err != nil { + log.Fatalf("Error creating the indexer: %s", err) + } + + for _, doc := range documens { + id := doc["id"].(string) + // Prepare the data payload: encode article to JSON + data, err := json.Marshal(doc) + if err != nil { + log.Fatalf("Cannot encode documen %s: %s", id, err) + } + + // >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> + // + // Add an item to the BulkIndexer + // + err = bi.Add( + context.Background(), + esutil.BulkIndexerItem{ + + Action: action, + + // DocumentID is the (optional) document ID + DocumentID: id, + + // Body is an `io.Reader` with the payload + Body: bytes.NewReader(data), + + // OnSuccess is called for each successful operation + OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { + atomic.AddUint64(&countSuccessful, 1) + }, + + // OnFailure is called for each failed operation + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { + if err != nil { + log.Printf("ERROR: %s", err) + } else { + log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason) + } + }, + }, + ) + if err != nil { + log.Printf("Unexpected error: %s", err) + } + // <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< + } + + // >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> + // Close the indexer + // + if err := bi.Close(context.Background()); err != nil { + log.Printf("Unexpected error: %s", err) + } + + stats := bi.Stats() + if stats.NumFailed > 0 { + log.Printf("Indexed [%d] documents with [%d] errors", stats.NumFlushed, stats.NumFailed) + } else { + log.Printf("Successfully indexed [%d] documents", stats.NumFlushed) + } +} + +func (es *ES) Search(index string, query map[string]interface{}) (res *esapi.Response, err error) { + var buf bytes.Buffer + if err = json.NewEncoder(&buf).Encode(query); err != nil { + return + } + // Perform the search request. + res, err = es.Client.Search( + es.Client.Search.WithContext(context.Background()), + es.Client.Search.WithIndex(index), + es.Client.Search.WithBody(&buf), + es.Client.Search.WithTrackTotalHits(true), + es.Client.Search.WithFrom(0), + es.Client.Search.WithSize(10), + es.Client.Search.WithSort("time:desc"), + es.Client.Search.WithPretty(), + ) + if err != nil { + return + } + defer res.Body.Close() + + return +} + +// 删除 index 根据 索引名 id +func (es *ES) Delete(index, idx string) (res *esapi.Response, err error) { + res, err = es.Client.Delete( + index, // Index name + idx, // Document ID + es.Client.Delete.WithRefresh("true"), + ) + if err != nil { + return + } + defer res.Body.Close() + + return +} + +func (es *ES) DeleteByQuery(index []string, query map[string]interface{}) (res *esapi.Response, err error) { + var buf bytes.Buffer + if err = json.NewEncoder(&buf).Encode(query); err != nil { + return + } + // Perform the search request. + res, err = es.Client.DeleteByQuery( + index, + &buf, + ) + if err != nil { + return + } + defer res.Body.Close() + + return +} diff --git a/database/kv/pebble.go b/database/kv/pebble.go new file mode 100644 index 0000000..e17d16b --- /dev/null +++ b/database/kv/pebble.go @@ -0,0 +1,66 @@ +package kv + +import ( + "github.com/cockroachdb/pebble" +) + +var Impl *KvImpl + +type KvImpl struct { + PebbleDB *pebble.DB +} + +func NewPebble(datadir string) *KvImpl { + db, err := pebble.Open(datadir, &pebble.Options{}) + if err != nil { + panic(err) + } + return &KvImpl{ + PebbleDB: db, + } +} + +func (db *KvImpl) PebbleSet(key, val string) error { + return db.PebbleDB.Set([]byte(key), []byte(val), pebble.Sync) +} + +func (db *KvImpl) PebbleGet(key string) ([]byte, error) { + value, _, err := db.PebbleDB.Get([]byte(key)) + if err != nil { + return nil, err + } + + return value, nil +} + +func (db *KvImpl) PebbleFetch(prefixKey string) (result map[string]string, err error) { + keyUpperBound := func(b []byte) []byte { + end := make([]byte, len(b)) + copy(end, b) + for i := len(end) - 1; i >= 0; i-- { + end[i] = end[i] + 1 + if end[i] != 0 { + return end[:i+1] + } + } + return nil // no upper-bound + } + + prefixIterOptions := func(prefix []byte) *pebble.IterOptions { + return &pebble.IterOptions{ + LowerBound: prefix, + UpperBound: keyUpperBound(prefix), + } + } + + iter, err := db.PebbleDB.NewIter(prefixIterOptions([]byte(prefixKey))) + + if err != nil { + return nil, err + } + + for iter.First(); iter.Valid(); iter.Next() { + result[string(iter.Key())] = string(iter.Value()) + } + return +} diff --git a/database/sql/logger.go b/database/sql/logger.go new file mode 100644 index 0000000..73fd2db --- /dev/null +++ b/database/sql/logger.go @@ -0,0 +1,63 @@ +package sql + +import ( + "fmt" + "io" + "log" + "os" + "path" + "path/filepath" + "time" + + "git.apinb.com/bsm-sdk/engine/utils" + "git.apinb.com/bsm-sdk/engine/vars" + "gorm.io/gorm/logger" +) + +var ( + F *os.File +) + +func setLogger(serviceName string, stdout bool) logger.Interface { + + // create log dir + absPath := path.Join(utils.GetCurrentPath(), vars.SqlLogDir, serviceName) + if !utils.PathExists(absPath) { + os.MkdirAll(absPath, os.ModePerm) + } + + // open log file + fullPath := getLogFileFullPath(absPath, getLogFileName(serviceName)) + F = openLogFile(fullPath) + + // write log + var multiOutput io.Writer + if stdout { + multiOutput = io.MultiWriter(os.Stdout, F) + } else { + multiOutput = io.MultiWriter(F) + } + + return logger.New(log.New(multiOutput, "\r\n", log.LstdFlags), logger.Config{ + SlowThreshold: 200 * time.Millisecond, + IgnoreRecordNotFoundError: false, + Colorful: false, + }) +} + +func getLogFileName(serviceName string) string { + return fmt.Sprintf("sql_%s.%s", time.Now().Format(vars.YYYYMMDD), vars.SqlLogFileExt) +} + +func getLogFileFullPath(dir, fn string) string { + return filepath.Join(dir, fn) +} + +func openLogFile(filePath string) *os.File { + handle, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + log.Printf("Fail to OpenFile :%v", err) + } + + return handle +} diff --git a/database/sql/postgresql.go b/database/sql/postgresql.go new file mode 100644 index 0000000..1480f4e --- /dev/null +++ b/database/sql/postgresql.go @@ -0,0 +1,56 @@ +package sql + +import ( + "git.apinb.com/bsm-sdk/engine/types" + "git.apinb.com/bsm-sdk/engine/vars" + "gorm.io/driver/postgres" + "gorm.io/gorm" + "gorm.io/gorm/schema" +) + +// new grom db. +func NewPostgreSql(dsn string, options *types.SqlOptions) (*gorm.DB, error) { + var err error + + //set connection default val. + if options == nil { + options = &types.SqlOptions{ + MaxIdleConns: vars.SqlOptionMaxIdleConns, + MaxOpenConns: vars.SqlOptionMaxIdleConns, + ConnMaxLifetime: vars.SqlOptionConnMaxLifetime, + LogStdout: false, + Debug: true, + } + } + + //控制台和文件同时输出日志 + var newLogger = setLogger(vars.ServiceKey, options.LogStdout) + + gormDb, err := gorm.Open(postgres.New(postgres.Config{ + DSN: dsn, + // PreferSimpleProtocol: true, disables implicit prepared statement usage + + }), &gorm.Config{ + Logger: newLogger, + NamingStrategy: schema.NamingStrategy{ + SingularTable: true, // 使用单数表名,启用该选项,此时,`User` 的表名应该是 `t_user` + }}) + if err != nil { + return nil, err + } + + if options.Debug { + gormDb = gormDb.Debug() + } + + // 获取通用数据库对象 sql.DB ,然后使用其提供的功能 + sqlDB, _ := gormDb.DB() + // SetMaxIdleConns 用于设置连接池中空闲连接的最大数量。 + sqlDB.SetMaxIdleConns(options.MaxIdleConns) + // SetMaxOpenConns 设置打开数据库连接的最大数量。 + sqlDB.SetMaxOpenConns(options.MaxOpenConns) + // SetConnMaxLifetime 设置了连接可复用的最大时间。 + sqlDB.SetConnMaxLifetime(options.ConnMaxLifetime) + + return gormDb, nil +} diff --git a/encipher/aes.go b/encipher/aes.go new file mode 100644 index 0000000..e8ff8f5 --- /dev/null +++ b/encipher/aes.go @@ -0,0 +1,146 @@ +package encipher + +import ( + "bytes" + "crypto/aes" + "crypto/cipher" + "encoding/base64" + "encoding/json" + "errors" + "strings" + "time" + + "git.apinb.com/bsm-sdk/engine/env" + "git.apinb.com/bsm-sdk/engine/exception" + "git.apinb.com/bsm-sdk/engine/types" + "git.apinb.com/bsm-sdk/engine/vars" +) + +var ( + certBytes *types.CertFileBytes = nil + JwtSecret []byte + JwtSecretLen int +) + +func New() { + JwtSecret = []byte(env.MeshEnv.JwtSecretKey) + JwtSecretLen = len(env.MeshEnv.JwtSecretKey) +} + +func GenerateTokenAes(id uint, identity, client, role string, extend map[string]string) (string, error) { + expireTime := time.Now().Add(vars.JwtExpireDay) + claims := types.JwtClaims{ + ID: id, + Identity: identity, + Extend: extend, + Client: client, + Role: role, + ExpiresAt: expireTime.Unix(), + } + + byte, err := json.Marshal(claims) + if err != nil { + return "", err + } + + token, err := AesEncryptCBC(byte) + if err != nil { + return "", err + } + return token, nil +} + +func AesEncryptCBC(plan []byte) (string, error) { + if (JwtSecretLen == 16 || JwtSecretLen == 24 || JwtSecretLen == 32) == false { + return "", errors.New("JwtSecret lenght must 16/24/32.") + } + // 分组秘钥 + // NewCipher该函数限制了输入k的长度必须为16, 24或者32 + block, _ := aes.NewCipher(JwtSecret) + // 获取秘钥块的长度 + blockSize := block.BlockSize() + // 补全码 + plan = PKCS7Padding(plan, blockSize) + // 加密模式 + blockMode := cipher.NewCBCEncrypter(block, JwtSecret[:blockSize]) + // 创建数组 + cryted := make([]byte, len(plan)) + // 加密 + blockMode.CryptBlocks(cryted, plan) + return base64.StdEncoding.EncodeToString(cryted), nil +} + +func AesDecryptCBC(cryted string) (b []byte, err error) { + if (JwtSecretLen == 16 || JwtSecretLen == 24 || JwtSecretLen == 32) == false { + return b, errors.New("JwtSecret lenght must 16/24/32.") + } + // 转成字节数组 + crytedByte, err := base64.StdEncoding.DecodeString(cryted) + if err != nil { + return + } + // 分组秘钥 + block, err := aes.NewCipher(JwtSecret) + if err != nil { + return + } + // 获取秘钥块的长度 + blockSize := block.BlockSize() + // 加密模式 + blockMode := cipher.NewCBCDecrypter(block, JwtSecret[:blockSize]) + // 创建数组 + orig := make([]byte, len(crytedByte)) + // 解密 + blockMode.CryptBlocks(orig, crytedByte) + // 去补全码 + orig = PKCS7UnPadding(orig, blockSize) + if orig == nil { + return nil, exception.ErrAuthParseFail + } + return orig, nil +} + +func PKCS7Padding(ciphertext []byte, blocksize int) []byte { + padding := blocksize - len(ciphertext)%blocksize + padtext := bytes.Repeat([]byte{byte(padding)}, padding) + return append(ciphertext, padtext...) +} + +// 去码 +// bug:runtime error: slice bounds out of range [:-22] +func PKCS7UnPadding(origData []byte, blocksize int) []byte { + if blocksize <= 0 { + return nil + } + if origData == nil || len(origData) == 0 { + return nil + } + if len(origData)%blocksize != 0 { + return nil + } + + length := len(origData) + unpadding := int(origData[length-1]) + return origData[:(length - unpadding)] +} + +func ParseTokenAes(token string) (*types.JwtClaims, error) { + token = strings.TrimSpace(token) + data, err := AesDecryptCBC(token) + if err != nil { + return nil, err + } + + var ac *types.JwtClaims + err = json.Unmarshal(data, &ac) + if err != nil { + return nil, exception.ErrAuthParseFail + } + + expireTime := time.Now().Unix() + if expireTime > ac.ExpiresAt { + return nil, exception.ErrAuthExpire + } + + return ac, nil +} diff --git a/encipher/cert.go b/encipher/cert.go new file mode 100644 index 0000000..52bea63 --- /dev/null +++ b/encipher/cert.go @@ -0,0 +1,26 @@ +package encipher + +import ( + "os" + + "git.apinb.com/bsm-sdk/engine/env" + "git.apinb.com/bsm-sdk/engine/exception" + "git.apinb.com/bsm-sdk/engine/types" +) + +func NewCertKey(privateKey []byte, publicKey []byte) { + if certBytes == nil { + certBytes = &types.CertFileBytes{ + Private: privateKey, + Public: publicKey, + } + } +} + +func GetKeyFile(filePath string) ([]byte, error) { + keyBytes, err := os.ReadFile(env.MeshEnv.Prefix + filePath) + if err != nil { + return nil, exception.ErrDataLoss + } + return keyBytes, nil +} diff --git a/env/env.go b/env/env.go new file mode 100644 index 0000000..faaa23b --- /dev/null +++ b/env/env.go @@ -0,0 +1,32 @@ +package env + +import ( + "os" + "strings" + + "git.apinb.com/bsm-sdk/engine/types" + "git.apinb.com/bsm-sdk/engine/utils" +) + +var MeshEnv *types.MeshEnv = nil + +// get system env. +func NewEnv() *types.MeshEnv { + if MeshEnv == nil { + MeshEnv = &types.MeshEnv{ + Workspace: GetEnvDefault("BlocksMesh_Workspace", "bsm"), + Prefix: GetEnvDefault("BlocksMesh_Prefix", utils.GetCurrentPath()), + JwtSecretKey: GetEnvDefault("BlocksMesh_JwtSecretKey", "Cblocksmesh2022C"), + RuntimeMode: strings.ToLower(GetEnvDefault("BlocksMesh_RuntimeMode", "dev")), + } + } + return MeshEnv +} + +func GetEnvDefault(key string, def string) string { + value := os.Getenv(key) + if value == "" { + return def + } + return value +} diff --git a/etcd/etcd.go b/etcd/etcd.go new file mode 100644 index 0000000..51bdcc9 --- /dev/null +++ b/etcd/etcd.go @@ -0,0 +1,112 @@ +package etcd + +import ( + "context" + "fmt" + "log" + "time" + + "git.apinb.com/bsm-sdk/engine/types" + "go.etcd.io/etcd/api/v3/mvccpb" + "go.etcd.io/etcd/client/pkg/v3/transport" + clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/grpc" +) + +// Etcd . +type Etcd struct { + Client *clientv3.Client + Kv clientv3.KV + Watcher clientv3.Watcher +} + +var ( + WithPrefix = clientv3.WithPrefix() + IsDelete = mvccpb.DELETE + MeshEtcdClient *Etcd = nil + PreKv = clientv3.WithPrevKV() +) + +// NewEtcd . +func NewEtcd(endpoints []string, tls *types.EtcdTls) *Etcd { + if MeshEtcdClient != nil { + return MeshEtcdClient + } + + cfg := clientv3.Config{ + Context: context.Background(), + Endpoints: endpoints, + // set timeout per request to fail fast when the target endpoints is unavailable + DialKeepAliveTimeout: 10 * time.Second, + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{ + grpc.WithBlock(), + }, + } + + // 证书 + if tls != nil { + tlsInfo := transport.TLSInfo{ + TrustedCAFile: tls.Ca, + CertFile: tls.Cert, + KeyFile: tls.CertKey, + } + _tlsConfig, err := tlsInfo.ClientConfig() + if err != nil { + log.Fatal(fmt.Sprintf("tlsconfig failed, err: %v", err)) + } + cfg.TLS = _tlsConfig + } + + client, err := clientv3.New(cfg) + + if err != nil { + log.Fatal(fmt.Sprintf("new etcd client v3 by config %+v error:%s", cfg, err)) + } + + //检测etcd服务是否连接畅通 + timeoutCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _, err = client.Status(timeoutCtx, cfg.Endpoints[0]) + if err != nil { + log.Fatal(fmt.Sprintf("error checking etcd status: %v", err)) + } + + MeshEtcdClient = &Etcd{ + Client: client, + Kv: clientv3.NewKV(client), + Watcher: clientv3.NewWatcher(client), + } + return MeshEtcdClient +} + +func (e *Etcd) Get(key string) (string, int64, error) { + var err error + var val string + var version int64 + resp, err := e.Kv.Get(context.Background(), key) + + if err == nil && len(resp.Kvs) != 0 { + val = string(resp.Kvs[0].Value) + version = resp.Kvs[0].ModRevision + } + return val, version, err +} + +func (e *Etcd) Fetch(key string) ([]*mvccpb.KeyValue, error) { + var err error + resp, err := e.Kv.Get(context.Background(), key, clientv3.WithPrefix()) + + return resp.Kvs, err +} + +func (e *Etcd) GetWithRev(key string, version int64) (string, error) { + var err error + var val string + resp, err := e.Kv.Get(context.Background(), key, clientv3.WithRev(version)) + + if err == nil && len(resp.Kvs) != 0 { + val = string(resp.Kvs[0].Value) + } + return val, err +} diff --git a/exception/auth.go b/exception/auth.go new file mode 100644 index 0000000..c81a0bc --- /dev/null +++ b/exception/auth.go @@ -0,0 +1,14 @@ +package exception + +// jwt custom error code ,begin:200 +var ( + ErrAuthPasswd = ErrorJson(201, "Password Error") + ErrAuthNotFound = ErrorJson(202, "Auth Token Not Found") + ErrAuthParseFail = ErrorJson(203, "Auth Parse Fail") + ErrAuthId = ErrorJson(204, "Auth Id Not Passed") + ErrAuthIdentity = ErrorJson(205, "Auth Identity Not Passed") + ErrAuthTokenChanged = ErrorJson(206, "Auth Token Changed") + ErrAuthIdType = ErrorJson(207, "Auth Id Type Error") + ErrAuthExpire = ErrorJson(208, "Auth Token Expire") + ErrAuthClient = ErrorJson(208, "Auth Token Client Not Passed") +) diff --git a/exception/db.go b/exception/db.go new file mode 100644 index 0000000..b949eb9 --- /dev/null +++ b/exception/db.go @@ -0,0 +1,10 @@ +package exception + +// db custom error code ,begin:300 +var ( + ErrDBFatal = ErrorJson(300, "DB Fatal error") + ErrCacheFatal = ErrorJson(301, "Cache Fatal error") + ErrEtcdFatal = ErrorJson(302, "Etcd Fatal error") + ErrElasticFatal = ErrorJson(303, "Elastic Fatal error") + ErrBlocksMQFatal = ErrorJson(304, "BlocksMQ Fatal error") +) diff --git a/exception/errors.go b/exception/errors.go new file mode 100644 index 0000000..a73de9a --- /dev/null +++ b/exception/errors.go @@ -0,0 +1,47 @@ +package exception + +import ( + "fmt" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var ( + // google grpc error status. + OK = ErrorJson(0, "OK") + ErrCanceled = ErrorJson(1, "Canceled") + ErrUnknown = ErrorJson(2, "Unknown") + ErrInvalidArgument = ErrorJson(3, "Invalid Argument") + ErrDeadlineExceeded = ErrorJson(4, "Deadline Exceeded") + ErrNotFound = ErrorJson(5, "Not Found") + ErrAlreadyExists = ErrorJson(6, "Already Exists") + ErrPermissionDenied = ErrorJson(7, "Permission Denied") + ErrResourceExhausted = ErrorJson(8, "Resource Exhausted") + ErrFailedPrecondition = ErrorJson(9, "Failed Precondition") + ErrAborted = ErrorJson(10, "Aborted") + ErrOutOfRange = ErrorJson(11, "Out Of Range") + ErrUnimplemented = ErrorJson(12, "Unimplemented") + ErrInternal = ErrorJson(13, "Internal") + ErrUnavailable = ErrorJson(14, "Unavailable") + ErrDataLoss = ErrorJson(15, "Data Loss") + ErrUnauthenticated = ErrorJson(16, "Unauthenticated") + ErrJSONMarshal = ErrorJson(17, "Marshal JSON") + ErrJSONUnmarshal = ErrorJson(18, "Unmarshal JSON") + + ErrSmsCode = ErrorJson(20, "SMS Code Invalid") + + // coustom error status +) + +func Error(c uint32, msg string) error { + return status.New(codes.Code(c), msg).Err() +} + +func Errorf(c uint32, format string, a ...interface{}) error { + return status.New(codes.Code(c), fmt.Sprintf(format, a...)).Err() +} + +func ErrorJson(c uint32, msg string) error { + return status.New(codes.Code(c), fmt.Sprintf(`{ "Code": %d, "Msg": "%s" }`, c, msg)).Err() +} diff --git a/exception/mq.go b/exception/mq.go new file mode 100644 index 0000000..76b87b3 --- /dev/null +++ b/exception/mq.go @@ -0,0 +1,11 @@ +package exception + +// mq custom error code ,begin:900 +var ( + ErrMQClient = ErrorJson(900, "MQ Connect Error") + ErrMQDispatch = ErrorJson(901, "MQ Dispatch Error") + ErrMQProducer = ErrorJson(902, "MQ Producer Error") + ErrMQConsumer = ErrorJson(902, "MQ Consumer Error") + ErrMQInternal = ErrorJson(903, "MQ Internal Error") + ErrMQStorage = ErrorJson(904, "MQ Storage Error") +) diff --git a/exception/work.go b/exception/work.go new file mode 100644 index 0000000..a784e55 --- /dev/null +++ b/exception/work.go @@ -0,0 +1,30 @@ +package exception + +// mesh custom error code ,begin:100 +var ( + // proxy + ErrWorkerServiceNotFound = ErrorJson(100, "Service Node Not Started") + ErrWorkerServerIsNil = ErrorJson(101, "Service Is Nil") + ErrWorkerMethodNotFound = ErrorJson(102, "Service Method Not Found") + ErrWorkerRequestContent = ErrorJson(103, "Worker Parse Request Content") + ErrWorkerRequestParams = ErrorJson(104, "Worker Parse Request Params") + + // header + ErrInvalidHeaderParams = ErrorJson(105, "Invalid Header Params") + + // grpc getway + ErrWorkerFailedProxy = ErrorJson(107, "Worker gRPC proxying should never reach this stage") + ErrWorkerGrpcProxyShould = ErrorJson(108, "Worker gRPC proxying should never reach this stage") + ErrWorkerServerStreamNotFound = ErrorJson(109, "Worker lowLevelServerStream not exists in context") + + // http getway + ErrWorkerHttpReadAll = ErrorJson(110, "Worker Http Read All") + ErrWorkerHttpResolveService = ErrorJson(111, "Worker Http Resolve Service") + ErrWorkerHttpMarshalJSON = ErrorJson(112, "Worker Http Parameter Must JSON") + ErrWorkerHttpUnmarshalJSON = ErrorJson(113, "Worker Http Return Not JSON") + ErrWorkerHttpReflectInvokeRpc = ErrorJson(114, "Worker Http Reflect InvokeRpc") + ErrWorkerHttpReflectAsDynamicMessage = ErrorJson(115, "Worker Http Reflect AsDynamicMessage") + + // invok + ErrServiceInvok = ErrorJson(116, "Service Invok") +) diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..47df3e8 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module git.apinb.com/bsm-sdk/engine + +go 1.21 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e69de29 diff --git a/print/print.go b/print/print.go new file mode 100644 index 0000000..1cc59c9 --- /dev/null +++ b/print/print.go @@ -0,0 +1,30 @@ +package print + +import ( + "fmt" + "log" +) + +// record INFO message. Color White +func Info(format string, a ...interface{}) { + message := fmt.Sprintf("\033[37m"+format+"\033[0m\n", a...) + log.Print(message) +} + +// record ERROR message. Color Red +func Important(format string, a ...interface{}) { + message := fmt.Sprintf("\033[33m"+format+"\033[0m\n", a...) + log.Print(message) +} + +// record Success message. Color Green +func Success(format string, a ...interface{}) { + message := fmt.Sprintf("\033[32m"+format+"\033[0m\n", a...) + log.Print(message) +} + +// record ERROR message. Color Red +func Fail(format string, a ...interface{}) { + message := fmt.Sprintf("\033[31m"+format+"\033[0m\n", a...) + log.Print(message) +} diff --git a/queue/nats/nats.go b/queue/nats/nats.go new file mode 100644 index 0000000..71b9140 --- /dev/null +++ b/queue/nats/nats.go @@ -0,0 +1,65 @@ +package nats + +import ( + "strings" + + "git.apinb.com/bsm-sdk/engine/vars" + natsgo "github.com/nats-io/nats.go" +) + +type Nats struct { + Client natsgo.JetStreamContext +} + +func NewNats(endpoints []string) (*Nats, error) { + jetStream, err := NatsNew(endpoints) + if err != nil { + return nil, err + } + + return &Nats{ + Client: jetStream, + }, nil +} + +func NatsNew(endpoints []string) (natsgo.JetStreamContext, error) { + var serverUrl string + if len(endpoints) > 1 { + serverUrl = strings.Join(endpoints, ",") + } else { + serverUrl = endpoints[0] + } + + conn, err := natsgo.Connect(serverUrl, natsgo.DontRandomize()) + if err != nil { + return nil, err + } + defer conn.Close() + + js, err := conn.JetStream(natsgo.PublishAsyncMaxPending(256)) + if err != nil { + return nil, err + } + + js.AddStream(&natsgo.StreamConfig{ + Name: vars.MQSpaceName, + Subjects: []string{vars.MQSpaceName}, //jetstream不支持通配符 + Retention: natsgo.WorkQueuePolicy, + MaxBytes: 8, + }) + + return js, nil +} + +func (mq *Nats) Subscribe(topic string, do func([]byte)) (err error) { + _, err = mq.Client.Subscribe(topic, func(m *natsgo.Msg) { + do(m.Data) + m.Ack() + }) + return +} + +func (mq *Nats) Producer(topic string, data []byte) (err error) { + _, err = mq.Client.Publish(topic, data) + return +} diff --git a/queue/pulsar/pulsar.go b/queue/pulsar/pulsar.go new file mode 100644 index 0000000..ea1ef59 --- /dev/null +++ b/queue/pulsar/pulsar.go @@ -0,0 +1,88 @@ +package pulsar + +import ( + "context" + "errors" + + "git.apinb.com/bsm-sdk/engine/types" + "git.apinb.com/bsm-sdk/engine/vars" + pulsargo "github.com/apache/pulsar-client-go/pulsar" +) + +type Pulsar struct { + Client pulsargo.Client +} + +func NewPulsar(cfg *types.PulsarConf) (*Pulsar, error) { + client, err := pulsargo.NewClient(pulsargo.ClientOptions{ + URL: cfg.Endpoints, //TODO: 更换为接入点地址(控制台集群管理页完整复制) + Authentication: pulsargo.NewAuthenticationToken(cfg.Token), + OperationTimeout: vars.OperationTimeout, + ConnectionTimeout: vars.ConnectionTimeout, + }) + if err != nil { + return nil, err + } + + return &Pulsar{ + Client: client, + }, nil +} + +// push to pulsar server, return messageid. +func (mq *Pulsar) Producer(topic, msg string) (MessageID string, err error) { + if msg == "" { + return "", errors.New("Message is nil.") + } + + producer, err := mq.Client.CreateProducer(pulsargo.ProducerOptions{ + Topic: topic, + CompressionType: pulsargo.ZSTD, + }) + + if err != nil { + return "", err + } + + msgID, err := producer.Send(context.Background(), &pulsargo.ProducerMessage{ + Payload: []byte(msg), + }) + + if err != nil { + return "", err + } + + return msgID.String(), nil +} + +func (mq *Pulsar) Subscribe(topic, subscription string, subType pulsargo.SubscriptionType, do func([]byte)) error { + // we can listen this channel + channel := make(chan pulsargo.ConsumerMessage, 100) + + options := pulsargo.ConsumerOptions{ + Topic: topic, + SubscriptionName: subscription, + Type: subType, + // fill `MessageChannel` field will create a listener + MessageChannel: channel, + } + + consumer, err := mq.Client.Subscribe(options) + if err != nil { + return err + } + + defer consumer.Close() + + // Receive messages from channel. The channel returns a struct `ConsumerMessage` which contains message and the consumer from where + // the message was received. It's not necessary here since we have 1 single consumer, but the channel could be + // shared across multiple consumers as well + for cm := range channel { + consumer := cm.Consumer + msg := cm.Message + + do(msg.Payload()) + consumer.Ack(msg) + } + return nil +} diff --git a/service/anonumous.go b/service/anonumous.go new file mode 100644 index 0000000..a6ece7e --- /dev/null +++ b/service/anonumous.go @@ -0,0 +1,67 @@ +package service + +import ( + "context" + "strings" + + "git.apinb.com/bsm-sdk/engine/etcd" + "git.apinb.com/bsm-sdk/engine/print" + "git.apinb.com/bsm-sdk/engine/types" + "git.apinb.com/bsm-sdk/engine/vars" + "github.com/zeromicro/go-zero/zrpc" +) + +func RegisterAnonymous(c *zrpc.RpcServerConf, anonCfg *types.AnonymousConf) { + // etcd is has + if !c.HasEtcd() { + return + } else { + print.Info("[Blocks Register] %s ETCD Endpoints: %s", vars.ServiceKey, c.Etcd.Hosts) + print.Success("[Blocks Register] Service: %s", c.Etcd.Key) + } + + // anonymous slice is zero + if len(anonCfg.Urls) == 0 || anonCfg.Key == "" { + return + } + + // connect etcd + var client *etcd.Etcd + if c.Etcd.HasTLS() { + client = etcd.NewEtcd(c.Etcd.Hosts, &types.EtcdTls{ + Ca: c.Etcd.CACertFile, + Cert: c.Etcd.CertFile, + CertKey: c.Etcd.CertKeyFile, + }) + } else { + client = etcd.NewEtcd(c.Etcd.Hosts, nil) + } + defer client.Client.Close() + + // remove reppeat, clear service all anonymous uri. + anonymous, _, _ := client.Get(anonCfg.Key) + as := strings.Split(anonymous, ",") + as = append(clearService(as), anonCfg.Urls...) + newAnonymous := strings.Join(as, ",") + + // put anonymous to etcd + _, err := client.Client.Put(context.Background(), anonCfg.Key, newAnonymous) + + if err != nil { + print.Fail("[Blocks Register] Anonymous Fail.") + } else { + print.Success("[Blocks Register] Anonymous: %s", newAnonymous) + } + +} + +func clearService(as []string) (out []string) { + for _, v := range as { + if len(v) >= len(vars.ServiceKey) { + if v[0:len(vars.ServiceKey)] != vars.ServiceKey { + out = append(out, v) + } + } + } + return +} diff --git a/service/context.go b/service/context.go new file mode 100644 index 0000000..6a53e4a --- /dev/null +++ b/service/context.go @@ -0,0 +1,50 @@ +package service + +import ( + "context" + "strings" + + "git.apinb.com/bsm-sdk/engine/encipher" + "git.apinb.com/bsm-sdk/engine/exception" + "git.apinb.com/bsm-sdk/engine/types" + "git.apinb.com/bsm-sdk/engine/utils" + "google.golang.org/grpc/metadata" +) + +// 解析Context中MetaData的数据 +type ParseOptions struct { + RoleValue string // 判断角色的值 + MustPrivateAllow bool // 是否只允许私有IP访问 +} + +func ParseMetaCtx(ctx context.Context, opts *ParseOptions) (*types.JwtClaims, error) { + // 解析metada中的信息并验证 + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return nil, exception.ErrAuthNotFound + } + + var Authorizations []string = md.Get("authorization") + if len(Authorizations) == 0 || Authorizations[0] == "" { + return nil, exception.ErrAuthNotFound + } + + claims, err := encipher.ParseTokenAes(Authorizations[0]) + if err != nil { + return nil, err + } + + if opts != nil { + if !strings.Contains(claims.Role, opts.RoleValue) { + return nil, exception.ErrPermissionDenied + } + if opts.MustPrivateAllow { + if utils.IsPublicIP(claims.Client) { + return nil, exception.ErrPermissionDenied + } + } + } + + return claims, nil + +} diff --git a/service/service.go b/service/service.go new file mode 100644 index 0000000..ca49c7f --- /dev/null +++ b/service/service.go @@ -0,0 +1,57 @@ +package service + +import ( + "fmt" + "path/filepath" + + "git.apinb.com/bsm-sdk/engine/cmd" + "git.apinb.com/bsm-sdk/engine/configure" + "git.apinb.com/bsm-sdk/engine/env" + "git.apinb.com/bsm-sdk/engine/print" + "git.apinb.com/bsm-sdk/engine/vars" + "github.com/zeromicro/go-zero/core/prometheus" + "github.com/zeromicro/go-zero/core/trace" +) + +func Register(srvKey string, cfg any) { + vars.ServiceKey = srvKey + + print.Info("[Blocks Service] %s Starting...", vars.ServiceKey) + // get os env. + env.NewEnv() + + // open side cmd. + cmd.NewCmd() + + // set config args. + args := map[string]string{ + "ServiceKey": srvKey, + "Workspace": env.MeshEnv.Workspace, + "RuntimeMode": env.MeshEnv.RuntimeMode, + } + + // get config file. + cf := fmt.Sprintf("%s_%s.yaml", srvKey, env.MeshEnv.RuntimeMode) + cf = filepath.Join(env.MeshEnv.Prefix, "etc", cf) + + configure.LoadYamlFile(cf, args, cfg) + + return +} + +func Start(addr string) { + print.Info("[Blocks Service] %s Workspace: %s", vars.ServiceKey, env.MeshEnv.Workspace) + print.Info("[Blocks Service] %s Listen Addr: %s", vars.ServiceKey, addr) + print.Info("[Blocks Service] %s Runtime Mode: %s", vars.ServiceKey, env.MeshEnv.RuntimeMode) +} + +func Done(p *prometheus.Config, t *trace.Config) { + if p != nil && p.Host != "" { + print.Info("[Blocks Service] %s Prometheus Addr: http://%s:%d%s", vars.ServiceKey, p.Host, p.Port, p.Path) + } + if t != nil && t.Endpoint != "" { + print.Info("[Blocks Service] %s Telemetry Endpoint: %s", vars.ServiceKey, t.Endpoint) + } + print.Info("[Blocks Service] %s Service Build: Version %s, Go Version %s By %s", vars.ServiceKey, vars.VERSION, vars.GO_VERSION, vars.BUILD_TIME) + print.Success("[Blocks Service] %s Service Grpc Register & Runing Success . ", vars.ServiceKey) +} diff --git a/tpi/wechat.go b/tpi/wechat.go new file mode 100644 index 0000000..36ffe69 --- /dev/null +++ b/tpi/wechat.go @@ -0,0 +1,65 @@ +package tpi + +import ( + "crypto/aes" + "crypto/cipher" + "encoding/base64" + "errors" +) + +func WeChat_Decrypt(sessionKey, encryptedData, iv string) (string, error) { + aesKey, err := base64.StdEncoding.DecodeString(sessionKey) + if err != nil { + return "", err + } + cipherText, err := base64.StdEncoding.DecodeString(encryptedData) + if err != nil { + return "", err + } + ivBytes, err := base64.StdEncoding.DecodeString(iv) + if err != nil { + return "", err + } + block, err := aes.NewCipher(aesKey) + if err != nil { + return "", err + } + mode := cipher.NewCBCDecrypter(block, ivBytes) + mode.CryptBlocks(cipherText, cipherText) + cipherText, err = WeChat_Pkcs7Unpad(cipherText, block.BlockSize()) + if err != nil { + return "", err + } + + return string(cipherText), nil +} + +// pkcs7Unpad returns slice of the original data without padding +func WeChat_Pkcs7Unpad(data []byte, blockSize int) ([]byte, error) { + var ( + // ErrInvalidBlockSize block size不合法 + ErrInvalidBlockSize = errors.New("invalid block size") + // ErrInvalidPKCS7Data PKCS7数据不合法 + ErrInvalidPKCS7Data = errors.New("invalid PKCS7 data") + // ErrInvalidPKCS7Padding 输入padding失败 + ErrInvalidPKCS7Padding = errors.New("invalid padding on input") + ) + + if blockSize <= 0 { + return nil, ErrInvalidBlockSize + } + if len(data)%blockSize != 0 || len(data) == 0 { + return nil, ErrInvalidPKCS7Data + } + c := data[len(data)-1] + n := int(c) + if n == 0 || n > len(data) { + return nil, ErrInvalidPKCS7Padding + } + for i := 0; i < n; i++ { + if data[len(data)-n+i] != c { + return nil, ErrInvalidPKCS7Padding + } + } + return data[:len(data)-n], nil +} diff --git a/types/config.go b/types/config.go new file mode 100644 index 0000000..48b5b76 --- /dev/null +++ b/types/config.go @@ -0,0 +1,19 @@ +package types + +type AnonymousConf struct { + Key string + Urls []string +} + +type PulsarConf struct { + Endpoints string + Token string + Namespaces string +} + +func (p *PulsarConf) IsHas() bool { + if p != nil && p.Endpoints != "" { + return true + } + return false +} diff --git a/types/db.go b/types/db.go new file mode 100644 index 0000000..1a23044 --- /dev/null +++ b/types/db.go @@ -0,0 +1,65 @@ +package types + +import ( + "time" + + "gorm.io/gorm" +) + +type ( + + // sql options + SqlOptions struct { + MaxIdleConns int + MaxOpenConns int + ConnMaxLifetime time.Duration + + LogStdout bool + Debug bool + } + + // standard ID,Created,Updated,Deleted definition. + Std_IICUDS struct { + ID uint `gorm:"primarykey;"` + Identity string `gorm:"column:identity;type:varchar(36);uniqueIndex;"` // 唯一标识,24位NanoID,36位为UUID + CreatedAt time.Time + UpdatedAt time.Time + DeletedAt gorm.DeletedAt `gorm:"index";` + Status int8 `gorm:"default:0;index;"` // 状态:默认为0,-1禁止,1为正常 + } + + // standard ID,Identity,Created,Updated,Deleted,Status definition. + Std_ICUD struct { + ID uint `gorm:"primarykey;"` + CreatedAt time.Time + UpdatedAt time.Time + DeletedAt gorm.DeletedAt `gorm:"index"` + } + + // standard ID,Created definition. + Std_IdCreated struct { + ID uint `gorm:"primarykey;"` + CreatedAt time.Time + } + + // standard PassportID,PassportIdentity definition. + Std_Passport struct { + PassportID uint `gorm:"column:passport_id;Index;"` + PassportIdentity string `gorm:"column:passport_identity;type:varchar(36);Index;"` // 用户唯一标识,24位NanoID,36位为UUID + } + + // standard ID definition. + Std_ID struct { + ID uint `gorm:"primarykey;"` + } + + // standard Identity definition. + Std_Identity struct { + Identity string `gorm:"column:identity;type:varchar(36);uniqueIndex;"` // 唯一标识,24位NanoID,36位为UUID + } + + // standard Status definition. + Std_Status struct { + Status int8 `gorm:"default:0;index;"` // 状态:默认为0,-1禁止,1为正常 + } +) diff --git a/types/encipher.go b/types/encipher.go new file mode 100644 index 0000000..66a075d --- /dev/null +++ b/types/encipher.go @@ -0,0 +1,15 @@ +package types + +type CertFileBytes struct { + Private []byte + Public []byte +} + +type JwtClaims struct { + ID uint `json:"id"` + Identity string `json:"identity"` + Extend map[string]string `json:"extend"` + Client string `json:"client"` + Role string `json:"role"` + ExpiresAt int64 `json:"exp"` +} diff --git a/types/env.go b/types/env.go new file mode 100644 index 0000000..156f08b --- /dev/null +++ b/types/env.go @@ -0,0 +1,15 @@ +package types + +const ( + Run_Time_Mode_Dev = iota + Run_Time_Mode_Test + Run_Time_Mode_Pre + Run_Time_Mode_Product +) + +type MeshEnv struct { + Workspace string // MESH workspace,default:bsm + Prefix string // MESH prefix,default:/usr/local/mesh/ + JwtSecretKey string // jwt Secret Key,default: + RuntimeMode string // Runtime Mode String: dev/test/pre/product +} diff --git a/types/etcd.go b/types/etcd.go new file mode 100644 index 0000000..c8be9c7 --- /dev/null +++ b/types/etcd.go @@ -0,0 +1,12 @@ +package types + +type Etcd struct { + Endpoints []string `json:"endpoints"` + Tls EtcdTls `json:",optional"` +} + +type EtcdTls struct { + Ca string `json:"ca,optional"` + Cert string `json:"cert,optional"` + CertKey string `json:"cert_key,optional" ` +} diff --git a/types/service.go b/types/service.go new file mode 100644 index 0000000..74f6e4d --- /dev/null +++ b/types/service.go @@ -0,0 +1,7 @@ +package types + +type ServiceSetting struct { + ConfigureFile string + RouteKey string + AnonymousKey string +} diff --git a/types/sidecar.go b/types/sidecar.go new file mode 100644 index 0000000..5642c9b --- /dev/null +++ b/types/sidecar.go @@ -0,0 +1,79 @@ +package types + +type Ping struct { + CRC string `json:"crc"` +} + +type Node struct { + Host string `json:"Host"` + IPAddress []string `json:"IpAddress"` + Timestamp int64 `json:"Timestamp"` + Runtime *HardwareCollect `json:"Runtime"` +} + +// Hardware Collect . +type HardwareCollect struct { + CPUUsedPercent float64 `json:"CPUUsedPercent"` + DiskFree uint64 `json:"DiskFree"` + DiskTotal uint64 `json:"DiskTotal"` + DiskUsedPercent float64 `json:"DiskUsedPercent"` + MemoryFree uint64 `json:"MemoryFree"` + MemoryTotal uint64 `json:"MemoryTotal"` + MemoryUsedPercent float64 `json:"MemoryUsedPercent"` + NetIOBytesRecv uint64 `json:"NetIOBytesRecv"` + NetIOBytesSent uint64 `json:"NetIOBytesSent"` +} + +// Service Runtime . +type ServiceCollect struct { + ServiceKey string + MemAlloc uint64 `json:"MemAlloc"` + MemTotalAlloc uint64 `json:"MemTotalAlloc"` + MemSys uint64 `json:"MemSys"` + MemHeapAlloc uint64 `json:"MemHeapAlloc"` + CpuNumber int `json:"CpuNumber"` + CpuMaxProcs int `json:"CpuMaxProcs"` + GoroutinesNumber int `json:"GoroutinesNumber"` + CgoCallNumber int64 `json:"CgoCallNumber"` + Timestamp int64 `json:"Timestamp"` +} + +type LoggerMsg struct { + Identity string `json:"identity,omitempty"` + Host string `json:"host,omitempty"` + ServiceKey string `json:"service_key,omitempty"` + Type string `json:"type,omitempty"` + Level string `json:"level,omitempty"` + Method string `json:"method,omitempty"` + Data string `json:"data,omitempty"` + Trace string `json:"trace,omitempty"` + Timestamp int64 `json:"Timestamp"` +} + +type RequestMsg struct { + RequestId string `json:"request_id,omitempty"` + Protocol string `json:"protocol,omitempty"` + Uri string `json:"uri,omitempty"` + Method string `json:"method,omitempty"` + Target string `json:"target,omitempty"` + StartTime int64 `json:"start_time,omitempty"` + Duration int64 `json:"duration,omitempty"` + EndTime int64 `json:"end_time,omitempty"` + Status int32 `json:"status,omitempty"` + Reply string `json:"reply,omitempty"` + UserAgent string `json:"user_agent,omitempty"` + RemoteAddr string `json:"remote_addr,omitempty"` + RequestBody string `json:"request_body,omitempty"` + Timestamp int64 `json:"Timestamp"` +} + +type ServiceMethods struct { + FileName string + ServiceKey string + Methods []string +} + +type SidecarReply struct { + Status int32 `json:"status,omitempty"` + Message string `json:"message,omitempty"` +} diff --git a/utils/array.go b/utils/array.go new file mode 100644 index 0000000..47fbe4d --- /dev/null +++ b/utils/array.go @@ -0,0 +1,28 @@ +package utils + +import "strings" + +func In(target string, array []string) bool { + target = strings.Trim(target, "") + for _, v := range array { + if strings.Trim(v, "") == target { + return true + } + } + return false +} + +func RemoveRepeatSlice(in []string) (out []string) { + _map := make(map[string]bool) + for i := 0; i < len(in); i++ { + if in[i] != "" { + _map[in[i]] = true + } + } + + for key, _ := range _map { + out = append(out, key) + } + + return out +} diff --git a/utils/convert.go b/utils/convert.go new file mode 100644 index 0000000..95b160a --- /dev/null +++ b/utils/convert.go @@ -0,0 +1,109 @@ +package utils + +import ( + "math" + "reflect" + "strconv" + "strings" + "unsafe" +) + +// 字符串转Int +// intStr:数字的字符串 +func String2Int(intStr string) (intNum int) { + intNum, _ = strconv.Atoi(intStr) + return +} + +// 字符串转Int64 +// intStr:数字的字符串 +func String2Int64(intStr string) (int64Num int64) { + intNum, _ := strconv.Atoi(intStr) + int64Num = int64(intNum) + return +} + +// 字符串转Float64 +// floatStr:小数点数字的字符串 +func String2Float64(floatStr string) (floatNum float64) { + floatNum, _ = strconv.ParseFloat(floatStr, 64) + return +} + +// 字符串转Float32 +// floatStr:小数点数字的字符串 +func String2Float32(floatStr string) (floatNum float32) { + floatNum64, _ := strconv.ParseFloat(floatStr, 32) + floatNum = float32(floatNum64) + return +} + +// Int转字符串 +// intNum:数字字符串 +func Int2String(intNum int) (intStr string) { + intStr = strconv.Itoa(intNum) + return +} + +// Int64转字符串 +// intNum:数字字符串 +func Int642String(intNum int64) (int64Str string) { + //10, 代表10进制 + int64Str = strconv.FormatInt(intNum, 10) + return +} + +// Float64转字符串 +// floatNum:float64数字 +// prec:精度位数(不传则默认float数字精度) +func Float64ToString(floatNum float64, prec ...int) (floatStr string) { + if len(prec) > 0 { + floatStr = strconv.FormatFloat(floatNum, 'f', prec[0], 64) + return + } + floatStr = strconv.FormatFloat(floatNum, 'f', -1, 64) + return +} + +// Float32转字符串 +// floatNum:float32数字 +// prec:精度位数(不传则默认float数字精度) +func Float32ToString(floatNum float32, prec ...int) (floatStr string) { + if len(prec) > 0 { + floatStr = strconv.FormatFloat(float64(floatNum), 'f', prec[0], 32) + return + } + floatStr = strconv.FormatFloat(float64(floatNum), 'f', -1, 32) + return +} + +// 二进制转10进制 +func BinaryToDecimal(bit string) (num int) { + fields := strings.Split(bit, "") + lens := len(fields) + var tempF float64 = 0 + for i := 0; i < lens; i++ { + floatNum := String2Float64(fields[i]) + tempF += floatNum * math.Pow(2, float64(lens-i-1)) + } + num = int(tempF) + return +} + +// BytesToString 0 拷贝转换 slice byte 为 string +func BytesToString(b []byte) (s string) { + _bptr := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + _sptr := (*reflect.StringHeader)(unsafe.Pointer(&s)) + _sptr.Data = _bptr.Data + _sptr.Len = _bptr.Len + return s +} + +// interface to string +func AnyToString(in any) (s string) { + if in == nil { + return "" + } + + return in.(string) +} diff --git a/utils/crypto.go b/utils/crypto.go new file mode 100644 index 0000000..f30b766 --- /dev/null +++ b/utils/crypto.go @@ -0,0 +1,25 @@ +package utils + +import ( + "crypto/hmac" + "crypto/md5" + "crypto/sha256" + "encoding/hex" + "fmt" +) + +//Md5 . +func Md5(src string) string { + data := []byte(src) + has := md5.Sum(data) + md5str := fmt.Sprintf("%x", has) + return md5str +} + +func Sha256(src, privateKey string) string { + s := []byte(src) + key := []byte(privateKey) + m := hmac.New(sha256.New, key) + m.Write(s) + return hex.EncodeToString(m.Sum(nil)) +} diff --git a/utils/dir.go b/utils/dir.go new file mode 100644 index 0000000..cd96406 --- /dev/null +++ b/utils/dir.go @@ -0,0 +1,35 @@ +package utils + +import ( + "os" +) + +func PathExists(path string) bool { + _, err := os.Stat(path) + if err != nil { + if os.IsExist(err) { + return true + } + return false + } + return true +} + +// 创建文件夹 +func CreateDir(dirName string) bool { + err := os.Mkdir(dirName, 0755) + if err != nil { + return false + } + return true +} + +func GetRunPath() string { + path, _ := os.Executable() + return path +} + +func GetCurrentPath() string { + path, _ := os.Getwd() + return path +} diff --git a/utils/ext.go b/utils/ext.go new file mode 100644 index 0000000..72298cf --- /dev/null +++ b/utils/ext.go @@ -0,0 +1,35 @@ +package utils + +import ( + "strconv" + "strings" +) + +func If(condition bool, trueValue, falseValue interface{}) interface{} { + if condition { + return trueValue + } + return falseValue +} + +//如果首字母是小写字母, 则变换为大写字母 +func FirstToUpper(str string) string { + if str == "" { + return "" + } + + return strings.ToUpper(str[:1]) + strings.ToLower(str[1:]) +} + +func ParseParams(in map[string]string) map[string]interface{} { + out := make(map[string]interface{}) + for k, v := range in { + fv, err := strconv.ParseFloat(v, 64) + if err != nil { + out[k] = fv + } else { + out[k] = v + } + } + return out +} diff --git a/utils/file.go b/utils/file.go new file mode 100644 index 0000000..28926da --- /dev/null +++ b/utils/file.go @@ -0,0 +1,20 @@ +package utils + +import ( + "errors" + "io" + "os" +) + +func StringToFile(path, content string) error { + startF, err := os.Create(path) + if err != nil { + return errors.New("os.Create create file " + path + " error:" + err.Error()) + } + defer startF.Close() + _, err = io.WriteString(startF, content) + if err != nil { + return errors.New("io.WriteString to " + path + " error:" + err.Error()) + } + return nil +} diff --git a/utils/identity.go b/utils/identity.go new file mode 100644 index 0000000..d342eb8 --- /dev/null +++ b/utils/identity.go @@ -0,0 +1,26 @@ +package utils + +import ( + "strconv" + + "github.com/google/uuid" + "github.com/jaevor/go-nanoid" +) + +func NanoID() string { + nanoid, _ := nanoid.CustomASCII("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789", 21) + return nanoid() +} + +func NanoIDInt() (id int64, err error) { + decenaryID, err := nanoid.CustomASCII("0123456789", 20) + if err != nil { + return + } + id, err = strconv.ParseInt(decenaryID(), 10, 64) + return +} + +func UUID() string { + return uuid.NewString() +} diff --git a/utils/json.go b/utils/json.go new file mode 100644 index 0000000..b55cb3e --- /dev/null +++ b/utils/json.go @@ -0,0 +1,13 @@ +package utils + +import "strings" + +func JsonEscape(in string) string { + in = strings.ReplaceAll(in, "\n", "") + in = strings.ReplaceAll(in, "\t", "") + in = strings.ReplaceAll(in, "\r", "") + in = strings.ReplaceAll(in, "\r\n", "") + in = strings.ReplaceAll(in, "\\\"", "\"") + in = strings.ReplaceAll(in, "\\\\", "\\") + return in +} diff --git a/utils/net.go b/utils/net.go new file mode 100644 index 0000000..50f86b6 --- /dev/null +++ b/utils/net.go @@ -0,0 +1,213 @@ +package utils + +import ( + "bytes" + "errors" + "fmt" + "io" + "net" + "net/http" + "os" + "strconv" +) + +func IsPublicIP(ipString string) bool { + ip := net.ParseIP(ipString) + if ip.IsLoopback() || ip.IsLinkLocalMulticast() || ip.IsLinkLocalUnicast() { + return false + } + if ip4 := ip.To4(); ip4 != nil { + switch true { + case ip4[0] == 10: + return false + case ip4[0] == 172 && ip4[1] >= 16 && ip4[1] <= 31: + return false + case ip4[0] == 192 && ip4[1] == 168: + return false + default: + return true + } + } + return false +} + +// Get Location IP . +func GetLocationIP() string { + addrs, err := net.InterfaceAddrs() + if err != nil { + return "" + } + ip := "" + for _, a := range addrs { + if ipnet, ok := a.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet.IP.To4() != nil { + ip = ipnet.IP.String() + break + } + } + } + if ip == "" { + return "" + } + return ip +} + +func LocalIPv4s() ([]string, error) { + + var ips []string + addrs, _ := net.InterfaceAddrs() + + for _, addr := range addrs { + if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet.IP.To4() != nil { + locIP := ipnet.IP.To4().String() + if locIP[0:7] != "169.254" { + ips = append(ips, locIP) + } + } + } + } + + return ips, nil +} + +func GetOutBoundIP() (ip string, err error) { + body, err := HttpGet("http://ip.dhcp.cn/?ip") // 获取外网 IP + return string(body), err +} + +func HttpGet(url string) ([]byte, error) { + resp, err := http.Get(url) + if err != nil { + // handle error + return nil, err + } + + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + + return body, err +} + +func HttpPost(url string, header map[string]string, data []byte) ([]byte, error) { + var err error + reader := bytes.NewBuffer(data) + request, err := http.NewRequest("POST", url, reader) + + if err != nil { + return nil, err + } + + request.Header.Set("Content-Type", "application/json;charset=UTF-8") + request.Header.Set("Request-Id", NanoID()) + + for key, val := range header { + request.Header.Set(key, val) + } + + client := http.Client{} + resp, err := client.Do(request) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + respBytes, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != 200 { + return nil, errors.New(string(respBytes)) + } + + return respBytes, nil +} + +func HttpRequest(r *http.Request) ([]byte, error) { + var err error + client := http.Client{} + resp, err := client.Do(r) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + respBytes, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + return respBytes, nil +} + +func DownloadFile(url, saveTo string, fb func(length, downLen int64)) { + var ( + fsize int64 + buf = make([]byte, 32*1024) + written int64 + ) + //创建一个http client + client := new(http.Client) + //get方法获取资源 + resp, err := client.Get(url) + if err != nil { + fmt.Printf("download %s error:%s\n", url, err) + os.Exit(0) + } + //读取服务器返回的文件大小 + fsize, err = strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 32) + if err != nil { + fmt.Println(err) + } + //创建文件 + file, err := os.Create(saveTo) + file.Chmod(0777) + if err != nil { + fmt.Printf("Create %s error:%s\n", saveTo, err) + os.Exit(0) + } + defer file.Close() + if resp.Body == nil { + fmt.Printf("resp %s error:%s\n", url, err) + os.Exit(0) + } + defer resp.Body.Close() + //下面是 io.copyBuffer() 的简化版本 + for { + //读取bytes + nr, er := resp.Body.Read(buf) + if nr > 0 { + //写入bytes + nw, ew := file.Write(buf[0:nr]) + //数据长度大于0 + if nw > 0 { + written += int64(nw) + } + //写入出错 + if ew != nil { + err = ew + break + } + //读取是数据长度不等于写入的数据长度 + if nr != nw { + err = io.ErrShortWrite + break + } + } + if er != nil { + if er != io.EOF { + err = er + } + break + } + //没有错误了快使用 callback + + fb(fsize, written) + } + + if err != nil { + fmt.Printf("callback error:%s\n", err) + os.Exit(0) + } +} diff --git a/utils/random.go b/utils/random.go new file mode 100644 index 0000000..a84416a --- /dev/null +++ b/utils/random.go @@ -0,0 +1,42 @@ +package utils + +import ( + "math/rand" + "time" +) + +//随机生成字符串 +func RandomString(l int) string { + str := "0123456789AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz" + bytes := []byte(str) + var result []byte = make([]byte, 0, l) + r := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := 0; i < l; i++ { + result = append(result, bytes[r.Intn(len(bytes))]) + } + return BytesToString(result) +} + +//随机生成纯字符串 +func RandomPureString(l int) string { + str := "AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz" + bytes := []byte(str) + var result []byte = make([]byte, 0, l) + r := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := 0; i < l; i++ { + result = append(result, bytes[r.Intn(len(bytes))]) + } + return BytesToString(result) +} + +//随机生成数字字符串 +func RandomNumber(l int) string { + str := "0123456789" + bytes := []byte(str) + var result []byte + r := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := 0; i < l; i++ { + result = append(result, bytes[r.Intn(len(bytes))]) + } + return BytesToString(result) +} diff --git a/utils/ticker.go b/utils/ticker.go new file mode 100644 index 0000000..e623669 --- /dev/null +++ b/utils/ticker.go @@ -0,0 +1,39 @@ +package utils + +import ( + "context" + "time" +) + +func SetTimeout(f func(), timeout int) context.CancelFunc { + ctx, cancelFunc := context.WithCancel(context.Background()) + go func() { + select { + case <-ctx.Done(): + case <-time.After(time.Duration(timeout) * time.Second): + f() + } + }() + return cancelFunc +} + +func SetInterval(what func(), delay time.Duration) chan bool { + ticker := time.NewTicker(delay) + stopIt := make(chan bool) + go func() { + + for { + + select { + case <-stopIt: + return + case <-ticker.C: + what() + } + } + + }() + + // return the bool channel to use it as a stopper + return stopIt +} diff --git a/vars/build.go b/vars/build.go new file mode 100644 index 0000000..6397223 --- /dev/null +++ b/vars/build.go @@ -0,0 +1,9 @@ +package vars + +// build ldflags -x +var ( + VERSION string = "0.0" + BUILD_TIME string = "2006-01-02 15:04:05" //time.Now().Format("2006-01-02 15:04:05") + GO_VERSION string = "unset" // runtime.GOOS + "/" + runtime.GOARCH + " " + runtime.Version() + ServiceKey string = "unset" +) diff --git a/vars/cache.go b/vars/cache.go new file mode 100644 index 0000000..2efae41 --- /dev/null +++ b/vars/cache.go @@ -0,0 +1,11 @@ +package vars + +import "time" + +var ( + // cache def value + MemGcDuration time.Duration = 10 * time.Minute + MemLRUMaxNumber int = 1024 + MemShardings int = 64 + RedisShardings int = 256 +) diff --git a/vars/content.go b/vars/content.go new file mode 100644 index 0000000..cfed91e --- /dev/null +++ b/vars/content.go @@ -0,0 +1,12 @@ +package vars + +type ContentType int32 + +const ( + CONTENT_TYPE_TEXT ContentType = iota + 1 // 1.文本 + CONTENT_TYPE_IMAGE // 2.图片 + CONTENT_TYPE_VIDEO // 3.视频 + CONTENT_TYPE_AUDIO // 4.音频 + CONTENT_TYPE_LINK // 5.连接 + CONTENT_TYPE_LOCATION // 6.定位 +) diff --git a/vars/elastic.go b/vars/elastic.go new file mode 100644 index 0000000..b43d074 --- /dev/null +++ b/vars/elastic.go @@ -0,0 +1,8 @@ +package vars + +import "runtime" + +var ( + ESNumWorkers int = runtime.NumCPU() + ESFlushBytes int = 5e+6 +) diff --git a/vars/jwt.go b/vars/jwt.go new file mode 100644 index 0000000..800e9e9 --- /dev/null +++ b/vars/jwt.go @@ -0,0 +1,8 @@ +package vars + +import "time" + +var ( + // cache def value + JwtExpireDay time.Duration = 1 * 24 * time.Hour +) diff --git a/vars/queue.go b/vars/queue.go new file mode 100644 index 0000000..ffe3559 --- /dev/null +++ b/vars/queue.go @@ -0,0 +1,9 @@ +package vars + +import "time" + +var ( + MQSpaceName = "BLOCKS" + OperationTimeout time.Duration = 30 * time.Second + ConnectionTimeout time.Duration = 30 * time.Second +) diff --git a/vars/service.go b/vars/service.go new file mode 100644 index 0000000..d61420f --- /dev/null +++ b/vars/service.go @@ -0,0 +1,5 @@ +package vars + +var ( + ServiceMethods []string +) diff --git a/vars/sql.go b/vars/sql.go new file mode 100644 index 0000000..3fa0bb6 --- /dev/null +++ b/vars/sql.go @@ -0,0 +1,17 @@ +package vars + +import "time" + +var ( + // sql options def val + SqlOptionMaxIdleConns int = 64 + SqlOptionMaxOpenConns int = 64 + SqlOptionConnMaxLifetime time.Duration = 60 * time.Second + SqlOptionLogStdout bool = true + SqlOptionDebug bool = true + + // sql log def val + SqlLogStdout bool = true + SqlLogDir = "logs" + SqlLogFileExt = "log" +) diff --git a/vars/ticker.go b/vars/ticker.go new file mode 100644 index 0000000..2e67bc6 --- /dev/null +++ b/vars/ticker.go @@ -0,0 +1,7 @@ +package vars + +import "time" + +var ( + ServiceRuntimeDuration time.Duration = 1 * time.Minute +) diff --git a/vars/time.go b/vars/time.go new file mode 100644 index 0000000..bc6ca73 --- /dev/null +++ b/vars/time.go @@ -0,0 +1,7 @@ +package vars + +var ( + YYYYMMDD string = "20060102" + YYYY_MM_DD string = "2006-01-02" + YYYY_MM_DD_HH_MM_SS string = "2006-01-02 15:04:05" +)