Skip to content

Commit

Permalink
draft - update sync metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
AlonLStarkWare committed Feb 10, 2025
1 parent 7eca8fd commit 075dba1
Show file tree
Hide file tree
Showing 13 changed files with 104 additions and 42 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 0 additions & 24 deletions crates/papyrus_common/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,5 @@
use std::sync::OnceLock;

/// The central marker is the first block number that doesn't exist yet.
pub const PAPYRUS_CENTRAL_BLOCK_MARKER: &str = "papyrus_central_block_marker";

/// The header marker is the first block number for which the node does not have a header.
pub const PAPYRUS_HEADER_MARKER: &str = "papyrus_header_marker";

/// The body marker is the first block number for which the node does not have a body.
pub const PAPYRUS_BODY_MARKER: &str = "papyrus_body_marker";

/// The state marker is the first block number for which the node does not have a state body.
pub const PAPYRUS_STATE_MARKER: &str = "papyrus_state_marker";

/// The compiled class marker is the first block number for which the node does not have all of the
/// corresponding compiled classes.
pub const PAPYRUS_COMPILED_CLASS_MARKER: &str = "papyrus_compiled_class_marker";

/// The base layer marker is the first block number for which the node does not guarantee L1
/// finality.
pub const PAPYRUS_BASE_LAYER_MARKER: &str = "papyrus_base_layer_marker";

/// The latency, in seconds, between a block timestamp (as state in its header) and the time the
/// node stores the header.
pub const PAPYRUS_HEADER_LATENCY_SEC: &str = "papyrus_header_latency";

// TODO(Shahak): consider making this value non static and add a way to change this while the app is
// running. e.g via a monitoring endpoint.
/// Global variable set by the main config to enable collecting profiling metrics.
Expand Down
7 changes: 6 additions & 1 deletion crates/papyrus_node/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ use papyrus_storage::{open_storage, StorageReader, StorageWriter};
use papyrus_sync::sources::base_layer::EthereumBaseLayerSource;
use papyrus_sync::sources::central::{CentralError, CentralSource, CentralSourceConfig};
use papyrus_sync::sources::pending::PendingSource;
use papyrus_sync::{StateSync as CentralStateSync, SyncConfig as CentralSyncConfig};
use papyrus_sync::{
PapyrusSyncMetrics,
StateSync as CentralStateSync,
SyncConfig as CentralSyncConfig,
};
use starknet_api::block::{BlockHash, BlockHashAndNumber};
use starknet_api::felt;
use starknet_class_manager_types::{EmptyClassManagerClient, SharedClassManagerClient};
Expand Down Expand Up @@ -201,6 +205,7 @@ async fn run_sync(
storage_reader.clone(),
storage_writer,
class_manager_client,
Some(PapyrusSyncMetrics::default()),
);
Ok(sync.run().await?)
}
Expand Down
1 change: 1 addition & 0 deletions crates/papyrus_p2p_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ starknet_state_sync_types.workspace = true
starknet-types-core.workspace = true
thiserror.workspace = true
starknet_class_manager_types.workspace = true
starknet_sequencer_metrics.workspace = true
async-trait.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
Expand Down
11 changes: 8 additions & 3 deletions crates/papyrus_p2p_sync/src/client/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ use chrono::{TimeZone, Utc};
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt};
use metrics::gauge;
use papyrus_common::metrics as papyrus_metrics;
use papyrus_network::network_manager::ClientResponsesManager;
use papyrus_protobuf::sync::{DataOrFin, SignedBlockHeader};
use papyrus_storage::header::{HeaderStorageReader, HeaderStorageWriter};
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use starknet_api::block::{BlockHash, BlockHeader, BlockNumber, BlockSignature};
use starknet_api::hash::StarkHash;
use starknet_class_manager_types::SharedClassManagerClient;
use starknet_sequencer_metrics::metric_definitions::{
PAPYRUS_HEADER_LATENCY_SEC,
PAPYRUS_HEADER_MARKER,
};
use starknet_state_sync_types::state_sync_types::SyncBlock;
use tracing::debug;

