Skip to content

Commit

Permalink
EvmNodeDatasource._synchronize() optimisations (#938)
Browse files Browse the repository at this point in the history
  • Loading branch information
igorsereda authored Feb 5, 2024
1 parent f66abc9 commit 606af08
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 15 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@ The format is based on [Keep a Changelog], and this project adheres to [Semantic

## [Unreleased]

### Performance

- evm.subsquid.events: EvmNodeDatasource._synchronize() optimisations

### Fixed

- ci: Fixed falling `test_metadata_networks()`
- codegen: Added correct support of dots in typenames.
- ci: Fixed falling `test_metadata_networks()`.

## [7.3.1] - 2024-01-29

Expand Down
9 changes: 5 additions & 4 deletions src/dipdup/codegen/evm_subsquid.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def jsonschema_from_abi(abi: dict[str, Any]) -> dict[str, Any]:
def convert_abi(package: DipDupPackage, events: set[str], functions: set[str]) -> None:
for abi_path in package.abi.glob('**/abi.json'):
abi = orjson.loads(abi_path.read_bytes())
abi_dirname = abi_path.relative_to(package.abi).parent
event_extras: defaultdict[str, EventAbiExtra] = defaultdict(EventAbiExtra) # type: ignore[arg-type]

for abi_item in abi:
Expand All @@ -66,7 +67,7 @@ def convert_abi(package: DipDupPackage, events: set[str], functions: set[str]) -
if name not in functions:
continue
schema = jsonschema_from_abi(abi_item)
schema_path = package.schemas / abi_path.parent.stem / 'evm_functions' / f'{abi_item["name"]}.json'
schema_path = package.schemas / abi_dirname / 'evm_functions' / f'{abi_item["name"]}.json'
elif abi_item['type'] == 'event':
name = abi_item['name']
if name in event_extras:
Expand All @@ -81,15 +82,15 @@ def convert_abi(package: DipDupPackage, events: set[str], functions: set[str]) -
continue

schema = jsonschema_from_abi(abi_item)
schema_path = package.schemas / abi_path.parent.stem / 'evm_events' / f'{abi_item["name"]}.json'
schema_path = package.schemas / abi_dirname / 'evm_events' / f'{abi_item["name"]}.json'
else:
continue

touch(schema_path)
schema_path.write_bytes(json_dumps(schema))

if event_extras:
event_extras_path = package.abi / abi_path.parent.stem / 'events.json'
event_extras_path = package.abi / abi_dirname / 'events.json'
touch(event_extras_path)
event_extras_path.write_bytes(json_dumps(event_extras))

Expand Down Expand Up @@ -139,7 +140,7 @@ async def _fetch_abi(self, index_config: SubsquidEventsIndexConfig) -> None:
datasource_configs = self._config.abi_datasources

for handler_config in index_config.handlers:
abi_path = self._package.abi / handler_config.contract.module_name / 'abi.json'
abi_path = self._package.abi / handler_config.contract.module_path / 'abi.json'
if abi_path.exists():
continue

Expand Down
6 changes: 5 additions & 1 deletion src/dipdup/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
from dipdup.utils import pascal_to_snake
from dipdup.yaml import DipDupYAMLConfig

from pathlib import Path
if TYPE_CHECKING:
from collections.abc import Iterator
from pathlib import Path

from dipdup.subscriptions import Subscription

Expand Down Expand Up @@ -242,6 +242,10 @@ class ContractConfig(ABC, NameMixin):
def module_name(self) -> str:
return self.typename or self.name

@property
def module_path(self) -> Path:
return Path(*self.module_name.split('.'))


class DatasourceConfig(ABC, NameMixin):
kind: str
Expand Down
4 changes: 2 additions & 2 deletions src/dipdup/datasources/evm_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ def call_on_disconnected(self, fn: EmptyCallback) -> None:
async def get_block_by_hash(self, block_hash: str) -> dict[str, Any]:
return await self._jsonrpc_request('eth_getBlockByHash', [block_hash, True]) # type: ignore[no-any-return]

async def get_block_by_level(self, block_number: int) -> dict[str, Any]:
return await self._jsonrpc_request('eth_getBlockByNumber', [hex(block_number), True]) # type: ignore[no-any-return]
async def get_block_by_level(self, block_number: int, full_transactions_data: bool = True) -> dict[str, Any]:
return await self._jsonrpc_request('eth_getBlockByNumber', [hex(block_number), full_transactions_data]) # type: ignore[no-any-return]

async def get_logs(self, params: dict[str, Any]) -> list[dict[str, Any]]:
return await self._jsonrpc_request('eth_getLogs', [params]) # type: ignore[no-any-return]
Expand Down
21 changes: 14 additions & 7 deletions src/dipdup/indexes/evm_subsquid_events/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def topics(self) -> dict[str, dict[str, str]]:
return self._topics

async def _process_queue(self) -> None:
logs_by_level = defaultdict(list)
logs_by_level: dict[int, list[EvmNodeLogData]] = defaultdict(list)

# NOTE: Drain queue and group messages by level.
while True:
Expand Down Expand Up @@ -152,14 +152,20 @@ async def _synchronize(self, sync_level: int) -> None:
typename = handler.contract.module_name
topics.add(self.topics[typename][handler.name])

# NOTE: Requesting logs by batches of NODE_BATCH_SIZE.
# NOTE: Requesting logs by batches of datasource_config.http.batch_size or NODE_BATCH_SIZE.
evm_node: EvmNodeDatasource = self.random_node
if evm_node._http_config:
batch_size = evm_node._http_config.batch_size
else:
batch_size = NODE_BATCH_SIZE

batch_first_level = first_level
while batch_first_level <= sync_level:
# NOTE: We need block timestamps for each level, so fetch them separately and match with logs.
timestamps: dict[int, int] = {}
tasks: deque[asyncio.Task[Any]] = deque()

batch_last_level = min(batch_first_level + NODE_BATCH_SIZE, sync_level)
batch_last_level = min(batch_first_level + batch_size, sync_level)
level_logs_task = asyncio.create_task(
self.random_node.get_logs(
{
Expand All @@ -168,13 +174,14 @@ async def _synchronize(self, sync_level: int) -> None:
}
)
)
tasks.append(level_logs_task)

async def _fetch_timestamp(level: int, timestamps: dict[int, int]) -> None:
block = await self.random_node.get_block_by_level(level)
block = await self.random_node.get_block_by_level(level, full_transactions_data=False)
timestamps[level] = int(block['timestamp'], 16)

for level in range(batch_first_level, batch_last_level + 1):
level = batch_last_level
level_logs = await level_logs_task
for level in [int(log['blockNumber'], 16) for log in level_logs]:
tasks.append(
asyncio.create_task(
_fetch_timestamp(level, timestamps),
Expand All @@ -184,7 +191,6 @@ async def _fetch_timestamp(level: int, timestamps: dict[int, int]) -> None:

await asyncio.gather(*tasks)

level_logs = await level_logs_task
parsed_level_logs = tuple(
EvmNodeLogData.from_json(
log,
Expand All @@ -197,6 +203,7 @@ async def _fetch_timestamp(level: int, timestamps: dict[int, int]) -> None:
Metrics.set_sqd_processor_last_block(level)

batch_first_level = batch_last_level + 1
await asyncio.sleep(evm_node._http_config.ratelimit_sleep)
else:
sync_level = min(sync_level, subsquid_sync_level)
fetcher = self._create_fetcher(first_level, sync_level)
Expand Down

0 comments on commit 606af08

Please sign in to comment.