From 1755cb3c9c5b473b53c5c1a9a716d0b8ca2be543 Mon Sep 17 00:00:00 2001 From: chao <48119764+withchao@users.noreply.github.com> Date: Fri, 24 Jan 2025 16:13:12 +0800 Subject: [PATCH] fix: the abnormal message has no sending time, causing the SDK to be abnormal (#3087) * pb * fix: Modifying other fields while setting IsPrivateChat does not take effect * fix: quote message error revoke * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * upgrading pkg tools * fix * fix * optimize log output * feat: support GetLastMessage * feat: support GetLastMessage * feat: s3 switch * feat: s3 switch * fix: GetUsersOnline * feat: SendBusinessNotification supported configuration parameters * feat: SendBusinessNotification supported configuration parameters * feat: SendBusinessNotification supported configuration parameters * feat: seq conversion failed without exiting * fix: DeleteDoc crash * fix: fill send time * fix: fill send time --- pkg/common/storage/database/mgo/msg.go | 132 ++++++++++++++++++++++++- pkg/common/storage/model/msg.go | 11 ++- 2 files changed, 139 insertions(+), 4 deletions(-) diff --git a/pkg/common/storage/database/mgo/msg.go b/pkg/common/storage/database/mgo/msg.go index c440d44420..83fefbfe61 100644 --- a/pkg/common/storage/database/mgo/msg.go +++ b/pkg/common/storage/database/mgo/msg.go @@ -1091,22 +1091,148 @@ func (m *MsgMgo) onlyFindDocIndex(ctx context.Context, docID string, indexes []i return msgDocModel[0].Msg, nil } +//func (m *MsgMgo) FindSeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) { +// if len(seqs) == 0 { +// return nil, nil +// } +// result := make([]*model.MsgInfoModel, 0, len(seqs)) +// for docID, seqs := range m.model.GetDocIDSeqsMap(conversationID, seqs) { +// res, err := m.onlyFindDocIndex(ctx, docID, datautil.Slice(seqs, m.model.GetMsgIndex)) +// if err != nil { +// return nil, err +// } +// for i, re := range res { +// if re == nil || re.Msg == nil { +// continue +// } +// result = append(result, res[i]) +// } +// } +// return result, nil +//} + +func (m *MsgMgo) findBeforeDocSendTime(ctx context.Context, docID string, limit int64) (int64, int64, error) { + if limit == 0 { + return 0, 0, nil + } + pipeline := []bson.M{ + { + "$match": bson.M{ + "doc_id": docID, + }, + }, + { + "$project": bson.M{ + "_id": 0, + "doc_id": 0, + }, + }, + { + "$unwind": "$msgs", + }, + { + "$project": bson.M{ + //"_id": 0, + //"doc_id": 0, + "msgs.msg.send_time": 1, + "msgs.msg.seq": 1, + }, + }, + } + if limit > 0 { + pipeline = append(pipeline, bson.M{"$limit": limit}) + } + type Result struct { + Msgs *model.MsgInfoModel `bson:"msgs"` + } + res, err := mongoutil.Aggregate[Result](ctx, m.coll, pipeline) + if err != nil { + return 0, 0, err + } + for i := len(res) - 1; i > 0; i-- { + v := res[i] + if v.Msgs != nil && v.Msgs.Msg != nil && v.Msgs.Msg.SendTime > 0 { + return v.Msgs.Msg.Seq, v.Msgs.Msg.SendTime, nil + } + } + return 0, 0, nil +} + +func (m *MsgMgo) findBeforeSendTime(ctx context.Context, conversationID string, seq int64) (int64, int64, error) { + first := true + for i := m.model.GetDocIndex(seq); i >= 0; i-- { + limit := int64(-1) + if first { + first = false + limit = m.model.GetMsgIndex(seq) + } + docID := m.model.BuildDocIDByIndex(conversationID, i) + msgSeq, msgSendTime, err := m.findBeforeDocSendTime(ctx, docID, limit) + if err != nil { + return 0, 0, err + } + if msgSendTime > 0 { + return msgSeq, msgSendTime, nil + } + } + return 0, 0, nil +} + func (m *MsgMgo) FindSeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) { if len(seqs) == 0 { return nil, nil } + var abnormalSeq []int64 result := make([]*model.MsgInfoModel, 0, len(seqs)) - for docID, seqs := range m.model.GetDocIDSeqsMap(conversationID, seqs) { - res, err := m.onlyFindDocIndex(ctx, docID, datautil.Slice(seqs, m.model.GetMsgIndex)) + for docID, docSeqs := range m.model.GetDocIDSeqsMap(conversationID, seqs) { + res, err := m.onlyFindDocIndex(ctx, docID, datautil.Slice(docSeqs, m.model.GetMsgIndex)) if err != nil { return nil, err } + if len(res) == 0 { + abnormalSeq = append(abnormalSeq, docSeqs...) + continue + } for i, re := range res { - if re == nil || re.Msg == nil { + if re == nil || re.Msg == nil || re.Msg.SendTime == 0 { + abnormalSeq = append(abnormalSeq, docSeqs[i]) continue } result = append(result, res[i]) } } + if len(abnormalSeq) > 0 { + datautil.Sort(abnormalSeq, false) + sendTime := make(map[int64]int64) + var ( + lastSeq int64 + lastSendTime int64 + ) + for _, seq := range abnormalSeq { + if lastSendTime > 0 && lastSeq <= seq { + sendTime[seq] = lastSendTime + continue + } + msgSeq, msgSendTime, err := m.findBeforeSendTime(ctx, conversationID, seq) + if err != nil { + return nil, err + } + if msgSendTime <= 0 { + break + } + sendTime[seq] = msgSendTime + lastSeq = msgSeq + lastSendTime = msgSendTime + } + for _, seq := range abnormalSeq { + result = append(result, &model.MsgInfoModel{ + Msg: &model.MsgDataModel{ + Seq: seq, + Status: constant.MsgStatusHasDeleted, + SendTime: sendTime[seq], + }, + }) + } + } return result, nil } diff --git a/pkg/common/storage/model/msg.go b/pkg/common/storage/model/msg.go index 69113032da..6cf63bfcda 100644 --- a/pkg/common/storage/model/msg.go +++ b/pkg/common/storage/model/msg.go @@ -15,9 +15,10 @@ package model import ( + "strconv" + "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" - "strconv" ) const ( @@ -108,6 +109,10 @@ func (m *MsgDocModel) IsFull() bool { return m.Msg[len(m.Msg)-1].Msg != nil } +func (m *MsgDocModel) GetDocIndex(seq int64) int64 { + return (seq - 1) / singleGocMsgNum +} + func (m *MsgDocModel) GetDocID(conversationID string, seq int64) string { seqSuffix := (seq - 1) / singleGocMsgNum return m.indexGen(conversationID, seqSuffix) @@ -135,6 +140,10 @@ func (*MsgDocModel) indexGen(conversationID string, seqSuffix int64) string { return conversationID + ":" + strconv.FormatInt(seqSuffix, 10) } +func (*MsgDocModel) BuildDocIDByIndex(conversationID string, index int64) string { + return conversationID + ":" + strconv.FormatInt(index, 10) +} + func (*MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) { for _, v := range seqs { msgModel := new(sdkws.MsgData)