Skip to content

Commit

Permalink
fixes and logs
Browse files Browse the repository at this point in the history
  • Loading branch information
droserasprout committed Dec 23, 2024
1 parent cf48863 commit 63f87fc
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 9 deletions.
10 changes: 8 additions & 2 deletions src/dipdup/datasources/substrate_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from asyncio import Queue
from collections.abc import Awaitable
from collections.abc import Callable
from contextlib import suppress
from copy import copy
from dataclasses import dataclass
from dataclasses import field
Expand Down Expand Up @@ -203,7 +204,7 @@ async def _on_message(self, message: Message) -> None:
# NOTE: Set None to identify possible subscriptions conflicts
self._pending_subscription = None
else:
raise Exception
raise FrameworkException('id in data, but no pending subscription')
elif 'method' in data and data['method'].startswith('chain_'):
subscription_id = data['params']['subscription']
if subscription_id not in self._subscription_ids:
Expand Down Expand Up @@ -247,7 +248,12 @@ async def get_full_block(self, hash: str) -> dict[str, Any]:
return await self._jsonrpc_request('chain_getBlock', [hash]) # type: ignore[no-any-return]

async def get_events(self, block_hash: str) -> tuple[_SubstrateNodeEventResponse, ...]:
events = await self._interface.get_events(block_hash)
# FIXME: aiosubstrate bug, fix asap
while True:
with suppress(AttributeError):
events = await self._interface.get_events(block_hash)
break
await asyncio.sleep(0.1)

result: list[_SubstrateNodeEventResponse] = []
for raw_event in events:
Expand Down
11 changes: 8 additions & 3 deletions src/dipdup/indexes/_subsquid.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ async def _get_node_sync_level(
node = node or random.choice(self.node_datasources)

node_sync_level = await node.get_head_level()
node._logger.info('current head is %s', node_sync_level)

subsquid_lag = abs(node_sync_level - subsquid_level)
subsquid_available = subsquid_level - index_level
self._logger.info('Subsquid is %s levels behind; %s available', subsquid_lag, subsquid_available)
Expand All @@ -64,7 +66,9 @@ async def _synchronize(self, sync_level: int) -> None:
return

if self.subsquid_datasources:
subsquid_sync_level = await self.subsquid_datasources[0].get_head_level()
datasource = self.subsquid_datasources[0]
subsquid_sync_level = await datasource.get_head_level()
datasource._logger.info('current head is %s', subsquid_sync_level)
metrics._sqd_processor_chain_height = subsquid_sync_level
else:
subsquid_sync_level = 0
Expand All @@ -74,15 +78,16 @@ async def _synchronize(self, sync_level: int) -> None:
# NOTE: Fetch last blocks from node if there are not enough realtime messages in queue
if node_sync_level:
sync_level = min(sync_level, node_sync_level)
self._logger.debug('Using node datasource; sync level: %s', sync_level)
self._logger.info('Synchronizing with `node`: %s -> %s', index_level, sync_level)
await self._synchronize_node(sync_level)
else:
sync_level = min(sync_level, subsquid_sync_level)
self._logger.info('Synchronizing with `subsquid`: %s -> %s', index_level, sync_level)
await self._synchronize_subsquid(sync_level)

if not self.node_datasources and not self._subsquid_started:
self._subsquid_started = True
self._logger.info('No `evm.node` datasources available; polling Subsquid')
self._logger.info('No `node` datasources available; polling Subsquid')
for datasource in self.subsquid_datasources:
await datasource.start()

Expand Down
11 changes: 7 additions & 4 deletions src/dipdup/indexes/substrate_events/fetcher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
from collections.abc import AsyncIterator
from collections.abc import Iterable
from contextlib import suppress
from typing import Any

from dipdup.datasources.substrate_node import SubstrateNodeDatasource
Expand Down Expand Up @@ -159,14 +160,16 @@ async def _log_loop() -> None:
)

while True:
if sum(queues[q].qsize() for q in queues) == 0 and all(t.done() for t in tasks):
if sum(queues[q].qsize() for q in queues) == 0 and all(t.done() for t in tasks[:-1]):
break

for t in tasks:
if t.done():
if t.done() or t.cancelled():
await t

header, events = await queues['events'].get()
yield tuple(SubstrateEventData.from_node(event, header) for event in events)
with suppress(asyncio.TimeoutError):
while True:
header, events = await asyncio.wait_for(queues['events'].get(), timeout=1)
yield tuple(SubstrateEventData.from_node(event, header) for event in events)

tasks[-1].cancel()

0 comments on commit 63f87fc

Please sign in to comment.