Skip to content

Commit

Permalink
EvmNodeDatasource._synchronize() optimisations
Browse files Browse the repository at this point in the history
 * Significantly reduced number of requests
 * Unused block details excluded from response
  • Loading branch information
igorsereda committed Feb 4, 2024
1 parent f66abc9 commit 19753cd
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 9 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ The format is based on [Keep a Changelog], and this project adheres to [Semantic

## [Unreleased]

### Performance

- evm.subsquid.events: EvmNodeDatasource._synchronize() optimisations

### Fixed

- ci: Fixed falling `test_metadata_networks()`
Expand Down
4 changes: 2 additions & 2 deletions src/dipdup/datasources/evm_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ def call_on_disconnected(self, fn: EmptyCallback) -> None:
async def get_block_by_hash(self, block_hash: str) -> dict[str, Any]:
return await self._jsonrpc_request('eth_getBlockByHash', [block_hash, True]) # type: ignore[no-any-return]

async def get_block_by_level(self, block_number: int) -> dict[str, Any]:
return await self._jsonrpc_request('eth_getBlockByNumber', [hex(block_number), True]) # type: ignore[no-any-return]
async def get_block_by_level(self, block_number: int, full_transactions_data: bool = True) -> dict[str, Any]:
return await self._jsonrpc_request('eth_getBlockByNumber', [hex(block_number), full_transactions_data]) # type: ignore[no-any-return]

async def get_logs(self, params: dict[str, Any]) -> list[dict[str, Any]]:
return await self._jsonrpc_request('eth_getLogs', [params]) # type: ignore[no-any-return]
Expand Down
21 changes: 14 additions & 7 deletions src/dipdup/indexes/evm_subsquid_events/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def topics(self) -> dict[str, dict[str, str]]:
return self._topics

async def _process_queue(self) -> None:
logs_by_level = defaultdict(list)
logs_by_level: dict[int, list[EvmNodeLogData]] = defaultdict(list)

# NOTE: Drain queue and group messages by level.
while True:
Expand Down Expand Up @@ -152,14 +152,20 @@ async def _synchronize(self, sync_level: int) -> None:
typename = handler.contract.module_name
topics.add(self.topics[typename][handler.name])

# NOTE: Requesting logs by batches of NODE_BATCH_SIZE.
# NOTE: Requesting logs by batches of datasource_config.http.batch_size or NODE_BATCH_SIZE.
evm_node: EvmNodeDatasource = self.random_node
if evm_node._http_config:
batch_size = evm_node._http_config.batch_size
else:
batch_size = NODE_BATCH_SIZE

batch_first_level = first_level
while batch_first_level <= sync_level:
# NOTE: We need block timestamps for each level, so fetch them separately and match with logs.
timestamps: dict[int, int] = {}
tasks: deque[asyncio.Task[Any]] = deque()

batch_last_level = min(batch_first_level + NODE_BATCH_SIZE, sync_level)
batch_last_level = min(batch_first_level + batch_size, sync_level)
level_logs_task = asyncio.create_task(
self.random_node.get_logs(
{
Expand All @@ -168,13 +174,14 @@ async def _synchronize(self, sync_level: int) -> None:
}
)
)
tasks.append(level_logs_task)

async def _fetch_timestamp(level: int, timestamps: dict[int, int]) -> None:
block = await self.random_node.get_block_by_level(level)
block = await self.random_node.get_block_by_level(level, full_transactions_data=False)
timestamps[level] = int(block['timestamp'], 16)

for level in range(batch_first_level, batch_last_level + 1):
level = batch_last_level
level_logs = await level_logs_task
for level in [int(log['blockNumber'], 16) for log in level_logs]:
tasks.append(
asyncio.create_task(
_fetch_timestamp(level, timestamps),
Expand All @@ -184,7 +191,6 @@ async def _fetch_timestamp(level: int, timestamps: dict[int, int]) -> None:

await asyncio.gather(*tasks)

level_logs = await level_logs_task
parsed_level_logs = tuple(
EvmNodeLogData.from_json(
log,
Expand All @@ -197,6 +203,7 @@ async def _fetch_timestamp(level: int, timestamps: dict[int, int]) -> None:
Metrics.set_sqd_processor_last_block(level)

batch_first_level = batch_last_level + 1
await asyncio.sleep(evm_node._http_config.ratelimit_sleep)
else:
sync_level = min(sync_level, subsquid_sync_level)
fetcher = self._create_fetcher(first_level, sync_level)
Expand Down

0 comments on commit 19753cd

Please sign in to comment.