// Package service 提供微服务管理功能 // 包括服务启动、注册、网关代理等核心功能 package service import ( "context" "log" "net" "os" "strconv" "strings" "net/http" gwRuntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "git.apinb.com/bsm-sdk/core/conf" "git.apinb.com/bsm-sdk/core/env" "git.apinb.com/bsm-sdk/core/printer" "git.apinb.com/bsm-sdk/core/vars" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" ) type ( // RegisterFn 定义服务注册函数类型 RegisterFn func(*grpc.Server) // Service 微服务实例 Service struct { GrpcSrv *grpc.Server // gRPC服务器实例 Opts *Options // 服务配置选项 } // Options 服务配置选项 Options struct { Addr string // 服务监听地址 EtcdClient *clientv3.Client // Etcd客户端 MsConf *conf.MicroServiceConf // 微服务配置 GatewayConf *conf.GatewayConf // 网关配置 GatewayCtx context.Context // 网关上下文 GatewayMux *gwRuntime.ServeMux // 网关多路复用器 } ) // New 创建新的服务实例 // srv: gRPC服务器实例 // opts: 服务配置选项 func New(srv *grpc.Server, opts *Options) *Service { return &Service{GrpcSrv: srv, Opts: opts} } // Addr 将IP和端口格式化为host:port格式 // ip: IP地址 // port: 端口号 func Addr(ip string, port int) string { return net.JoinHostPort(ip, strconv.Itoa(port)) } // Start 启动服务 // 包括etcd注册、gRPC服务启动、HTTP网关启动 func (s *Service) Start() { printer.Info("[BSM - %s] Service Starting ...", vars.ServiceKey) // 注册到etcd if s.Opts.MsConf != nil && s.Opts.MsConf.Enable { if s.Opts.EtcdClient == nil { printer.Error("[BSM Register] Etcd Client is nil.") os.Exit(1) } printer.Info("[BSM - %s] Registering Service to Etcd ...", vars.ServiceKey) // 获取gRPC方法用于网关/路由发现 methods := FoundGrpcMethods(s.GrpcSrv) // 设置路由键 routerKey := vars.ServiceRootPrefix + "Router/" + env.Runtime.Workspace + "/" + strings.ToLower(vars.ServiceKey) + "/" + s.Opts.Addr // 使用租约注册到etcd register, err := RegisterService(s.Opts.EtcdClient, routerKey, methods, vars.ServiceLease) if err != nil { log.Panicf("[ERROR] %s Service Register:%s \n", vars.ServiceKey, err.Error()) } anonKey := vars.ServiceRootPrefix + "Router/" + env.Runtime.Workspace + "/" register.SetAnonymous(anonKey, s.Opts.MsConf.Anonymous) // 服务注册租约监听 go register.ListenLeaseRespChan() } // 启动gRPC服务 tcpListen, err := net.Listen("tcp", s.Opts.Addr) if err != nil { panic(err) } go func() { if err := s.GrpcSrv.Serve(tcpListen); err != nil { panic(err) } }() printer.Success("[BSM - %s] Grpc %s Runing Success !", vars.ServiceKey, s.Opts.Addr) // 启动HTTP网关 if s.Opts.GatewayConf != nil && s.Opts.GatewayConf.Enable { addr := Addr("0.0.0.0", s.Opts.GatewayConf.Port) go s.Gateway(s.Opts.Addr, addr) printer.Success("[BSM - %s] Http %s Runing Success!", vars.ServiceKey, addr) } // 阻塞主线程 select {} } // Gateway 启动HTTP网关服务 // grpcAddr: gRPC服务地址 // httpAddr: HTTP服务地址 func (s *Service) Gateway(grpcAddr string, httpAddr string) { // 定义上下文 _, cancel := context.WithCancel(s.Opts.GatewayCtx) defer cancel() // 启动HTTP服务,不因HTTP启动失败而导致panic if err := http.ListenAndServe(httpAddr, s.Opts.GatewayMux); err != nil { printer.Error("[BSM - %s] Http Serve Error: %v", vars.ServiceKey, err) } } // Use 执行初始化函数 // initFunc: 初始化函数 func (s *Service) Use(initFunc func() error) { err := (initFunc)() if err != nil { printer.Error(err.Error()) panic(err) } } // Stop 优雅停止服务 func (s *Service) Stop() { s.GrpcSrv.GracefulStop() } // FoundGrpcMethods 发现gRPC服务中的所有方法 // s: gRPC服务器实例 // 返回: 方法名列表 func FoundGrpcMethods(s *grpc.Server) []string { var mothods []string for key, srv := range s.GetServiceInfo() { srvName := strings.Split(key, ".")[1] for _, mn := range srv.Methods { mothods = append(mothods, srvName+"."+mn.Name) } } return mothods }