From a060da5ec663bf142b1788aa7c31d589cb522959 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 22 Jan 2025 14:19:52 +0800 Subject: [PATCH 1/4] monolithic --- pkg/service/a_test.go | 56 ++++++++++++++++++++++++++++++++++ pkg/service/registry.go | 67 +++++++++++++++++++++++++++++++++++++++++ pkg/service/service.go | 14 +++++++++ 3 files changed, 137 insertions(+) create mode 100644 pkg/service/a_test.go create mode 100644 pkg/service/registry.go create mode 100644 pkg/service/service.go diff --git a/pkg/service/a_test.go b/pkg/service/a_test.go new file mode 100644 index 0000000000..fbbddaee82 --- /dev/null +++ b/pkg/service/a_test.go @@ -0,0 +1,56 @@ +package service + +import ( + "crypto/tls" + "net" + "testing" + "time" +) + +func TestName1(t *testing.T) { + + tls.Client(&testConn{}, &tls.Config{}).Handshake() + + time.Sleep(time.Hour) +} + +type testConn struct { +} + +func (testConn) Read(b []byte) (n int, err error) { + panic("implement me") +} + +func (testConn) Write(b []byte) (n int, err error) { + panic("implement me") +} + +func (testConn) Close() error { + //TODO implement me + panic("implement me") +} + +func (testConn) LocalAddr() net.Addr { + //TODO implement me + panic("implement me") +} + +func (testConn) RemoteAddr() net.Addr { + //TODO implement me + panic("implement me") +} + +func (testConn) SetDeadline(t time.Time) error { + //TODO implement me + panic("implement me") +} + +func (testConn) SetReadDeadline(t time.Time) error { + //TODO implement me + panic("implement me") +} + +func (testConn) SetWriteDeadline(t time.Time) error { + //TODO implement me + panic("implement me") +} diff --git a/pkg/service/registry.go b/pkg/service/registry.go new file mode 100644 index 0000000000..38a4575ace --- /dev/null +++ b/pkg/service/registry.go @@ -0,0 +1,67 @@ +package service + +import ( + "context" + "fmt" + + "google.golang.org/grpc" +) + +type Conn interface { + GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]grpc.ClientConnInterface, error) //1 + GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (grpc.ClientConnInterface, error) //2 + GetSelfConnTarget() string //3 + AddOption(opts ...grpc.DialOption) //4 + CloseConn(conn *grpc.ClientConn) //5 + // do not use this method for call rpc +} +type SvcDiscoveryRegistry interface { + Conn + Register(serviceName, host string, port int, opts ...grpc.DialOption) error //6 + UnRegister() error //7 + Close() + GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) // +} + +var _ SvcDiscoveryRegistry = (*DiscoveryRegistry)(nil) + +type DiscoveryRegistry struct { +} + +func (x *DiscoveryRegistry) RegisterService(desc *grpc.ServiceDesc, impl any) { + fmt.Println("RegisterService", desc, impl) +} + +func (x *DiscoveryRegistry) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]grpc.ClientConnInterface, error) { + //TODO implement me + panic("implement me") +} + +func (x *DiscoveryRegistry) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (grpc.ClientConnInterface, error) { + //TODO implement me + panic("implement me") +} + +func (x *DiscoveryRegistry) GetSelfConnTarget() string { + return "" +} + +func (x *DiscoveryRegistry) AddOption(opts ...grpc.DialOption) {} + +func (x *DiscoveryRegistry) CloseConn(conn *grpc.ClientConn) { + _ = conn.Close() +} + +func (x *DiscoveryRegistry) Register(serviceName, host string, port int, opts ...grpc.DialOption) error { + return nil +} + +func (x *DiscoveryRegistry) UnRegister() error { + return nil +} + +func (x *DiscoveryRegistry) Close() {} + +func (x *DiscoveryRegistry) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) { + return "", nil +} diff --git a/pkg/service/service.go b/pkg/service/service.go new file mode 100644 index 0000000000..183aa4b9cc --- /dev/null +++ b/pkg/service/service.go @@ -0,0 +1,14 @@ +package service + +import ( + "fmt" + + "google.golang.org/grpc" +) + +type GrpcServer struct { +} + +func (x *GrpcServer) RegisterService(desc *grpc.ServiceDesc, impl any) { + fmt.Println("RegisterService", desc, impl) +} From f60272a1d24743dc40cb378e013fda1877b4679b Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 22 Jan 2025 17:02:35 +0800 Subject: [PATCH 2/4] fix: DeleteDoc crash --- go.mod | 4 + internal/msggateway/ws_server.go | 12 ++- internal/push/onlinepusher.go | 2 +- internal/rpc/auth/auth.go | 2 +- pkg/service/a_test.go | 134 ++++++++++++++++++++++++------- pkg/service/registry.go | 97 ++++++++-------------- 6 files changed, 152 insertions(+), 99 deletions(-) diff --git a/go.mod b/go.mod index fd188e9782..fc0b411a0a 100644 --- a/go.mod +++ b/go.mod @@ -219,3 +219,7 @@ require ( golang.org/x/crypto v0.27.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) + +replace ( + github.com/openimsdk/tools => /Users/chao/Desktop/code/tools +) \ No newline at end of file diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go index 24dd823f6f..0731074c06 100644 --- a/internal/msggateway/ws_server.go +++ b/internal/msggateway/ws_server.go @@ -223,15 +223,19 @@ func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *C if err != nil { return err } + if len(conns) == 0 || (len(conns) == 1 && ws.disCov.IsSelfNode(conns[0])) { + return nil + } + wg := errgroup.Group{} wg.SetLimit(concurrentRequest) // Online push user online message to other node for _, v := range conns { v := v - log.ZDebug(ctx, " sendUserOnlineInfoToOtherNode conn ", "target", v.Target()) - if v.Target() == ws.disCov.GetSelfConnTarget() { - log.ZDebug(ctx, "Filter out this node", "node", v.Target()) + log.ZDebug(ctx, "sendUserOnlineInfoToOtherNode conn") + if ws.disCov.IsSelfNode(v) { + log.ZDebug(ctx, "Filter out this node") continue } @@ -242,7 +246,7 @@ func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *C PlatformID: int32(client.PlatformID), Token: client.token, }) if err != nil { - log.ZWarn(ctx, "MultiTerminalLoginCheck err", err, "node", v.Target()) + log.ZWarn(ctx, "MultiTerminalLoginCheck err", err) } return nil }) diff --git a/internal/push/onlinepusher.go b/internal/push/onlinepusher.go index 23e68339c5..8bec6e60b2 100644 --- a/internal/push/onlinepusher.go +++ b/internal/push/onlinepusher.go @@ -166,7 +166,7 @@ func (k *K8sStaticConsistentHash) GetConnsAndOnlinePush(ctx context.Context, msg } } log.ZDebug(ctx, "genUsers send hosts struct:", "usersHost", usersHost) - var usersConns = make(map[*grpc.ClientConn][]string) + var usersConns = make(map[grpc.ClientConnInterface][]string) for host, userIds := range usersHost { tconn, _ := k.disCov.GetConn(ctx, host) usersConns[tconn] = userIds diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 3e096aa646..2e64c365ce 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -192,7 +192,7 @@ func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID return err } for _, v := range conns { - log.ZDebug(ctx, "forceKickOff", "conn", v.Target()) + log.ZDebug(ctx, "forceKickOff", "userID", userID, "platformID", platformID) client := msggateway.NewMsgGatewayClient(v) kickReq := &msggateway.KickUserOfflineReq{KickUserIDList: []string{userID}, PlatformID: platformID} _, err := client.KickUserOffline(ctx, kickReq) diff --git a/pkg/service/a_test.go b/pkg/service/a_test.go index fbbddaee82..34b6214569 100644 --- a/pkg/service/a_test.go +++ b/pkg/service/a_test.go @@ -1,56 +1,134 @@ package service import ( - "crypto/tls" - "net" + "context" + "fmt" + "sync" "testing" - "time" + + "github.com/openimsdk/protocol/group" + "github.com/openimsdk/protocol/user" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" +) + +var ( + _ user.UnimplementedUserServer + _ group.UnimplementedGroupServer ) func TestName1(t *testing.T) { + cc := newStandaloneConn() + user.RegisterUserServer(cc.Registry(), &user.UnimplementedUserServer{}) + group.RegisterGroupServer(cc.Registry(), &group.UnimplementedGroupServer{}) + ctx := context.Background() + resp, err := user.NewUserClient(cc).GetUserStatus(ctx, &user.GetUserStatusReq{UserID: "imAdmin", UserIDs: []string{"10000", "20000"}}) + if err != nil { + t.Error(err) + return + } + t.Log(resp) +} - tls.Client(&testConn{}, &tls.Config{}).Handshake() +func newStandaloneConn() *standaloneConn { + return &standaloneConn{ + registry: newStandaloneRegistry(), + serializer: NewProtoSerializer(), + } +} - time.Sleep(time.Hour) +type standaloneConn struct { + registry *standaloneRegistry + serializer Serializer } -type testConn struct { +func (x *standaloneConn) Registry() grpc.ServiceRegistrar { + return x.registry } -func (testConn) Read(b []byte) (n int, err error) { - panic("implement me") +func (x *standaloneConn) Invoke(ctx context.Context, method string, args any, reply any, opts ...grpc.CallOption) error { + handler := x.registry.getMethod(method) + if handler == nil { + return fmt.Errorf("service %s not found", method) + } + resp, err := handler(ctx, args, nil) + if err != nil { + return err + } + tmp, err := x.serializer.Marshal(resp) + if err != nil { + return err + } + return x.serializer.Unmarshal(tmp, reply) } -func (testConn) Write(b []byte) (n int, err error) { - panic("implement me") +func (x *standaloneConn) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { + return nil, status.Errorf(codes.Unimplemented, "method stream not implemented") } -func (testConn) Close() error { - //TODO implement me - panic("implement me") +type serverHandler func(ctx context.Context, req any, interceptor grpc.UnaryServerInterceptor) (any, error) + +func newStandaloneRegistry() *standaloneRegistry { + return &standaloneRegistry{ + methods: make(map[string]serverHandler), + serializer: NewProtoSerializer(), + } } -func (testConn) LocalAddr() net.Addr { - //TODO implement me - panic("implement me") +type standaloneRegistry struct { + lock sync.RWMutex + methods map[string]serverHandler + serializer Serializer } -func (testConn) RemoteAddr() net.Addr { - //TODO implement me - panic("implement me") +func (x *standaloneConn) emptyDec(req any) error { + return nil } -func (testConn) SetDeadline(t time.Time) error { - //TODO implement me - panic("implement me") +func (x *standaloneRegistry) RegisterService(desc *grpc.ServiceDesc, impl any) { + x.lock.Lock() + defer x.lock.Unlock() + for i := range desc.Methods { + method := desc.Methods[i] + name := fmt.Sprintf("/%s/%s", desc.ServiceName, method.MethodName) + if _, ok := x.methods[name]; ok { + panic(fmt.Errorf("service %s already registered, method %s", desc.ServiceName, method.MethodName)) + } + x.methods[name] = func(ctx context.Context, req any, interceptor grpc.UnaryServerInterceptor) (any, error) { + return method.Handler(impl, ctx, func(in any) error { + tmp, err := x.serializer.Marshal(req) + if err != nil { + return err + } + return x.serializer.Unmarshal(tmp, in) + }, interceptor) + } + } } -func (testConn) SetReadDeadline(t time.Time) error { - //TODO implement me - panic("implement me") +func (x *standaloneRegistry) getMethod(name string) serverHandler { + x.lock.RLock() + defer x.lock.RUnlock() + return x.methods[name] +} + +type Serializer interface { + Marshal(any) ([]byte, error) + Unmarshal([]byte, any) error +} + +func NewProtoSerializer() Serializer { + return protoSerializer{} +} + +type protoSerializer struct{} + +func (protoSerializer) Marshal(in any) ([]byte, error) { + return proto.Marshal(in.(proto.Message)) } -func (testConn) SetWriteDeadline(t time.Time) error { - //TODO implement me - panic("implement me") +func (protoSerializer) Unmarshal(b []byte, out any) error { + return proto.Unmarshal(b, out.(proto.Message)) } diff --git a/pkg/service/registry.go b/pkg/service/registry.go index 38a4575ace..a847a4d816 100644 --- a/pkg/service/registry.go +++ b/pkg/service/registry.go @@ -1,67 +1,34 @@ package service -import ( - "context" - "fmt" - - "google.golang.org/grpc" -) - -type Conn interface { - GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]grpc.ClientConnInterface, error) //1 - GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (grpc.ClientConnInterface, error) //2 - GetSelfConnTarget() string //3 - AddOption(opts ...grpc.DialOption) //4 - CloseConn(conn *grpc.ClientConn) //5 - // do not use this method for call rpc -} -type SvcDiscoveryRegistry interface { - Conn - Register(serviceName, host string, port int, opts ...grpc.DialOption) error //6 - UnRegister() error //7 - Close() - GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) // -} - -var _ SvcDiscoveryRegistry = (*DiscoveryRegistry)(nil) - -type DiscoveryRegistry struct { -} - -func (x *DiscoveryRegistry) RegisterService(desc *grpc.ServiceDesc, impl any) { - fmt.Println("RegisterService", desc, impl) -} - -func (x *DiscoveryRegistry) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]grpc.ClientConnInterface, error) { - //TODO implement me - panic("implement me") -} - -func (x *DiscoveryRegistry) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (grpc.ClientConnInterface, error) { - //TODO implement me - panic("implement me") -} - -func (x *DiscoveryRegistry) GetSelfConnTarget() string { - return "" -} - -func (x *DiscoveryRegistry) AddOption(opts ...grpc.DialOption) {} - -func (x *DiscoveryRegistry) CloseConn(conn *grpc.ClientConn) { - _ = conn.Close() -} - -func (x *DiscoveryRegistry) Register(serviceName, host string, port int, opts ...grpc.DialOption) error { - return nil -} - -func (x *DiscoveryRegistry) UnRegister() error { - return nil -} - -func (x *DiscoveryRegistry) Close() {} - -func (x *DiscoveryRegistry) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) { - return "", nil -} +// +//import ( +// "context" +// "fmt" +// "sync" +// +// "google.golang.org/grpc" +//) +// +//type DiscoveryRegistry struct { +// lock sync.RWMutex +// services map[string]grpc.ClientConnInterface +//} +// +//func (x *DiscoveryRegistry) RegisterService(desc *grpc.ServiceDesc, impl any) { +// fmt.Println("RegisterService", desc, impl) +//} +// +//func (x *DiscoveryRegistry) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]grpc.ClientConnInterface, error) { +// //TODO implement me +// panic("implement me") +//} +// +//func (x *DiscoveryRegistry) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (grpc.ClientConnInterface, error) { +// //TODO implement me +// panic("implement me") +//} +// +//func (x *DiscoveryRegistry) IsSelfNode(cc grpc.ClientConnInterface) bool { +// +// return false +//} From c2b035839d774cbe082495bc88b6e4bc30fe5353 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Thu, 23 Jan 2025 18:26:40 +0800 Subject: [PATCH 3/4] fix: monolithic --- cmd/main.go | 209 +++++++ internal/msggateway/hub_server.go | 2 +- internal/msggateway/init.go | 6 +- internal/msggateway/ws_server.go | 6 +- internal/push/onlinepusher.go | 7 +- internal/push/push.go | 4 +- internal/push/push_handler.go | 2 +- internal/rpc/auth/auth.go | 4 +- internal/rpc/conversation/conversation.go | 5 +- internal/rpc/group/group.go | 5 +- internal/rpc/msg/server.go | 7 +- internal/rpc/relation/friend.go | 5 +- internal/rpc/third/third.go | 5 +- internal/rpc/user/user.go | 6 +- pkg/common/config/config.go | 644 +++++++++++----------- pkg/common/config/load_config.go | 2 +- pkg/common/startrpc/start.go | 2 +- pkg/service/a_test.go | 134 ----- pkg/service/registry.go | 34 -- pkg/service/service.go | 14 - 20 files changed, 568 insertions(+), 535 deletions(-) create mode 100644 cmd/main.go delete mode 100644 pkg/service/a_test.go delete mode 100644 pkg/service/registry.go delete mode 100644 pkg/service/service.go diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000000..870c5dd40a --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,209 @@ +package main + +import ( + "bytes" + "context" + "flag" + "fmt" + "os" + "path" + "path/filepath" + "reflect" + "runtime" + "strings" + + "github.com/mitchellh/mapstructure" + "github.com/openimsdk/open-im-server/v3/internal/api" + "github.com/openimsdk/open-im-server/v3/internal/msggateway" + "github.com/openimsdk/open-im-server/v3/internal/msgtransfer" + "github.com/openimsdk/open-im-server/v3/internal/push" + "github.com/openimsdk/open-im-server/v3/internal/rpc/auth" + "github.com/openimsdk/open-im-server/v3/internal/rpc/conversation" + "github.com/openimsdk/open-im-server/v3/internal/rpc/group" + "github.com/openimsdk/open-im-server/v3/internal/rpc/msg" + "github.com/openimsdk/open-im-server/v3/internal/rpc/relation" + "github.com/openimsdk/open-im-server/v3/internal/rpc/third" + "github.com/openimsdk/open-im-server/v3/internal/rpc/user" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/discovery/standalone" + "github.com/openimsdk/tools/utils/datautil" + "github.com/spf13/viper" + "google.golang.org/grpc" + "gopkg.in/yaml.v3" +) + +func main() { + var configPath string + flag.StringVar(&configPath, "c", "/Users/chao/Desktop/code/open-im-server/config", "config path") + flag.Parse() + cmd := newCmds(configPath) + putCmd1(cmd, auth.Start) + putCmd1(cmd, conversation.Start) + putCmd1(cmd, relation.Start) + putCmd1(cmd, group.Start) + putCmd1(cmd, msg.Start) + putCmd1(cmd, third.Start) + putCmd1(cmd, user.Start) + putCmd1(cmd, push.Start) + putCmd2(cmd, msggateway.Start) + putCmd2(cmd, msgtransfer.Start) + putCmd2(cmd, api.Start) + ctx := context.Background() + if err := cmd.run(ctx); err != nil { + fmt.Println(err) + } + fmt.Println("success") +} + +func getTypePath(typ reflect.Type) string { + return path.Join(typ.PkgPath(), typ.Name()) +} + +func newCmds(confPath string) *cmds { + return &cmds{confPath: confPath} +} + +type cmds struct { + confPath string + cmds []cmdName + conf map[string][]byte +} + +func (x *cmds) readConfig() error { + skip := []string{ + config.DiscoveryConfigFilename, + } + if x.conf == nil { + x.conf = make(map[string][]byte) + } + vof := reflect.ValueOf(&config.AllConfig{}).Elem() + num := vof.NumField() + for i := 0; i < num; i++ { + field := vof.Field(i) + for ptr := true; ptr; { + if field.Kind() == reflect.Ptr { + field = field.Elem() + } else { + ptr = false + } + } + itemConf := field.Addr().Interface() + name := itemConf.(interface{ GetConfigFileName() string }).GetConfigFileName() + if datautil.Contain(name, skip...) { + x.conf[getTypePath(field.Type())] = nil + continue + } + data, err := os.ReadFile(filepath.Join(x.confPath, name)) + if err != nil { + return err + } + x.conf[getTypePath(field.Type())] = data + } + val := config.Discovery{Enable: discovery.Standalone} + var buf bytes.Buffer + if err := yaml.NewEncoder(&buf).Encode(&val); err != nil { + return err + } + x.conf[getTypePath(reflect.TypeOf(val))] = buf.Bytes() + return nil +} + +func (x *cmds) parseConf(conf any) error { + vof := reflect.ValueOf(conf) + for { + if vof.Kind() == reflect.Ptr { + vof = vof.Elem() + } else { + break + } + } + tof := vof.Type() + numField := vof.NumField() + for i := 0; i < numField; i++ { + typeField := tof.Field(i) + if !typeField.IsExported() { + continue + } + field := vof.Field(i) + pkt := getTypePath(field.Type()) + confData, ok := x.conf[pkt] + if !ok { + if typeField.Name == "FcmConfigPath" && field.Kind() == reflect.String { + field.SetString(x.confPath) + continue + } + return fmt.Errorf("config field %s %s not found", vof.Type().Name(), typeField.Name) + } + if confData == nil { + continue + } + val := field.Addr().Interface() + v := viper.New() + v.SetConfigType("yaml") + if err := v.ReadConfig(bytes.NewReader(confData)); err != nil { + return err + } + fn := func(conf *mapstructure.DecoderConfig) { + conf.TagName = config.StructTagName + } + if err := v.Unmarshal(val, fn); err != nil { + return err + } + } + return nil +} + +func (x *cmds) add(name string, fn func(ctx context.Context) error) { + x.cmds = append(x.cmds, cmdName{Name: name, Func: fn}) +} + +func (x *cmds) run(ctx context.Context) error { + if x.conf == nil { + if err := x.readConfig(); err != nil { + return err + } + } + for _, cmd := range x.cmds { + fmt.Println("start", cmd.Name) + if err := cmd.Func(ctx); err != nil { + fmt.Println("start failed", cmd.Name, err) + return err + } + fmt.Println("start ok", cmd.Name) + } + return nil +} + +type cmdName struct { + Name string + Func func(ctx context.Context) error +} + +func getFuncPacketName(fn any) string { + name := path.Base(runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()) + if index := strings.Index(name, "."); index >= 0 { + name = name[:index] + } + return name +} + +func putCmd1[C any](cmd *cmds, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar) error) { + cmd.add(getFuncPacketName(fn), func(ctx context.Context) error { + var conf C + if err := cmd.parseConf(&conf); err != nil { + return err + } + return fn(ctx, &conf, standalone.GetDiscoveryConn(), standalone.GetServiceRegistrar()) + }) +} + +func putCmd2[C any](cmd *cmds, fn func(ctx context.Context, index int, config *C) error) { + cmd.add(getFuncPacketName(fn), func(ctx context.Context) error { + var conf C + if err := cmd.parseConf(&conf); err != nil { + return err + } + return fn(ctx, 0, &conf) + }) +} diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index 887a90d7a7..7e84488809 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -35,7 +35,7 @@ import ( "google.golang.org/grpc" ) -func (s *Server) InitServer(ctx context.Context, config *Config, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error { +func (s *Server) InitServer(ctx context.Context, config *Config, disCov discovery.Conn, server grpc.ServiceRegistrar) error { userConn, err := disCov.GetConn(ctx, config.Discovery.RpcService.User) if err != nil { return err diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 6614b96bdb..f11bc3fafc 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -34,14 +34,14 @@ type Config struct { WebhooksConfig config.Webhooks Discovery config.Discovery - RuntimeEnv string + runtimeEnv string } // Start run ws server. func Start(ctx context.Context, index int, conf *Config) error { - conf.RuntimeEnv = runtimeenv.PrintRuntimeEnvironment() + conf.runtimeEnv = runtimeenv.PrintRuntimeEnvironment() - log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", conf.RuntimeEnv, + log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", conf.runtimeEnv, "rpcPorts", conf.MsgGateway.RPC.Ports, "wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports) wsPort, err := datautil.GetElemByIndex(conf.MsgGateway.LongConnSvr.Ports, index) diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go index 0731074c06..d62a6c6dc5 100644 --- a/internal/msggateway/ws_server.go +++ b/internal/msggateway/ws_server.go @@ -35,7 +35,7 @@ type LongConnServer interface { GetUserAllCons(userID string) ([]*Client, bool) GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool) Validate(s any) error - SetDiscoveryRegistry(ctx context.Context, client discovery.SvcDiscoveryRegistry, config *Config) error + SetDiscoveryRegistry(ctx context.Context, client discovery.Conn, config *Config) error KickUserConn(client *Client) error UnRegister(c *Client) SetKickHandlerInfo(i *kickHandler) @@ -60,7 +60,7 @@ type WsServer struct { handshakeTimeout time.Duration writeBufferSize int validate *validator.Validate - disCov discovery.SvcDiscoveryRegistry + disCov discovery.Conn Compressor //Encoder MessageHandler @@ -75,7 +75,7 @@ type kickHandler struct { newClient *Client } -func (ws *WsServer) SetDiscoveryRegistry(ctx context.Context, disCov discovery.SvcDiscoveryRegistry, config *Config) error { +func (ws *WsServer) SetDiscoveryRegistry(ctx context.Context, disCov discovery.Conn, config *Config) error { userConn, err := disCov.GetConn(ctx, config.Discovery.RpcService.User) if err != nil { return err diff --git a/internal/push/onlinepusher.go b/internal/push/onlinepusher.go index 8bec6e60b2..d027661210 100644 --- a/internal/push/onlinepusher.go +++ b/internal/push/onlinepusher.go @@ -41,8 +41,7 @@ func (u emptyOnlinePusher) GetOnlinePushFailedUserIDs(ctx context.Context, msg * return nil } -func NewOnlinePusher(disCov discovery.SvcDiscoveryRegistry, config *Config) OnlinePusher { - +func NewOnlinePusher(disCov discovery.Conn, config *Config) OnlinePusher { if config.runTimeEnv == conf.KUBERNETES { return NewDefaultAllNode(disCov, config) } @@ -56,11 +55,11 @@ func NewOnlinePusher(disCov discovery.SvcDiscoveryRegistry, config *Config) Onli } type DefaultAllNode struct { - disCov discovery.SvcDiscoveryRegistry + disCov discovery.Conn config *Config } -func NewDefaultAllNode(disCov discovery.SvcDiscoveryRegistry, config *Config) *DefaultAllNode { +func NewDefaultAllNode(disCov discovery.Conn, config *Config) *DefaultAllNode { return &DefaultAllNode{disCov: disCov, config: config} } diff --git a/internal/push/push.go b/internal/push/push.go index b7c1ec4272..748463d652 100644 --- a/internal/push/push.go +++ b/internal/push/push.go @@ -17,7 +17,7 @@ import ( type pushServer struct { pbpush.UnimplementedPushMsgServiceServer database controller.PushDatabase - disCov discovery.SvcDiscoveryRegistry + disCov discovery.Conn offlinePusher offlinepush.OfflinePusher pushCh *ConsumerHandler offlinePushCh *OfflinePushConsumerHandler @@ -45,7 +45,7 @@ func (p pushServer) DelUserPushToken(ctx context.Context, return &pbpush.DelUserPushTokenResp{}, nil } -func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { +func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { config.runTimeEnv = runtimeenv.PrintRuntimeEnvironment() rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 80d14499f4..dad9b53b7f 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -50,7 +50,7 @@ type ConsumerHandler struct { } func NewConsumerHandler(ctx context.Context, config *Config, database controller.PushDatabase, offlinePusher offlinepush.OfflinePusher, rdb redis.UniversalClient, - client discovery.SvcDiscoveryRegistry) (*ConsumerHandler, error) { + client discovery.Conn) (*ConsumerHandler, error) { var consumerHandler ConsumerHandler var err error consumerHandler.pushConsumerGroup, err = kafka.NewMConsumerGroup(config.KafkaConfig.Build(), config.KafkaConfig.ToPushGroupID, diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 2e64c365ce..3287d91128 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -43,7 +43,7 @@ import ( type authServer struct { pbauth.UnimplementedAuthServer authDatabase controller.AuthDatabase - RegisterCenter discovery.SvcDiscoveryRegistry + RegisterCenter discovery.Conn config *Config userClient *rpcli.UserClient } @@ -55,7 +55,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { +func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) if err != nil { return err diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index 53364ff866..c7958f0213 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -16,10 +16,11 @@ package conversation import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "sort" "time" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" @@ -65,7 +66,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { +func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { return err diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index bea0e1af4d..39555d47f0 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -17,13 +17,14 @@ package group import ( "context" "fmt" - "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "math/big" "math/rand" "strconv" "strings" "time" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -76,7 +77,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { +func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { return err diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 7ccda6bd57..a85b88b0b2 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -16,6 +16,7 @@ package msg import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -56,8 +57,8 @@ type Config struct { // MsgServer encapsulates dependencies required for message handling. type msgServer struct { msg.UnimplementedMsgServer - RegisterCenter discovery.SvcDiscoveryRegistry // Service discovery registry for service registration. - MsgDatabase controller.CommonMsgDatabase // Interface for message database operations. + RegisterCenter discovery.Conn // Service discovery registry for service registration. + MsgDatabase controller.CommonMsgDatabase // Interface for message database operations. StreamMsgDatabase controller.StreamMsgDatabase UserLocalCache *rpccache.UserLocalCache // Local cache for user data. FriendLocalCache *rpccache.FriendLocalCache // Local cache for friend data. @@ -76,7 +77,7 @@ func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorF } -func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { +func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { return err diff --git a/internal/rpc/relation/friend.go b/internal/rpc/relation/friend.go index 79db149705..464275f751 100644 --- a/internal/rpc/relation/friend.go +++ b/internal/rpc/relation/friend.go @@ -16,6 +16,7 @@ package relation import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/tools/mq/memamq" @@ -47,7 +48,7 @@ type friendServer struct { db controller.FriendDatabase blackDatabase controller.BlackDatabase notificationSender *FriendNotificationSender - RegisterCenter discovery.SvcDiscoveryRegistry + RegisterCenter discovery.Conn config *Config webhookClient *webhook.Client queue *memamq.MemoryQueue @@ -66,7 +67,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { +func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { return err diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index 0b8ca25a85..38da8c3ccf 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -17,9 +17,10 @@ package third import ( "context" "fmt" - "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "time" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" @@ -60,7 +61,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { +func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { return err diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 15e93a9887..656f0d168d 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -45,7 +45,7 @@ import ( pbuser "github.com/openimsdk/protocol/user" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/pagination" - registry "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/utils/datautil" "google.golang.org/grpc" @@ -57,7 +57,7 @@ type userServer struct { db controller.UserDatabase friendNotificationSender *relation.FriendNotificationSender userNotificationSender *UserNotificationSender - RegisterCenter registry.SvcDiscoveryRegistry + RegisterCenter discovery.Conn config *Config webhookClient *webhook.Client groupClient *rpcli.GroupClient @@ -76,7 +76,7 @@ type Config struct { Discovery config.Discovery } -func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegistry, server *grpc.Server) error { +func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error { mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { return err diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 1b9121b7a3..2315522f35 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -28,378 +28,380 @@ import ( "github.com/openimsdk/tools/s3/oss" ) +const StructTagName = "yaml" + type CacheConfig struct { - Topic string `mapstructure:"topic"` - SlotNum int `mapstructure:"slotNum"` - SlotSize int `mapstructure:"slotSize"` - SuccessExpire int `mapstructure:"successExpire"` - FailedExpire int `mapstructure:"failedExpire"` + Topic string `yaml:"topic"` + SlotNum int `yaml:"slotNum"` + SlotSize int `yaml:"slotSize"` + SuccessExpire int `yaml:"successExpire"` + FailedExpire int `yaml:"failedExpire"` } type LocalCache struct { - User CacheConfig `mapstructure:"user"` - Group CacheConfig `mapstructure:"group"` - Friend CacheConfig `mapstructure:"friend"` - Conversation CacheConfig `mapstructure:"conversation"` + User CacheConfig `yaml:"user"` + Group CacheConfig `yaml:"group"` + Friend CacheConfig `yaml:"friend"` + Conversation CacheConfig `yaml:"conversation"` } type Log struct { - StorageLocation string `mapstructure:"storageLocation"` - RotationTime uint `mapstructure:"rotationTime"` - RemainRotationCount uint `mapstructure:"remainRotationCount"` - RemainLogLevel int `mapstructure:"remainLogLevel"` - IsStdout bool `mapstructure:"isStdout"` - IsJson bool `mapstructure:"isJson"` - IsSimplify bool `mapstructure:"isSimplify"` - WithStack bool `mapstructure:"withStack"` + StorageLocation string `yaml:"storageLocation"` + RotationTime uint `yaml:"rotationTime"` + RemainRotationCount uint `yaml:"remainRotationCount"` + RemainLogLevel int `yaml:"remainLogLevel"` + IsStdout bool `yaml:"isStdout"` + IsJson bool `yaml:"isJson"` + IsSimplify bool `yaml:"isSimplify"` + WithStack bool `yaml:"withStack"` } type Minio struct { - Bucket string `mapstructure:"bucket"` - AccessKeyID string `mapstructure:"accessKeyID"` - SecretAccessKey string `mapstructure:"secretAccessKey"` - SessionToken string `mapstructure:"sessionToken"` - InternalAddress string `mapstructure:"internalAddress"` - ExternalAddress string `mapstructure:"externalAddress"` - PublicRead bool `mapstructure:"publicRead"` + Bucket string `yaml:"bucket"` + AccessKeyID string `yaml:"accessKeyID"` + SecretAccessKey string `yaml:"secretAccessKey"` + SessionToken string `yaml:"sessionToken"` + InternalAddress string `yaml:"internalAddress"` + ExternalAddress string `yaml:"externalAddress"` + PublicRead bool `yaml:"publicRead"` } type Mongo struct { - URI string `mapstructure:"uri"` - Address []string `mapstructure:"address"` - Database string `mapstructure:"database"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - AuthSource string `mapstructure:"authSource"` - MaxPoolSize int `mapstructure:"maxPoolSize"` - MaxRetry int `mapstructure:"maxRetry"` + URI string `yaml:"uri"` + Address []string `yaml:"address"` + Database string `yaml:"database"` + Username string `yaml:"username"` + Password string `yaml:"password"` + AuthSource string `yaml:"authSource"` + MaxPoolSize int `yaml:"maxPoolSize"` + MaxRetry int `yaml:"maxRetry"` } type Kafka struct { - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - ProducerAck string `mapstructure:"producerAck"` - CompressType string `mapstructure:"compressType"` - Address []string `mapstructure:"address"` - ToRedisTopic string `mapstructure:"toRedisTopic"` - ToMongoTopic string `mapstructure:"toMongoTopic"` - ToPushTopic string `mapstructure:"toPushTopic"` - ToOfflinePushTopic string `mapstructure:"toOfflinePushTopic"` - ToRedisGroupID string `mapstructure:"toRedisGroupID"` - ToMongoGroupID string `mapstructure:"toMongoGroupID"` - ToPushGroupID string `mapstructure:"toPushGroupID"` - ToOfflineGroupID string `mapstructure:"toOfflinePushGroupID"` - - Tls TLSConfig `mapstructure:"tls"` + Username string `yaml:"username"` + Password string `yaml:"password"` + ProducerAck string `yaml:"producerAck"` + CompressType string `yaml:"compressType"` + Address []string `yaml:"address"` + ToRedisTopic string `yaml:"toRedisTopic"` + ToMongoTopic string `yaml:"toMongoTopic"` + ToPushTopic string `yaml:"toPushTopic"` + ToOfflinePushTopic string `yaml:"toOfflinePushTopic"` + ToRedisGroupID string `yaml:"toRedisGroupID"` + ToMongoGroupID string `yaml:"toMongoGroupID"` + ToPushGroupID string `yaml:"toPushGroupID"` + ToOfflineGroupID string `yaml:"toOfflinePushGroupID"` + + Tls TLSConfig `yaml:"tls"` } type TLSConfig struct { - EnableTLS bool `mapstructure:"enableTLS"` - CACrt string `mapstructure:"caCrt"` - ClientCrt string `mapstructure:"clientCrt"` - ClientKey string `mapstructure:"clientKey"` - ClientKeyPwd string `mapstructure:"clientKeyPwd"` - InsecureSkipVerify bool `mapstructure:"insecureSkipVerify"` + EnableTLS bool `yaml:"enableTLS"` + CACrt string `yaml:"caCrt"` + ClientCrt string `yaml:"clientCrt"` + ClientKey string `yaml:"clientKey"` + ClientKeyPwd string `yaml:"clientKeyPwd"` + InsecureSkipVerify bool `yaml:"insecureSkipVerify"` } type API struct { Api struct { - ListenIP string `mapstructure:"listenIP"` - Ports []int `mapstructure:"ports"` - CompressionLevel int `mapstructure:"compressionLevel"` - } `mapstructure:"api"` + ListenIP string `yaml:"listenIP"` + Ports []int `yaml:"ports"` + CompressionLevel int `yaml:"compressionLevel"` + } `yaml:"api"` Prometheus struct { - Enable bool `mapstructure:"enable"` - AutoSetPorts bool `mapstructure:"autoSetPorts"` - Ports []int `mapstructure:"ports"` - GrafanaURL string `mapstructure:"grafanaURL"` - } `mapstructure:"prometheus"` + Enable bool `yaml:"enable"` + AutoSetPorts bool `yaml:"autoSetPorts"` + Ports []int `yaml:"ports"` + GrafanaURL string `yaml:"grafanaURL"` + } `yaml:"prometheus"` } type CronTask struct { - CronExecuteTime string `mapstructure:"cronExecuteTime"` - RetainChatRecords int `mapstructure:"retainChatRecords"` - FileExpireTime int `mapstructure:"fileExpireTime"` - DeleteObjectType []string `mapstructure:"deleteObjectType"` + CronExecuteTime string `yaml:"cronExecuteTime"` + RetainChatRecords int `yaml:"retainChatRecords"` + FileExpireTime int `yaml:"fileExpireTime"` + DeleteObjectType []string `yaml:"deleteObjectType"` } type OfflinePushConfig struct { - Enable bool `mapstructure:"enable"` - Title string `mapstructure:"title"` - Desc string `mapstructure:"desc"` - Ext string `mapstructure:"ext"` + Enable bool `yaml:"enable"` + Title string `yaml:"title"` + Desc string `yaml:"desc"` + Ext string `yaml:"ext"` } type NotificationConfig struct { - IsSendMsg bool `mapstructure:"isSendMsg"` - ReliabilityLevel int `mapstructure:"reliabilityLevel"` - UnreadCount bool `mapstructure:"unreadCount"` - OfflinePush OfflinePushConfig `mapstructure:"offlinePush"` + IsSendMsg bool `yaml:"isSendMsg"` + ReliabilityLevel int `yaml:"reliabilityLevel"` + UnreadCount bool `yaml:"unreadCount"` + OfflinePush OfflinePushConfig `yaml:"offlinePush"` } type Notification struct { - GroupCreated NotificationConfig `mapstructure:"groupCreated"` - GroupInfoSet NotificationConfig `mapstructure:"groupInfoSet"` - JoinGroupApplication NotificationConfig `mapstructure:"joinGroupApplication"` - MemberQuit NotificationConfig `mapstructure:"memberQuit"` - GroupApplicationAccepted NotificationConfig `mapstructure:"groupApplicationAccepted"` - GroupApplicationRejected NotificationConfig `mapstructure:"groupApplicationRejected"` - GroupOwnerTransferred NotificationConfig `mapstructure:"groupOwnerTransferred"` - MemberKicked NotificationConfig `mapstructure:"memberKicked"` - MemberInvited NotificationConfig `mapstructure:"memberInvited"` - MemberEnter NotificationConfig `mapstructure:"memberEnter"` - GroupDismissed NotificationConfig `mapstructure:"groupDismissed"` - GroupMuted NotificationConfig `mapstructure:"groupMuted"` - GroupCancelMuted NotificationConfig `mapstructure:"groupCancelMuted"` - GroupMemberMuted NotificationConfig `mapstructure:"groupMemberMuted"` - GroupMemberCancelMuted NotificationConfig `mapstructure:"groupMemberCancelMuted"` - GroupMemberInfoSet NotificationConfig `mapstructure:"groupMemberInfoSet"` + GroupCreated NotificationConfig `yaml:"groupCreated"` + GroupInfoSet NotificationConfig `yaml:"groupInfoSet"` + JoinGroupApplication NotificationConfig `yaml:"joinGroupApplication"` + MemberQuit NotificationConfig `yaml:"memberQuit"` + GroupApplicationAccepted NotificationConfig `yaml:"groupApplicationAccepted"` + GroupApplicationRejected NotificationConfig `yaml:"groupApplicationRejected"` + GroupOwnerTransferred NotificationConfig `yaml:"groupOwnerTransferred"` + MemberKicked NotificationConfig `yaml:"memberKicked"` + MemberInvited NotificationConfig `yaml:"memberInvited"` + MemberEnter NotificationConfig `yaml:"memberEnter"` + GroupDismissed NotificationConfig `yaml:"groupDismissed"` + GroupMuted NotificationConfig `yaml:"groupMuted"` + GroupCancelMuted NotificationConfig `yaml:"groupCancelMuted"` + GroupMemberMuted NotificationConfig `yaml:"groupMemberMuted"` + GroupMemberCancelMuted NotificationConfig `yaml:"groupMemberCancelMuted"` + GroupMemberInfoSet NotificationConfig `yaml:"groupMemberInfoSet"` GroupMemberSetToAdmin NotificationConfig `yaml:"groupMemberSetToAdmin"` GroupMemberSetToOrdinary NotificationConfig `yaml:"groupMemberSetToOrdinaryUser"` - GroupInfoSetAnnouncement NotificationConfig `mapstructure:"groupInfoSetAnnouncement"` - GroupInfoSetName NotificationConfig `mapstructure:"groupInfoSetName"` - FriendApplicationAdded NotificationConfig `mapstructure:"friendApplicationAdded"` - FriendApplicationApproved NotificationConfig `mapstructure:"friendApplicationApproved"` - FriendApplicationRejected NotificationConfig `mapstructure:"friendApplicationRejected"` - FriendAdded NotificationConfig `mapstructure:"friendAdded"` - FriendDeleted NotificationConfig `mapstructure:"friendDeleted"` - FriendRemarkSet NotificationConfig `mapstructure:"friendRemarkSet"` - BlackAdded NotificationConfig `mapstructure:"blackAdded"` - BlackDeleted NotificationConfig `mapstructure:"blackDeleted"` - FriendInfoUpdated NotificationConfig `mapstructure:"friendInfoUpdated"` - UserInfoUpdated NotificationConfig `mapstructure:"userInfoUpdated"` - UserStatusChanged NotificationConfig `mapstructure:"userStatusChanged"` - ConversationChanged NotificationConfig `mapstructure:"conversationChanged"` - ConversationSetPrivate NotificationConfig `mapstructure:"conversationSetPrivate"` + GroupInfoSetAnnouncement NotificationConfig `yaml:"groupInfoSetAnnouncement"` + GroupInfoSetName NotificationConfig `yaml:"groupInfoSetName"` + FriendApplicationAdded NotificationConfig `yaml:"friendApplicationAdded"` + FriendApplicationApproved NotificationConfig `yaml:"friendApplicationApproved"` + FriendApplicationRejected NotificationConfig `yaml:"friendApplicationRejected"` + FriendAdded NotificationConfig `yaml:"friendAdded"` + FriendDeleted NotificationConfig `yaml:"friendDeleted"` + FriendRemarkSet NotificationConfig `yaml:"friendRemarkSet"` + BlackAdded NotificationConfig `yaml:"blackAdded"` + BlackDeleted NotificationConfig `yaml:"blackDeleted"` + FriendInfoUpdated NotificationConfig `yaml:"friendInfoUpdated"` + UserInfoUpdated NotificationConfig `yaml:"userInfoUpdated"` + UserStatusChanged NotificationConfig `yaml:"userStatusChanged"` + ConversationChanged NotificationConfig `yaml:"conversationChanged"` + ConversationSetPrivate NotificationConfig `yaml:"conversationSetPrivate"` } type Prometheus struct { - Enable bool `mapstructure:"enable"` - Ports []int `mapstructure:"ports"` + Enable bool `yaml:"enable"` + Ports []int `yaml:"ports"` } type MsgGateway struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - AutoSetPorts bool `mapstructure:"autoSetPorts"` - Ports []int `mapstructure:"ports"` - } `mapstructure:"rpc"` - Prometheus Prometheus `mapstructure:"prometheus"` - ListenIP string `mapstructure:"listenIP"` + RegisterIP string `yaml:"registerIP"` + AutoSetPorts bool `yaml:"autoSetPorts"` + Ports []int `yaml:"ports"` + } `yaml:"rpc"` + Prometheus Prometheus `yaml:"prometheus"` + ListenIP string `yaml:"listenIP"` LongConnSvr struct { - Ports []int `mapstructure:"ports"` - WebsocketMaxConnNum int `mapstructure:"websocketMaxConnNum"` - WebsocketMaxMsgLen int `mapstructure:"websocketMaxMsgLen"` - WebsocketTimeout int `mapstructure:"websocketTimeout"` - } `mapstructure:"longConnSvr"` + Ports []int `yaml:"ports"` + WebsocketMaxConnNum int `yaml:"websocketMaxConnNum"` + WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"` + WebsocketTimeout int `yaml:"websocketTimeout"` + } `yaml:"longConnSvr"` } type MsgTransfer struct { Prometheus struct { - Enable bool `mapstructure:"enable"` - AutoSetPorts bool `mapstructure:"autoSetPorts"` - Ports []int `mapstructure:"ports"` - } `mapstructure:"prometheus"` + Enable bool `yaml:"enable"` + AutoSetPorts bool `yaml:"autoSetPorts"` + Ports []int `yaml:"ports"` + } `yaml:"prometheus"` } type Push struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - ListenIP string `mapstructure:"listenIP"` - AutoSetPorts bool `mapstructure:"autoSetPorts"` - Ports []int `mapstructure:"ports"` - } `mapstructure:"rpc"` - Prometheus Prometheus `mapstructure:"prometheus"` - MaxConcurrentWorkers int `mapstructure:"maxConcurrentWorkers"` - Enable string `mapstructure:"enable"` + RegisterIP string `yaml:"registerIP"` + ListenIP string `yaml:"listenIP"` + AutoSetPorts bool `yaml:"autoSetPorts"` + Ports []int `yaml:"ports"` + } `yaml:"rpc"` + Prometheus Prometheus `yaml:"prometheus"` + MaxConcurrentWorkers int `yaml:"maxConcurrentWorkers"` + Enable string `yaml:"enable"` GeTui struct { - PushUrl string `mapstructure:"pushUrl"` - MasterSecret string `mapstructure:"masterSecret"` - AppKey string `mapstructure:"appKey"` - Intent string `mapstructure:"intent"` - ChannelID string `mapstructure:"channelID"` - ChannelName string `mapstructure:"channelName"` - } `mapstructure:"geTui"` + PushUrl string `yaml:"pushUrl"` + MasterSecret string `yaml:"masterSecret"` + AppKey string `yaml:"appKey"` + Intent string `yaml:"intent"` + ChannelID string `yaml:"channelID"` + ChannelName string `yaml:"channelName"` + } `yaml:"geTui"` FCM struct { - FilePath string `mapstructure:"filePath"` - AuthURL string `mapstructure:"authURL"` - } `mapstructure:"fcm"` + FilePath string `yaml:"filePath"` + AuthURL string `yaml:"authURL"` + } `yaml:"fcm"` JPush struct { - AppKey string `mapstructure:"appKey"` - MasterSecret string `mapstructure:"masterSecret"` - PushURL string `mapstructure:"pushURL"` - PushIntent string `mapstructure:"pushIntent"` - } `mapstructure:"jpush"` + AppKey string `yaml:"appKey"` + MasterSecret string `yaml:"masterSecret"` + PushURL string `yaml:"pushURL"` + PushIntent string `yaml:"pushIntent"` + } `yaml:"jpush"` IOSPush struct { - PushSound string `mapstructure:"pushSound"` - BadgeCount bool `mapstructure:"badgeCount"` - Production bool `mapstructure:"production"` - } `mapstructure:"iosPush"` - FullUserCache bool `mapstructure:"fullUserCache"` + PushSound string `yaml:"pushSound"` + BadgeCount bool `yaml:"badgeCount"` + Production bool `yaml:"production"` + } `yaml:"iosPush"` + FullUserCache bool `yaml:"fullUserCache"` } type Auth struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - ListenIP string `mapstructure:"listenIP"` - AutoSetPorts bool `mapstructure:"autoSetPorts"` - Ports []int `mapstructure:"ports"` - } `mapstructure:"rpc"` - Prometheus Prometheus `mapstructure:"prometheus"` + RegisterIP string `yaml:"registerIP"` + ListenIP string `yaml:"listenIP"` + AutoSetPorts bool `yaml:"autoSetPorts"` + Ports []int `yaml:"ports"` + } `yaml:"rpc"` + Prometheus Prometheus `yaml:"prometheus"` TokenPolicy struct { - Expire int64 `mapstructure:"expire"` - } `mapstructure:"tokenPolicy"` + Expire int64 `yaml:"expire"` + } `yaml:"tokenPolicy"` } type Conversation struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - ListenIP string `mapstructure:"listenIP"` - AutoSetPorts bool `mapstructure:"autoSetPorts"` - Ports []int `mapstructure:"ports"` - } `mapstructure:"rpc"` - Prometheus Prometheus `mapstructure:"prometheus"` + RegisterIP string `yaml:"registerIP"` + ListenIP string `yaml:"listenIP"` + AutoSetPorts bool `yaml:"autoSetPorts"` + Ports []int `yaml:"ports"` + } `yaml:"rpc"` + Prometheus Prometheus `yaml:"prometheus"` } type Friend struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - ListenIP string `mapstructure:"listenIP"` - AutoSetPorts bool `mapstructure:"autoSetPorts"` - Ports []int `mapstructure:"ports"` - } `mapstructure:"rpc"` - Prometheus Prometheus `mapstructure:"prometheus"` + RegisterIP string `yaml:"registerIP"` + ListenIP string `yaml:"listenIP"` + AutoSetPorts bool `yaml:"autoSetPorts"` + Ports []int `yaml:"ports"` + } `yaml:"rpc"` + Prometheus Prometheus `yaml:"prometheus"` } type Group struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - ListenIP string `mapstructure:"listenIP"` - AutoSetPorts bool `mapstructure:"autoSetPorts"` - Ports []int `mapstructure:"ports"` - } `mapstructure:"rpc"` - Prometheus Prometheus `mapstructure:"prometheus"` - EnableHistoryForNewMembers bool `mapstructure:"enableHistoryForNewMembers"` + RegisterIP string `yaml:"registerIP"` + ListenIP string `yaml:"listenIP"` + AutoSetPorts bool `yaml:"autoSetPorts"` + Ports []int `yaml:"ports"` + } `yaml:"rpc"` + Prometheus Prometheus `yaml:"prometheus"` + EnableHistoryForNewMembers bool `yaml:"enableHistoryForNewMembers"` } type Msg struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - ListenIP string `mapstructure:"listenIP"` - AutoSetPorts bool `mapstructure:"autoSetPorts"` - Ports []int `mapstructure:"ports"` - } `mapstructure:"rpc"` - Prometheus Prometheus `mapstructure:"prometheus"` - FriendVerify bool `mapstructure:"friendVerify"` + RegisterIP string `yaml:"registerIP"` + ListenIP string `yaml:"listenIP"` + AutoSetPorts bool `yaml:"autoSetPorts"` + Ports []int `yaml:"ports"` + } `yaml:"rpc"` + Prometheus Prometheus `yaml:"prometheus"` + FriendVerify bool `yaml:"friendVerify"` } type Third struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - ListenIP string `mapstructure:"listenIP"` - AutoSetPorts bool `mapstructure:"autoSetPorts"` - Ports []int `mapstructure:"ports"` - } `mapstructure:"rpc"` - Prometheus Prometheus `mapstructure:"prometheus"` + RegisterIP string `yaml:"registerIP"` + ListenIP string `yaml:"listenIP"` + AutoSetPorts bool `yaml:"autoSetPorts"` + Ports []int `yaml:"ports"` + } `yaml:"rpc"` + Prometheus Prometheus `yaml:"prometheus"` Object struct { - Enable string `mapstructure:"enable"` - Cos Cos `mapstructure:"cos"` - Oss Oss `mapstructure:"oss"` - Kodo Kodo `mapstructure:"kodo"` - Aws Aws `mapstructure:"aws"` - } `mapstructure:"object"` + Enable string `yaml:"enable"` + Cos Cos `yaml:"cos"` + Oss Oss `yaml:"oss"` + Kodo Kodo `yaml:"kodo"` + Aws Aws `yaml:"aws"` + } `yaml:"object"` } type Cos struct { - BucketURL string `mapstructure:"bucketURL"` - SecretID string `mapstructure:"secretID"` - SecretKey string `mapstructure:"secretKey"` - SessionToken string `mapstructure:"sessionToken"` - PublicRead bool `mapstructure:"publicRead"` + BucketURL string `yaml:"bucketURL"` + SecretID string `yaml:"secretID"` + SecretKey string `yaml:"secretKey"` + SessionToken string `yaml:"sessionToken"` + PublicRead bool `yaml:"publicRead"` } type Oss struct { - Endpoint string `mapstructure:"endpoint"` - Bucket string `mapstructure:"bucket"` - BucketURL string `mapstructure:"bucketURL"` - AccessKeyID string `mapstructure:"accessKeyID"` - AccessKeySecret string `mapstructure:"accessKeySecret"` - SessionToken string `mapstructure:"sessionToken"` - PublicRead bool `mapstructure:"publicRead"` + Endpoint string `yaml:"endpoint"` + Bucket string `yaml:"bucket"` + BucketURL string `yaml:"bucketURL"` + AccessKeyID string `yaml:"accessKeyID"` + AccessKeySecret string `yaml:"accessKeySecret"` + SessionToken string `yaml:"sessionToken"` + PublicRead bool `yaml:"publicRead"` } type Kodo struct { - Endpoint string `mapstructure:"endpoint"` - Bucket string `mapstructure:"bucket"` - BucketURL string `mapstructure:"bucketURL"` - AccessKeyID string `mapstructure:"accessKeyID"` - AccessKeySecret string `mapstructure:"accessKeySecret"` - SessionToken string `mapstructure:"sessionToken"` - PublicRead bool `mapstructure:"publicRead"` + Endpoint string `yaml:"endpoint"` + Bucket string `yaml:"bucket"` + BucketURL string `yaml:"bucketURL"` + AccessKeyID string `yaml:"accessKeyID"` + AccessKeySecret string `yaml:"accessKeySecret"` + SessionToken string `yaml:"sessionToken"` + PublicRead bool `yaml:"publicRead"` } type Aws struct { - Region string `mapstructure:"region"` - Bucket string `mapstructure:"bucket"` - AccessKeyID string `mapstructure:"accessKeyID"` - SecretAccessKey string `mapstructure:"secretAccessKey"` - SessionToken string `mapstructure:"sessionToken"` - PublicRead bool `mapstructure:"publicRead"` + Region string `yaml:"region"` + Bucket string `yaml:"bucket"` + AccessKeyID string `yaml:"accessKeyID"` + SecretAccessKey string `yaml:"secretAccessKey"` + SessionToken string `yaml:"sessionToken"` + PublicRead bool `yaml:"publicRead"` } type User struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - ListenIP string `mapstructure:"listenIP"` - AutoSetPorts bool `mapstructure:"autoSetPorts"` - Ports []int `mapstructure:"ports"` - } `mapstructure:"rpc"` - Prometheus Prometheus `mapstructure:"prometheus"` + RegisterIP string `yaml:"registerIP"` + ListenIP string `yaml:"listenIP"` + AutoSetPorts bool `yaml:"autoSetPorts"` + Ports []int `yaml:"ports"` + } `yaml:"rpc"` + Prometheus Prometheus `yaml:"prometheus"` } type Redis struct { - Address []string `mapstructure:"address"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - ClusterMode bool `mapstructure:"clusterMode"` - DB int `mapstructure:"storage"` - MaxRetry int `mapstructure:"maxRetry"` - PoolSize int `mapstructure:"poolSize"` + Address []string `yaml:"address"` + Username string `yaml:"username"` + Password string `yaml:"password"` + ClusterMode bool `yaml:"clusterMode"` + DB int `yaml:"storage"` + MaxRetry int `yaml:"maxRetry"` + PoolSize int `yaml:"poolSize"` } type BeforeConfig struct { - Enable bool `mapstructure:"enable"` - Timeout int `mapstructure:"timeout"` - FailedContinue bool `mapstructure:"failedContinue"` - AllowedTypes []string `mapstructure:"allowedTypes"` - DeniedTypes []string `mapstructure:"deniedTypes"` + Enable bool `yaml:"enable"` + Timeout int `yaml:"timeout"` + FailedContinue bool `yaml:"failedContinue"` + AllowedTypes []string `yaml:"allowedTypes"` + DeniedTypes []string `yaml:"deniedTypes"` } type AfterConfig struct { - Enable bool `mapstructure:"enable"` - Timeout int `mapstructure:"timeout"` - AttentionIds []string `mapstructure:"attentionIds"` - AllowedTypes []string `mapstructure:"allowedTypes"` - DeniedTypes []string `mapstructure:"deniedTypes"` + Enable bool `yaml:"enable"` + Timeout int `yaml:"timeout"` + AttentionIds []string `yaml:"attentionIds"` + AllowedTypes []string `yaml:"allowedTypes"` + DeniedTypes []string `yaml:"deniedTypes"` } type Share struct { - Secret string `mapstructure:"secret"` - IMAdminUserID []string `mapstructure:"imAdminUserID"` - MultiLogin MultiLogin `mapstructure:"multiLogin"` + Secret string `yaml:"secret"` + IMAdminUserID []string `yaml:"imAdminUserID"` + MultiLogin MultiLogin `yaml:"multiLogin"` } type MultiLogin struct { - Policy int `mapstructure:"policy"` - MaxNumOneEnd int `mapstructure:"maxNumOneEnd"` + Policy int `yaml:"policy"` + MaxNumOneEnd int `yaml:"maxNumOneEnd"` } type RpcService struct { - User string `mapstructure:"user"` - Friend string `mapstructure:"friend"` - Msg string `mapstructure:"msg"` - Push string `mapstructure:"push"` - MessageGateway string `mapstructure:"messageGateway"` - Group string `mapstructure:"group"` - Auth string `mapstructure:"auth"` - Conversation string `mapstructure:"conversation"` - Third string `mapstructure:"third"` + User string `yaml:"user"` + Friend string `yaml:"friend"` + Msg string `yaml:"msg"` + Push string `yaml:"push"` + MessageGateway string `yaml:"messageGateway"` + Group string `yaml:"group"` + Auth string `yaml:"auth"` + Conversation string `yaml:"conversation"` + Third string `yaml:"third"` } func (r *RpcService) GetServiceNames() []string { @@ -418,80 +420,80 @@ func (r *RpcService) GetServiceNames() []string { // FullConfig stores all configurations for before and after events type Webhooks struct { - URL string `mapstructure:"url"` - BeforeSendSingleMsg BeforeConfig `mapstructure:"beforeSendSingleMsg"` - BeforeUpdateUserInfoEx BeforeConfig `mapstructure:"beforeUpdateUserInfoEx"` - AfterUpdateUserInfoEx AfterConfig `mapstructure:"afterUpdateUserInfoEx"` - AfterSendSingleMsg AfterConfig `mapstructure:"afterSendSingleMsg"` - BeforeSendGroupMsg BeforeConfig `mapstructure:"beforeSendGroupMsg"` - BeforeMsgModify BeforeConfig `mapstructure:"beforeMsgModify"` - AfterSendGroupMsg AfterConfig `mapstructure:"afterSendGroupMsg"` - AfterUserOnline AfterConfig `mapstructure:"afterUserOnline"` - AfterUserOffline AfterConfig `mapstructure:"afterUserOffline"` - AfterUserKickOff AfterConfig `mapstructure:"afterUserKickOff"` - BeforeOfflinePush BeforeConfig `mapstructure:"beforeOfflinePush"` - BeforeOnlinePush BeforeConfig `mapstructure:"beforeOnlinePush"` - BeforeGroupOnlinePush BeforeConfig `mapstructure:"beforeGroupOnlinePush"` - BeforeAddFriend BeforeConfig `mapstructure:"beforeAddFriend"` - BeforeUpdateUserInfo BeforeConfig `mapstructure:"beforeUpdateUserInfo"` - AfterUpdateUserInfo AfterConfig `mapstructure:"afterUpdateUserInfo"` - BeforeCreateGroup BeforeConfig `mapstructure:"beforeCreateGroup"` - AfterCreateGroup AfterConfig `mapstructure:"afterCreateGroup"` - BeforeMemberJoinGroup BeforeConfig `mapstructure:"beforeMemberJoinGroup"` - BeforeSetGroupMemberInfo BeforeConfig `mapstructure:"beforeSetGroupMemberInfo"` - AfterSetGroupMemberInfo AfterConfig `mapstructure:"afterSetGroupMemberInfo"` - AfterQuitGroup AfterConfig `mapstructure:"afterQuitGroup"` - AfterKickGroupMember AfterConfig `mapstructure:"afterKickGroupMember"` - AfterDismissGroup AfterConfig `mapstructure:"afterDismissGroup"` - BeforeApplyJoinGroup BeforeConfig `mapstructure:"beforeApplyJoinGroup"` - AfterGroupMsgRead AfterConfig `mapstructure:"afterGroupMsgRead"` - AfterSingleMsgRead AfterConfig `mapstructure:"afterSingleMsgRead"` - BeforeUserRegister BeforeConfig `mapstructure:"beforeUserRegister"` - AfterUserRegister AfterConfig `mapstructure:"afterUserRegister"` - AfterTransferGroupOwner AfterConfig `mapstructure:"afterTransferGroupOwner"` - BeforeSetFriendRemark BeforeConfig `mapstructure:"beforeSetFriendRemark"` - AfterSetFriendRemark AfterConfig `mapstructure:"afterSetFriendRemark"` - AfterGroupMsgRevoke AfterConfig `mapstructure:"afterGroupMsgRevoke"` - AfterJoinGroup AfterConfig `mapstructure:"afterJoinGroup"` - BeforeInviteUserToGroup BeforeConfig `mapstructure:"beforeInviteUserToGroup"` - AfterSetGroupInfo AfterConfig `mapstructure:"afterSetGroupInfo"` - BeforeSetGroupInfo BeforeConfig `mapstructure:"beforeSetGroupInfo"` - AfterSetGroupInfoEx AfterConfig `mapstructure:"afterSetGroupInfoEx"` - BeforeSetGroupInfoEx BeforeConfig `mapstructure:"beforeSetGroupInfoEx"` - AfterRevokeMsg AfterConfig `mapstructure:"afterRevokeMsg"` - BeforeAddBlack BeforeConfig `mapstructure:"beforeAddBlack"` - AfterAddFriend AfterConfig `mapstructure:"afterAddFriend"` - BeforeAddFriendAgree BeforeConfig `mapstructure:"beforeAddFriendAgree"` - AfterAddFriendAgree AfterConfig `mapstructure:"afterAddFriendAgree"` - AfterDeleteFriend AfterConfig `mapstructure:"afterDeleteFriend"` - BeforeImportFriends BeforeConfig `mapstructure:"beforeImportFriends"` - AfterImportFriends AfterConfig `mapstructure:"afterImportFriends"` - AfterRemoveBlack AfterConfig `mapstructure:"afterRemoveBlack"` + URL string `yaml:"url"` + BeforeSendSingleMsg BeforeConfig `yaml:"beforeSendSingleMsg"` + BeforeUpdateUserInfoEx BeforeConfig `yaml:"beforeUpdateUserInfoEx"` + AfterUpdateUserInfoEx AfterConfig `yaml:"afterUpdateUserInfoEx"` + AfterSendSingleMsg AfterConfig `yaml:"afterSendSingleMsg"` + BeforeSendGroupMsg BeforeConfig `yaml:"beforeSendGroupMsg"` + BeforeMsgModify BeforeConfig `yaml:"beforeMsgModify"` + AfterSendGroupMsg AfterConfig `yaml:"afterSendGroupMsg"` + AfterUserOnline AfterConfig `yaml:"afterUserOnline"` + AfterUserOffline AfterConfig `yaml:"afterUserOffline"` + AfterUserKickOff AfterConfig `yaml:"afterUserKickOff"` + BeforeOfflinePush BeforeConfig `yaml:"beforeOfflinePush"` + BeforeOnlinePush BeforeConfig `yaml:"beforeOnlinePush"` + BeforeGroupOnlinePush BeforeConfig `yaml:"beforeGroupOnlinePush"` + BeforeAddFriend BeforeConfig `yaml:"beforeAddFriend"` + BeforeUpdateUserInfo BeforeConfig `yaml:"beforeUpdateUserInfo"` + AfterUpdateUserInfo AfterConfig `yaml:"afterUpdateUserInfo"` + BeforeCreateGroup BeforeConfig `yaml:"beforeCreateGroup"` + AfterCreateGroup AfterConfig `yaml:"afterCreateGroup"` + BeforeMemberJoinGroup BeforeConfig `yaml:"beforeMemberJoinGroup"` + BeforeSetGroupMemberInfo BeforeConfig `yaml:"beforeSetGroupMemberInfo"` + AfterSetGroupMemberInfo AfterConfig `yaml:"afterSetGroupMemberInfo"` + AfterQuitGroup AfterConfig `yaml:"afterQuitGroup"` + AfterKickGroupMember AfterConfig `yaml:"afterKickGroupMember"` + AfterDismissGroup AfterConfig `yaml:"afterDismissGroup"` + BeforeApplyJoinGroup BeforeConfig `yaml:"beforeApplyJoinGroup"` + AfterGroupMsgRead AfterConfig `yaml:"afterGroupMsgRead"` + AfterSingleMsgRead AfterConfig `yaml:"afterSingleMsgRead"` + BeforeUserRegister BeforeConfig `yaml:"beforeUserRegister"` + AfterUserRegister AfterConfig `yaml:"afterUserRegister"` + AfterTransferGroupOwner AfterConfig `yaml:"afterTransferGroupOwner"` + BeforeSetFriendRemark BeforeConfig `yaml:"beforeSetFriendRemark"` + AfterSetFriendRemark AfterConfig `yaml:"afterSetFriendRemark"` + AfterGroupMsgRevoke AfterConfig `yaml:"afterGroupMsgRevoke"` + AfterJoinGroup AfterConfig `yaml:"afterJoinGroup"` + BeforeInviteUserToGroup BeforeConfig `yaml:"beforeInviteUserToGroup"` + AfterSetGroupInfo AfterConfig `yaml:"afterSetGroupInfo"` + BeforeSetGroupInfo BeforeConfig `yaml:"beforeSetGroupInfo"` + AfterSetGroupInfoEx AfterConfig `yaml:"afterSetGroupInfoEx"` + BeforeSetGroupInfoEx BeforeConfig `yaml:"beforeSetGroupInfoEx"` + AfterRevokeMsg AfterConfig `yaml:"afterRevokeMsg"` + BeforeAddBlack BeforeConfig `yaml:"beforeAddBlack"` + AfterAddFriend AfterConfig `yaml:"afterAddFriend"` + BeforeAddFriendAgree BeforeConfig `yaml:"beforeAddFriendAgree"` + AfterAddFriendAgree AfterConfig `yaml:"afterAddFriendAgree"` + AfterDeleteFriend AfterConfig `yaml:"afterDeleteFriend"` + BeforeImportFriends BeforeConfig `yaml:"beforeImportFriends"` + AfterImportFriends AfterConfig `yaml:"afterImportFriends"` + AfterRemoveBlack AfterConfig `yaml:"afterRemoveBlack"` } type ZooKeeper struct { - Schema string `mapstructure:"schema"` - Address []string `mapstructure:"address"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` + Schema string `yaml:"schema"` + Address []string `yaml:"address"` + Username string `yaml:"username"` + Password string `yaml:"password"` } type Discovery struct { - Enable string `mapstructure:"enable"` - Etcd Etcd `mapstructure:"etcd"` - Kubernetes Kubernetes `mapstructure:"kubernetes"` - RpcService RpcService `mapstructure:"rpcService"` + Enable string `yaml:"enable"` + Etcd Etcd `yaml:"etcd"` + Kubernetes Kubernetes `yaml:"kubernetes"` + RpcService RpcService `yaml:"rpcService"` } type Kubernetes struct { - Namespace string `mapstructure:"namespace"` + Namespace string `yaml:"namespace"` } type Etcd struct { - RootDirectory string `mapstructure:"rootDirectory"` - Address []string `mapstructure:"address"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` + RootDirectory string `yaml:"rootDirectory"` + Address []string `yaml:"address"` + Username string `yaml:"username"` + Password string `yaml:"password"` } func (m *Mongo) Build() *mongoutil.Config { @@ -783,7 +785,7 @@ func (a *AllConfig) GetConfigNames() []string { } } -var ( +const ( FileName = "config.yaml" DiscoveryConfigFilename = "discovery.yml" KafkaConfigFileName = "kafka.yml" diff --git a/pkg/common/config/load_config.go b/pkg/common/config/load_config.go index aa87211f9b..76c724b2a8 100644 --- a/pkg/common/config/load_config.go +++ b/pkg/common/config/load_config.go @@ -35,7 +35,7 @@ func loadConfig(path string, envPrefix string, config any) error { } if err := v.Unmarshal(config, func(config *mapstructure.DecoderConfig) { - config.TagName = "mapstructure" + config.TagName = StructTagName }); err != nil { return errs.WrapMsg(err, "failed to unmarshal config", "path", path, "envPrefix", envPrefix) } diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 27aabca950..f41a419708 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -50,7 +50,7 @@ import ( func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T, watchConfigNames []string, watchServiceNames []string, - rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, + rpcFn func(ctx context.Context, config T, client discovery.Conn, server grpc.ServiceRegistrar) error, options ...grpc.ServerOption) error { watchConfigNames = append(watchConfigNames, conf.LogConfigFileName) diff --git a/pkg/service/a_test.go b/pkg/service/a_test.go deleted file mode 100644 index 34b6214569..0000000000 --- a/pkg/service/a_test.go +++ /dev/null @@ -1,134 +0,0 @@ -package service - -import ( - "context" - "fmt" - "sync" - "testing" - - "github.com/openimsdk/protocol/group" - "github.com/openimsdk/protocol/user" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "google.golang.org/protobuf/proto" -) - -var ( - _ user.UnimplementedUserServer - _ group.UnimplementedGroupServer -) - -func TestName1(t *testing.T) { - cc := newStandaloneConn() - user.RegisterUserServer(cc.Registry(), &user.UnimplementedUserServer{}) - group.RegisterGroupServer(cc.Registry(), &group.UnimplementedGroupServer{}) - ctx := context.Background() - resp, err := user.NewUserClient(cc).GetUserStatus(ctx, &user.GetUserStatusReq{UserID: "imAdmin", UserIDs: []string{"10000", "20000"}}) - if err != nil { - t.Error(err) - return - } - t.Log(resp) -} - -func newStandaloneConn() *standaloneConn { - return &standaloneConn{ - registry: newStandaloneRegistry(), - serializer: NewProtoSerializer(), - } -} - -type standaloneConn struct { - registry *standaloneRegistry - serializer Serializer -} - -func (x *standaloneConn) Registry() grpc.ServiceRegistrar { - return x.registry -} - -func (x *standaloneConn) Invoke(ctx context.Context, method string, args any, reply any, opts ...grpc.CallOption) error { - handler := x.registry.getMethod(method) - if handler == nil { - return fmt.Errorf("service %s not found", method) - } - resp, err := handler(ctx, args, nil) - if err != nil { - return err - } - tmp, err := x.serializer.Marshal(resp) - if err != nil { - return err - } - return x.serializer.Unmarshal(tmp, reply) -} - -func (x *standaloneConn) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { - return nil, status.Errorf(codes.Unimplemented, "method stream not implemented") -} - -type serverHandler func(ctx context.Context, req any, interceptor grpc.UnaryServerInterceptor) (any, error) - -func newStandaloneRegistry() *standaloneRegistry { - return &standaloneRegistry{ - methods: make(map[string]serverHandler), - serializer: NewProtoSerializer(), - } -} - -type standaloneRegistry struct { - lock sync.RWMutex - methods map[string]serverHandler - serializer Serializer -} - -func (x *standaloneConn) emptyDec(req any) error { - return nil -} - -func (x *standaloneRegistry) RegisterService(desc *grpc.ServiceDesc, impl any) { - x.lock.Lock() - defer x.lock.Unlock() - for i := range desc.Methods { - method := desc.Methods[i] - name := fmt.Sprintf("/%s/%s", desc.ServiceName, method.MethodName) - if _, ok := x.methods[name]; ok { - panic(fmt.Errorf("service %s already registered, method %s", desc.ServiceName, method.MethodName)) - } - x.methods[name] = func(ctx context.Context, req any, interceptor grpc.UnaryServerInterceptor) (any, error) { - return method.Handler(impl, ctx, func(in any) error { - tmp, err := x.serializer.Marshal(req) - if err != nil { - return err - } - return x.serializer.Unmarshal(tmp, in) - }, interceptor) - } - } -} - -func (x *standaloneRegistry) getMethod(name string) serverHandler { - x.lock.RLock() - defer x.lock.RUnlock() - return x.methods[name] -} - -type Serializer interface { - Marshal(any) ([]byte, error) - Unmarshal([]byte, any) error -} - -func NewProtoSerializer() Serializer { - return protoSerializer{} -} - -type protoSerializer struct{} - -func (protoSerializer) Marshal(in any) ([]byte, error) { - return proto.Marshal(in.(proto.Message)) -} - -func (protoSerializer) Unmarshal(b []byte, out any) error { - return proto.Unmarshal(b, out.(proto.Message)) -} diff --git a/pkg/service/registry.go b/pkg/service/registry.go deleted file mode 100644 index a847a4d816..0000000000 --- a/pkg/service/registry.go +++ /dev/null @@ -1,34 +0,0 @@ -package service - -// -//import ( -// "context" -// "fmt" -// "sync" -// -// "google.golang.org/grpc" -//) -// -//type DiscoveryRegistry struct { -// lock sync.RWMutex -// services map[string]grpc.ClientConnInterface -//} -// -//func (x *DiscoveryRegistry) RegisterService(desc *grpc.ServiceDesc, impl any) { -// fmt.Println("RegisterService", desc, impl) -//} -// -//func (x *DiscoveryRegistry) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]grpc.ClientConnInterface, error) { -// //TODO implement me -// panic("implement me") -//} -// -//func (x *DiscoveryRegistry) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (grpc.ClientConnInterface, error) { -// //TODO implement me -// panic("implement me") -//} -// -//func (x *DiscoveryRegistry) IsSelfNode(cc grpc.ClientConnInterface) bool { -// -// return false -//} diff --git a/pkg/service/service.go b/pkg/service/service.go deleted file mode 100644 index 183aa4b9cc..0000000000 --- a/pkg/service/service.go +++ /dev/null @@ -1,14 +0,0 @@ -package service - -import ( - "fmt" - - "google.golang.org/grpc" -) - -type GrpcServer struct { -} - -func (x *GrpcServer) RegisterService(desc *grpc.ServiceDesc, impl any) { - fmt.Println("RegisterService", desc, impl) -} From edc2bcc2f135359d9a50339ae149c52b89482295 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Fri, 24 Jan 2025 14:13:55 +0800 Subject: [PATCH 4/4] fix: monolithic --- cmd/main.go | 2 +- internal/msgtransfer/init.go | 9 ++++----- pkg/common/config/constant.go | 1 + pkg/common/discovery/discoveryregister.go | 3 +++ pkg/common/startrpc/start.go | 12 ++++++++---- 5 files changed, 17 insertions(+), 10 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 870c5dd40a..75636ec1a2 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -100,7 +100,7 @@ func (x *cmds) readConfig() error { } x.conf[getTypePath(field.Type())] = data } - val := config.Discovery{Enable: discovery.Standalone} + val := config.Discovery{Enable: config.Standalone} var buf bytes.Buffer if err := yaml.NewEncoder(&buf).Encode(&val); err != nil { return err diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 96e6bbde0e..85f2080476 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -176,11 +176,10 @@ func (m *MsgTransfer) Start(index int, config *Config, client discovery.SvcDisco return listener, port, nil } - if config.MsgTransfer.Prometheus.AutoSetPorts && config.Discovery.Enable != conf.ETCD { - return errs.New("only etcd support autoSetPorts", "RegisterName", "api").Wrap() - } - - if config.MsgTransfer.Prometheus.Enable { + if config.Discovery.Enable != conf.Standalone && config.MsgTransfer.Prometheus.Enable { + if config.MsgTransfer.Prometheus.AutoSetPorts && config.Discovery.Enable != conf.ETCD { + return errs.New("only etcd support autoSetPorts", "RegisterName", "api").Wrap() + } var ( listener net.Listener prometheusPort int diff --git a/pkg/common/config/constant.go b/pkg/common/config/constant.go index f3fcc67ef0..3a83faaa37 100644 --- a/pkg/common/config/constant.go +++ b/pkg/common/config/constant.go @@ -21,6 +21,7 @@ const ( DeploymentType = "DEPLOYMENT_TYPE" KUBERNETES = "kubernetes" ETCD = "etcd" + Standalone = "standalone" ) const ( diff --git a/pkg/common/discovery/discoveryregister.go b/pkg/common/discovery/discoveryregister.go index 1b64c3e78a..97b5c4988b 100644 --- a/pkg/common/discovery/discoveryregister.go +++ b/pkg/common/discovery/discoveryregister.go @@ -19,6 +19,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/discovery/standalone" "google.golang.org/grpc" "github.com/openimsdk/tools/discovery/kubernetes" @@ -38,6 +39,8 @@ func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string, watchN } switch discovery.Enable { + case config.Standalone: + return standalone.GetSvcDiscoveryRegistry(), nil case config.ETCD: return etcd.NewSvcDiscoveryRegistry( discovery.Etcd.RootDirectory, diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index f41a419708..41e1edc1e9 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -53,6 +53,14 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf rpcFn func(ctx context.Context, config T, client discovery.Conn, server grpc.ServiceRegistrar) error, options ...grpc.ServerOption) error { + if notification != nil { + conf.InitNotification(notification) + } + + if discovery.Enable == conf.Standalone { + return nil + } + watchConfigNames = append(watchConfigNames, conf.LogConfigFileName) var ( rpcTcpAddr string @@ -61,10 +69,6 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf prometheusPort int ) - if notification != nil { - conf.InitNotification(notification) - } - registerIP, err := network.GetRpcRegisterIP(registerIP) if err != nil { return err