From cf48863de4c53f2a68b4937f09bf5052dae4b998 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Mon, 23 Dec 2024 15:43:23 -0300 Subject: [PATCH] typehints --- .../indexes/substrate_events/fetcher.py | 38 ++++++++++--------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/src/dipdup/indexes/substrate_events/fetcher.py b/src/dipdup/indexes/substrate_events/fetcher.py index 83c70ddee..04f791746 100644 --- a/src/dipdup/indexes/substrate_events/fetcher.py +++ b/src/dipdup/indexes/substrate_events/fetcher.py @@ -1,5 +1,7 @@ import asyncio from collections.abc import AsyncIterator +from collections.abc import Iterable +from typing import Any from dipdup.datasources.substrate_node import SubstrateNodeDatasource from dipdup.datasources.substrate_subsquid import SubstrateSubsquidDatasource @@ -62,7 +64,7 @@ async def fetch_events(self) -> AsyncIterator[tuple[SubstrateEventData, ...]]: batch_size = node._http_config.batch_size queue_limit = 50 - queues = { # type: ignore[var-annotated] + queues: dict[str, asyncio.Queue[Any]] = { 'levels': asyncio.Queue(), 'hashes': asyncio.Queue(), 'headers': asyncio.Queue(), @@ -72,8 +74,8 @@ async def fetch_events(self) -> AsyncIterator[tuple[SubstrateEventData, ...]]: for level in range(self._first_level, self._last_level + 1): await queues['levels'].put(level) - async def _hashes_loop(): # type: ignore[no-untyped-def] - async def _batch(levels): # type: ignore[no-untyped-def] + async def _hashes_loop() -> None: + async def _batch(levels: Iterable[int]) -> None: block_hashes = await asyncio.gather( *(node.get_block_hash(level) for level in levels), ) @@ -86,14 +88,14 @@ async def _batch(levels): # type: ignore[no-untyped-def] while queues['levels'].qsize() > 0: batch.append(await queues['levels'].get()) if len(batch) >= batch_size: - await _batch(batch) # type: ignore[no-untyped-call] + await _batch(batch) batch = [] if batch: - await _batch(batch) # type: ignore[no-untyped-call] + await _batch(batch) - async def _headers_loop(): # type: ignore[no-untyped-def] - async def _batch(hashes): # type: ignore[no-untyped-def] + async def _headers_loop() -> None: + async def _batch(hashes: Iterable[str]) -> None: block_headers = await asyncio.gather( *(node.get_block_header(hash_) for hash_ in hashes), ) @@ -107,17 +109,17 @@ async def _batch(hashes): # type: ignore[no-untyped-def] block_hash = await queues['hashes'].get() batch.append(block_hash) if len(batch) >= batch_size: - await _batch(batch) # type: ignore[no-untyped-call] + await _batch(batch) batch = [] if queues['levels'].qsize() == 0 and queues['hashes'].qsize() == 0: break if batch: - await _batch(batch) # type: ignore[no-untyped-call] + await _batch(batch) - async def _events_loop(): # type: ignore[no-untyped-def] - async def _batch(headers): # type: ignore[no-untyped-def] + async def _events_loop() -> None: + async def _batch(headers: Iterable[dict[str, Any]]) -> None: block_events = await asyncio.gather( *(node.get_events(header['hash']) for header in headers), ) @@ -129,16 +131,16 @@ async def _batch(headers): # type: ignore[no-untyped-def] block_header = await queues['headers'].get() batch.append(block_header) if len(batch) >= batch_size: - await _batch(batch) # type: ignore[no-untyped-call] + await _batch(batch) batch = [] if queues['levels'].qsize() == 0 and queues['hashes'].qsize() == 0 and queues['headers'].qsize() == 0: break if batch: - await _batch(batch) # type: ignore[no-untyped-call] + await _batch(batch) - async def _log_loop(): # type: ignore[no-untyped-def] + async def _log_loop() -> None: while True: await asyncio.sleep(1) self._logger.debug( @@ -150,10 +152,10 @@ async def _log_loop(): # type: ignore[no-untyped-def] ) tasks = ( - asyncio.create_task(_hashes_loop()), # type: ignore[no-untyped-call] - asyncio.create_task(_headers_loop()), # type: ignore[no-untyped-call] - asyncio.create_task(_events_loop()), # type: ignore[no-untyped-call] - asyncio.create_task(_log_loop()), # type: ignore[no-untyped-call] + asyncio.create_task(_hashes_loop()), + asyncio.create_task(_headers_loop()), + asyncio.create_task(_events_loop()), + asyncio.create_task(_log_loop()), ) while True: