Skip to content

Commit

Permalink
Merge pull request #2513 from eqlabs/sistemd/running-event-filter-bui…
Browse files Browse the repository at this point in the history
…ld-time

feat(bloom): store local DB state on shutdown
  • Loading branch information
sistemd authored Jan 27, 2025
2 parents 6867aec + b50250b commit bf46c56
Show file tree
Hide file tree
Showing 11 changed files with 273 additions and 113 deletions.
29 changes: 27 additions & 2 deletions crates/pathfinder/src/bin/pathfinder/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,16 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst
Hint: This is usually caused by exceeding the file descriptor limit of your system.
Try increasing the file limit to using `ulimit` or similar tooling.",
)?;

let shutdown_storage = storage_manager
.create_pool(NonZeroU32::new(1).unwrap())
.context(
r"Creating database connection pool for graceful shutdown
Hint: This is usually caused by exceeding the file descriptor limit of your system.
Try increasing the file limit to using `ulimit` or similar tooling.",
)?;

info!(location=?pathfinder_context.database, "Database migrated.");
verify_database(
&sync_storage,
Expand Down Expand Up @@ -293,7 +303,7 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst

let sync_handle = if config.is_sync_enabled {
start_sync(
sync_storage.clone(),
sync_storage,
pathfinder_context,
ethereum.client,
sync_state.clone(),
Expand Down Expand Up @@ -364,11 +374,26 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst
}
}

let jh = tokio::task::spawn_blocking(|| -> anyhow::Result<Storage> {
shutdown_storage
.connection()
.context("Creating database connection for graceful shutdown")?
.transaction()
.context("Creating database transaction for graceful shutdown")?
.store_in_memory_state()
.context("Storing in-memory DB state on shutdown")?;

Ok(shutdown_storage)
});

// Wait for the shutdown storage task to finish.
let shutdown_storage = jh.await.context("Running shutdown storage task")??;

// If a RO db connection pool remains after all RW connection pools have been
// dropped, WAL & SHM files are never cleaned up. To avoid this, we make sure
// that all RO pools and all but one RW pools are dropped when task tracker
// finishes waiting, and then we drop the last RW pool.
main_result.map(|_| sync_storage)
main_result.map(|_| shutdown_storage)
}

#[cfg(feature = "tokio-console")]
Expand Down
4 changes: 2 additions & 2 deletions crates/pathfinder/src/state/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1081,8 +1081,8 @@ async fn l2_reorg(
}

transaction
.reset()
.context("Resetting local DB state after reorg")?;
.reset_in_memory_state(head)
.context("Resetting in-memory DB state after reorg")?;

// Track combined L1 and L2 state.
let l1_l2_head = transaction.l1_l2_pointer().context("Query L1-L2 head")?;
Expand Down
4 changes: 2 additions & 2 deletions crates/pathfinder/src/sync/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,8 +667,8 @@ async fn rollback_to_anchor(
}

transaction
.reset()
.context("Resetting local DB state after reorg")?;
.reset_in_memory_state(head)
.context("Resetting in-memory DB state after reorg")?;

transaction.commit().context("Committing transaction")?;

Expand Down
Binary file modified crates/rpc/fixtures/mainnet.sqlite
Binary file not shown.
2 changes: 1 addition & 1 deletion crates/rpc/src/method/trace_block_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,7 @@ pub(crate) mod tests {
// Need to avoid skipping blocks for `insert_transaction_data`
// so that there is no gap in event filters.
(0..619596)
.step_by(pathfinder_storage::AGGREGATE_BLOOM_BLOCK_RANGE_LEN as usize)
.step_by(usize::try_from(pathfinder_storage::AGGREGATE_BLOOM_BLOCK_RANGE_LEN).unwrap())
.for_each(|block: u64| {
let block = BlockNumber::new_or_panic(block.saturating_sub(1));
transaction
Expand Down
5 changes: 3 additions & 2 deletions crates/storage/src/bloom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,6 @@ impl AggregateBloomCache {
from_block: BlockNumber,
to_block: BlockNumber,
) -> Vec<Arc<AggregateBloom>> {
let mut cache = self.0.lock().unwrap();

let from_block = from_block.get();
let to_block = to_block.get();

Expand All @@ -386,6 +384,8 @@ impl AggregateBloomCache {
- (to_block % AGGREGATE_BLOOM_BLOCK_RANGE_LEN)
- 1;

let mut cache = self.0.lock().unwrap();

(from_block_aligned..=to_block_aligned)
.step_by(AGGREGATE_BLOOM_BLOCK_RANGE_LEN as usize)
.map(|from| {
Expand All @@ -408,6 +408,7 @@ impl AggregateBloomCache {
/// Store the given filters in the cache.
pub fn set_many(&self, filters: &[Arc<AggregateBloom>]) {
let mut cache = self.0.lock().unwrap();

filters.iter().for_each(|filter| {
let k = CacheKey {
from_block: filter.from_block,
Expand Down
15 changes: 10 additions & 5 deletions crates/storage/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,16 @@ impl Transaction<'_> {
matches!(self.trie_prune_mode, TriePruneMode::Prune { .. })
}

/// Resets the [`Storage`](crate::Storage) state. Required after each reorg.
pub fn reset(&self) -> anyhow::Result<()> {
self.rebuild_running_event_filter()?;
self.event_filter_cache.reset();
/// Store the in-memory [`Storage`](crate::Storage) state in the database.
/// To be performed on shutdown.
pub fn store_in_memory_state(self) -> anyhow::Result<()> {
self.store_running_event_filter()?.commit()
}

Ok(())
/// Resets the in-memory [`Storage`](crate::Storage) state. Required after
/// each reorg.
pub fn reset_in_memory_state(&self, head: BlockNumber) -> anyhow::Result<()> {
self.event_filter_cache.reset();
self.rebuild_running_event_filter(head)
}
}
Loading

0 comments on commit bf46c56

Please sign in to comment.