From 76c993057ea2c0a41f6f409c6dc4e51530b4c5eb Mon Sep 17 00:00:00 2001 From: Artem Date: Sun, 4 Feb 2024 17:01:41 +0100 Subject: [PATCH] Fix: resync on missed blocks --- cmd/mempool/tzkt/tzkt.go | 82 ++++++++++++++++++++-------------------- 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/cmd/mempool/tzkt/tzkt.go b/cmd/mempool/tzkt/tzkt.go index 5f90038..8966937 100644 --- a/cmd/mempool/tzkt/tzkt.go +++ b/cmd/mempool/tzkt/tzkt.go @@ -63,16 +63,33 @@ func (tzkt *TzKT) Connect(ctx context.Context) error { case <-ctx.Done(): return case msg := <-tzkt.client.Listen(): - switch msg.Channel { - case events.ChannelOperations: - if err := tzkt.handleOperationMessage(msg); err != nil { - log.Err(err).Msg("handleOperationMessage") + switch msg.Type { + case events.MessageTypeData: + switch msg.Channel { + case events.ChannelOperations: + if err := tzkt.handleOperationMessage(msg); err != nil { + log.Err(err).Msg("handleOperationMessage") + } + case events.ChannelBlocks: + if err := tzkt.handleBlockMessage(msg); err != nil { + log.Err(err).Msg("handleBlockMessage") + } } - case events.ChannelBlocks: - if err := tzkt.handleBlockMessage(msg); err != nil { - log.Err(err).Msg("handleBlockMessage") + case events.MessageTypeState: + if msg.Channel != events.ChannelBlocks { + continue } + + if tzkt.state < msg.State { + // if blocks was missed in some reason we should index missed blocks + log.Warn().Uint64("old_state", tzkt.state).Uint64("new_level", msg.State).Msg("detect missed blocks. resync...") + + tzkt.Sync(ctx, msg.State) + } + tzkt.state = msg.State + case events.MessageTypeReorg, events.MessageTypeSubscribed: } + } } }) @@ -104,49 +121,32 @@ func (tzkt *TzKT) Blocks() <-chan BlockMessage { } func (tzkt *TzKT) handleBlockMessage(msg events.Message) error { - switch msg.Type { - case events.MessageTypeData: - if msg.Body == nil { - return nil - } - blocks := msg.Body.([]data.Block) - for i := range blocks { - tzkt.blocks <- BlockMessage{ - Hash: blocks[i].Hash, - Level: blocks[i].Level, - Type: msg.Type, - Timestamp: blocks[i].Timestamp.UTC(), - } - } - case events.MessageTypeState, events.MessageTypeReorg, events.MessageTypeSubscribed: + if msg.Body == nil { + return nil + } + blocks := msg.Body.([]data.Block) + for i := range blocks { tzkt.blocks <- BlockMessage{ - Level: msg.State, - Type: msg.Type, + Hash: blocks[i].Hash, + Level: blocks[i].Level, + Type: msg.Type, + Timestamp: blocks[i].Timestamp.UTC(), } - default: - return errors.Wrapf(ErrUnknownMessageType, "%d", msg.Type) + tzkt.state = blocks[i].Level } return nil } func (tzkt *TzKT) handleOperationMessage(msg events.Message) error { - switch msg.Type { - case events.MessageTypeData: - if msg.Body == nil { - return nil - } - operations, ok := msg.Body.([]any) - if !ok { - return nil - } - return tzkt.handleUpdateMessage(operations) - case events.MessageTypeState, events.MessageTypeReorg: - default: - return errors.Wrapf(ErrUnknownMessageType, "%d", msg.Type) + if msg.Body == nil { + return nil } - - return nil + operations, ok := msg.Body.([]any) + if !ok { + return nil + } + return tzkt.handleUpdateMessage(operations) } func (tzkt *TzKT) handleUpdateMessage(operations []any) error {