Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: the abnormal message has no sending time, causing the SDK to be abnormal #3086

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 209 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package main

import (
"bytes"
"context"
"flag"
"fmt"
"os"
"path"
"path/filepath"
"reflect"
"runtime"
"strings"

"github.com/mitchellh/mapstructure"
"github.com/openimsdk/open-im-server/v3/internal/api"
"github.com/openimsdk/open-im-server/v3/internal/msggateway"
"github.com/openimsdk/open-im-server/v3/internal/msgtransfer"
"github.com/openimsdk/open-im-server/v3/internal/push"
"github.com/openimsdk/open-im-server/v3/internal/rpc/auth"
"github.com/openimsdk/open-im-server/v3/internal/rpc/conversation"
"github.com/openimsdk/open-im-server/v3/internal/rpc/group"
"github.com/openimsdk/open-im-server/v3/internal/rpc/msg"
"github.com/openimsdk/open-im-server/v3/internal/rpc/relation"
"github.com/openimsdk/open-im-server/v3/internal/rpc/third"
"github.com/openimsdk/open-im-server/v3/internal/rpc/user"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/discovery/standalone"
"github.com/openimsdk/tools/utils/datautil"
"github.com/spf13/viper"
"google.golang.org/grpc"
"gopkg.in/yaml.v3"
)

func main() {
var configPath string
flag.StringVar(&configPath, "c", "/Users/chao/Desktop/code/open-im-server/config", "config path")
flag.Parse()
cmd := newCmds(configPath)
putCmd1(cmd, auth.Start)
putCmd1(cmd, conversation.Start)
putCmd1(cmd, relation.Start)
putCmd1(cmd, group.Start)
putCmd1(cmd, msg.Start)
putCmd1(cmd, third.Start)
putCmd1(cmd, user.Start)
putCmd1(cmd, push.Start)
putCmd2(cmd, msggateway.Start)
putCmd2(cmd, msgtransfer.Start)
putCmd2(cmd, api.Start)
ctx := context.Background()
if err := cmd.run(ctx); err != nil {
fmt.Println(err)
}
fmt.Println("success")
}

func getTypePath(typ reflect.Type) string {
return path.Join(typ.PkgPath(), typ.Name())
}

func newCmds(confPath string) *cmds {
return &cmds{confPath: confPath}
}

type cmds struct {
confPath string
cmds []cmdName
conf map[string][]byte
}

func (x *cmds) readConfig() error {
skip := []string{
config.DiscoveryConfigFilename,
}
if x.conf == nil {
x.conf = make(map[string][]byte)
}
vof := reflect.ValueOf(&config.AllConfig{}).Elem()
num := vof.NumField()
for i := 0; i < num; i++ {
field := vof.Field(i)
for ptr := true; ptr; {
if field.Kind() == reflect.Ptr {
field = field.Elem()
} else {
ptr = false
}
}
itemConf := field.Addr().Interface()
name := itemConf.(interface{ GetConfigFileName() string }).GetConfigFileName()
if datautil.Contain(name, skip...) {
x.conf[getTypePath(field.Type())] = nil
continue
}
data, err := os.ReadFile(filepath.Join(x.confPath, name))
if err != nil {
return err
}
x.conf[getTypePath(field.Type())] = data
}
val := config.Discovery{Enable: config.Standalone}
var buf bytes.Buffer
if err := yaml.NewEncoder(&buf).Encode(&val); err != nil {
return err
}
x.conf[getTypePath(reflect.TypeOf(val))] = buf.Bytes()
return nil
}

func (x *cmds) parseConf(conf any) error {
vof := reflect.ValueOf(conf)
for {
if vof.Kind() == reflect.Ptr {
vof = vof.Elem()
} else {
break
}
}
tof := vof.Type()
numField := vof.NumField()
for i := 0; i < numField; i++ {
typeField := tof.Field(i)
if !typeField.IsExported() {
continue
}
field := vof.Field(i)
pkt := getTypePath(field.Type())
confData, ok := x.conf[pkt]
if !ok {
if typeField.Name == "FcmConfigPath" && field.Kind() == reflect.String {
field.SetString(x.confPath)
continue
}
return fmt.Errorf("config field %s %s not found", vof.Type().Name(), typeField.Name)
}
if confData == nil {
continue
}
val := field.Addr().Interface()
v := viper.New()
v.SetConfigType("yaml")
if err := v.ReadConfig(bytes.NewReader(confData)); err != nil {
return err
}
fn := func(conf *mapstructure.DecoderConfig) {
conf.TagName = config.StructTagName
}
if err := v.Unmarshal(val, fn); err != nil {
return err
}
}
return nil
}

