Skip to content

Commit

Permalink
Refactor TzKT indexes (#875)
Browse files Browse the repository at this point in the history
  • Loading branch information
droserasprout authored Nov 10, 2023
1 parent 8539c28 commit 8f3687a
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 237 deletions.
7 changes: 3 additions & 4 deletions src/dipdup/indexes/evm_subsquid_events/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async def _process_queue(self) -> None:
break

for message_level, level_logs in logs_by_level.items():
await self._process_level_events(tuple(level_logs), self.topics, message_level)
await self._process_level_events(tuple(level_logs), message_level)

def get_sync_level(self) -> int:
"""Get level index needs to be synchronized to depending on its subscription status"""
Expand Down Expand Up @@ -166,14 +166,14 @@ async def _synchronize(self, sync_level: int) -> None:
raise FrameworkException(f'Block {level} not found')
timestamp = int(block['timestamp'], 16)
parsed_level_logs = tuple(EvmNodeLogData.from_json(log, timestamp) for log in level_logs)
await self._process_level_events(parsed_level_logs, self.topics, sync_level)
await self._process_level_events(parsed_level_logs, sync_level)

else:
sync_level = min(sync_level, subsquid_sync_level)
fetcher = self._create_fetcher(first_level, sync_level)

async for _level, events in fetcher.fetch_by_level():
await self._process_level_events(events, self.topics, sync_level)
await self._process_level_events(events, sync_level)

await self._exit_sync_state(sync_level)

Expand Down Expand Up @@ -201,7 +201,6 @@ def _create_fetcher(self, first_level: int, last_level: int) -> EventLogFetcher:
async def _process_level_events(
self,
events: tuple[SubsquidEventData | EvmNodeLogData, ...],
topics: dict[str, dict[str, str]],
sync_level: int,
) -> None:
if not events:
Expand Down
85 changes: 85 additions & 0 deletions src/dipdup/indexes/tezos_tzkt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from abc import abstractmethod
from collections import deque
from contextlib import ExitStack
from typing import Any
from typing import Generic

from dipdup.datasources.tezos_tzkt import TzktDatasource
from dipdup.exceptions import FrameworkException
from dipdup.index import Index
from dipdup.index import IndexConfigT
from dipdup.index import IndexQueueItemT
from dipdup.models.tezos_tzkt import TzktMessageType
from dipdup.models.tezos_tzkt import TzktRollbackMessage
from dipdup.prometheus import Metrics


class TzktIndex(
Generic[IndexConfigT, IndexQueueItemT],
Index[Any, Any, TzktDatasource],
message_type=TzktMessageType,
):
async def _process_queue(self) -> None:
"""Process WebSocket queue"""
if self._queue:
self._logger.debug('Processing websocket queue')
while self._queue:
message = self._queue.popleft()
if isinstance(message, TzktRollbackMessage):
await self._tzkt_rollback(message.from_level, message.to_level)
continue

message_level = message[0].level
if message_level <= self.state.level:
self._logger.debug('Skipping outdated message: %s <= %s', message_level, self.state.level)
continue

with ExitStack() as stack:
if Metrics.enabled:
stack.enter_context(Metrics.measure_level_realtime_duration())
await self._process_level_data(message, message_level)

async def _process_level_data(
self,
level_data: IndexQueueItemT,
sync_level: int,
) -> None:
if not level_data:
return

batch_level = level_data[0].level
index_level = self.state.level
if batch_level <= index_level:
raise FrameworkException(f'Batch level is lower than index level: {batch_level} <= {index_level}')

self._logger.debug('Processing data of level %s', batch_level)
matched_handlers = self._match_level_data(self._config.handlers, level_data)

if Metrics.enabled:
Metrics.set_index_handlers_matched(len(matched_handlers))

# NOTE: We still need to bump index level but don't care if it will be done in existing transaction
if not matched_handlers:
await self._update_state(level=batch_level)
return

async with self._ctx.transactions.in_transaction(batch_level, sync_level, self.name):
for handler_config, data in matched_handlers:
await self._call_matched_handler(handler_config, data)
await self._update_state(level=batch_level)

@abstractmethod
def _match_level_data(
self,
handlers: Any,
level_data: Any,
) -> deque[Any]:
...

@abstractmethod
async def _call_matched_handler(
self,
handler_config: Any,
level_data: Any,
) -> None:
...
67 changes: 10 additions & 57 deletions src/dipdup/indexes/tezos_tzkt_big_maps/index.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
from collections import deque
from contextlib import ExitStack
from datetime import datetime
from typing import Any

from dipdup.config.tezos_tzkt_big_maps import TzktBigMapsHandlerConfig
from dipdup.config.tezos_tzkt_big_maps import TzktBigMapsIndexConfig
from dipdup.datasources.tezos_tzkt import TzktDatasource
from dipdup.exceptions import ConfigInitializationException
from dipdup.exceptions import ConfigurationError
from dipdup.exceptions import FrameworkException
from dipdup.index import Index
from dipdup.indexes.tezos_tzkt import TzktIndex
from dipdup.indexes.tezos_tzkt_big_maps.fetcher import BigMapFetcher
from dipdup.indexes.tezos_tzkt_big_maps.fetcher import get_big_map_pairs
from dipdup.indexes.tezos_tzkt_big_maps.matcher import match_big_maps
Expand All @@ -24,33 +23,13 @@


class TzktBigMapsIndex(
Index[TzktBigMapsIndexConfig, BigMapQueueItem, TzktDatasource],
TzktIndex[TzktBigMapsIndexConfig, BigMapQueueItem],
message_type=TzktMessageType.big_map,
):
def push_big_maps(self, big_maps: BigMapQueueItem) -> None:
"""Push big map diffs to queue"""
self.push_realtime_message(big_maps)

async def _process_queue(self) -> None:
"""Process WebSocket queue"""
if self._queue:
self._logger.debug('Processing websocket queue')
while self._queue:
message = self._queue.popleft()
if isinstance(message, TzktRollbackMessage):
await self._tzkt_rollback(message.from_level, message.to_level)
continue

message_level = message[0].level
if message_level <= self.state.level:
self._logger.debug('Skipping outdated message: %s <= %s', message_level, self.state.level)
continue

with ExitStack() as stack:
if Metrics.enabled:
stack.enter_context(Metrics.measure_level_realtime_duration())
await self._process_level_big_maps(message, message_level)

async def _synchronize(self, sync_level: int) -> None:
"""Fetch operations via Fetcher and pass to message callback"""
index_level = await self._enter_sync_state(sync_level)
Expand Down Expand Up @@ -82,7 +61,7 @@ async def _synchronize_full(self, index_level: int, sync_level: int) -> None:
if Metrics.enabled:
Metrics.set_levels_to_sync(self._config.name, sync_level - level)
stack.enter_context(Metrics.measure_level_sync_duration())
await self._process_level_big_maps(big_maps, sync_level)
await self._process_level_data(big_maps, sync_level)

async def _synchronize_level(self, head_level: int) -> None:
# NOTE: Checking late because feature flags could be modified after loading config
Expand All @@ -98,7 +77,7 @@ async def _synchronize_level(self, head_level: int) -> None:
if contract_big_map['path'] == path:
big_map_ids.add((int(contract_big_map['ptr']), address, path))

# NOTE: Do not use `_process_level_big_maps` here; we want to maintain transaction manually.
# NOTE: Do not use `_process_level_data` here; we want to maintain transaction manually.
async with self._ctx.transactions.in_transaction(head_level, head_level, self.name):
for big_map_id, address, path in big_map_ids:
async for big_map_keys in self._datasource.iter_big_map(big_map_id, head_level):
Expand All @@ -124,37 +103,8 @@ async def _synchronize_level(self, head_level: int) -> None:

await self._update_state(level=head_level)

async def _process_level_big_maps(
self,
big_maps: tuple[TzktBigMapData, ...],
sync_level: int,
) -> None:
if not big_maps:
return

batch_level = big_maps[0].level
index_level = self.state.level
if batch_level <= index_level:
raise FrameworkException(f'Batch level is lower than index level: {batch_level} <= {index_level}')

self._logger.debug('Processing big map diffs of level %s', batch_level)
matched_handlers = match_big_maps(self._ctx.package, self._config.handlers, big_maps)

if Metrics.enabled:
Metrics.set_index_handlers_matched(len(matched_handlers))

# NOTE: We still need to bump index level but don't care if it will be done in existing transaction
if not matched_handlers:
await self._update_state(level=batch_level)
return

async with self._ctx.transactions.in_transaction(batch_level, sync_level, self.name):
for handler_config, big_map_diff in matched_handlers:
await self._call_matched_handler(handler_config, big_map_diff)
await self._update_state(level=batch_level)

async def _call_matched_handler(
self, handler_config: TzktBigMapsHandlerConfig, big_map_diff: TzktBigMapDiff[Any, Any]
self, handler_config: TzktBigMapsHandlerConfig, level_data: TzktBigMapDiff[Any, Any]
) -> None:
if not handler_config.parent:
raise ConfigInitializationException
Expand All @@ -165,5 +115,8 @@ async def _call_matched_handler(
self.datasource,
# NOTE: missing `operation_id` field in API to identify operation
None,
big_map_diff,
level_data,
)

def _match_level_data(self, handlers: Any, level_data: Any) -> deque[Any]:
return match_big_maps(self._ctx.package, handlers, level_data)
70 changes: 12 additions & 58 deletions src/dipdup/indexes/tezos_tzkt_events/index.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from collections import deque
from contextlib import ExitStack
from typing import Any

from dipdup.config.tezos_tzkt_events import TzktEventsHandlerConfig
from dipdup.config.tezos_tzkt_events import TzktEventsHandlerConfigU
from dipdup.config.tezos_tzkt_events import TzktEventsIndexConfig
from dipdup.datasources.tezos_tzkt import TzktDatasource
from dipdup.exceptions import ConfigInitializationException
from dipdup.exceptions import FrameworkException
from dipdup.index import Index
from dipdup.indexes.tezos_tzkt import TzktIndex
from dipdup.indexes.tezos_tzkt_events.fetcher import EventFetcher
from dipdup.indexes.tezos_tzkt_events.matcher import match_events
from dipdup.models.tezos_tzkt import TzktEvent
Expand All @@ -21,32 +21,12 @@


class TzktEventsIndex(
Index[TzktEventsIndexConfig, EventQueueItem, TzktDatasource],
TzktIndex[TzktEventsIndexConfig, EventQueueItem],
message_type=TzktMessageType.event,
):
def push_events(self, events: EventQueueItem) -> None:
self.push_realtime_message(events)

async def _process_queue(self) -> None:
"""Process WebSocket queue"""
if self._queue:
self._logger.debug('Processing websocket queue')
while self._queue:
message = self._queue.popleft()
if isinstance(message, TzktRollbackMessage):
await self._tzkt_rollback(message.from_level, message.to_level)
continue

message_level = message[0].level
if message_level <= self.state.level:
self._logger.debug('Skipping outdated message: %s <= %s', message_level, self.state.level)
continue

with ExitStack() as stack:
if Metrics.enabled:
stack.enter_context(Metrics.measure_level_realtime_duration())
await self._process_level_events(message, message_level)

def _create_fetcher(self, first_level: int, last_level: int) -> EventFetcher:
event_addresses = self._get_event_addresses()
event_tags = self._get_event_tags()
Expand All @@ -73,44 +53,15 @@ async def _synchronize(self, sync_level: int) -> None:
if Metrics.enabled:
Metrics.set_levels_to_sync(self._config.name, sync_level - level)
stack.enter_context(Metrics.measure_level_sync_duration())
await self._process_level_events(events, sync_level)
await self._process_level_data(events, sync_level)

await self._exit_sync_state(sync_level)

async def _process_level_events(
self,
events: tuple[TzktEventData, ...],
sync_level: int,
) -> None:
if not events:
return

batch_level = events[0].level
index_level = self.state.level
if batch_level <= index_level:
raise FrameworkException(f'Batch level is lower than index level: {batch_level} <= {index_level}')

self._logger.debug('Processing contract events of level %s', batch_level)
matched_handlers = match_events(self._ctx.package, self._config.handlers, events)

if Metrics.enabled:
Metrics.set_index_handlers_matched(len(matched_handlers))

# NOTE: We still need to bump index level but don't care if it will be done in existing transaction
if not matched_handlers:
await self._update_state(level=batch_level)
return

async with self._ctx.transactions.in_transaction(batch_level, sync_level, self.name):
for handler_config, event in matched_handlers:
await self._call_matched_handler(handler_config, event)
await self._update_state(level=batch_level)

async def _call_matched_handler(
self, handler_config: TzktEventsHandlerConfigU, event: TzktEvent[Any] | TzktUnknownEvent
self, handler_config: TzktEventsHandlerConfigU, level_data: TzktEvent[Any] | TzktUnknownEvent
) -> None:
if isinstance(handler_config, TzktEventsHandlerConfig) != isinstance(event, TzktEvent):
raise FrameworkException(f'Invalid handler config and event types: {handler_config}, {event}')
if isinstance(handler_config, TzktEventsHandlerConfig) != isinstance(level_data, TzktEvent):
raise FrameworkException(f'Invalid handler config and event types: {handler_config}, {level_data}')

if not handler_config.parent:
raise ConfigInitializationException
Expand All @@ -119,8 +70,8 @@ async def _call_matched_handler(
handler_config.callback,
handler_config.parent.name,
self.datasource,
str(event.data.transaction_id),
event,
str(level_data.data.transaction_id),
level_data,
)

def _get_event_addresses(self) -> set[str]:
Expand All @@ -137,3 +88,6 @@ def _get_event_tags(self) -> set[str]:
if isinstance(handler_config, TzktEventsHandlerConfig):
paths.add(handler_config.tag)
return paths

def _match_level_data(self, handlers: Any, level_data: Any) -> deque[Any]:
return match_events(self._ctx.package, handlers, level_data)
Loading

0 comments on commit 8f3687a

Please sign in to comment.