diff --git a/src/dipdup/datasources/substrate_node.py b/src/dipdup/datasources/substrate_node.py index cc32d67df..5f41cf739 100644 --- a/src/dipdup/datasources/substrate_node.py +++ b/src/dipdup/datasources/substrate_node.py @@ -15,7 +15,7 @@ import pysignalr.exceptions from dipdup.config import HttpConfig -from dipdup.config.substrate import SubstrateDatasourceConfigU +from dipdup.config.substrate_node import SubstrateNodeDatasourceConfig from dipdup.datasources import JsonRpcDatasource from dipdup.exceptions import DatasourceError from dipdup.exceptions import FrameworkException @@ -90,21 +90,21 @@ def save_file(self) -> None: raise ValueError(f'Unsupported file type: {self.path}') -class SubstrateNodeDatasource(JsonRpcDatasource[SubstrateDatasourceConfigU]): +class SubstrateNodeDatasource(JsonRpcDatasource[SubstrateNodeDatasourceConfig]): _default_http_config = HttpConfig( batch_size=20, ) - def __init__(self, config: SubstrateDatasourceConfigU) -> None: + def __init__(self, config: SubstrateNodeDatasourceConfig) -> None: from aiosubstrate.base import SubstrateInterface # NOTE: Use our aiohttp session and limiters SubstrateInterface.http_request = partial(self._jsonrpc_request, raw=True) # type: ignore[method-assign] super().__init__(config) - self._pending_subscription = None + self._pending_subscription: SubstrateNodeSubscription | None = None self._subscription_ids: dict[str, SubstrateNodeSubscription] = {} - self._interface = SubstrateInterface(config.url) + self._interface = SubstrateInterface(config.url) # type: ignore[no-untyped-call] self._emitter_queue: Queue[SubscriptionMessage] = Queue() @@ -131,7 +131,7 @@ async def initialize(self) -> None: self.set_sync_level(None, level) # NOTE: Prepare substrate_interface - await self._interface.init_props() + await self._interface.init_props() # type: ignore[no-untyped-call] self._interface.reload_type_registry() async def _ws_loop(self) -> None: @@ -219,7 +219,7 @@ async def get_head_level(self) -> int: return int(header['number'], 16) async def get_block_hash(self, height: int) -> str: - return await self._jsonrpc_request('chain_getBlockHash', [height]) + return await self._jsonrpc_request('chain_getBlockHash', [height]) # type: ignore[no-any-return] async def get_block_header(self, hash: str) -> BlockHeader: response = await self._jsonrpc_request('chain_getHeader', [hash]) @@ -227,7 +227,7 @@ async def get_block_header(self, hash: str) -> BlockHeader: return { 'hash': hash, 'number': int(response['number'], 16), - 'prevRoot': response['parentHash'], + 'prev_root': response['parentHash'], } async def get_metadata_header(self, height: int) -> MetadataHeader: @@ -244,7 +244,7 @@ async def get_metadata_header_batch(self, heights: list[int]) -> list[MetadataHe return await asyncio.gather(*[self.get_metadata_header(h) for h in heights]) async def get_full_block(self, hash: str) -> dict[str, Any]: - return await self._jsonrpc_request('chain_getBlock', [hash]) + return await self._jsonrpc_request('chain_getBlock', [hash]) # type: ignore[no-any-return] async def get_events(self, block_hash: str) -> tuple[SubstrateEventDataDict, ...]: events = await self._interface.get_events(block_hash) @@ -256,9 +256,7 @@ async def get_events(self, block_hash: str) -> tuple[SubstrateEventDataDict, ... { 'name': f'{event['module_id']}.{event['event_id']}', 'index': event['event_index'], - 'extrinsicIndex': event['extrinsic_idx'], - 'callAddress': None, - 'args': None, + 'extrinsic_index': event['extrinsic_idx'], 'decoded_args': event['attributes'], } ) @@ -344,7 +342,7 @@ async def _emitter_loop(self) -> None: block_hash = await self.get_block_hash(level) event_dicts = await self.get_events(block_hash) block_header = await self.get_block_header(block_hash) - events = tuple(SubstrateEventData(**event_dict, header=block_header) for event_dict in event_dicts) + events = tuple(SubstrateEventData.from_node(event_dict, block_header) for event_dict in event_dicts) await self.emit_events(events) diff --git a/src/dipdup/datasources/substrate_subsquid.py b/src/dipdup/datasources/substrate_subsquid.py index 4f64c3d0c..a6f9b66a8 100644 --- a/src/dipdup/datasources/substrate_subsquid.py +++ b/src/dipdup/datasources/substrate_subsquid.py @@ -3,7 +3,7 @@ from dipdup.config.substrate_subsquid import SubstrateSubsquidDatasourceConfig from dipdup.datasources._subsquid import AbstractSubsquidDatasource from dipdup.models._subsquid import AbstractSubsquidQuery -from dipdup.models.substrate import SubstrateEventData +from dipdup.models.substrate import SubstrateEventDataSubsquid Query = AbstractSubsquidQuery @@ -14,7 +14,7 @@ async def iter_events( first_level: int, last_level: int, names: tuple[str, ...], - ) -> AsyncIterator[tuple[SubstrateEventData, ...]]: + ) -> AsyncIterator[tuple[SubstrateEventDataSubsquid, ...]]: current_level = first_level while current_level <= last_level: @@ -50,5 +50,7 @@ async def iter_events( response = await self.query_worker(query, current_level) for level_item in response: - yield tuple(SubstrateEventData(**e, header=level_item['header']) for e in level_item['events']) + for event_item in level_item['events']: + event_item['header'] = level_item['header'] + yield tuple(level_item['events']) current_level = level_item['header']['number'] + 1 diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index bdfe22b39..a16ee97c3 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -75,6 +75,7 @@ from dipdup.models.evm_node import EvmNodeHeadData from dipdup.models.evm_node import EvmNodeSyncingData from dipdup.models.substrate import HeadBlock +from dipdup.models.substrate import SubstrateEventData from dipdup.models.tezos import TezosBigMapData from dipdup.models.tezos import TezosEventData from dipdup.models.tezos import TezosHeadBlockData @@ -556,7 +557,6 @@ async def _on_tzkt_events(self, datasource: TezosTzktDatasource, events: tuple[T if isinstance(index, TezosEventsIndex) and datasource in index.datasources: index.push_realtime_message(events) - # TODO: fix data typing async def _on_substrate_head( self, datasource: SubstrateNodeDatasource, @@ -565,11 +565,10 @@ async def _on_substrate_head( # TODO: any head updates here? pass - # TODO: fix data typing async def _on_substrate_events( self, datasource: SubstrateNodeDatasource, - events: tuple[dict[str, Any], ...], + events: tuple[SubstrateEventData, ...], ) -> None: for index in self._indexes.values(): if isinstance(index, SubstrateEventsIndex) and datasource in index.datasources: diff --git a/src/dipdup/indexes/substrate_events/fetcher.py b/src/dipdup/indexes/substrate_events/fetcher.py index ad8ec7e04..ba647f034 100644 --- a/src/dipdup/indexes/substrate_events/fetcher.py +++ b/src/dipdup/indexes/substrate_events/fetcher.py @@ -25,13 +25,16 @@ def __init__( self._names = names async def fetch_by_level(self) -> AsyncIterator[tuple[int, tuple[SubstrateEventData, ...]]]: - event_iter = self.random_datasource.iter_events( + async for level, events in self.readahead_by_level(self.fetch_events()): + yield level, events + + async def fetch_events(self) -> AsyncIterator[tuple[SubstrateEventData, ...]]: + async for events in self.random_datasource.iter_events( first_level=self._first_level, last_level=self._last_level, names=self._names, - ) - async for level, batch in self.readahead_by_level(event_iter): - yield level, batch + ): + yield tuple(SubstrateEventData.from_subsquid(event) for event in events) class SubstrateNodeEventFetcher(SubstrateNodeFetcher[SubstrateEventData]): @@ -50,12 +53,12 @@ def __init__( ) async def fetch_by_level(self) -> AsyncIterator[tuple[int, tuple[SubstrateEventData, ...]]]: - async for level, event in self.readahead_by_level(self.fetch_events()): - yield level, event + async for level, events in self.readahead_by_level(self.fetch_events()): + yield level, events - async def fetch_events(self) -> AsyncIterator[tuple[tuple[SubstrateEventData, ...]]]: + async def fetch_events(self) -> AsyncIterator[tuple[SubstrateEventData, ...]]: for level in range(self._first_level, self._last_level): block_hash = await self.get_random_node().get_block_hash(level) event_dicts = await self.get_random_node().get_events(block_hash) block_header = await self.get_random_node().get_block_header(block_hash) - yield tuple(SubstrateEventData(**event_dict, header=block_header) for event_dict in event_dicts) + yield tuple(SubstrateEventData.from_node(event_dict, block_header) for event_dict in event_dicts) diff --git a/src/dipdup/models/substrate.py b/src/dipdup/models/substrate.py index 6073e8c9c..918482a4c 100644 --- a/src/dipdup/models/substrate.py +++ b/src/dipdup/models/substrate.py @@ -2,6 +2,7 @@ from functools import cached_property from typing import Any from typing import Generic +from typing import Self from typing import TypedDict from typing import TypeVar from typing import cast @@ -10,31 +11,41 @@ from dipdup.runtimes import SubstrateRuntime +class BlockHeaderSubsquid(TypedDict): + number: int + hash: str + parentHash: str + stateRoot: str + extrinsicsRoot: str + digest: str + specName: str + specVersion: int + implName: str + implVersion: int + timestamp: int + validator: str + + +class SubstrateEventDataSubsquid(TypedDict): + name: str + index: int + extrinsicIndex: int + callAddress: list[str] + args: list[Any] + header: BlockHeaderSubsquid + + class BlockHeader(TypedDict): hash: str number: int - daHeight: str - transactionsRoot: str - transactionsCount: int - messageReceiptCount: int - prevRoot: str - time: str - applicationHash: str - eventInboxRoot: str - consensusParametersVersion: int - stateTransitionBytecodeVersion: int - messageOutboxRoot: str - # NOTE: There are more fields in header - specVersion: str + prev_root: str class SubstrateEventDataDict(TypedDict): name: str index: int - extrinsicIndex: int - callAddress: list[str] - args: list[Any] | None - decoded_args: dict[str, Any] | None + extrinsic_index: int + decoded_args: dict[str, Any] @dataclass(frozen=True, kw_only=True) @@ -42,18 +53,45 @@ class SubstrateEventData(HasLevel): # TODO: there are more fields in event data: phase, topics name: str index: int - extrinsicIndex: int - callAddress: list[str] | None - # TODO: ensure logic is straightforward + extrinsic_index: int + call_address: list[str] | None # we receive decoded args from node datasource and encoded from subsquid datasource args: list[Any] | None = None decoded_args: dict[str, Any] | None = None header: BlockHeader + header_extra: BlockHeaderSubsquid | None @property def level(self) -> int: # type: ignore[override] return self.header['number'] + @classmethod + def from_node(cls, event_dict: SubstrateEventDataDict, header: BlockHeader) -> Self: + return cls( + **event_dict, + call_address=None, + args=None, + header=header, + header_extra=None, + ) + + @classmethod + def from_subsquid(cls, event_dict: SubstrateEventDataSubsquid) -> Self: + return cls( + name=event_dict['name'], + index=event_dict['index'], + extrinsic_index=event_dict['extrinsicIndex'], + call_address=event_dict['callAddress'], + args=event_dict['args'], + decoded_args=None, + header={ + 'hash': event_dict['header']['hash'], + 'number': event_dict['header']['number'], + 'prev_root': event_dict['header']['parentHash'], + }, + header_extra=event_dict['header'], + ) + class HeadBlock(TypedDict): parentHash: str @@ -75,15 +113,16 @@ class SubstrateEvent(Generic[PayloadT]): @cached_property def payload(self) -> PayloadT: if self.data.decoded_args is not None: - return self.data.decoded_args + return cast(PayloadT, self.data.decoded_args) - assert self.data.args is not None + # NOTE: both from subsquid + assert self.data.args is not None and self.data.header_extra is not None return cast( PayloadT, self.runtime.decode_event_args( name=self.name, args=self.data.args, - spec_version=self.data.header['specVersion'], + spec_version=str(self.data.header_extra['specVersion']), ), ) diff --git a/src/dipdup/models/substrate_node.py b/src/dipdup/models/substrate_node.py index 97a25ae14..0589f06fb 100644 --- a/src/dipdup/models/substrate_node.py +++ b/src/dipdup/models/substrate_node.py @@ -6,7 +6,8 @@ from dipdup.subscriptions import Subscription -class SubstrateNodeSubscription(ABC, Subscription): ... +class SubstrateNodeSubscription(ABC, Subscription): + method: str @dataclass(frozen=True)