Skip to content

Commit

Permalink
Merge pull request #3126 from openimsdk/cherry-pick-274a9be
Browse files Browse the repository at this point in the history
fix: the abnormal message has no sending time, causing the SDK to be abnormal [Created by @withchao from #3087]
  • Loading branch information
withchao authored Feb 8, 2025
2 parents 9b672f7 + 1755cb3 commit 8edb3ff
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 4 deletions.
132 changes: 129 additions & 3 deletions pkg/common/storage/database/mgo/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,22 +1091,148 @@ func (m *MsgMgo) onlyFindDocIndex(ctx context.Context, docID string, indexes []i
return msgDocModel[0].Msg, nil
}

//func (m *MsgMgo) FindSeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) {
// if len(seqs) == 0 {
// return nil, nil
// }
// result := make([]*model.MsgInfoModel, 0, len(seqs))
// for docID, seqs := range m.model.GetDocIDSeqsMap(conversationID, seqs) {
// res, err := m.onlyFindDocIndex(ctx, docID, datautil.Slice(seqs, m.model.GetMsgIndex))
// if err != nil {
// return nil, err
// }
// for i, re := range res {
// if re == nil || re.Msg == nil {
// continue
// }
// result = append(result, res[i])
// }
// }
// return result, nil
//}

func (m *MsgMgo) findBeforeDocSendTime(ctx context.Context, docID string, limit int64) (int64, int64, error) {
if limit == 0 {
return 0, 0, nil
}
pipeline := []bson.M{
{
"$match": bson.M{
"doc_id": docID,
},
},
{
"$project": bson.M{
"_id": 0,
"doc_id": 0,
},
},
{
"$unwind": "$msgs",
},
{
"$project": bson.M{
//"_id": 0,
//"doc_id": 0,
"msgs.msg.send_time": 1,
"msgs.msg.seq": 1,
},
},
}
if limit > 0 {
pipeline = append(pipeline, bson.M{"$limit": limit})
}
type Result struct {
Msgs *model.MsgInfoModel `bson:"msgs"`
}
res, err := mongoutil.Aggregate[Result](ctx, m.coll, pipeline)
if err != nil {
return 0, 0, err
}
for i := len(res) - 1; i > 0; i-- {
v := res[i]
if v.Msgs != nil && v.Msgs.Msg != nil && v.Msgs.Msg.SendTime > 0 {
return v.Msgs.Msg.Seq, v.Msgs.Msg.SendTime, nil
}
}
return 0, 0, nil
}

func (m *MsgMgo) findBeforeSendTime(ctx context.Context, conversationID string, seq int64) (int64, int64, error) {
first := true
for i := m.model.GetDocIndex(seq); i >= 0; i-- {
limit := int64(-1)
if first {
first = false
limit = m.model.GetMsgIndex(seq)
}
docID := m.model.BuildDocIDByIndex(conversationID, i)
msgSeq, msgSendTime, err := m.findBeforeDocSendTime(ctx, docID, limit)
if err != nil {
return 0, 0, err
}
if msgSendTime > 0 {
return msgSeq, msgSendTime, nil
}
}
return 0, 0, nil
}

func (m *MsgMgo) FindSeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) {
if len(seqs) == 0 {
return nil, nil
}
var abnormalSeq []int64
result := make([]*model.MsgInfoModel, 0, len(seqs))
for docID, seqs := range m.model.GetDocIDSeqsMap(conversationID, seqs) {
res, err := m.onlyFindDocIndex(ctx, docID, datautil.Slice(seqs, m.model.GetMsgIndex))
for docID, docSeqs := range m.model.GetDocIDSeqsMap(conversationID, seqs) {
res, err := m.onlyFindDocIndex(ctx, docID, datautil.Slice(docSeqs, m.model.GetMsgIndex))
if err != nil {
return nil, err
}
if len(res) == 0 {
abnormalSeq = append(abnormalSeq, docSeqs...)
continue
}
for i, re := range res {
if re == nil || re.Msg == nil {
if re == nil || re.Msg == nil || re.Msg.SendTime == 0 {
abnormalSeq = append(abnormalSeq, docSeqs[i])
continue
}
result = append(result, res[i])
}
}
if len(abnormalSeq) > 0 {
datautil.Sort(abnormalSeq, false)
sendTime := make(map[int64]int64)
var (
lastSeq int64
lastSendTime int64
)
for _, seq := range abnormalSeq {
if lastSendTime > 0 && lastSeq <= seq {
sendTime[seq] = lastSendTime
continue
}
msgSeq, msgSendTime, err := m.findBeforeSendTime(ctx, conversationID, seq)
if err != nil {
return nil, err
}
if msgSendTime <= 0 {
break
}
sendTime[seq] = msgSendTime
lastSeq = msgSeq
lastSendTime = msgSendTime
}
for _, seq := range abnormalSeq {
result = append(result, &model.MsgInfoModel{
Msg: &model.MsgDataModel{
Seq: seq,
Status: constant.MsgStatusHasDeleted,
SendTime: sendTime[seq],
},
})
}
}
return result, nil
}
11 changes: 10 additions & 1 deletion pkg/common/storage/model/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
package model

import (
"strconv"

"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
"strconv"
)

const (
Expand Down Expand Up @@ -108,6 +109,10 @@ func (m *MsgDocModel) IsFull() bool {
return m.Msg[len(m.Msg)-1].Msg != nil
}

func (m *MsgDocModel) GetDocIndex(seq int64) int64 {
return (seq - 1) / singleGocMsgNum
}

func (m *MsgDocModel) GetDocID(conversationID string, seq int64) string {
seqSuffix := (seq - 1) / singleGocMsgNum
return m.indexGen(conversationID, seqSuffix)
Expand Down Expand Up @@ -135,6 +140,10 @@ func (*MsgDocModel) indexGen(conversationID string, seqSuffix int64) string {
return conversationID + ":" + strconv.FormatInt(seqSuffix, 10)
}

func (*MsgDocModel) BuildDocIDByIndex(conversationID string, index int64) string {
return conversationID + ":" + strconv.FormatInt(index, 10)
}

func (*MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) {
for _, v := range seqs {
msgModel := new(sdkws.MsgData)
Expand Down

0 comments on commit 8edb3ff

Please sign in to comment.