Skip to content

Commit

Permalink
refactor: enhance Substrate event handling and typing
Browse files Browse the repository at this point in the history
  • Loading branch information
Wizard1209 committed Dec 19, 2024
1 parent 4cb6fd0 commit 03679d0
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 51 deletions.
24 changes: 11 additions & 13 deletions src/dipdup/datasources/substrate_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import pysignalr.exceptions

from dipdup.config import HttpConfig
from dipdup.config.substrate import SubstrateDatasourceConfigU
from dipdup.config.substrate_node import SubstrateNodeDatasourceConfig
from dipdup.datasources import JsonRpcDatasource
from dipdup.exceptions import DatasourceError
from dipdup.exceptions import FrameworkException
Expand Down Expand Up @@ -90,21 +90,21 @@ def save_file(self) -> None:
raise ValueError(f'Unsupported file type: {self.path}')


class SubstrateNodeDatasource(JsonRpcDatasource[SubstrateDatasourceConfigU]):
class SubstrateNodeDatasource(JsonRpcDatasource[SubstrateNodeDatasourceConfig]):
_default_http_config = HttpConfig(
batch_size=20,
)

def __init__(self, config: SubstrateDatasourceConfigU) -> None:
def __init__(self, config: SubstrateNodeDatasourceConfig) -> None:
from aiosubstrate.base import SubstrateInterface

# NOTE: Use our aiohttp session and limiters
SubstrateInterface.http_request = partial(self._jsonrpc_request, raw=True) # type: ignore[method-assign]

super().__init__(config)
self._pending_subscription = None
self._pending_subscription: SubstrateNodeSubscription | None = None
self._subscription_ids: dict[str, SubstrateNodeSubscription] = {}
self._interface = SubstrateInterface(config.url)
self._interface = SubstrateInterface(config.url) # type: ignore[no-untyped-call]

self._emitter_queue: Queue[SubscriptionMessage] = Queue()

Expand All @@ -131,7 +131,7 @@ async def initialize(self) -> None:
self.set_sync_level(None, level)

# NOTE: Prepare substrate_interface
await self._interface.init_props()
await self._interface.init_props() # type: ignore[no-untyped-call]
self._interface.reload_type_registry()

async def _ws_loop(self) -> None:
Expand Down Expand Up @@ -219,15 +219,15 @@ async def get_head_level(self) -> int:
return int(header['number'], 16)

async def get_block_hash(self, height: int) -> str:
return await self._jsonrpc_request('chain_getBlockHash', [height])
return await self._jsonrpc_request('chain_getBlockHash', [height]) # type: ignore[no-any-return]

async def get_block_header(self, hash: str) -> BlockHeader:
response = await self._jsonrpc_request('chain_getHeader', [hash])
# FIXME: missing fields
return {
'hash': hash,
'number': int(response['number'], 16),
'prevRoot': response['parentHash'],
'prev_root': response['parentHash'],
}

async def get_metadata_header(self, height: int) -> MetadataHeader:
Expand All @@ -244,7 +244,7 @@ async def get_metadata_header_batch(self, heights: list[int]) -> list[MetadataHe
return await asyncio.gather(*[self.get_metadata_header(h) for h in heights])

async def get_full_block(self, hash: str) -> dict[str, Any]:
return await self._jsonrpc_request('chain_getBlock', [hash])
return await self._jsonrpc_request('chain_getBlock', [hash]) # type: ignore[no-any-return]

async def get_events(self, block_hash: str) -> tuple[SubstrateEventDataDict, ...]:
events = await self._interface.get_events(block_hash)
Expand All @@ -256,9 +256,7 @@ async def get_events(self, block_hash: str) -> tuple[SubstrateEventDataDict, ...
{
'name': f'{event['module_id']}.{event['event_id']}',
'index': event['event_index'],
'extrinsicIndex': event['extrinsic_idx'],
'callAddress': None,
'args': None,
'extrinsic_index': event['extrinsic_idx'],
'decoded_args': event['attributes'],
}
)
Expand Down Expand Up @@ -344,7 +342,7 @@ async def _emitter_loop(self) -> None:
block_hash = await self.get_block_hash(level)
event_dicts = await self.get_events(block_hash)
block_header = await self.get_block_header(block_hash)
events = tuple(SubstrateEventData(**event_dict, header=block_header) for event_dict in event_dicts)
events = tuple(SubstrateEventData.from_node(event_dict, block_header) for event_dict in event_dicts)
await self.emit_events(events)


Expand Down
8 changes: 5 additions & 3 deletions src/dipdup/datasources/substrate_subsquid.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dipdup.config.substrate_subsquid import SubstrateSubsquidDatasourceConfig
from dipdup.datasources._subsquid import AbstractSubsquidDatasource
from dipdup.models._subsquid import AbstractSubsquidQuery
from dipdup.models.substrate import SubstrateEventData
from dipdup.models.substrate import SubstrateEventDataSubsquid

Query = AbstractSubsquidQuery

Expand All @@ -14,7 +14,7 @@ async def iter_events(
first_level: int,
last_level: int,
names: tuple[str, ...],
) -> AsyncIterator[tuple[SubstrateEventData, ...]]:
) -> AsyncIterator[tuple[SubstrateEventDataSubsquid, ...]]:
current_level = first_level

