Skip to content

Commit

Permalink
events from node working prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
Wizard1209 committed Nov 25, 2024
1 parent 89567b7 commit 3fe0b21
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 73 deletions.
91 changes: 29 additions & 62 deletions src/dipdup/datasources/substrate_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,18 @@
from dataclasses import dataclass
from dataclasses import field
from pathlib import Path
from typing import TYPE_CHECKING

import orjson
import pysignalr.exceptions
from scalecodec.base import ScaleBytes

from dipdup.config import HttpConfig
from dipdup.config.substrate import SubstrateDatasourceConfigU
from dipdup.config.substrate_subscan import SubstrateSubscanDatasourceConfig
from dipdup.config.substrate_subsquid import SubstrateSubsquidDatasourceConfig
from dipdup.datasources import JsonRpcDatasource
from dipdup.exceptions import DatasourceError
from dipdup.models.substrate import SubstrateEventData
from dipdup.models.substrate import BlockHeader
from dipdup.models.substrate import SubstrateEventDataDict
from dipdup.models.substrate_node import SubstrateNodeSubscription
from dipdup.pysignalr import Message
from dipdup.runtimes import SubstrateRuntime

if TYPE_CHECKING:
from scalecodec.base import ScaleDecoder

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -146,6 +139,15 @@ async def get_head_level(self) -> int:
header = await self._jsonrpc_request('chain_getHeader', [head])
return int(header['number'], 16)

async def get_block_hash(self, height: int) -> str:
return await self._jsonrpc_request('chain_getBlockHash', [height])

async def get_block_header(self, hash: str) -> BlockHeader:
response = await self._jsonrpc_request('chain_getHeader', [hash])
return {'hash': hash,
'number': int(response['number'], 16),
'prevRoot': response['parentHash']}

async def get_metadata_header(self, height: int) -> MetadataHeader:
block_hash = await self.get_block_hash(height)
rt = await self._jsonrpc_request('chain_getRuntimeVersion', [block_hash])
Expand All @@ -155,64 +157,29 @@ async def get_metadata_header(self, height: int) -> MetadataHeader:
block_number=height,
block_hash=block_hash,
)

async def get_full_block_by_level(self, height: int) -> dict:
block_hash = await self.get_block_hash(height)
return await self.get_full_block(block_hash)

async def get_events_storage(self, hash: str) -> dict:
return await self._jsonrpc_request('state_getStorageAt', [
'0x26aa394eea5630e07c48ae0c9558cef780d41e5e16056765bc8461851072c9d7',
hash
])

async def get_events(self, height: int, decoder: SubstrateRuntime) -> tuple[SubstrateEventData]:
# TODO: get info for storage request for events
block_hash = await self.get_block_hash(height)
event_data = await self.get_events_storage(block_hash)
runtime_config = decoder.runtime_config

events = await self._interface.get_events(block_hash)

# metadata_bin: str = await self.get_raw_metadata(block_hash)
# metadata = await self._interface.create_scale_object(
# 'MetadataVersioned', data=ScaleBytes(metadata_bin)
# )
# metadata.decode()

# add runtime metadata using metadata kwarg
scale_object = runtime_config.create_scale_object(
'Vec<EventRecord>', metadata=metadata
)
event_bytes = ScaleBytes(event_data)
event = scale_object.decode(event_bytes)

async for line in block:
block_data = orjson.loads(line)

# Extract events from onInitialize
for event in block_data.get('onInitialize', {}).get('events', []):
SubstrateEventData(method=event['method'], data=event['data'])

# Extract events from extrinsics
for extrinsic in block_data.get('extrinsics', []):
for event in extrinsic.get('events', []):
SubstrateEventData(method=event['method'], data=event['data'])

# Extract events from onFinalize
for event in block_data.get('onFinalize', {}).get('events', []):
SubstrateEventData(method=event['method'], data=event['data'])

return tuple()
async def get_metadata_header_batch(self, heights: list[int]) -> list[MetadataHeader]:
return await asyncio.gather(*[self.get_metadata_header(h) for h in heights])

