From 8f3687a31fa7fbf027efd15846baf17db9356fee Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Fri, 10 Nov 2023 20:37:19 -0300 Subject: [PATCH] Refactor TzKT indexes (#875) --- .../indexes/evm_subsquid_events/index.py | 7 +- src/dipdup/indexes/tezos_tzkt.py | 85 +++++++++++++++++++ .../indexes/tezos_tzkt_big_maps/index.py | 67 +++------------ src/dipdup/indexes/tezos_tzkt_events/index.py | 70 +++------------ src/dipdup/indexes/tezos_tzkt_head/index.py | 18 ++-- .../indexes/tezos_tzkt_operations/index.py | 20 +++-- .../tezos_tzkt_token_balances/index.py | 60 ++----------- .../tezos_tzkt_token_transfers/index.py | 60 ++----------- 8 files changed, 150 insertions(+), 237 deletions(-) create mode 100644 src/dipdup/indexes/tezos_tzkt.py diff --git a/src/dipdup/indexes/evm_subsquid_events/index.py b/src/dipdup/indexes/evm_subsquid_events/index.py index 47b59990b..79fa4bf6a 100644 --- a/src/dipdup/indexes/evm_subsquid_events/index.py +++ b/src/dipdup/indexes/evm_subsquid_events/index.py @@ -99,7 +99,7 @@ async def _process_queue(self) -> None: break for message_level, level_logs in logs_by_level.items(): - await self._process_level_events(tuple(level_logs), self.topics, message_level) + await self._process_level_events(tuple(level_logs), message_level) def get_sync_level(self) -> int: """Get level index needs to be synchronized to depending on its subscription status""" @@ -166,14 +166,14 @@ async def _synchronize(self, sync_level: int) -> None: raise FrameworkException(f'Block {level} not found') timestamp = int(block['timestamp'], 16) parsed_level_logs = tuple(EvmNodeLogData.from_json(log, timestamp) for log in level_logs) - await self._process_level_events(parsed_level_logs, self.topics, sync_level) + await self._process_level_events(parsed_level_logs, sync_level) else: sync_level = min(sync_level, subsquid_sync_level) fetcher = self._create_fetcher(first_level, sync_level) async for _level, events in fetcher.fetch_by_level(): - await self._process_level_events(events, self.topics, sync_level) + await self._process_level_events(events, sync_level) await self._exit_sync_state(sync_level) @@ -201,7 +201,6 @@ def _create_fetcher(self, first_level: int, last_level: int) -> EventLogFetcher: async def _process_level_events( self, events: tuple[SubsquidEventData | EvmNodeLogData, ...], - topics: dict[str, dict[str, str]], sync_level: int, ) -> None: if not events: diff --git a/src/dipdup/indexes/tezos_tzkt.py b/src/dipdup/indexes/tezos_tzkt.py new file mode 100644 index 000000000..305ec5e36 --- /dev/null +++ b/src/dipdup/indexes/tezos_tzkt.py @@ -0,0 +1,85 @@ +from abc import abstractmethod +from collections import deque +from contextlib import ExitStack +from typing import Any +from typing import Generic + +from dipdup.datasources.tezos_tzkt import TzktDatasource +from dipdup.exceptions import FrameworkException +from dipdup.index import Index +from dipdup.index import IndexConfigT +from dipdup.index import IndexQueueItemT +from dipdup.models.tezos_tzkt import TzktMessageType +from dipdup.models.tezos_tzkt import TzktRollbackMessage +from dipdup.prometheus import Metrics + + +class TzktIndex( + Generic[IndexConfigT, IndexQueueItemT], + Index[Any, Any, TzktDatasource], + message_type=TzktMessageType, +): + async def _process_queue(self) -> None: + """Process WebSocket queue""" + if self._queue: + self._logger.debug('Processing websocket queue') + while self._queue: + message = self._queue.popleft() + if isinstance(message, TzktRollbackMessage): + await self._tzkt_rollback(message.from_level, message.to_level) + continue + + message_level = message[0].level + if message_level <= self.state.level: + self._logger.debug('Skipping outdated message: %s <= %s', message_level, self.state.level) + continue + + with ExitStack() as stack: + if Metrics.enabled: + stack.enter_context(Metrics.measure_level_realtime_duration()) + await self._process_level_data(message, message_level) + + async def _process_level_data( + self, + level_data: IndexQueueItemT, + sync_level: int, + ) -> None: + if not level_data: + return + + batch_level = level_data[0].level + index_level = self.state.level + if batch_level <= index_level: + raise FrameworkException(f'Batch level is lower than index level: {batch_level} <= {index_level}') + + self._logger.debug('Processing data of level %s', batch_level) + matched_handlers = self._match_level_data(self._config.handlers, level_data) + + if Metrics.enabled: + Metrics.set_index_handlers_matched(len(matched_handlers)) + + # NOTE: We still need to bump index level but don't care if it will be done in existing transaction + if not matched_handlers: + await self._update_state(level=batch_level) + return + + async with self._ctx.transactions.in_transaction(batch_level, sync_level, self.name): + for handler_config, data in matched_handlers: + await self._call_matched_handler(handler_config, data) + await self._update_state(level=batch_level) + + @abstractmethod + def _match_level_data( + self, + handlers: Any, + level_data: Any, + ) -> deque[Any]: + ... + + @abstractmethod + async def _call_matched_handler( + self, + handler_config: Any, + level_data: Any, + ) -> None: + ... diff --git a/src/dipdup/indexes/tezos_tzkt_big_maps/index.py b/src/dipdup/indexes/tezos_tzkt_big_maps/index.py index e88f133ef..d5b64ab6f 100644 --- a/src/dipdup/indexes/tezos_tzkt_big_maps/index.py +++ b/src/dipdup/indexes/tezos_tzkt_big_maps/index.py @@ -1,14 +1,13 @@ +from collections import deque from contextlib import ExitStack from datetime import datetime from typing import Any from dipdup.config.tezos_tzkt_big_maps import TzktBigMapsHandlerConfig from dipdup.config.tezos_tzkt_big_maps import TzktBigMapsIndexConfig -from dipdup.datasources.tezos_tzkt import TzktDatasource from dipdup.exceptions import ConfigInitializationException from dipdup.exceptions import ConfigurationError -from dipdup.exceptions import FrameworkException -from dipdup.index import Index +from dipdup.indexes.tezos_tzkt import TzktIndex from dipdup.indexes.tezos_tzkt_big_maps.fetcher import BigMapFetcher from dipdup.indexes.tezos_tzkt_big_maps.fetcher import get_big_map_pairs from dipdup.indexes.tezos_tzkt_big_maps.matcher import match_big_maps @@ -24,33 +23,13 @@ class TzktBigMapsIndex( - Index[TzktBigMapsIndexConfig, BigMapQueueItem, TzktDatasource], + TzktIndex[TzktBigMapsIndexConfig, BigMapQueueItem], message_type=TzktMessageType.big_map, ): def push_big_maps(self, big_maps: BigMapQueueItem) -> None: """Push big map diffs to queue""" self.push_realtime_message(big_maps) - async def _process_queue(self) -> None: - """Process WebSocket queue""" - if self._queue: - self._logger.debug('Processing websocket queue') - while self._queue: - message = self._queue.popleft() - if isinstance(message, TzktRollbackMessage): - await self._tzkt_rollback(message.from_level, message.to_level) - continue - - message_level = message[0].level - if message_level <= self.state.level: - self._logger.debug('Skipping outdated message: %s <= %s', message_level, self.state.level) - continue - - with ExitStack() as stack: - if Metrics.enabled: - stack.enter_context(Metrics.measure_level_realtime_duration()) - await self._process_level_big_maps(message, message_level) - async def _synchronize(self, sync_level: int) -> None: """Fetch operations via Fetcher and pass to message callback""" index_level = await self._enter_sync_state(sync_level) @@ -82,7 +61,7 @@ async def _synchronize_full(self, index_level: int, sync_level: int) -> None: if Metrics.enabled: Metrics.set_levels_to_sync(self._config.name, sync_level - level) stack.enter_context(Metrics.measure_level_sync_duration()) - await self._process_level_big_maps(big_maps, sync_level) + await self._process_level_data(big_maps, sync_level) async def _synchronize_level(self, head_level: int) -> None: # NOTE: Checking late because feature flags could be modified after loading config @@ -98,7 +77,7 @@ async def _synchronize_level(self, head_level: int) -> None: if contract_big_map['path'] == path: big_map_ids.add((int(contract_big_map['ptr']), address, path)) - # NOTE: Do not use `_process_level_big_maps` here; we want to maintain transaction manually. + # NOTE: Do not use `_process_level_data` here; we want to maintain transaction manually. async with self._ctx.transactions.in_transaction(head_level, head_level, self.name): for big_map_id, address, path in big_map_ids: async for big_map_keys in self._datasource.iter_big_map(big_map_id, head_level): @@ -124,37 +103,8 @@ async def _synchronize_level(self, head_level: int) -> None: await self._update_state(level=head_level) - async def _process_level_big_maps( - self, - big_maps: tuple[TzktBigMapData, ...], - sync_level: int, - ) -> None: - if not big_maps: - return - - batch_level = big_maps[0].level - index_level = self.state.level - if batch_level <= index_level: - raise FrameworkException(f'Batch level is lower than index level: {batch_level} <= {index_level}') - - self._logger.debug('Processing big map diffs of level %s', batch_level) - matched_handlers = match_big_maps(self._ctx.package, self._config.handlers, big_maps) - - if Metrics.enabled: - Metrics.set_index_handlers_matched(len(matched_handlers)) - - # NOTE: We still need to bump index level but don't care if it will be done in existing transaction - if not matched_handlers: - await self._update_state(level=batch_level) - return - - async with self._ctx.transactions.in_transaction(batch_level, sync_level, self.name): - for handler_config, big_map_diff in matched_handlers: - await self._call_matched_handler(handler_config, big_map_diff) - await self._update_state(level=batch_level) - async def _call_matched_handler( - self, handler_config: TzktBigMapsHandlerConfig, big_map_diff: TzktBigMapDiff[Any, Any] + self, handler_config: TzktBigMapsHandlerConfig, level_data: TzktBigMapDiff[Any, Any] ) -> None: if not handler_config.parent: raise ConfigInitializationException @@ -165,5 +115,8 @@ async def _call_matched_handler( self.datasource, # NOTE: missing `operation_id` field in API to identify operation None, - big_map_diff, + level_data, ) + + def _match_level_data(self, handlers: Any, level_data: Any) -> deque[Any]: + return match_big_maps(self._ctx.package, handlers, level_data) diff --git a/src/dipdup/indexes/tezos_tzkt_events/index.py b/src/dipdup/indexes/tezos_tzkt_events/index.py index 13ba25351..fb9089ea8 100644 --- a/src/dipdup/indexes/tezos_tzkt_events/index.py +++ b/src/dipdup/indexes/tezos_tzkt_events/index.py @@ -1,13 +1,13 @@ +from collections import deque from contextlib import ExitStack from typing import Any from dipdup.config.tezos_tzkt_events import TzktEventsHandlerConfig from dipdup.config.tezos_tzkt_events import TzktEventsHandlerConfigU from dipdup.config.tezos_tzkt_events import TzktEventsIndexConfig -from dipdup.datasources.tezos_tzkt import TzktDatasource from dipdup.exceptions import ConfigInitializationException from dipdup.exceptions import FrameworkException -from dipdup.index import Index +from dipdup.indexes.tezos_tzkt import TzktIndex from dipdup.indexes.tezos_tzkt_events.fetcher import EventFetcher from dipdup.indexes.tezos_tzkt_events.matcher import match_events from dipdup.models.tezos_tzkt import TzktEvent @@ -21,32 +21,12 @@ class TzktEventsIndex( - Index[TzktEventsIndexConfig, EventQueueItem, TzktDatasource], + TzktIndex[TzktEventsIndexConfig, EventQueueItem], message_type=TzktMessageType.event, ): def push_events(self, events: EventQueueItem) -> None: self.push_realtime_message(events) - async def _process_queue(self) -> None: - """Process WebSocket queue""" - if self._queue: - self._logger.debug('Processing websocket queue') - while self._queue: - message = self._queue.popleft() - if isinstance(message, TzktRollbackMessage): - await self._tzkt_rollback(message.from_level, message.to_level) - continue - - message_level = message[0].level - if message_level <= self.state.level: - self._logger.debug('Skipping outdated message: %s <= %s', message_level, self.state.level) - continue - - with ExitStack() as stack: - if Metrics.enabled: - stack.enter_context(Metrics.measure_level_realtime_duration()) - await self._process_level_events(message, message_level) - def _create_fetcher(self, first_level: int, last_level: int) -> EventFetcher: event_addresses = self._get_event_addresses() event_tags = self._get_event_tags() @@ -73,44 +53,15 @@ async def _synchronize(self, sync_level: int) -> None: if Metrics.enabled: Metrics.set_levels_to_sync(self._config.name, sync_level - level) stack.enter_context(Metrics.measure_level_sync_duration()) - await self._process_level_events(events, sync_level) + await self._process_level_data(events, sync_level) await self._exit_sync_state(sync_level) - async def _process_level_events( - self, - events: tuple[TzktEventData, ...], - sync_level: int, - ) -> None: - if not events: - return - - batch_level = events[0].level - index_level = self.state.level - if batch_level <= index_level: - raise FrameworkException(f'Batch level is lower than index level: {batch_level} <= {index_level}') - - self._logger.debug('Processing contract events of level %s', batch_level) - matched_handlers = match_events(self._ctx.package, self._config.handlers, events) - - if Metrics.enabled: - Metrics.set_index_handlers_matched(len(matched_handlers)) - - # NOTE: We still need to bump index level but don't care if it will be done in existing transaction - if not matched_handlers: - await self._update_state(level=batch_level) - return - - async with self._ctx.transactions.in_transaction(batch_level, sync_level, self.name): - for handler_config, event in matched_handlers: - await self._call_matched_handler(handler_config, event) - await self._update_state(level=batch_level) - async def _call_matched_handler( - self, handler_config: TzktEventsHandlerConfigU, event: TzktEvent[Any] | TzktUnknownEvent + self, handler_config: TzktEventsHandlerConfigU, level_data: TzktEvent[Any] | TzktUnknownEvent ) -> None: - if isinstance(handler_config, TzktEventsHandlerConfig) != isinstance(event, TzktEvent): - raise FrameworkException(f'Invalid handler config and event types: {handler_config}, {event}') + if isinstance(handler_config, TzktEventsHandlerConfig) != isinstance(level_data, TzktEvent): + raise FrameworkException(f'Invalid handler config and event types: {handler_config}, {level_data}') if not handler_config.parent: raise ConfigInitializationException @@ -119,8 +70,8 @@ async def _call_matched_handler( handler_config.callback, handler_config.parent.name, self.datasource, - str(event.data.transaction_id), - event, + str(level_data.data.transaction_id), + level_data, ) def _get_event_addresses(self) -> set[str]: @@ -137,3 +88,6 @@ def _get_event_tags(self) -> set[str]: if isinstance(handler_config, TzktEventsHandlerConfig): paths.add(handler_config.tag) return paths + + def _match_level_data(self, handlers: Any, level_data: Any) -> deque[Any]: + return match_events(self._ctx.package, handlers, level_data) diff --git a/src/dipdup/indexes/tezos_tzkt_head/index.py b/src/dipdup/indexes/tezos_tzkt_head/index.py index d54bea86c..2cec529b3 100644 --- a/src/dipdup/indexes/tezos_tzkt_head/index.py +++ b/src/dipdup/indexes/tezos_tzkt_head/index.py @@ -1,9 +1,11 @@ +from collections import deque +from typing import Any + from dipdup.config.tezos_tzkt_head import HeadHandlerConfig from dipdup.config.tezos_tzkt_head import TzktHeadIndexConfig -from dipdup.datasources.tezos_tzkt import TzktDatasource from dipdup.exceptions import ConfigInitializationException from dipdup.exceptions import FrameworkException -from dipdup.index import Index +from dipdup.indexes.tezos_tzkt import TzktIndex from dipdup.models import IndexStatus from dipdup.models.tezos_tzkt import TzktHeadBlockData from dipdup.models.tezos_tzkt import TzktMessageType @@ -13,7 +15,7 @@ class TzktHeadIndex( - Index[TzktHeadIndexConfig, HeadQueueItem, TzktDatasource], + TzktIndex[TzktHeadIndexConfig, HeadQueueItem], message_type=TzktMessageType.head, ): def push_head(self, events: HeadQueueItem) -> None: @@ -23,6 +25,7 @@ async def _synchronize(self, sync_level: int) -> None: self._logger.info('Setting index level to %s and moving on', sync_level) await self._update_state(status=IndexStatus.realtime, level=sync_level) + # FIXME: Use method from TzktIndex async def _process_queue(self) -> None: while self._queue: message = self._queue.popleft() @@ -47,7 +50,7 @@ async def _process_queue(self) -> None: await self._call_matched_handler(self._config.handler_config, message) await self._update_state(level=batch_level) - async def _call_matched_handler(self, handler_config: HeadHandlerConfig, head: TzktHeadBlockData) -> None: + async def _call_matched_handler(self, handler_config: HeadHandlerConfig, level_data: TzktHeadBlockData) -> None: if not handler_config.parent: raise ConfigInitializationException @@ -55,6 +58,9 @@ async def _call_matched_handler(self, handler_config: HeadHandlerConfig, head: T handler_config.callback, handler_config.parent.name, self.datasource, - head.hash, - head, + level_data.hash, + level_data, ) + + def _match_level_data(self, handlers: Any, level_data: Any) -> deque[Any]: + raise NotImplementedError diff --git a/src/dipdup/indexes/tezos_tzkt_operations/index.py b/src/dipdup/indexes/tezos_tzkt_operations/index.py index 0bd74e2a1..8967fabed 100644 --- a/src/dipdup/indexes/tezos_tzkt_operations/index.py +++ b/src/dipdup/indexes/tezos_tzkt_operations/index.py @@ -1,10 +1,12 @@ import logging from collections import defaultdict from collections import deque +from collections.abc import Coroutine from collections.abc import Iterable from collections.abc import Iterator from collections.abc import Sequence from contextlib import ExitStack +from typing import Any from dipdup.config.tezos_tzkt_operations import OperationsHandlerOriginationPatternConfig as OriginationPatternConfig from dipdup.config.tezos_tzkt_operations import OperationsHandlerTransactionPatternConfig as TransactionPatternConfig @@ -17,7 +19,7 @@ from dipdup.datasources.tezos_tzkt import TzktDatasource from dipdup.exceptions import ConfigInitializationException from dipdup.exceptions import FrameworkException -from dipdup.index import Index +from dipdup.indexes.tezos_tzkt import TzktIndex from dipdup.indexes.tezos_tzkt_operations.fetcher import OperationFetcher from dipdup.indexes.tezos_tzkt_operations.fetcher import OperationUnfilteredFetcher from dipdup.indexes.tezos_tzkt_operations.matcher import MatchedOperationsT @@ -143,7 +145,7 @@ def extract_operation_subgroups( class TzktOperationsIndex( - Index[TzktOperationsIndexConfigU, OperationQueueItem, TzktDatasource], + TzktIndex[TzktOperationsIndexConfigU, OperationQueueItem], message_type=TzktMessageType.operation, ): def __init__( @@ -175,6 +177,7 @@ async def get_filters(self) -> tuple[set[str | None], set[str], set[int]]: return self._entrypoint_filter, self._address_filter, self._code_hash_filter + # FIXME: Use method from TzktIndex async def _process_queue(self) -> None: """Process WebSocket queue""" self._logger.debug('Processing %s realtime messages from queue', len(self._queue)) @@ -253,6 +256,7 @@ async def _synchronize(self, sync_level: int) -> None: await self._exit_sync_state(sync_level) + # FIXME: Use method from TzktIndex async def _process_level_operations( self, operation_subgroups: tuple[OperationSubgroup, ...], @@ -300,15 +304,15 @@ async def _process_level_operations( async with self._ctx.transactions.in_transaction(batch_level, sync_level, self.name): for operation_subgroup, handler_config, args in matched_handlers: - await self._call_matched_handler(handler_config, operation_subgroup, args) + await self._call_matched_handler(handler_config, (operation_subgroup, args)) await self._update_state(level=batch_level) async def _call_matched_handler( self, handler_config: TzktOperationsHandlerConfigU, - operation_subgroup: OperationSubgroup, - args: Sequence[OperationsHandlerArgumentU], + level_data: tuple[OperationSubgroup, Sequence[OperationsHandlerArgumentU]], ) -> None: + operation_subgroup, args = level_data if not handler_config.parent: raise ConfigInitializationException @@ -319,3 +323,9 @@ async def _call_matched_handler( operation_subgroup.hash + ': {}', *args, ) + + def _match_level_data(self, handlers: Any, level_data: Any) -> deque[Any]: + raise NotImplementedError + + def _process_level_data(self, level_data: OperationQueueItem, sync_level: int) -> Coroutine[Any, Any, None]: + raise NotImplementedError diff --git a/src/dipdup/indexes/tezos_tzkt_token_balances/index.py b/src/dipdup/indexes/tezos_tzkt_token_balances/index.py index 710adf028..75cd27403 100644 --- a/src/dipdup/indexes/tezos_tzkt_token_balances/index.py +++ b/src/dipdup/indexes/tezos_tzkt_token_balances/index.py @@ -1,22 +1,20 @@ -from contextlib import ExitStack +from collections import deque +from typing import Any from dipdup.config.tezos_tzkt_token_balances import TzktTokenBalancesHandlerConfig from dipdup.config.tezos_tzkt_token_balances import TzktTokenBalancesIndexConfig -from dipdup.datasources.tezos_tzkt import TzktDatasource from dipdup.exceptions import ConfigInitializationException -from dipdup.exceptions import FrameworkException -from dipdup.index import Index +from dipdup.indexes.tezos_tzkt import TzktIndex from dipdup.indexes.tezos_tzkt_token_balances.matcher import match_token_balances from dipdup.models.tezos_tzkt import TzktMessageType from dipdup.models.tezos_tzkt import TzktRollbackMessage from dipdup.models.tezos_tzkt import TzktTokenBalanceData -from dipdup.prometheus import Metrics TokenBalanceQueueItem = tuple[TzktTokenBalanceData, ...] | TzktRollbackMessage class TzktTokenBalancesIndex( - Index[TzktTokenBalancesIndexConfig, TokenBalanceQueueItem, TzktDatasource], + TzktIndex[TzktTokenBalancesIndexConfig, TokenBalanceQueueItem], message_type=TzktMessageType.token_balance, ): def push_token_balances(self, token_balances: TokenBalanceQueueItem) -> None: @@ -49,35 +47,6 @@ async def _synchronize_actual(self, head_level: int) -> None: await self._update_state(level=head_level) - async def _process_level_token_balances( - self, - token_balances: tuple[TzktTokenBalanceData, ...], - sync_level: int, - ) -> None: - if not token_balances: - return - - batch_level = token_balances[0].level - index_level = self.state.level - if batch_level <= index_level: - raise FrameworkException(f'Batch level is lower than index level: {batch_level} <= {index_level}') - - self._logger.debug('Processing token balances of level %s', batch_level) - matched_handlers = match_token_balances(self._config.handlers, token_balances) - - if Metrics.enabled: - Metrics.set_index_handlers_matched(len(matched_handlers)) - - # NOTE: We still need to bump index level but don't care if it will be done in existing transaction - if not matched_handlers: - await self._update_state(level=batch_level) - return - - async with self._ctx.transactions.in_transaction(batch_level, sync_level, self.name): - for handler_config, token_balance in matched_handlers: - await self._call_matched_handler(handler_config, token_balance) - await self._update_state(level=batch_level) - async def _call_matched_handler( self, handler_config: TzktTokenBalancesHandlerConfig, token_balance: TzktTokenBalanceData ) -> None: @@ -93,22 +62,5 @@ async def _call_matched_handler( token_balance, ) - async def _process_queue(self) -> None: - """Process WebSocket queue""" - if self._queue: - self._logger.debug('Processing websocket queue') - while self._queue: - message = self._queue.popleft() - if isinstance(message, TzktRollbackMessage): - await self._tzkt_rollback(message.from_level, message.to_level) - continue - - message_level = message[0].level - if message_level <= self.state.level: - self._logger.debug('Skipping outdated message: %s <= %s', message_level, self.state.level) - continue - - with ExitStack() as stack: - if Metrics.enabled: - stack.enter_context(Metrics.measure_level_realtime_duration()) - await self._process_level_token_balances(message, message_level) + def _match_level_data(self, handlers: Any, level_data: Any) -> deque[Any]: + return match_token_balances(handlers, level_data) diff --git a/src/dipdup/indexes/tezos_tzkt_token_transfers/index.py b/src/dipdup/indexes/tezos_tzkt_token_transfers/index.py index 5ca0e0b17..c300e6a28 100644 --- a/src/dipdup/indexes/tezos_tzkt_token_transfers/index.py +++ b/src/dipdup/indexes/tezos_tzkt_token_transfers/index.py @@ -1,11 +1,11 @@ +from collections import deque from contextlib import ExitStack +from typing import Any from dipdup.config.tezos_tzkt_token_transfers import TzktTokenTransfersHandlerConfig from dipdup.config.tezos_tzkt_token_transfers import TzktTokenTransfersIndexConfig -from dipdup.datasources.tezos_tzkt import TzktDatasource from dipdup.exceptions import ConfigInitializationException -from dipdup.exceptions import FrameworkException -from dipdup.index import Index +from dipdup.indexes.tezos_tzkt import TzktIndex from dipdup.indexes.tezos_tzkt_token_transfers.fetcher import TokenTransferFetcher from dipdup.indexes.tezos_tzkt_token_transfers.matcher import match_token_transfers from dipdup.models.tezos_tzkt import TzktMessageType @@ -17,7 +17,7 @@ class TzktTokenTransfersIndex( - Index[TzktTokenTransfersIndexConfig, TokenTransferQueueItem, TzktDatasource], + TzktIndex[TzktTokenTransfersIndexConfig, TokenTransferQueueItem], message_type=TzktMessageType.token_transfer, ): def push_token_transfers(self, token_transfers: TokenTransferQueueItem) -> None: @@ -63,39 +63,10 @@ async def _synchronize(self, sync_level: int) -> None: if Metrics.enabled: Metrics.set_levels_to_sync(self._config.name, sync_level - level) stack.enter_context(Metrics.measure_level_sync_duration()) - await self._process_level_token_transfers(token_transfers, sync_level) + await self._process_level_data(token_transfers, sync_level) await self._exit_sync_state(sync_level) - async def _process_level_token_transfers( - self, - token_transfers: tuple[TzktTokenTransferData, ...], - sync_level: int, - ) -> None: - if not token_transfers: - return - - batch_level = token_transfers[0].level - index_level = self.state.level - if batch_level <= index_level: - raise FrameworkException(f'Batch level is lower than index level: {batch_level} <= {index_level}') - - self._logger.debug('Processing token transfers of level %s', batch_level) - matched_handlers = match_token_transfers(self._config.handlers, token_transfers) - - if Metrics.enabled: - Metrics.set_index_handlers_matched(len(matched_handlers)) - - # NOTE: We still need to bump index level but don't care if it will be done in existing transaction - if not matched_handlers: - await self._update_state(level=batch_level) - return - - async with self._ctx.transactions.in_transaction(batch_level, sync_level, self.name): - for handler_config, token_transfer in matched_handlers: - await self._call_matched_handler(handler_config, token_transfer) - await self._update_state(level=batch_level) - async def _call_matched_handler( self, handler_config: TzktTokenTransfersHandlerConfig, token_transfer: TzktTokenTransferData ) -> None: @@ -111,22 +82,5 @@ async def _call_matched_handler( token_transfer, ) - async def _process_queue(self) -> None: - """Process WebSocket queue""" - if self._queue: - self._logger.debug('Processing websocket queue') - while self._queue: - message = self._queue.popleft() - if isinstance(message, TzktRollbackMessage): - await self._tzkt_rollback(message.from_level, message.to_level) - continue - - message_level = message[0].level - if message_level <= self.state.level: - self._logger.debug('Skipping outdated message: %s <= %s', message_level, self.state.level) - continue - - with ExitStack() as stack: - if Metrics.enabled: - stack.enter_context(Metrics.measure_level_realtime_duration()) - await self._process_level_token_transfers(message, message_level) + def _match_level_data(self, handlers: Any, level_data: Any) -> deque[Any]: + return match_token_transfers(handlers, level_data)