while current_level <= last_level:
Expand Down Expand Up @@ -50,5 +50,7 @@ async def iter_events(
response = await self.query_worker(query, current_level)

for level_item in response:
yield tuple(SubstrateEventData(**e, header=level_item['header']) for e in level_item['events'])
for event_item in level_item['events']:
event_item['header'] = level_item['header']
yield tuple(level_item['events'])
current_level = level_item['header']['number'] + 1
5 changes: 2 additions & 3 deletions src/dipdup/dipdup.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
from dipdup.models.evm_node import EvmNodeHeadData
from dipdup.models.evm_node import EvmNodeSyncingData
from dipdup.models.substrate import HeadBlock
from dipdup.models.substrate import SubstrateEventData
from dipdup.models.tezos import TezosBigMapData
from dipdup.models.tezos import TezosEventData
from dipdup.models.tezos import TezosHeadBlockData
Expand Down Expand Up @@ -556,7 +557,6 @@ async def _on_tzkt_events(self, datasource: TezosTzktDatasource, events: tuple[T
if isinstance(index, TezosEventsIndex) and datasource in index.datasources:
index.push_realtime_message(events)

# TODO: fix data typing
async def _on_substrate_head(
self,
datasource: SubstrateNodeDatasource,
Expand All @@ -565,11 +565,10 @@ async def _on_substrate_head(
# TODO: any head updates here?
pass

# TODO: fix data typing
async def _on_substrate_events(
self,
datasource: SubstrateNodeDatasource,
events: tuple[dict[str, Any], ...],
events: tuple[SubstrateEventData, ...],
) -> None:
for index in self._indexes.values():
if isinstance(index, SubstrateEventsIndex) and datasource in index.datasources:
Expand Down
19 changes: 11 additions & 8 deletions src/dipdup/indexes/substrate_events/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@ def __init__(
self._names = names

async def fetch_by_level(self) -> AsyncIterator[tuple[int, tuple[SubstrateEventData, ...]]]:
event_iter = self.random_datasource.iter_events(
async for level, events in self.readahead_by_level(self.fetch_events()):
yield level, events

async def fetch_events(self) -> AsyncIterator[tuple[SubstrateEventData, ...]]:
async for events in self.random_datasource.iter_events(
first_level=self._first_level,
last_level=self._last_level,
names=self._names,
)
async for level, batch in self.readahead_by_level(event_iter):
yield level, batch
):
yield tuple(SubstrateEventData.from_subsquid(event) for event in events)


class SubstrateNodeEventFetcher(SubstrateNodeFetcher[SubstrateEventData]):
Expand All @@ -50,12 +53,12 @@ def __init__(
)

async def fetch_by_level(self) -> AsyncIterator[tuple[int, tuple[SubstrateEventData, ...]]]:
async for level, event in self.readahead_by_level(self.fetch_events()):
yield level, event
async for level, events in self.readahead_by_level(self.fetch_events()):
yield level, events

async def fetch_events(self) -> AsyncIterator[tuple[tuple[SubstrateEventData, ...]]]:
async def fetch_events(self) -> AsyncIterator[tuple[SubstrateEventData, ...]]:
for level in range(self._first_level, self._last_level):
block_hash = await self.get_random_node().get_block_hash(level)
event_dicts = await self.get_random_node().get_events(block_hash)
block_header = await self.get_random_node().get_block_header(block_hash)
yield tuple(SubstrateEventData(**event_dict, header=block_header) for event_dict in event_dicts)
yield tuple(SubstrateEventData.from_node(event_dict, block_header) for event_dict in event_dicts)
85 changes: 62 additions & 23 deletions src/dipdup/models/substrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from functools import cached_property
from typing import Any
from typing import Generic
from typing import Self
from typing import TypedDict
from typing import TypeVar
from typing import cast
Expand All @@ -10,50 +11,87 @@
from dipdup.runtimes import SubstrateRuntime


class BlockHeaderSubsquid(TypedDict):
number: int
hash: str
parentHash: str
stateRoot: str
extrinsicsRoot: str
digest: str
specName: str
specVersion: int
implName: str
implVersion: int
timestamp: int
validator: str


class SubstrateEventDataSubsquid(TypedDict):
name: str
index: int
extrinsicIndex: int
callAddress: list[str]
args: list[Any]
header: BlockHeaderSubsquid


class BlockHeader(TypedDict):
hash: str
number: int
daHeight: str
transactionsRoot: str
transactionsCount: int
messageReceiptCount: int
prevRoot: str
time: str
applicationHash: str
eventInboxRoot: str
consensusParametersVersion: int
stateTransitionBytecodeVersion: int
messageOutboxRoot: str
# NOTE: There are more fields in header
specVersion: str
prev_root: str


class SubstrateEventDataDict(TypedDict):
name: str
index: int
extrinsicIndex: int
callAddress: list[str]
args: list[Any] | None
decoded_args: dict[str, Any] | None
extrinsic_index: int
decoded_args: dict[str, Any]


@dataclass(frozen=True, kw_only=True)
class SubstrateEventData(HasLevel):
# TODO: there are more fields in event data: phase, topics
name: str
index: int
extrinsicIndex: int
callAddress: list[str] | None
# TODO: ensure logic is straightforward
extrinsic_index: int
call_address: list[str] | None
# we receive decoded args from node datasource and encoded from subsquid datasource
args: list[Any] | None = None
decoded_args: dict[str, Any] | None = None
header: BlockHeader
header_extra: BlockHeaderSubsquid | None

@property
def level(self) -> int: # type: ignore[override]
return self.header['number']

@classmethod
def from_node(cls, event_dict: SubstrateEventDataDict, header: BlockHeader) -> Self:
return cls(
**event_dict,
call_address=None,
args=None,
header=header,
header_extra=None,
)

@classmethod
def from_subsquid(cls, event_dict: SubstrateEventDataSubsquid) -> Self:
return cls(
name=event_dict['name'],
index=event_dict['index'],
extrinsic_index=event_dict['extrinsicIndex'],
call_address=event_dict['callAddress'],
args=event_dict['args'],
decoded_args=None,
header={
'hash': event_dict['header']['hash'],
'number': event_dict['header']['number'],
'prev_root': event_dict['header']['parentHash'],
},
header_extra=event_dict['header'],
)


class HeadBlock(TypedDict):
parentHash: str
Expand All @@ -75,15 +113,16 @@ class SubstrateEvent(Generic[PayloadT]):
@cached_property
def payload(self) -> PayloadT:
if self.data.decoded_args is not None:
return self.data.decoded_args
return cast(PayloadT, self.data.decoded_args)

assert self.data.args is not None
# NOTE: both from subsquid
assert self.data.args is not None and self.data.header_extra is not None
return cast(
PayloadT,
self.runtime.decode_event_args(
name=self.name,
args=self.data.args,
spec_version=self.data.header['specVersion'],
spec_version=str(self.data.header_extra['specVersion']),
),
)

Expand Down
3 changes: 2 additions & 1 deletion src/dipdup/models/substrate_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from dipdup.subscriptions import Subscription


class SubstrateNodeSubscription(ABC, Subscription): ...
class SubstrateNodeSubscription(ABC, Subscription):
method: str


@dataclass(frozen=True)
Expand Down

0 comments on commit 03679d0

Please sign in to comment.