async def get_block_hash(self, height: int) -> str:
return await self._jsonrpc_request('chain_getBlockHash', [height])

async def get_full_block(self, hash: str) -> dict:
return await self._jsonrpc_request('chain_getBlock', [hash])

async def get_metadata_header_batch(self, heights: list[int]) -> list[MetadataHeader]:
return await asyncio.gather(*[self.get_metadata_header(h) for h in heights])
async def get_events(self, block_hash: str) -> tuple[SubstrateEventDataDict, ...]:
events = await self._interface.get_events(block_hash)

res_events: list[SubstrateEventDataDict] = []
for event in events:
event: dict = event.decode()
res_events.append({
'name': f'{event['module_id']}.{event['event_id']}',
'index': event['event_index'],
'extrinsicIndex': event['extrinsic_idx'],
'callAddress': None,
'args': None,
'decoded_args': event['attributes'],
})

return tuple(res_events)

async def find_metadata_versions(
self,
Expand Down
13 changes: 6 additions & 7 deletions src/dipdup/indexes/substrate_events/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ def __init__(
self,
name: str,
datasources: tuple[SubstrateNodeDatasource, ...],
runtime: SubstrateRuntime,
first_level: int,
last_level: int,
) -> None:
Expand All @@ -50,15 +49,15 @@ def __init__(
first_level=first_level,
last_level=last_level,
)
# FIXME: ensure decoder is set for correct runtime scope
self.decoder = runtime

async def fetch_by_level(self) -> AsyncIterator[tuple[int, tuple[SubstrateEventData, ...]]]:
# TODO: add event type from runtime to fetchevent
async for level, event in self.readahead_by_level(self.fetch_events()):
# TODO: convert block to event data
yield level, event

async def fetch_events(self) -> AsyncIterator[tuple[int, tuple[SubstrateEventData, ...]]]:
async def fetch_events(self) -> AsyncIterator[tuple[tuple[SubstrateEventData, ...]]]:
for level in range(self._first_level, self._last_level):
yield level, await self.get_random_node().get_events(level, self.decoder)
block_hash = await self.get_random_node().get_block_hash(level)
event_dicts = await self.get_random_node().get_events(block_hash)
block_header = await self.get_random_node().get_block_header(block_hash)
yield tuple(SubstrateEventData(**event_dict, header=block_header)
for event_dict in event_dicts)
1 change: 0 additions & 1 deletion src/dipdup/indexes/substrate_events/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def _create_node_fetcher(self, first_level: int, last_level: int) -> SubstrateNo
return SubstrateNodeEventFetcher(
name=self.name,
datasources=self.node_datasources,
runtime=self.runtime,
first_level=first_level,
last_level=last_level,
)
Expand Down
18 changes: 15 additions & 3 deletions src/dipdup/models/substrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,25 @@ class BlockHeader(TypedDict):
specVersion: str


@dataclass(frozen=True)
class SubstrateEventData(HasLevel):
class SubstrateEventDataDict(TypedDict):
name: str
index: int
extrinsicIndex: int
callAddress: list[str]
args: list[Any]


@dataclass(frozen=True)
class SubstrateEventData(HasLevel):
# TODO: there are more fields in event data: phase, topics
name: str
index: int
extrinsicIndex: int
callAddress: list[str] | None
# TODO: ensure logic is straightforward
# we receive decoded args from node datasource and encoded from subsquid datasource
args: list[Any] | None
decoded_args: dict[str, Any] | None
header: BlockHeader

@property
Expand All @@ -53,7 +65,7 @@ class SubstrateEvent(Generic[PayloadT]):
# TODO: could be used in other models with typed payload
@cached_property
def payload(self) -> PayloadT:
return cast(
return self.data.decoded_args or cast(
PayloadT,
self.runtime.decode_event_args(
name=self.name,
Expand Down

0 comments on commit 3fe0b21

Please sign in to comment.