From a0a9d69070961dce931f73ec6069bb825ac28b58 Mon Sep 17 00:00:00 2001 From: Igor Sereda Date: Sat, 3 Feb 2024 02:12:21 +0300 Subject: [PATCH] EvmNodeDatasource._synchronize() optimisations * Significantly reduced number of requests * Unused block details excluded from response * Metadata networks unit test fixed --- src/dipdup/datasources/evm_node.py | 4 ++-- .../indexes/evm_subsquid_events/index.py | 21 ++++++++++++------- src/dipdup/models/tzip_metadata.py | 1 + 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/dipdup/datasources/evm_node.py b/src/dipdup/datasources/evm_node.py index ee7f71a2e..6fc0cb112 100644 --- a/src/dipdup/datasources/evm_node.py +++ b/src/dipdup/datasources/evm_node.py @@ -213,8 +213,8 @@ def call_on_disconnected(self, fn: EmptyCallback) -> None: async def get_block_by_hash(self, block_hash: str) -> dict[str, Any]: return await self._jsonrpc_request('eth_getBlockByHash', [block_hash, True]) # type: ignore[no-any-return] - async def get_block_by_level(self, block_number: int) -> dict[str, Any]: - return await self._jsonrpc_request('eth_getBlockByNumber', [hex(block_number), True]) # type: ignore[no-any-return] + async def get_block_by_level(self, block_number: int, full_transactions_data: bool = True) -> dict[str, Any]: + return await self._jsonrpc_request('eth_getBlockByNumber', [hex(block_number), full_transactions_data]) # type: ignore[no-any-return] async def get_logs(self, params: dict[str, Any]) -> list[dict[str, Any]]: return await self._jsonrpc_request('eth_getLogs', [params]) # type: ignore[no-any-return] diff --git a/src/dipdup/indexes/evm_subsquid_events/index.py b/src/dipdup/indexes/evm_subsquid_events/index.py index 2cffd1025..f9c5c74c2 100644 --- a/src/dipdup/indexes/evm_subsquid_events/index.py +++ b/src/dipdup/indexes/evm_subsquid_events/index.py @@ -76,7 +76,7 @@ def topics(self) -> dict[str, dict[str, str]]: return self._topics async def _process_queue(self) -> None: - logs_by_level = defaultdict(list) + logs_by_level: dict[int, list[EvmNodeLogData]] = defaultdict(list) # NOTE: Drain queue and group messages by level. while True: @@ -152,14 +152,20 @@ async def _synchronize(self, sync_level: int) -> None: typename = handler.contract.module_name topics.add(self.topics[typename][handler.name]) - # NOTE: Requesting logs by batches of NODE_BATCH_SIZE. + # NOTE: Requesting logs by batches of datasource_config.http.batch_size or NODE_BATCH_SIZE. + evm_node: EvmNodeDatasource = self.random_node + if evm_node._http_config: + batch_size = evm_node._http_config.batch_size + else: + batch_size = NODE_BATCH_SIZE + batch_first_level = first_level while batch_first_level <= sync_level: # NOTE: We need block timestamps for each level, so fetch them separately and match with logs. timestamps: dict[int, int] = {} tasks: deque[asyncio.Task[Any]] = deque() - batch_last_level = min(batch_first_level + NODE_BATCH_SIZE, sync_level) + batch_last_level = min(batch_first_level + batch_size, sync_level) level_logs_task = asyncio.create_task( self.random_node.get_logs( { @@ -168,13 +174,14 @@ async def _synchronize(self, sync_level: int) -> None: } ) ) - tasks.append(level_logs_task) async def _fetch_timestamp(level: int, timestamps: dict[int, int]) -> None: - block = await self.random_node.get_block_by_level(level) + block = await self.random_node.get_block_by_level(level, full_transactions_data=False) timestamps[level] = int(block['timestamp'], 16) - for level in range(batch_first_level, batch_last_level + 1): + level = batch_last_level + level_logs = await level_logs_task + for level in [int(log['blockNumber'], 16) for log in level_logs]: tasks.append( asyncio.create_task( _fetch_timestamp(level, timestamps), @@ -184,7 +191,6 @@ async def _fetch_timestamp(level: int, timestamps: dict[int, int]) -> None: await asyncio.gather(*tasks) - level_logs = await level_logs_task parsed_level_logs = tuple( EvmNodeLogData.from_json( log, @@ -197,6 +203,7 @@ async def _fetch_timestamp(level: int, timestamps: dict[int, int]) -> None: Metrics.set_sqd_processor_last_block(level) batch_first_level = batch_last_level + 1 + await asyncio.sleep(evm_node._http_config.ratelimit_sleep) else: sync_level = min(sync_level, subsquid_sync_level) fetcher = self._create_fetcher(first_level, sync_level) diff --git a/src/dipdup/models/tzip_metadata.py b/src/dipdup/models/tzip_metadata.py index c3e91a38c..d2b001971 100644 --- a/src/dipdup/models/tzip_metadata.py +++ b/src/dipdup/models/tzip_metadata.py @@ -6,3 +6,4 @@ class TzipMetadataNetwork(Enum): ghostnet = 'ghostnet' mumbainet = 'mumbainet' nairobinet = 'nairobinet' + oxfordnet = 'oxfordnet'