Skip to content

Commit

Permalink
Merge pull request #3032 from openimsdk/cherry-pick-a1b5f05
Browse files Browse the repository at this point in the history
deps: Merge  #3026 #3029 PRs into pre-release-v3.8.3
  • Loading branch information
withchao authored Jan 4, 2025
2 parents a395c82 + 34c6fe5 commit 3b3ce0d
Show file tree
Hide file tree
Showing 18 changed files with 517 additions and 28 deletions.
7 changes: 7 additions & 0 deletions config/openim-rpc-third.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,10 @@ object:
accessKeySecret:
sessionToken:
publicRead: false
aws:
region: ap-southeast-2
bucket: testdemo832234
accessKeyID:
secretAccessKey:
sessionToken:
publicRead: false
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.69
github.com/openimsdk/tools v0.0.50-alpha.62
github.com/openimsdk/protocol v0.0.72-alpha.70
github.com/openimsdk/tools v0.0.50-alpha.63
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -317,12 +317,12 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM=
github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.69 h1:b22oY2XTdBR/BePqA73KsrM3GDF3Vk8YcBEXZU4ArJc=
github.com/openimsdk/protocol v0.0.72-alpha.69/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
github.com/openimsdk/tools v0.0.50-alpha.62 h1:e/m1XL7+EXbkOoxr/En/612WcOPKOUHPBj0++gG6MuQ=
github.com/openimsdk/tools v0.0.50-alpha.62/go.mod h1:JowL2jYr8tu4vcQe+5hJh4v3BtSx1T0CIS3pgU/Mw+U=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.70 h1:j7vB81+rTthijRda2b8tlli9oWvPxr4yXHwZ8nPZIBQ=
github.com/openimsdk/protocol v0.0.72-alpha.70/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
github.com/openimsdk/tools v0.0.50-alpha.63 h1:dPoVvg4KWqYX/xtK3j96TwX2A/4jwT5S5XIHvSM9hTY=
github.com/openimsdk/tools v0.0.50-alpha.63/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
Expand Down
2 changes: 2 additions & 0 deletions internal/msggateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ func (c *Client) handleMessage(message []byte) error {
resp, messageErr = c.longConnServer.GetSeqMessage(ctx, binaryReq)
case WSGetConvMaxReadSeq:
resp, messageErr = c.longConnServer.GetConversationsHasReadAndMaxSeq(ctx, binaryReq)
case WsPullConvLastMessage:
resp, messageErr = c.longConnServer.GetLastMessage(ctx, binaryReq)
case WsLogoutMsg:
resp, messageErr = c.longConnServer.UserLogout(ctx, binaryReq)
case WsSetBackgroundStatus:
Expand Down
1 change: 1 addition & 0 deletions internal/msggateway/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
WSSendSignalMsg = 1004
WSPullMsg = 1005
WSGetConvMaxReadSeq = 1006
WsPullConvLastMessage = 1007
WSPushMsg = 2001
WSKickOnlineMsg = 2002
WsLogoutMsg = 2003
Expand Down
9 changes: 0 additions & 9 deletions internal/msggateway/hub_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ func NewServer(longConnServer LongConnServer, conf *Config, ready func(srv *Serv
return s
}

func (s *Server) OnlinePushMsg(context context.Context, req *msggateway.OnlinePushMsgReq) (*msggateway.OnlinePushMsgResp, error) {
panic("implement me")
}

func (s *Server) GetUsersOnlineStatus(ctx context.Context, req *msggateway.GetUsersOnlineStatusReq) (*msggateway.GetUsersOnlineStatusResp, error) {
if !authverify.IsAppManagerUid(ctx, s.config.Share.IMAdminUserID) {
return nil, errs.ErrNoPermission.WrapMsg("only app manager")
Expand Down Expand Up @@ -126,11 +122,6 @@ func (s *Server) GetUsersOnlineStatus(ctx context.Context, req *msggateway.GetUs
return &resp, nil
}

func (s *Server) OnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq) (*msggateway.OnlineBatchPushOneMsgResp, error) {
// todo implement
return nil, nil
}

func (s *Server) pushToUser(ctx context.Context, userID string, msgData *sdkws.MsgData) *msggateway.SingleMsgToUserResults {
clients, ok := s.LongConnServer.GetUserAllCons(userID)
if !ok {
Expand Down
13 changes: 13 additions & 0 deletions internal/msggateway/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ type MessageHandler interface {
GetSeqMessage(ctx context.Context, data *Req) ([]byte, error)
UserLogout(ctx context.Context, data *Req) ([]byte, error)
SetUserDeviceBackground(ctx context.Context, data *Req) ([]byte, bool, error)
GetLastMessage(ctx context.Context, data *Req) ([]byte, error)
}

var _ MessageHandler = (*GrpcHandler)(nil)
Expand Down Expand Up @@ -266,3 +267,15 @@ func (g *GrpcHandler) SetUserDeviceBackground(ctx context.Context, data *Req) ([
}
return nil, req.IsBackground, nil
}

func (g *GrpcHandler) GetLastMessage(ctx context.Context, data *Req) ([]byte, error) {
var req msg.GetLastMessageReq
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, err
}
resp, err := g.msgClient.GetLastMessage(ctx, &req)
if err != nil {
return nil, err
}
return proto.Marshal(resp)
}
7 changes: 5 additions & 2 deletions internal/push/offlinepush/dummy/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@ import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
"github.com/openimsdk/tools/log"
"sync/atomic"
)

func NewClient() *Dummy {
return &Dummy{}
}

type Dummy struct {
v atomic.Bool
}

func (d *Dummy) Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error {
log.ZDebug(ctx, "dummy push")
log.ZWarn(ctx, "Dummy push", nil, "ps", "The offline push is not configured. To configure it, please go to config/openim-push.yml.")
if d.v.CompareAndSwap(false, true) {
log.ZWarn(ctx, "dummy push", nil, "ps", "the offline push is not configured. to configure it, please go to config/openim-push.yml")
}
return nil
}
8 changes: 8 additions & 0 deletions internal/rpc/msg/sync_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,3 +245,11 @@ func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq
func (m *msgServer) GetServerTime(ctx context.Context, _ *msg.GetServerTimeReq) (*msg.GetServerTimeResp, error) {
return &msg.GetServerTimeResp{ServerTime: timeutil.GetCurrentTimestampByMill()}, nil
}

func (m *msgServer) GetLastMessage(ctx context.Context, req *msg.GetLastMessageReq) (*msg.GetLastMessageResp, error) {
msgs, err := m.MsgDatabase.GetLastMessage(ctx, req.ConversationIDs, req.UserID)
if err != nil {
return nil, err
}
return &msg.GetLastMessageResp{Msgs: msgs}, nil
}
141 changes: 133 additions & 8 deletions pkg/common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package config

import (
"github.com/openimsdk/tools/s3/aws"
"strings"
"time"

Expand Down Expand Up @@ -290,14 +291,7 @@ type Third struct {
Cos Cos `mapstructure:"cos"`
Oss Oss `mapstructure:"oss"`
Kodo Kodo `mapstructure:"kodo"`
Aws struct {
Endpoint string `mapstructure:"endpoint"`
Region string `mapstructure:"region"`
Bucket string `mapstructure:"bucket"`
AccessKeyID string `mapstructure:"accessKeyID"`
AccessKeySecret string `mapstructure:"accessKeySecret"`
PublicRead bool `mapstructure:"publicRead"`
} `mapstructure:"aws"`
Aws Aws `mapstructure:"aws"`
} `mapstructure:"object"`
}
type Cos struct {
Expand Down Expand Up @@ -327,6 +321,15 @@ type Kodo struct {
PublicRead bool `mapstructure:"publicRead"`
}

type Aws struct {
Endpoint string `mapstructure:"endpoint"`
Region string `mapstructure:"region"`
Bucket string `mapstructure:"bucket"`
AccessKeyID string `mapstructure:"accessKeyID"`
SecretAccessKey string `mapstructure:"secretAccessKey"`
SessionToken string `mapstructure:"sessionToken"`
}

type User struct {
RPC struct {
RegisterIP string `mapstructure:"registerIP"`
Expand Down Expand Up @@ -567,6 +570,16 @@ func (o *Kodo) Build() *kodo.Config {
}
}

func (o *Aws) Build() *aws.Config {
return &aws.Config{
Region: o.Region,
Bucket: o.Bucket,
AccessKeyID: o.AccessKeyID,
SecretAccessKey: o.SecretAccessKey,
SessionToken: o.SessionToken,
}
}

func (l *CacheConfig) Failed() time.Duration {
return time.Second * time.Duration(l.FailedExpire)
}
Expand All @@ -578,3 +591,115 @@ func (l *CacheConfig) Success() time.Duration {
func (l *CacheConfig) Enable() bool {
return l.Topic != "" && l.SlotNum > 0 && l.SlotSize > 0
}

var (
DiscoveryConfigFilename = "discovery.yml"
KafkaConfigFileName = "kafka.yml"
LocalCacheConfigFileName = "local-cache.yml"
LogConfigFileName = "log.yml"
MinioConfigFileName = "minio.yml"
MongodbConfigFileName = "mongodb.yml"
OpenIMAPICfgFileName = "openim-api.yml"
OpenIMCronTaskCfgFileName = "openim-crontask.yml"
OpenIMMsgGatewayCfgFileName = "openim-msggateway.yml"
OpenIMMsgTransferCfgFileName = "openim-msgtransfer.yml"
OpenIMPushCfgFileName = "openim-push.yml"
OpenIMRPCAuthCfgFileName = "openim-rpc-auth.yml"
OpenIMRPCConversationCfgFileName = "openim-rpc-conversation.yml"
OpenIMRPCFriendCfgFileName = "openim-rpc-friend.yml"
OpenIMRPCGroupCfgFileName = "openim-rpc-group.yml"
OpenIMRPCMsgCfgFileName = "openim-rpc-msg.yml"
OpenIMRPCThirdCfgFileName = "openim-rpc-third.yml"
OpenIMRPCUserCfgFileName = "openim-rpc-user.yml"
RedisConfigFileName = "redis.yml"
ShareFileName = "share.yml"
WebhooksConfigFileName = "webhooks.yml"
)

func (d *Discovery) GetConfigFileName() string {
return DiscoveryConfigFilename
}

func (k *Kafka) GetConfigFileName() string {
return KafkaConfigFileName
}

func (lc *LocalCache) GetConfigFileName() string {
return LocalCacheConfigFileName
}

func (l *Log) GetConfigFileName() string {
return LogConfigFileName
}

func (m *Minio) GetConfigFileName() string {
return MinioConfigFileName
}

func (m *Mongo) GetConfigFileName() string {
return MongodbConfigFileName
}

func (n *Notification) GetConfigFileName() string {
return NotificationFileName
}

func (a *API) GetConfigFileName() string {
return OpenIMAPICfgFileName
}

func (ct *CronTask) GetConfigFileName() string {
return OpenIMCronTaskCfgFileName
}

func (mg *MsgGateway) GetConfigFileName() string {
return OpenIMMsgGatewayCfgFileName
}

func (mt *MsgTransfer) GetConfigFileName() string {
return OpenIMMsgTransferCfgFileName
}

func (p *Push) GetConfigFileName() string {
return OpenIMPushCfgFileName
}

func (a *Auth) GetConfigFileName() string {
return OpenIMRPCAuthCfgFileName
}

func (c *Conversation) GetConfigFileName() string {
return OpenIMRPCConversationCfgFileName
}

func (f *Friend) GetConfigFileName() string {
return OpenIMRPCFriendCfgFileName
}

func (g *Group) GetConfigFileName() string {
return OpenIMRPCGroupCfgFileName
}

func (m *Msg) GetConfigFileName() string {
return OpenIMRPCMsgCfgFileName
}

func (t *Third) GetConfigFileName() string {
return OpenIMRPCThirdCfgFileName
}

func (u *User) GetConfigFileName() string {
return OpenIMRPCUserCfgFileName
}

func (r *Redis) GetConfigFileName() string {
return RedisConfigFileName
}

func (s *Share) GetConfigFileName() string {
return ShareFileName
}

func (w *Webhooks) GetConfigFileName() string {
return WebhooksConfigFileName
}
25 changes: 24 additions & 1 deletion pkg/common/storage/controller/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ type CommonMsgDatabase interface {
DeleteDoc(ctx context.Context, docID string) error

GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error)

GetLastMessage(ctx context.Context, conversationIDS []string, userID string) (map[string]*sdkws.MsgData, error)
}

func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
Expand Down Expand Up @@ -811,8 +813,29 @@ func (db *commonMsgDatabase) GetMessageBySeqs(ctx context.Context, conversationI
if v, ok := seqMsgs[seq]; ok {
res = append(res, convert.MsgDB2Pb(v.Msg))
} else {
res = append(res, &sdkws.MsgData{Seq: seq})
res = append(res, &sdkws.MsgData{Seq: seq, Status: constant.MsgStatusHasDeleted})
}
}
return res, nil
}

func (db *commonMsgDatabase) GetLastMessage(ctx context.Context, conversationIDs []string, userID string) (map[string]*sdkws.MsgData, error) {
res := make(map[string]*sdkws.MsgData)
for _, conversationID := range conversationIDs {
if _, ok := res[conversationID]; ok {
continue
}
msg, err := db.msgDocDatabase.GetLastMessage(ctx, conversationID)
if err != nil {
if errs.Unwrap(err) == mongo.ErrNoDocuments {
continue
}
return nil, err
}
tmp := []*model.MsgInfoModel{msg}
db.handlerDeleteAndRevoked(ctx, userID, tmp)
db.handlerQuote(ctx, userID, conversationID, tmp)
res[conversationID] = convert.MsgDB2Pb(msg.Msg)
}
return res, nil
}
Loading

0 comments on commit 3b3ce0d

Please sign in to comment.