func (x *cmds) add(name string, fn func(ctx context.Context) error) {
x.cmds = append(x.cmds, cmdName{Name: name, Func: fn})
}

func (x *cmds) run(ctx context.Context) error {
if x.conf == nil {
if err := x.readConfig(); err != nil {
return err
}
}
for _, cmd := range x.cmds {
fmt.Println("start", cmd.Name)
if err := cmd.Func(ctx); err != nil {
fmt.Println("start failed", cmd.Name, err)
return err
}
fmt.Println("start ok", cmd.Name)
}
return nil
}

type cmdName struct {
Name string
Func func(ctx context.Context) error
}

func getFuncPacketName(fn any) string {
name := path.Base(runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name())
if index := strings.Index(name, "."); index >= 0 {
name = name[:index]
}
return name
}

func putCmd1[C any](cmd *cmds, fn func(ctx context.Context, config *C, client discovery.Conn, server grpc.ServiceRegistrar) error) {
cmd.add(getFuncPacketName(fn), func(ctx context.Context) error {
var conf C
if err := cmd.parseConf(&conf); err != nil {
return err
}
return fn(ctx, &conf, standalone.GetDiscoveryConn(), standalone.GetServiceRegistrar())
})
}

func putCmd2[C any](cmd *cmds, fn func(ctx context.Context, index int, config *C) error) {
cmd.add(getFuncPacketName(fn), func(ctx context.Context) error {
var conf C
if err := cmd.parseConf(&conf); err != nil {
return err
}
return fn(ctx, 0, &conf)
})
}
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,7 @@ require (
golang.org/x/crypto v0.27.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)

replace (
github.com/openimsdk/tools => /Users/chao/Desktop/code/tools
)
2 changes: 1 addition & 1 deletion internal/msggateway/hub_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"google.golang.org/grpc"
)

func (s *Server) InitServer(ctx context.Context, config *Config, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
func (s *Server) InitServer(ctx context.Context, config *Config, disCov discovery.Conn, server grpc.ServiceRegistrar) error {
userConn, err := disCov.GetConn(ctx, config.Discovery.RpcService.User)
if err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions internal/msggateway/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ type Config struct {
WebhooksConfig config.Webhooks
Discovery config.Discovery

RuntimeEnv string
runtimeEnv string
}

// Start run ws server.
func Start(ctx context.Context, index int, conf *Config) error {
conf.RuntimeEnv = runtimeenv.PrintRuntimeEnvironment()
conf.runtimeEnv = runtimeenv.PrintRuntimeEnvironment()

log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", conf.RuntimeEnv,
log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", conf.runtimeEnv,
"rpcPorts", conf.MsgGateway.RPC.Ports,
"wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports)
wsPort, err := datautil.GetElemByIndex(conf.MsgGateway.LongConnSvr.Ports, index)
Expand Down
18 changes: 11 additions & 7 deletions internal/msggateway/ws_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type LongConnServer interface {
GetUserAllCons(userID string) ([]*Client, bool)
GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool)
Validate(s any) error
SetDiscoveryRegistry(ctx context.Context, client discovery.SvcDiscoveryRegistry, config *Config) error
SetDiscoveryRegistry(ctx context.Context, client discovery.Conn, config *Config) error
KickUserConn(client *Client) error
UnRegister(c *Client)
SetKickHandlerInfo(i *kickHandler)
Expand All @@ -60,7 +60,7 @@ type WsServer struct {
handshakeTimeout time.Duration
writeBufferSize int
validate *validator.Validate
disCov discovery.SvcDiscoveryRegistry
disCov discovery.Conn
Compressor
//Encoder
MessageHandler
Expand All @@ -75,7 +75,7 @@ type kickHandler struct {
newClient *Client
}

func (ws *WsServer) SetDiscoveryRegistry(ctx context.Context, disCov discovery.SvcDiscoveryRegistry, config *Config) error {
func (ws *WsServer) SetDiscoveryRegistry(ctx context.Context, disCov discovery.Conn, config *Config) error {
userConn, err := disCov.GetConn(ctx, config.Discovery.RpcService.User)
if err != nil {
return err
Expand Down Expand Up @@ -223,15 +223,19 @@ func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *C
if err != nil {
return err
}
if len(conns) == 0 || (len(conns) == 1 && ws.disCov.IsSelfNode(conns[0])) {
return nil
}

wg := errgroup.Group{}
wg.SetLimit(concurrentRequest)

// Online push user online message to other node
for _, v := range conns {
v := v
log.ZDebug(ctx, " sendUserOnlineInfoToOtherNode conn ", "target", v.Target())
if v.Target() == ws.disCov.GetSelfConnTarget() {
log.ZDebug(ctx, "Filter out this node", "node", v.Target())
log.ZDebug(ctx, "sendUserOnlineInfoToOtherNode conn")
if ws.disCov.IsSelfNode(v) {
log.ZDebug(ctx, "Filter out this node")
continue
}

Expand All @@ -242,7 +246,7 @@ func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *C
PlatformID: int32(client.PlatformID), Token: client.token,
})
if err != nil {
log.ZWarn(ctx, "MultiTerminalLoginCheck err", err, "node", v.Target())
log.ZWarn(ctx, "MultiTerminalLoginCheck err", err)
}
return nil
})
Expand Down
9 changes: 4 additions & 5 deletions internal/msgtransfer/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,10 @@ func (m *MsgTransfer) Start(index int, config *Config, client discovery.SvcDisco
return listener, port, nil
}

if config.MsgTransfer.Prometheus.AutoSetPorts && config.Discovery.Enable != conf.ETCD {
return errs.New("only etcd support autoSetPorts", "RegisterName", "api").Wrap()
}

if config.MsgTransfer.Prometheus.Enable {
if config.Discovery.Enable != conf.Standalone && config.MsgTransfer.Prometheus.Enable {
if config.MsgTransfer.Prometheus.AutoSetPorts && config.Discovery.Enable != conf.ETCD {
return errs.New("only etcd support autoSetPorts", "RegisterName", "api").Wrap()
}
var (
listener net.Listener
prometheusPort int
Expand Down
9 changes: 4 additions & 5 deletions internal/push/onlinepusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ func (u emptyOnlinePusher) GetOnlinePushFailedUserIDs(ctx context.Context, msg *
return nil
}

func NewOnlinePusher(disCov discovery.SvcDiscoveryRegistry, config *Config) OnlinePusher {

func NewOnlinePusher(disCov discovery.Conn, config *Config) OnlinePusher {
if config.runTimeEnv == conf.KUBERNETES {
return NewDefaultAllNode(disCov, config)
}
Expand All @@ -56,11 +55,11 @@ func NewOnlinePusher(disCov discovery.SvcDiscoveryRegistry, config *Config) Onli
}

type DefaultAllNode struct {
disCov discovery.SvcDiscoveryRegistry
disCov discovery.Conn
config *Config
}

func NewDefaultAllNode(disCov discovery.SvcDiscoveryRegistry, config *Config) *DefaultAllNode {
func NewDefaultAllNode(disCov discovery.Conn, config *Config) *DefaultAllNode {
return &DefaultAllNode{disCov: disCov, config: config}
}

Expand Down Expand Up @@ -166,7 +165,7 @@ func (k *K8sStaticConsistentHash) GetConnsAndOnlinePush(ctx context.Context, msg
}
}
log.ZDebug(ctx, "genUsers send hosts struct:", "usersHost", usersHost)
var usersConns = make(map[*grpc.ClientConn][]string)
var usersConns = make(map[grpc.ClientConnInterface][]string)
for host, userIds := range usersHost {
tconn, _ := k.disCov.GetConn(ctx, host)
usersConns[tconn] = userIds
Expand Down
4 changes: 2 additions & 2 deletions internal/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
type pushServer struct {
pbpush.UnimplementedPushMsgServiceServer
database controller.PushDatabase
disCov discovery.SvcDiscoveryRegistry
disCov discovery.Conn
offlinePusher offlinepush.OfflinePusher
pushCh *ConsumerHandler
offlinePushCh *OfflinePushConsumerHandler
Expand Down Expand Up @@ -45,7 +45,7 @@ func (p pushServer) DelUserPushToken(ctx context.Context,
return &pbpush.DelUserPushTokenResp{}, nil
}

func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
func Start(ctx context.Context, config *Config, client discovery.Conn, server grpc.ServiceRegistrar) error {
config.runTimeEnv = runtimeenv.PrintRuntimeEnvironment()

rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
Expand Down
2 changes: 1 addition & 1 deletion internal/push/push_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type ConsumerHandler struct {
}

func NewConsumerHandler(ctx context.Context, config *Config, database controller.PushDatabase, offlinePusher offlinepush.OfflinePusher, rdb redis.UniversalClient,
client discovery.SvcDiscoveryRegistry) (*ConsumerHandler, error) {
client discovery.Conn) (*ConsumerHandler, error) {
var consumerHandler ConsumerHandler
var err error
consumerHandler.pushConsumerGroup, err = kafka.NewMConsumerGroup(config.KafkaConfig.Build(), config.KafkaConfig.ToPushGroupID,
Expand Down
Loading
Loading