Skip to content

Commit

Permalink
others
Browse files Browse the repository at this point in the history
  • Loading branch information
droserasprout committed Oct 20, 2023
1 parent 3d3edbe commit d73a941
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 102 deletions.
51 changes: 1 addition & 50 deletions src/dipdup/indexes/tezos_tzkt_events/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,6 @@ class TzktEventsIndex(
def push_events(self, events: EventQueueItem) -> None:
self.push_realtime_message(events)

async def _process_queue(self) -> None:
"""Process WebSocket queue"""
if self._queue:
self._logger.debug('Processing websocket queue')
while self._queue:
message = self._queue.popleft()
if isinstance(message, TzktRollbackMessage):
await self._tzkt_rollback(message.from_level, message.to_level)
continue

message_level = message[0].level
if message_level <= self.state.level:
self._logger.debug('Skipping outdated message: %s <= %s', message_level, self.state.level)
continue

with ExitStack() as stack:
if Metrics.enabled:
stack.enter_context(Metrics.measure_level_realtime_duration())
await self._process_level_events(message, message_level)

def _create_fetcher(self, first_level: int, last_level: int) -> EventFetcher:
event_addresses = self._get_event_addresses()
event_tags = self._get_event_tags()
Expand All @@ -73,39 +53,10 @@ async def _synchronize(self, sync_level: int) -> None:
if Metrics.enabled:
Metrics.set_levels_to_sync(self._config.name, sync_level - level)
stack.enter_context(Metrics.measure_level_sync_duration())
await self._process_level_events(events, sync_level)
await self._process_level_data(events, sync_level)

await self._exit_sync_state(sync_level)

async def _process_level_events(
self,
events: tuple[TzktEventData, ...],
sync_level: int,
) -> None:
if not events:
return

batch_level = events[0].level
index_level = self.state.level
if batch_level <= index_level:
raise FrameworkException(f'Batch level is lower than index level: {batch_level} <= {index_level}')

self._logger.debug('Processing contract events of level %s', batch_level)
matched_handlers = match_events(self._ctx.package, self._config.handlers, events)

if Metrics.enabled:
Metrics.set_index_handlers_matched(len(matched_handlers))

# NOTE: We still need to bump index level but don't care if it will be done in existing transaction
if not matched_handlers:
await self._update_state(level=batch_level)
return

async with self._ctx.transactions.in_transaction(batch_level, sync_level, self.name):
for handler_config, event in matched_handlers:
await self._call_matched_handler(handler_config, event)
await self._update_state(level=batch_level)

async def _call_matched_handler(
self, handler_config: TzktEventsHandlerConfigU, level_data: TzktEvent[Any] | TzktUnknownEvent
) -> None:
Expand Down
1 change: 1 addition & 0 deletions src/dipdup/indexes/tezos_tzkt_head/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ async def _synchronize(self, sync_level: int) -> None:
self._logger.info('Setting index level to %s and moving on', sync_level)
await self._update_state(status=IndexStatus.realtime, level=sync_level)

# FIXME: Use method from TzktIndex
async def _process_queue(self) -> None:
while self._queue:
message = self._queue.popleft()
Expand Down
6 changes: 6 additions & 0 deletions src/dipdup/indexes/tezos_tzkt_operations/index.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from collections import defaultdict
from collections import deque
from collections.abc import Coroutine
from collections.abc import Iterable
from collections.abc import Iterator
from collections.abc import Sequence
Expand Down Expand Up @@ -176,6 +177,7 @@ async def get_filters(self) -> tuple[set[str | None], set[str], set[int]]:

return self._entrypoint_filter, self._address_filter, self._code_hash_filter

# FIXME: Use method from TzktIndex
async def _process_queue(self) -> None:
"""Process WebSocket queue"""
self._logger.debug('Processing %s realtime messages from queue', len(self._queue))
Expand Down Expand Up @@ -254,6 +256,7 @@ async def _synchronize(self, sync_level: int) -> None:

await self._exit_sync_state(sync_level)

# FIXME: Use method from TzktIndex
async def _process_level_operations(
self,
operation_subgroups: tuple[OperationSubgroup, ...],
Expand Down Expand Up @@ -323,3 +326,6 @@ async def _call_matched_handler(

def _match_level_data(self, handlers: Any, level_data: Any) -> deque[Any]:
raise NotImplementedError

def _process_level_data(self, level_data: OperationQueueItem, sync_level: int) -> Coroutine[Any, Any, None]:
raise NotImplementedError
52 changes: 0 additions & 52 deletions src/dipdup/indexes/tezos_tzkt_token_balances/index.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
from collections import deque
from contextlib import ExitStack
from typing import Any

from dipdup.config.tezos_tzkt_token_balances import TzktTokenBalancesHandlerConfig
from dipdup.config.tezos_tzkt_token_balances import TzktTokenBalancesIndexConfig
from dipdup.exceptions import ConfigInitializationException
from dipdup.exceptions import FrameworkException
from dipdup.indexes.tezos_tzkt import TzktIndex
from dipdup.indexes.tezos_tzkt_token_balances.matcher import match_token_balances
from dipdup.models.tezos_tzkt import TzktMessageType
from dipdup.models.tezos_tzkt import TzktRollbackMessage
from dipdup.models.tezos_tzkt import TzktTokenBalanceData
from dipdup.prometheus import Metrics

TokenBalanceQueueItem = tuple[TzktTokenBalanceData, ...] | TzktRollbackMessage

Expand Down Expand Up @@ -50,35 +47,6 @@ async def _synchronize_actual(self, head_level: int) -> None:

await self._update_state(level=head_level)

async def _process_level_token_balances(
self,
token_balances: tuple[TzktTokenBalanceData, ...],
sync_level: int,
) -> None:
if not token_balances:
return

batch_level = token_balances[0].level
index_level = self.state.level
if batch_level <= index_level:
raise FrameworkException(f'Batch level is lower than index level: {batch_level} <= {index_level}')

self._logger.debug('Processing token balances of level %s', batch_level)
matched_handlers = match_token_balances(self._config.handlers, token_balances)

if Metrics.enabled:
Metrics.set_index_handlers_matched(len(matched_handlers))

# NOTE: We still need to bump index level but don't care if it will be done in existing transaction
if not matched_handlers:
await self._update_state(level=batch_level)
return

async with self._ctx.transactions.in_transaction(batch_level, sync_level, self.name):
for handler_config, token_balance in matched_handlers:
await self._call_matched_handler(handler_config, token_balance)
await self._update_state(level=batch_level)

async def _call_matched_handler(
self, handler_config: TzktTokenBalancesHandlerConfig, token_balance: TzktTokenBalanceData
) -> None:
Expand All @@ -94,25 +62,5 @@ async def _call_matched_handler(
token_balance,
)

async def _process_queue(self) -> None:
"""Process WebSocket queue"""
if self._queue:
self._logger.debug('Processing websocket queue')
while self._queue:
message = self._queue.popleft()
if isinstance(message, TzktRollbackMessage):
await self._tzkt_rollback(message.from_level, message.to_level)
continue

message_level = message[0].level
if message_level <= self.state.level:
self._logger.debug('Skipping outdated message: %s <= %s', message_level, self.state.level)
continue

with ExitStack() as stack:
if Metrics.enabled:
stack.enter_context(Metrics.measure_level_realtime_duration())
await self._process_level_token_balances(message, message_level)

def _match_level_data(self, handlers: Any, level_data: Any) -> deque[Any]:
return match_token_balances(handlers, level_data)

0 comments on commit d73a941

Please sign in to comment.