Skip to content

Commit

Permalink
typehints
Browse files Browse the repository at this point in the history
  • Loading branch information
droserasprout committed Dec 23, 2024
1 parent 1039ca3 commit cf48863
Showing 1 changed file with 20 additions and 18 deletions.
38 changes: 20 additions & 18 deletions src/dipdup/indexes/substrate_events/fetcher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
from collections.abc import AsyncIterator
from collections.abc import Iterable
from typing import Any

from dipdup.datasources.substrate_node import SubstrateNodeDatasource
from dipdup.datasources.substrate_subsquid import SubstrateSubsquidDatasource
Expand Down Expand Up @@ -62,7 +64,7 @@ async def fetch_events(self) -> AsyncIterator[tuple[SubstrateEventData, ...]]:
batch_size = node._http_config.batch_size
queue_limit = 50

queues = { # type: ignore[var-annotated]
queues: dict[str, asyncio.Queue[Any]] = {
'levels': asyncio.Queue(),
'hashes': asyncio.Queue(),
'headers': asyncio.Queue(),
Expand All @@ -72,8 +74,8 @@ async def fetch_events(self) -> AsyncIterator[tuple[SubstrateEventData, ...]]:
for level in range(self._first_level, self._last_level + 1):
await queues['levels'].put(level)

async def _hashes_loop(): # type: ignore[no-untyped-def]
async def _batch(levels): # type: ignore[no-untyped-def]
async def _hashes_loop() -> None:
async def _batch(levels: Iterable[int]) -> None:
block_hashes = await asyncio.gather(
*(node.get_block_hash(level) for level in levels),
)
Expand All @@ -86,14 +88,14 @@ async def _batch(levels): # type: ignore[no-untyped-def]
while queues['levels'].qsize() > 0:
batch.append(await queues['levels'].get())
if len(batch) >= batch_size:
await _batch(batch) # type: ignore[no-untyped-call]
await _batch(batch)
batch = []

if batch:
await _batch(batch) # type: ignore[no-untyped-call]
await _batch(batch)

async def _headers_loop(): # type: ignore[no-untyped-def]
async def _batch(hashes): # type: ignore[no-untyped-def]
async def _headers_loop() -> None:
async def _batch(hashes: Iterable[str]) -> None:
block_headers = await asyncio.gather(
*(node.get_block_header(hash_) for hash_ in hashes),
)
Expand All @@ -107,17 +109,17 @@ async def _batch(hashes): # type: ignore[no-untyped-def]
block_hash = await queues['hashes'].get()
batch.append(block_hash)
if len(batch) >= batch_size:
await _batch(batch) # type: ignore[no-untyped-call]
await _batch(batch)
batch = []

if queues['levels'].qsize() == 0 and queues['hashes'].qsize() == 0:
break

if batch:
await _batch(batch) # type: ignore[no-untyped-call]
await _batch(batch)

async def _events_loop(): # type: ignore[no-untyped-def]
async def _batch(headers): # type: ignore[no-untyped-def]
async def _events_loop() -> None:
async def _batch(headers: Iterable[dict[str, Any]]) -> None:
block_events = await asyncio.gather(
*(node.get_events(header['hash']) for header in headers),
)
Expand All @@ -129,16 +131,16 @@ async def _batch(headers): # type: ignore[no-untyped-def]
block_header = await queues['headers'].get()
batch.append(block_header)
if len(batch) >= batch_size:
await _batch(batch) # type: ignore[no-untyped-call]
await _batch(batch)
batch = []

if queues['levels'].qsize() == 0 and queues['hashes'].qsize() == 0 and queues['headers'].qsize() == 0:
break

if batch:
await _batch(batch) # type: ignore[no-untyped-call]
await _batch(batch)

async def _log_loop(): # type: ignore[no-untyped-def]
async def _log_loop() -> None:
while True:
await asyncio.sleep(1)
self._logger.debug(
Expand All @@ -150,10 +152,10 @@ async def _log_loop(): # type: ignore[no-untyped-def]
)

tasks = (
asyncio.create_task(_hashes_loop()), # type: ignore[no-untyped-call]
asyncio.create_task(_headers_loop()), # type: ignore[no-untyped-call]
asyncio.create_task(_events_loop()), # type: ignore[no-untyped-call]
asyncio.create_task(_log_loop()), # type: ignore[no-untyped-call]
asyncio.create_task(_hashes_loop()),
asyncio.create_task(_headers_loop()),
asyncio.create_task(_events_loop()),
asyncio.create_task(_log_loop()),
)

while True:
Expand Down

0 comments on commit cf48863

Please sign in to comment.