Expand Down Expand Up @@ -47,7 +50,8 @@ impl BlockData for SignedBlockHeader {
.expect("Vec::first should return a value on a vector of size 1"),
)?
.commit()?;
gauge!(papyrus_metrics::PAPYRUS_HEADER_MARKER).set(
// TODO(alonl): fix this metric
gauge!(PAPYRUS_HEADER_MARKER.get_name()).set(
self.block_header.block_header_without_hash.block_number.unchecked_next().0 as f64,
);
// TODO(shahak): Fix code dup with central sync
Expand All @@ -62,7 +66,8 @@ impl BlockData for SignedBlockHeader {
let header_latency = time_delta.num_seconds();
debug!("Header latency: {}.", header_latency);
if header_latency >= 0 {
gauge!(papyrus_metrics::PAPYRUS_HEADER_LATENCY_SEC).set(header_latency as f64);
// TODO(alonl): fix this metric
gauge!(PAPYRUS_HEADER_LATENCY_SEC.get_name()).set(header_latency as f64);
}
Ok(())
}
Expand Down
5 changes: 3 additions & 2 deletions crates/papyrus_p2p_sync/src/client/state_diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::collections::HashSet;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt};
use metrics::gauge;
use papyrus_common::metrics as papyrus_metrics;
use papyrus_network::network_manager::ClientResponsesManager;
use papyrus_proc_macros::latency_histogram;
use papyrus_protobuf::sync::{DataOrFin, StateDiffChunk};
Expand All @@ -13,6 +12,7 @@ use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use starknet_api::block::BlockNumber;
use starknet_api::state::ThinStateDiff;
use starknet_class_manager_types::SharedClassManagerClient;
use starknet_sequencer_metrics::metric_definitions::PAPYRUS_STATE_MARKER;
use starknet_state_sync_types::state_sync_types::SyncBlock;

use super::block_data_stream_builder::BadPeerError;
Expand All @@ -34,7 +34,8 @@ impl BlockData for (ThinStateDiff, BlockNumber) {
) -> BoxFuture<'a, Result<(), P2pSyncClientError>> {
async move {
storage_writer.begin_rw_txn()?.append_state_diff(self.1, self.0)?.commit()?;
gauge!(papyrus_metrics::PAPYRUS_STATE_MARKER).set(self.1.unchecked_next().0 as f64);
// TODO(alonl): fix this metric
gauge!(PAPYRUS_STATE_MARKER.get_name()).set(self.1.unchecked_next().0 as f64);
Ok(())
}
.boxed()
Expand Down
1 change: 1 addition & 0 deletions crates/papyrus_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ starknet-types-core.workspace = true
starknet_api.workspace = true
starknet_class_manager_types.workspace = true
starknet_client.workspace = true
starknet_sequencer_metrics.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["full", "sync"] }
tracing.workspace = true
Expand Down
81 changes: 69 additions & 12 deletions crates/papyrus_sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use chrono::{TimeZone, Utc};
use futures::stream;
use futures_util::{pin_mut, select, Stream, StreamExt};
use indexmap::IndexMap;
use papyrus_common::metrics as papyrus_metrics;
use papyrus_common::pending_classes::PendingClasses;
use papyrus_config::converters::deserialize_seconds_to_duration;
use papyrus_config::dumping::{ser_param, SerializeConfig};
Expand All @@ -42,6 +41,16 @@ use starknet_api::deprecated_contract_class::ContractClass as DeprecatedContract
use starknet_api::state::{StateDiff, ThinStateDiff};
use starknet_class_manager_types::{ClassManagerClientError, SharedClassManagerClient};
use starknet_client::reader::PendingData;
use starknet_sequencer_metrics::metric_definitions::{
PAPYRUS_BASE_LAYER_MARKER,
PAPYRUS_BODY_MARKER,
PAPYRUS_CENTRAL_BLOCK_MARKER,
PAPYRUS_COMPILED_CLASS_MARKER,
PAPYRUS_HEADER_LATENCY_SEC,
PAPYRUS_HEADER_MARKER,
PAPYRUS_STATE_MARKER,
};
use starknet_sequencer_metrics::metrics::MetricGauge;
use tokio::sync::RwLock;
use tracing::{debug, error, info, instrument, trace, warn};

Expand Down Expand Up @@ -140,6 +149,30 @@ impl Default for SyncConfig {
}
}

pub struct PapyrusSyncMetrics {
pub header_marker: MetricGauge,
pub body_marker: MetricGauge,
pub state_marker: MetricGauge,
pub compiled_class_marker: MetricGauge,
pub base_layer_marker: MetricGauge,
pub central_block_marker: MetricGauge,
pub header_latency: MetricGauge,
}

