Skip to content

Commit

Permalink
refactoring fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Wizard1209 committed Dec 11, 2024
1 parent b7b31e4 commit 4d50caa
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 17 deletions.
1 change: 0 additions & 1 deletion src/dipdup/config/starknet_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,4 @@ class StarknetEventsIndexConfig(StarknetIndexConfig):
handlers: tuple[StarknetEventsHandlerConfig, ...]

def get_subscriptions(self) -> set[Subscription]:
# TODO: return custom subscription class
return {StarknetSubscription()}
6 changes: 6 additions & 0 deletions src/dipdup/datasources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ def _get_ws_client(self) -> WebsocketTransport:

# FIXME: Not necessary a WS datasource
class JsonRpcDatasource(WebsocketDatasource[DatasourceConfigT]):
NODE_LAST_MILE = 0

def __init__(self, config: DatasourceConfigT) -> None:
super().__init__(config)
self._requests: dict[str, tuple[asyncio.Event, Any]] = {}
Expand Down Expand Up @@ -219,6 +221,10 @@ async def _request() -> None:
raise DatasourceError(data['error']['message'], self.name)
return data['result']

# TODO: probably should be defined higher
@abstractmethod
async def get_head_level() -> int: ...


def create_datasource(config: DatasourceConfig) -> Datasource[Any]:
from dipdup.config.abi_etherscan import AbiEtherscanDatasourceConfig
Expand Down
3 changes: 2 additions & 1 deletion src/dipdup/datasources/evm_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@


NODE_LEVEL_TIMEOUT = 0.1
NODE_LAST_MILE = 128


HeadCallback = Callable[['EvmNodeDatasource', EvmNodeHeadData], Awaitable[None]]
Expand Down Expand Up @@ -68,6 +67,8 @@ async def wait_level(self) -> None:


class EvmNodeDatasource(JsonRpcDatasource[EvmNodeDatasourceConfig]):
NODE_LAST_MILE = 128

_default_http_config = HttpConfig(
batch_size=10,
ratelimit_sleep=1,
Expand Down
15 changes: 5 additions & 10 deletions src/dipdup/datasources/substrate_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from dipdup.exceptions import DatasourceError
from dipdup.exceptions import FrameworkException
from dipdup.models.substrate import BlockHeader
from dipdup.models.substrate import HeadBlock
from dipdup.models.substrate import SubstrateEventData
from dipdup.models.substrate import SubstrateEventDataDict
from dipdup.models.substrate_node import SubstrateNodeHeadSubscription
Expand All @@ -30,15 +31,14 @@
_logger = logging.getLogger(__name__)


# TODO: think about return type of callback, for now i'm not sure where data should be ingested after the head update
HeadCallback = Callable[['SubstrateNodeDatasource', dict], Awaitable[None]]
HeadCallback = Callable[['SubstrateNodeDatasource', HeadBlock], Awaitable[None]]
EventCallback = Callable[['SubstrateNodeDatasource', tuple[SubstrateEventData, ...]], Awaitable[None]]


# NOTE: Renamed entity class LevelData from evm_node
@dataclass
class SubscriptionMessage:
head: dict[str, Any]
head: HeadBlock
fetch_events: bool = False


Expand Down Expand Up @@ -154,7 +154,6 @@ async def subscribe(self) -> None:
if not self.realtime:
return

# TODO: Ensure substrate subscriptions list made correctly
missing_subscriptions = self._subscriptions.missing_subscriptions
if not missing_subscriptions:
return
Expand All @@ -164,8 +163,7 @@ async def subscribe(self) -> None:
if isinstance(subscription, SubstrateNodeSubscription):
await self._subscribe(subscription)

# TODO: fix typing
async def emit_head(self, head: dict) -> None:
async def emit_head(self, head: HeadBlock) -> None:
for fn in self._on_head_callbacks:
await fn(self, head)

Expand All @@ -180,9 +178,6 @@ def call_on_events(self, fn: EventCallback) -> None:
self._on_event_callbacks.add(fn)

async def _on_message(self, message: Message) -> None:
# TODO: since only head subscription available we need to load target objects(i.e. events) separately
# to schedule those requests we need to get information about index type from subscription

if not isinstance(message, WebsocketMessage):
raise FrameworkException(f'Unknown message type: {type(message)}')

Expand Down Expand Up @@ -327,7 +322,7 @@ async def _emitter_loop(self) -> None:

level = int(level_data.head['number'], 16)
self._logger.info('New head: %s', level)
await self.emit_head(level_data)
await self.emit_head(level_data.head)

# NOTE: subscribing to finalized head, no rollback required

Expand Down
3 changes: 2 additions & 1 deletion src/dipdup/dipdup.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
from dipdup.models.evm import EvmTransactionData
from dipdup.models.evm_node import EvmNodeHeadData
from dipdup.models.evm_node import EvmNodeSyncingData
from dipdup.models.substrate import HeadBlock
from dipdup.models.tezos import TezosBigMapData
from dipdup.models.tezos import TezosEventData
from dipdup.models.tezos import TezosHeadBlockData
Expand Down Expand Up @@ -552,7 +553,7 @@ async def _on_tzkt_events(self, datasource: TezosTzktDatasource, events: tuple[T
index.push_realtime_message(events)

# TODO: fix data typing
async def _on_substrate_head(self, datasource: SubstrateNodeDatasource, head: dict) -> None:
async def _on_substrate_head(self, datasource: SubstrateNodeDatasource, head: HeadBlock) -> None:
# TODO: any head updates here?
pass

Expand Down
7 changes: 3 additions & 4 deletions src/dipdup/indexes/_subsquid.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@

if TYPE_CHECKING:
from dipdup.context import DipDupContext
from dipdup.datasources.evm_node import NODE_LAST_MILE
from dipdup.datasources.evm_node import EvmNodeDatasource
from dipdup.datasources import JsonRpcDatasource
from dipdup.index import Index
from dipdup.index import IndexQueueItemT
from dipdup.performance import metrics
Expand Down Expand Up @@ -40,7 +39,7 @@ async def _get_node_sync_level(
self,
subsquid_level: int,
index_level: int,
node: EvmNodeDatasource | None = None,
node: JsonRpcDatasource | None = None,
) -> int | None:
if not self.node_datasources:
return None
Expand All @@ -50,7 +49,7 @@ async def _get_node_sync_level(
subsquid_lag = abs(node_sync_level - subsquid_level)
subsquid_available = subsquid_level - index_level
self._logger.info('Subsquid is %s levels behind; %s available', subsquid_lag, subsquid_available)
if subsquid_available < NODE_LAST_MILE:
if subsquid_available < node.NODE_LAST_MILE:
return node_sync_level
return None

Expand Down
8 changes: 8 additions & 0 deletions src/dipdup/models/substrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ def level(self) -> int: # type: ignore[override]
return self.header['number']


class HeadBlock(TypedDict):
parentHash: str
number: str
stateRoot: str
extrinsicsRoot: str
digest: dict[str, Any]


PayloadT = TypeVar('PayloadT')


Expand Down

0 comments on commit 4d50caa

Please sign in to comment.