From c7bc6dc43891aa0a45244fd0e87e55da9115259e Mon Sep 17 00:00:00 2001 From: charizer Date: Fri, 8 Oct 2021 10:31:14 +0800 Subject: [PATCH] update (#179) Co-authored-by: chenzr --- syncaggregate/sync/syncMng.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/syncaggregate/sync/syncMng.go b/syncaggregate/sync/syncMng.go index fbcc1e48..070e9217 100644 --- a/syncaggregate/sync/syncMng.go +++ b/syncaggregate/sync/syncMng.go @@ -59,6 +59,7 @@ type SyncMng struct { cache service.Cache complexCache *common.ComplexCache syncOffset sync.Map //traceId, UserRoomOffset + s2d sync.Map // last del sendtodevice offset //repos onlineRepo *repos.OnlineUserRepo userTimeLine *repos.UserTimeLineRepo @@ -607,12 +608,16 @@ func (sm *SyncMng) addSendToDevice(req *request, response *syncapitypes.Response } maxPos = feedUp - - err = sm.db.DeleteStdMessage(req.ctx, req.marks.stdRecv, req.device.UserID, req.device.ID) - if err != nil { - log.Errorf("addSendToDevice: delete history std message error traceid:%s user:%s dev:%s err:%v", req.traceId, req.device.UserID, req.device.ID, err) + lastOffset, ok := sm.s2d.Load(req.device.UserID + "|" + req.device.ID) + if ok && lastOffset.(int64) >= req.marks.stdRecv { + // has delete to lastest, do nothing + }else{ + err = sm.db.DeleteStdMessage(req.ctx, req.marks.stdRecv, req.device.UserID, req.device.ID) + if err != nil { + log.Errorf("addSendToDevice: delete history std message error traceid:%s user:%s dev:%s err:%v", req.traceId, req.device.UserID, req.device.ID, err) + } + sm.s2d.Store(req.device.UserID + "|" + req.device.ID, req.marks.stdRecv) } - for _, stream := range feeds { response.ToDevice.StdEvent = append(response.ToDevice.StdEvent, *stream.DataStream) } @@ -626,8 +631,8 @@ func (sm *SyncMng) addSendToDevice(req *request, response *syncapitypes.Response } req.marks.stdProcess = maxPos - //deviceBytes, _ := json.Marshal(response.ToDevice) - //log.Errorf("SyncMng addSendToDevice response user:%s, dev:%s, events:%s", req.device.UserID, req.device.ID, string(deviceBytes)) + deviceBytes, _ := json.Marshal(response.ToDevice) + log.Errorf("SyncMng addSendToDevice response user:%s, dev:%s, events:%s", req.device.UserID, req.device.ID, string(deviceBytes)) } func (sm *SyncMng) addAccountData(req *request, response *syncapitypes.Response) *syncapitypes.Response {