Skip to content

Commit

Permalink
feat(starknet_sequencer_infra): adding metrics counters to local servers
Browse files Browse the repository at this point in the history
commit-id:4738c786
  • Loading branch information
lev-starkware committed Feb 13, 2025
1 parent 6985e91 commit 28a4f5d
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 2 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions crates/starknet_infra_utils/src/type_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,24 @@ pub fn short_type_name<T: ?Sized>() -> String {
truncate_type(full_name)
}

/// Converts camel case string to a snake case string.
pub fn camel_to_snake_case(input: &str) -> String {
let mut snake_case = String::new();

for (i, c) in input.chars().enumerate() {
if c.is_uppercase() {
if i != 0 {
snake_case.push('_');
}
snake_case.push(c.to_ascii_lowercase());
} else {
snake_case.push(c);
}
}

snake_case
}

/// Truncates a fully qualified Rust type name by removing module paths, leaving only the type
/// name and its generic parameters.
///
Expand Down
3 changes: 3 additions & 0 deletions crates/starknet_sequencer_infra/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
starknet_api.workspace = true
starknet_infra_utils.workspace = true
starknet_sequencer_metrics.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tower = { workspace = true, features = ["limit"] }
Expand All @@ -30,6 +31,8 @@ validator.workspace = true

[dev-dependencies]
assert_matches.workspace = true
metrics.workspace = true
metrics-exporter-prometheus.workspace = true
once_cell.workspace = true
pretty_assertions.workspace = true
starknet-types-core.workspace = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::component_definitions::{
};
use crate::component_server::{ComponentReplacer, ComponentServerStarter};
use crate::errors::{ComponentServerError, ReplaceComponentError};
use crate::metrics::InfraMetrics;

/// The `LocalComponentServer` struct is a generic server that handles requests and responses for a
/// specified component. It receives requests, processes them using the provided component, and
Expand Down Expand Up @@ -222,14 +223,21 @@ async fn request_response_loop<Request, Response, Component>(
Request: Send + Debug,
Response: Send + Debug,
{
info!("Starting server for component {}", short_type_name::<Component>());
let component_name = short_type_name::<Component>();
info!("Starting server for component {}", component_name);

let metrics = InfraMetrics::new(&component_name);

while let Some(request_and_res_tx) = rx.recv().await {
let request = request_and_res_tx.request;
let tx = request_and_res_tx.tx;
debug!("Component {} received request {:?}", short_type_name::<Component>(), request);

metrics.increment_received();

process_request(component, request, tx).await;

metrics.increment_processed();
}

error!("Stopping server for component {}", short_type_name::<Component>());
Expand All @@ -245,20 +253,29 @@ async fn concurrent_request_response_loop<Request, Response, Component>(
Request: Send + Debug + 'static,
Response: Send + Debug + 'static,
{
info!("Starting concurrent server for component {}", short_type_name::<Component>());
let component_name = short_type_name::<Component>();
info!("Starting concurrent server for component {}", component_name);

let task_limiter = Arc::new(Semaphore::new(max_concurrency));
let metrics = Arc::new(InfraMetrics::new(&component_name));

while let Some(request_and_res_tx) = rx.recv().await {
let request = request_and_res_tx.request;
let tx = request_and_res_tx.tx;
debug!("Component {} received request {:?}", short_type_name::<Component>(), request);

metrics.increment_received();

// Acquire a permit to run the task.
let permit = task_limiter.clone().acquire_owned().await.unwrap();

let mut cloned_component = component.clone();
let cloned_metrics = metrics.clone();
tokio::spawn(async move {
process_request(&mut cloned_component, request, tx).await;

cloned_metrics.increment_processed();

// Drop the permit to allow more tasks to be created.
drop(permit);
});
Expand Down
1 change: 1 addition & 0 deletions crates/starknet_sequencer_infra/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod component_client;
pub mod component_definitions;
pub mod component_server;
pub mod errors;
mod metrics;
pub mod serde_utils;
#[cfg(test)]
pub mod tests;
Expand Down
75 changes: 75 additions & 0 deletions crates/starknet_sequencer_infra/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use starknet_infra_utils::type_name::camel_to_snake_case;
use starknet_sequencer_metrics::metric_definitions::METRIC_COUNTERS_MAP;
use starknet_sequencer_metrics::metrics::{MetricCounter, MetricScope};
use tracing::warn;

/// A struct to contain all metrics for the a server/component.
pub struct InfraMetrics {
mock: bool,
received_msgs: &'static MetricCounter,
processed_msgs: &'static MetricCounter,
}

const MOCK_METRIC_COUNTER: MetricCounter =
MetricCounter::new(MetricScope::Infra, "fake_test_counter", "Fake test counter", 0);

fn get_metrics_counter(name: &str) -> Option<&'static MetricCounter> {
let counter = METRIC_COUNTERS_MAP.get(name);
if counter.is_none() {
warn!("Counter {} not found", name);
}

counter.map(|v| &**v)
}

fn get_received_msgs_counter(name: &str) -> Option<&'static MetricCounter> {
get_metrics_counter(&format!("{}_msgs_received", name))
}

fn get_processed_msgs_counter(name: &str) -> Option<&'static MetricCounter> {
get_metrics_counter(&format!("{}_msgs_processed", name))
}

impl InfraMetrics {
pub fn new(component_name: &str) -> Self {
let snake_case_component_name = camel_to_snake_case(component_name);
let received_msgs = get_received_msgs_counter(&snake_case_component_name);
let processed_msgs = get_processed_msgs_counter(&snake_case_component_name);

let infra_metrics = match received_msgs.is_none() || processed_msgs.is_none() {
true => InfraMetrics {
mock: true,
received_msgs: &MOCK_METRIC_COUNTER,
processed_msgs: &MOCK_METRIC_COUNTER,
},
false => InfraMetrics {
mock: false,
received_msgs: received_msgs.unwrap(),
processed_msgs: processed_msgs.unwrap(),
},
};

infra_metrics.register_infra_metrics();

infra_metrics
}

fn register_infra_metrics(&self) {
if !self.mock {
self.received_msgs.register();
self.processed_msgs.register();
}
}

pub fn increment_received(&self) {
if !self.mock {
self.received_msgs.increment(1);
}
}

pub fn increment_processed(&self) {
if !self.mock {
self.processed_msgs.increment(1);
}
}
}

0 comments on commit 28a4f5d

Please sign in to comment.