impl Default for PapyrusSyncMetrics {
fn default() -> Self {
Self {
header_marker: PAPYRUS_HEADER_MARKER,
body_marker: PAPYRUS_BODY_MARKER,
state_marker: PAPYRUS_STATE_MARKER,
compiled_class_marker: PAPYRUS_COMPILED_CLASS_MARKER,
base_layer_marker: PAPYRUS_BASE_LAYER_MARKER,
central_block_marker: PAPYRUS_CENTRAL_BLOCK_MARKER,
header_latency: PAPYRUS_HEADER_LATENCY_SEC,
}
}
}

// Orchestrates specific network interfaces (e.g. central, p2p, l1) and writes to Storage and shared
// memory.
pub struct GenericStateSync<
Expand All @@ -158,6 +191,7 @@ pub struct GenericStateSync<
writer: StorageWriter,
sequencer_pub_key: Option<SequencerPublicKey>,
class_manager_client: Option<SharedClassManagerClient>,
metrics: Option<PapyrusSyncMetrics>,
}

pub type StateSyncResult = Result<(), StateSyncError>;
Expand Down Expand Up @@ -438,20 +472,26 @@ impl<
.append_block_signature(block_number, signature)?
.append_body(block_number, block.body)?
.commit()?;
metrics::gauge!(papyrus_metrics::PAPYRUS_HEADER_MARKER)
.set(block_number.unchecked_next().0 as f64);
metrics::gauge!(papyrus_metrics::PAPYRUS_BODY_MARKER)
.set(block_number.unchecked_next().0 as f64);
if let Some(metrics) = self.metrics.as_ref() {
metrics.header_marker.increment(1);
metrics.body_marker.increment(1);
}
let time_delta = Utc::now()
- Utc
.timestamp_opt(block.header.block_header_without_hash.timestamp.0 as i64, 0)
.single()
.expect("block timestamp should be valid");
let header_latency = time_delta.num_seconds();
debug!("Header latency: {}.", header_latency);
// TODO(alonl): replace this with the commented code once set is supported.
if header_latency >= 0 {
metrics::gauge!(papyrus_metrics::PAPYRUS_HEADER_LATENCY_SEC).set(header_latency as f64);
metrics::gauge!(PAPYRUS_HEADER_LATENCY_SEC.get_name()).set(header_latency as f64);
}
// if let Some(metrics) = self.metrics.as_ref() {
// if header_latency >= 0 {
// metrics.header_latency.set(header_latency as f64);
// }
// }
Ok(())
}

Expand Down Expand Up @@ -504,11 +544,17 @@ impl<
.update_class_manager_block_marker(&block_number)?
.commit()?;
}
metrics::gauge!(papyrus_metrics::PAPYRUS_STATE_MARKER)
// TODO(alonl): replace this with the commented code once set is supported.
metrics::gauge!(PAPYRUS_STATE_MARKER.get_name())
.set(block_number.unchecked_next().0 as f64);
let compiled_class_marker = self.reader.begin_ro_txn()?.get_compiled_class_marker()?;
metrics::gauge!(papyrus_metrics::PAPYRUS_COMPILED_CLASS_MARKER)
metrics::gauge!(PAPYRUS_COMPILED_CLASS_MARKER.get_name())
.set(compiled_class_marker.0 as f64);
// if let Some(metrics) = self.metrics.as_ref() {
// let compiled_class_marker = self.reader.begin_ro_txn()?.get_compiled_class_marker()?;
// metrics.state_marker.increment(1);
// metrics.compiled_class_marker.set(compiled_class_marker.0 as f64);
// }

// Info the user on syncing the block once all the data is stored.
info!("Added block {} with hash {:#064x}.", block_number, block_hash.0);
Expand All @@ -532,8 +578,12 @@ impl<
txn.commit()?;
let compiled_class_marker =
self.reader.begin_ro_txn()?.get_compiled_class_marker()?;
metrics::gauge!(papyrus_metrics::PAPYRUS_COMPILED_CLASS_MARKER)
// TODO(alonl): replace this with the commented code once set is supported.
metrics::gauge!(PAPYRUS_COMPILED_CLASS_MARKER.get_name())
.set(compiled_class_marker.0 as f64);
// if let Some(metrics) = self.metrics.as_ref() {
// metrics.compiled_class_marker.set(compiled_class_marker.0 as f64);
// }
debug!("Added compiled class.");
Ok(())
}
Expand Down Expand Up @@ -575,8 +625,9 @@ impl<
if txn.get_base_layer_block_marker()? != block_number.unchecked_next() {
info!("Verified block {block_number} hash against base layer.");
txn.update_base_layer_block_marker(&block_number.unchecked_next())?.commit()?;
metrics::gauge!(papyrus_metrics::PAPYRUS_BASE_LAYER_MARKER)
.set(block_number.unchecked_next().0 as f64);
if let Some(metrics) = self.metrics.as_ref() {
metrics.base_layer_marker.increment(1);
}
}
Ok(())
}
Expand Down Expand Up @@ -710,7 +761,11 @@ fn stream_new_blocks<
let central_block_marker = latest_central_block.map_or(
BlockNumber::default(), |block_hash_and_number| block_hash_and_number.number.unchecked_next()
);
metrics::gauge!(papyrus_metrics::PAPYRUS_CENTRAL_BLOCK_MARKER).set(central_block_marker.0 as f64);
// TODO(alonl): replace this with the commented code once set is supported.
metrics::gauge!(PAPYRUS_CENTRAL_BLOCK_MARKER.get_name()).set(central_block_marker.0 as f64);
// if let Some(metrics) = self.metrics.as_ref() {
// metrics.central_block_marker.set(central_block_marker.0 as f64);
// }
if header_marker == central_block_marker {
// Only if the node have the last block and state (without casms), sync pending data.
if collect_pending_data && reader.begin_ro_txn()?.get_state_marker()? == header_marker{
Expand Down Expand Up @@ -812,6 +867,7 @@ impl StateSync {
reader: StorageReader,
writer: StorageWriter,
class_manager_client: Option<SharedClassManagerClient>,
metrics: Option<PapyrusSyncMetrics>,
) -> Self {
let base_layer_source = base_layer_source.map(Arc::new);
Self {
Expand All @@ -826,6 +882,7 @@ impl StateSync {
writer,
sequencer_pub_key: None,
class_manager_client,
metrics,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/papyrus_sync/src/sources/central_sync_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ async fn run_sync(
writer,
sequencer_pub_key: None,
class_manager_client,
metrics: None,
};

state_sync.run().await?;
Expand Down
1 change: 1 addition & 0 deletions crates/papyrus_sync/src/sync_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ fn store_base_layer_block_test() {
writer,
sequencer_pub_key: None,
class_manager_client: None,
metrics: None,
};

// Trying to store a block without a header in the storage.
Expand Down
9 changes: 9 additions & 0 deletions crates/starknet_sequencer_metrics/src/metric_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ define_gauge_metrics!(
MetricScope::Batcher => {
{ STORAGE_HEIGHT, "batcher_storage_height", "The height of the batcher's storage" }
},
MetricScope::PapyrusSync => {
{ PAPYRUS_HEADER_MARKER, "papyrus_header_marker", "The first block number for which the node does not have a header" },
{ PAPYRUS_BODY_MARKER, "papyrus_body_marker", "The first block number for which the node does not have a body" },
{ PAPYRUS_STATE_MARKER, "papyrus_state_marker", "The first block number for which the node does not have a state body" },
{ PAPYRUS_COMPILED_CLASS_MARKER, "papyrus_compiled_class_marker", "The first block number for which the node does not have all of the corresponding compiled classes" },
{ PAPYRUS_BASE_LAYER_MARKER, "papyrus_base_layer_marker", "The first block number for which the node does not guarantee L1 finality" },
{ PAPYRUS_CENTRAL_BLOCK_MARKER, "papyrus_central_block_marker", "The first block number that doesn't exist yet" },
{ PAPYRUS_HEADER_LATENCY_SEC, "papyrus_header_latency", "The latency, in seconds, between a block timestamp (as state in its header) and the time the node stores the header" },
}
);

define_counter_metrics!(
Expand Down
1 change: 1 addition & 0 deletions crates/starknet_sequencer_metrics/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use regex::{escape, Regex};
pub enum MetricScope {
Batcher,
HttpServer,
PapyrusSync,
}

pub struct MetricCounter {
Expand Down
2 changes: 2 additions & 0 deletions crates/starknet_state_sync/src/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use papyrus_storage::{open_storage, StorageReader, StorageWriter};
use papyrus_sync::sources::central::{CentralError, CentralSource};
use papyrus_sync::sources::pending::PendingSource;
use papyrus_sync::{
PapyrusSyncMetrics,
StateSync as CentralStateSync,
StateSyncError as CentralStateSyncError,
GENESIS_HASH,
Expand Down Expand Up @@ -245,6 +246,7 @@ impl StateSyncRunner {
storage_reader,
storage_writer,
Some(class_manager_client),
Some(PapyrusSyncMetrics::default()),
)
}
}
Expand Down

0 comments on commit 075dba1

Please sign in to comment.