diff --git a/Cargo.lock b/Cargo.lock index 3513abaf..1988e1b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -29,6 +29,7 @@ dependencies = [ "logger", "metriken", "net", + "parking_lot", "protocol-admin", "protocol-common", "session", @@ -1001,18 +1002,6 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" -[[package]] -name = "heatmap" -version = "0.7.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ff337aee5a51159480a1c9bc225f36a7d87543023aa118d5843ece2f125bccb" -dependencies = [ - "clocksource", - "histogram", - "parking_lot", - "thiserror", -] - [[package]] name = "heck" version = "0.4.0" @@ -1039,11 +1028,10 @@ dependencies = [ [[package]] name = "histogram" -version = "0.7.4" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e673d137229619d5c2c8903b6ed5852b43636c0017ff2e66b1aafb8ccf04b80b" +checksum = "8bbe54c811e0443ccedc269b633a9495a03e04c20082b26f6c1cc4d27bfd0678" dependencies = [ - "serde", "thiserror", ] @@ -1359,11 +1347,11 @@ dependencies = [ [[package]] name = "metriken" -version = "0.2.3" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d081482206e965281c71f40f1899160b9578bb8f7688b7821b144a43bc3e701a" +checksum = "20cdbc5c288bcf449bdf639e712c24305ec3bcd5765b80f6f1bbd38038ba89ac" dependencies = [ - "heatmap", + "histogram", "linkme", "metriken-derive", "once_cell", @@ -1941,6 +1929,7 @@ dependencies = [ "criterion 0.3.6", "logger", "metriken", + "parking_lot", "protocol-common", "storage-types", ] @@ -2183,9 +2172,9 @@ dependencies = [ [[package]] name = "ringlog" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c90d3d1e4db43daedfdae26373e7b4cb4f56b26f217fc8f661e4f439827b4a46" +checksum = "987665e9638b6e3c9bf753fd2da5058c8fa0e734221582d8790e3b7d0e1c3394" dependencies = [ "ahash", "clocksource", @@ -2411,7 +2400,6 @@ name = "server" version = "0.3.1" dependencies = [ "admin", - "clocksource", "common", "config", "crossbeam-channel", @@ -2627,22 +2615,22 @@ checksum = "949517c0cf1bf4ee812e2e07e08ab448e3ae0d23472aee8a06c985f0c8815b16" [[package]] name = "thiserror" -version = "1.0.38" +version = "1.0.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a9cd18aa97d5c45c6603caea1da6628790b37f7a34b6ca89522331c5180fed0" +checksum = "1177e8c6d7ede7afde3585fd2513e611227efd6481bd78d2e82ba1ce16557ed4" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.38" +version = "1.0.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f" +checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc" dependencies = [ "proc-macro2", "quote", - "syn 1.0.107", + "syn 2.0.27", ] [[package]] @@ -2974,7 +2962,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "static_assertions", ] diff --git a/Cargo.toml b/Cargo.toml index fe685677..75416234 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ repository = "https://github.com/pelikan-io/pelikan" license = "Apache-2.0" [workspace] +resolver = "2" members = [ "src/common", "src/config", @@ -55,17 +56,18 @@ httparse = "1.8.0" libc = "0.2.139" log = "0.4.17" memmap2 = "0.5.8" -metriken = "0.2.3" +metriken = "0.3.0" metrohash = "1.0.6" mio = "0.8.5" nom = "7.1.3" +parking_lot = "0.12.1" phf = "0.11.1" proc-macro2 = "1.0.50" quote = "1.0.23" rand = "0.8.5" rand_chacha = "0.3.1" rand_xoshiro = "0.6.0" -ringlog = "0.2.0" +ringlog = "0.3.0" serde = "1.0.152" serde_json = "1.0.91" signal-hook = "0.3.15" diff --git a/src/common/src/ssl.rs b/src/common/src/ssl.rs index 4f105472..a2f73a0b 100644 --- a/src/common/src/ssl.rs +++ b/src/common/src/ssl.rs @@ -5,7 +5,7 @@ pub use boring::ssl::*; use net::TlsTcpAcceptor; -use std::io::{Error, ErrorKind}; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; pub trait TlsConfig { fn certificate_chain(&self) -> Option; @@ -21,14 +21,17 @@ pub trait TlsConfig { /// there were any issues during initialization. Otherwise, returns a /// `TlsTcpAcceptor` wrapped in an option, where the `None` variant indicates /// that TLS should not be used. -pub fn tls_acceptor(config: &dyn TlsConfig) -> Result, std::io::Error> { +pub fn tls_acceptor(config: &dyn TlsConfig) -> Result, IoError> { let mut builder = TlsTcpAcceptor::mozilla_intermediate_v5()?; // we use xor here to check if we have an under-specified tls configuration if config.private_key().is_some() ^ (config.certificate_chain().is_some() || config.certificate().is_some()) { - return Err(Error::new(ErrorKind::Other, "incomplete tls configuration")); + return Err(IoError::new( + IoErrorKind::Other, + "incomplete tls configuration", + )); } // load the private key diff --git a/src/core/admin/Cargo.toml b/src/core/admin/Cargo.toml index 400c7834..a607b96e 100644 --- a/src/core/admin/Cargo.toml +++ b/src/core/admin/Cargo.toml @@ -17,6 +17,7 @@ libc = { workspace = true } logger = { path = "../../logger" } metriken = { workspace = true } net = { path = "../../net" } +parking_lot = { workspace = true } protocol-admin = { path = "../../protocol/admin" } protocol-common = { path = "../../protocol/common" } session = { path = "../../session" } diff --git a/src/core/admin/src/lib.rs b/src/core/admin/src/lib.rs index 479d88f5..a46e7bdd 100644 --- a/src/core/admin/src/lib.rs +++ b/src/core/admin/src/lib.rs @@ -17,6 +17,7 @@ use std::collections::VecDeque; use std::io::{Error, ErrorKind, Result}; use std::sync::Arc; use std::time::Duration; +use std::time::UNIX_EPOCH; use switchboard::{Queues, Waker}; use tiny_http::{Method, Request, Response}; @@ -493,175 +494,6 @@ impl Admin { } } - /// A "human-readable" exposition format which outputs one stat per line, - /// with a LF used as the end of line symbol. - /// - /// ```text - /// get: 0 - /// get_cardinality_p25: 0 - /// get_cardinality_p50: 0 - /// get_cardinality_p75: 0 - /// get_cardinality_p90: 0 - /// get_cardinality_p9999: 0 - /// get_cardinality_p999: 0 - /// get_cardinality_p99: 0 - /// get_ex: 0 - /// get_key: 0 - /// get_key_hit: 0 - /// get_key_miss: 0 - /// ``` - fn human_stats(&self) -> String { - let mut data = Vec::new(); - - for metric in &metriken::metrics() { - let any = match metric.as_any() { - Some(any) => any, - None => { - continue; - } - }; - - if let Some(counter) = any.downcast_ref::() { - data.push(format!("{}: {}", metric.name(), counter.value())); - } else if let Some(gauge) = any.downcast_ref::() { - data.push(format!("{}: {}", metric.name(), gauge.value())); - } else if let Some(heatmap) = any.downcast_ref::() { - for (label, value) in PERCENTILES { - if let Some(Ok(bucket)) = heatmap.percentile(*value) { - data.push(format!("{}_{}: {}", metric.name(), label, bucket.high())); - } - } - } - } - - data.sort(); - data.join("\n") + "\n" - } - - /// JSON stats output which follows the conventions found in Finagle and - /// TwitterServer libraries. Percentiles are appended to the metric name, - /// eg: `request_latency_p999` for the 99.9th percentile. For more details - /// about the Finagle / TwitterServer format see: - /// https://twitter.github.io/twitter-server/Features.html#metrics - /// - /// ```text - /// {"get": 0,"get_cardinality_p25": 0,"get_cardinality_p50": 0, ... } - /// ``` - fn json_stats(&self) -> String { - let head = "{".to_owned(); - - let mut data = Vec::new(); - - for metric in &metriken::metrics() { - let any = match metric.as_any() { - Some(any) => any, - None => { - continue; - } - }; - - if let Some(counter) = any.downcast_ref::() { - data.push(format!("\"{}\": {}", metric.name(), counter.value())); - } else if let Some(gauge) = any.downcast_ref::() { - data.push(format!("\"{}\": {}", metric.name(), gauge.value())); - } else if let Some(heatmap) = any.downcast_ref::() { - for (label, value) in PERCENTILES { - if let Some(Ok(bucket)) = heatmap.percentile(*value) { - data.push(format!( - "\"{}_{}\": {}", - metric.name(), - label, - bucket.high() - )); - } - } - } - } - - data.sort(); - let body = data.join(","); - let mut content = head; - content += &body; - content += "}"; - content - } - - /// Prometheus / OpenTelemetry compatible stats output. Each stat is - /// annotated with a type. Percentiles use the label 'percentile' to - /// indicate which percentile corresponds to the value: - /// - /// ```text - /// # TYPE get counter - /// get 0 - /// # TYPE get_cardinality gauge - /// get_cardinality{percentile="p25"} 0 - /// # TYPE get_cardinality gauge - /// get_cardinality{percentile="p50"} 0 - /// # TYPE get_cardinality gauge - /// get_cardinality{percentile="p75"} 0 - /// # TYPE get_cardinality gauge - /// get_cardinality{percentile="p90"} 0 - /// # TYPE get_cardinality gauge - /// get_cardinality{percentile="p99"} 0 - /// # TYPE get_cardinality gauge - /// get_cardinality{percentile="p999"} 0 - /// # TYPE get_cardinality gauge - /// get_cardinality{percentile="p9999"} 0 - /// # TYPE get_ex counter - /// get_ex 0 - /// # TYPE get_key counter - /// get_key 0 - /// # TYPE get_key_hit counter - /// get_key_hit 0 - /// # TYPE get_key_miss counter - /// get_key_miss 0 - /// ``` - fn prometheus_stats(&self) -> String { - let mut data = Vec::new(); - - for metric in &metriken::metrics() { - let any = match metric.as_any() { - Some(any) => any, - None => { - continue; - } - }; - - if let Some(counter) = any.downcast_ref::() { - data.push(format!( - "# TYPE {} counter\n{} {}", - metric.name(), - metric.name(), - counter.value() - )); - } else if let Some(gauge) = any.downcast_ref::() { - data.push(format!( - "# TYPE {} gauge\n{} {}", - metric.name(), - metric.name(), - gauge.value() - )); - } else if let Some(heatmap) = any.downcast_ref::() { - for (label, value) in PERCENTILES { - if let Some(Ok(bucket)) = heatmap.percentile(*value) { - data.push(format!( - "# TYPE {} gauge\n{}{{percentile=\"{}\"}} {}", - metric.name(), - metric.name(), - label, - bucket.high() - )); - } - } - } - } - data.sort(); - let mut content = data.join("\n"); - content += "\n"; - let parts: Vec<&str> = content.split('/').collect(); - parts.join("_") - } - /// Handle a HTTP request fn handle_http_request(&self, request: Request) { let url = request.url(); @@ -672,7 +504,7 @@ impl Admin { // stats in the Prometheus format "/metrics" => match request.method() { Method::Get => { - let _ = request.respond(Response::from_string(self.prometheus_stats())); + let _ = request.respond(Response::from_string(prometheus_stats())); } _ => { let _ = request.respond(Response::empty(400)); @@ -682,7 +514,7 @@ impl Admin { // for maximum compatibility with various internal conventions "/metrics.json" | "/vars.json" | "/admin/metrics.json" => match request.method() { Method::Get => { - let _ = request.respond(Response::from_string(self.json_stats())); + let _ = request.respond(Response::from_string(json_stats())); } _ => { let _ = request.respond(Response::empty(400)); @@ -692,7 +524,7 @@ impl Admin { // on internal conventions "/vars" => match request.method() { Method::Get => { - let _ = request.respond(Response::from_string(self.human_stats())); + let _ = request.respond(Response::from_string(human_stats())); } _ => { let _ = request.respond(Response::empty(400)); @@ -779,4 +611,171 @@ impl Admin { } } +/// A "human-readable" exposition format which outputs one stat per line, +/// with a LF used as the end of line symbol. +/// +/// ```text +/// get: 0 +/// get_cardinality_p25: 0 +/// get_cardinality_p50: 0 +/// get_cardinality_p75: 0 +/// get_cardinality_p90: 0 +/// get_cardinality_p9999: 0 +/// get_cardinality_p999: 0 +/// get_cardinality_p99: 0 +/// get_ex: 0 +/// get_key: 0 +/// get_key_hit: 0 +/// get_key_miss: 0 +/// ``` +pub fn human_stats() -> String { + let data = human_formatted_stats(); + data.join("\n") + "\n" +} + +/// JSON stats output which follows the conventions found in Finagle and +/// TwitterServer libraries. Percentiles are appended to the metric name, +/// eg: `request_latency_p999` for the 99.9th percentile. For more details +/// about the Finagle / TwitterServer format see: +/// https://twitter.github.io/twitter-server/Features.html#metrics +/// +/// ```text +/// {"get": 0,"get_cardinality_p25": 0,"get_cardinality_p50": 0, ... } +/// ``` +pub fn json_stats() -> String { + let data = human_formatted_stats(); + + "{".to_string() + &data.join(",") + "}" +} + +/// Prometheus / OpenTelemetry compatible stats output. Each stat is +/// annotated with a type. Percentiles use the label 'percentile' to +/// indicate which percentile corresponds to the value: +/// +/// ```text +/// # TYPE get counter +/// get 0 +/// # TYPE get_cardinality gauge +/// get_cardinality{percentile="p25"} 0 +/// # TYPE get_cardinality gauge +/// get_cardinality{percentile="p50"} 0 +/// # TYPE get_cardinality gauge +/// get_cardinality{percentile="p75"} 0 +/// # TYPE get_cardinality gauge +/// get_cardinality{percentile="p90"} 0 +/// # TYPE get_cardinality gauge +/// get_cardinality{percentile="p99"} 0 +/// # TYPE get_cardinality gauge +/// get_cardinality{percentile="p999"} 0 +/// # TYPE get_cardinality gauge +/// get_cardinality{percentile="p9999"} 0 +/// # TYPE get_ex counter +/// get_ex 0 +/// # TYPE get_key counter +/// get_key 0 +/// # TYPE get_key_hit counter +/// get_key_hit 0 +/// # TYPE get_key_miss counter +/// get_key_miss 0 +/// ``` +pub fn prometheus_stats() -> String { + let mut data = Vec::new(); + + let snapshots = SNAPSHOTS.read(); + + let timestamp = snapshots + .timestamp() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis(); + + for metric in &metriken::metrics() { + let any = match metric.as_any() { + Some(any) => any, + None => { + continue; + } + }; + + let name = metric.name(); + + if let Some(counter) = any.downcast_ref::() { + if metric.metadata().is_empty() { + data.push(format!( + "# TYPE {name}_total counter\n{name}_total {}", + counter.value() + )); + } else { + data.push(format!( + "# TYPE {name} counter\n{} {}", + metric.formatted(metriken::Format::Prometheus), + counter.value() + )); + } + } else if let Some(gauge) = any.downcast_ref::() { + data.push(format!( + "# TYPE {name} gauge\n{} {}", + metric.formatted(metriken::Format::Prometheus), + gauge.value() + )); + } else if any.downcast_ref::().is_some() + || any.downcast_ref::().is_some() + { + for (_label, percentile, value) in snapshots.percentiles(metric.name()) { + data.push(format!( + "# TYPE {name} gauge\n{name}{{percentile=\"{:02}\"}} {value} {timestamp}", + percentile, + )); + } + } + } + + data.sort(); + data.dedup(); + let mut content = data.join("\n"); + content += "\n"; + let parts: Vec<&str> = content.split('/').collect(); + parts.join("_") +} + +// human formatted stats that can be exposed as human stats or converted to json +fn human_formatted_stats() -> Vec { + let mut data = Vec::new(); + + let snapshots = SNAPSHOTS.read(); + + for metric in &metriken::metrics() { + let any = match metric.as_any() { + Some(any) => any, + None => { + continue; + } + }; + + let name = metric.name(); + + if let Some(counter) = any.downcast_ref::() { + let value = counter.value(); + + data.push(format!("\"{name}\": {value}")); + } else if let Some(gauge) = any.downcast_ref::() { + let value = gauge.value(); + + data.push(format!("\"{name}\": {value}")); + } else if any.downcast_ref::().is_some() + || any.downcast_ref::().is_some() + { + let percentiles = snapshots.percentiles(metric.name()); + + for (label, _percentile, value) in percentiles { + data.push(format!("\"{name}/{label}\": {value}",)); + } + } + } + + data.sort(); + + data +} + common::metrics::test_no_duplicates!(); diff --git a/src/core/proxy/src/backend.rs b/src/core/proxy/src/backend.rs index c699c21d..fb09a484 100644 --- a/src/core/proxy/src/backend.rs +++ b/src/core/proxy/src/backend.rs @@ -12,8 +12,7 @@ use std::collections::VecDeque; name = "backend_event_depth", description = "distribution of the number of events received per iteration of the event loop" )] -pub static BACKEND_EVENT_DEPTH: Heatmap = - Heatmap::new(0, 8, 17, Duration::from_secs(60), Duration::from_secs(1)); +pub static BACKEND_EVENT_DEPTH: AtomicHistogram = AtomicHistogram::new(7, 17); #[metric( name = "backend_event_error", @@ -211,14 +210,12 @@ where error!("Error polling"); } - let timestamp = Instant::now(); - let count = events.iter().count(); BACKEND_EVENT_TOTAL.add(count as _); if count == self.nevent { BACKEND_EVENT_MAX_REACHED.increment(); } else { - let _ = BACKEND_EVENT_DEPTH.increment(timestamp, count as _); + let _ = BACKEND_EVENT_DEPTH.increment(count as _); } // process all events diff --git a/src/core/proxy/src/frontend.rs b/src/core/proxy/src/frontend.rs index fc66f5c5..0a2bdff5 100644 --- a/src/core/proxy/src/frontend.rs +++ b/src/core/proxy/src/frontend.rs @@ -9,8 +9,7 @@ use crate::*; name = "frontend_event_depth", description = "distribution of the number of events received per iteration of the event loop" )] -pub static FRONTEND_EVENT_DEPTH: Heatmap = - Heatmap::new(0, 8, 17, Duration::from_secs(60), Duration::from_secs(1)); +pub static FRONTEND_EVENT_DEPTH: AtomicHistogram = AtomicHistogram::new(7, 17); #[metric( name = "frontend_event_error", @@ -220,14 +219,12 @@ where error!("Error polling"); } - let timestamp = Instant::now(); - let count = events.iter().count(); FRONTEND_EVENT_TOTAL.add(count as _); if count == self.nevent { FRONTEND_EVENT_MAX_REACHED.increment(); } else { - let _ = FRONTEND_EVENT_DEPTH.increment(timestamp, count as _); + let _ = FRONTEND_EVENT_DEPTH.increment(count as _); } // process all events diff --git a/src/core/server/Cargo.toml b/src/core/server/Cargo.toml index 22a5dd37..37f2c541 100644 --- a/src/core/server/Cargo.toml +++ b/src/core/server/Cargo.toml @@ -12,7 +12,6 @@ license = { workspace = true } [dependencies] admin = { path = "../admin" } common = { path = "../../common" } -clocksource = { workspace = true } config = { path = "../../config" } crossbeam-channel = { workspace = true } entrystore = { path = "../../entrystore" } diff --git a/src/core/server/src/lib.rs b/src/core/server/src/lib.rs index 59e269aa..d32e6061 100644 --- a/src/core/server/src/lib.rs +++ b/src/core/server/src/lib.rs @@ -119,8 +119,6 @@ use workers::WorkersBuilder; pub use process::{Process, ProcessBuilder}; -type Instant = clocksource::Instant>; - // TODO(bmartin): this *should* be plenty safe, the queue should rarely ever be // full, and a single wakeup should drain at least one message and make room for // the response. A stat to prove that this is sufficient would be good. diff --git a/src/core/server/src/workers/mod.rs b/src/core/server/src/workers/mod.rs index ca5c8d11..64080e64 100644 --- a/src/core/server/src/workers/mod.rs +++ b/src/core/server/src/workers/mod.rs @@ -17,8 +17,7 @@ use storage::*; name = "worker_event_depth", description = "distribution of the number of events received per iteration of the event loop" )] -pub static WORKER_EVENT_DEPTH: Heatmap = - Heatmap::new(0, 8, 17, Duration::from_secs(60), Duration::from_secs(1)); +pub static WORKER_EVENT_DEPTH: AtomicHistogram = AtomicHistogram::new(7, 17); #[metric( name = "worker_event_error", diff --git a/src/core/server/src/workers/multi.rs b/src/core/server/src/workers/multi.rs index d14cc6ed..29aced45 100644 --- a/src/core/server/src/workers/multi.rs +++ b/src/core/server/src/workers/multi.rs @@ -136,14 +136,12 @@ where error!("Error polling"); } - let timestamp = Instant::now(); - let count = events.iter().count(); WORKER_EVENT_TOTAL.add(count as _); if count == self.nevent { WORKER_EVENT_MAX_REACHED.increment(); } else { - let _ = WORKER_EVENT_DEPTH.increment(timestamp, count as _); + let _ = WORKER_EVENT_DEPTH.increment(count as _); } // process all events diff --git a/src/core/server/src/workers/single.rs b/src/core/server/src/workers/single.rs index 6ef1f4f4..cfb721c2 100644 --- a/src/core/server/src/workers/single.rs +++ b/src/core/server/src/workers/single.rs @@ -197,14 +197,12 @@ where error!("Error polling"); } - let timestamp = Instant::now(); - let count = events.iter().count(); WORKER_EVENT_TOTAL.add(count as _); if count == self.nevent { WORKER_EVENT_MAX_REACHED.increment(); } else { - let _ = WORKER_EVENT_DEPTH.increment(timestamp, count as _); + let _ = WORKER_EVENT_DEPTH.increment(count as _); } // process all events diff --git a/src/core/server/src/workers/storage.rs b/src/core/server/src/workers/storage.rs index a4ebc8b9..7527fcb9 100644 --- a/src/core/server/src/workers/storage.rs +++ b/src/core/server/src/workers/storage.rs @@ -14,8 +14,7 @@ pub static STORAGE_EVENT_LOOP: Counter = Counter::new(); name = "storage_queue_depth", description = "the distribution of the depth of the storage queue on each loop" )] -pub static STORAGE_QUEUE_DEPTH: Heatmap = - Heatmap::new(0, 8, 20, Duration::from_secs(60), Duration::from_secs(1)); +pub static STORAGE_QUEUE_DEPTH: AtomicHistogram = AtomicHistogram::new(7, 20); pub struct StorageWorkerBuilder { nevent: usize, @@ -108,8 +107,6 @@ where error!("Error polling"); } - let timestamp = Instant::now(); - if !events.is_empty() { self.waker.reset(); @@ -117,7 +114,7 @@ where self.data_queue.try_recv_all(&mut messages); - let _ = STORAGE_QUEUE_DEPTH.increment(timestamp, messages.len() as _); + let _ = STORAGE_QUEUE_DEPTH.increment(messages.len() as _); for message in messages.drain(..) { let sender = message.sender(); diff --git a/src/protocol/admin/Cargo.toml b/src/protocol/admin/Cargo.toml index b236d27b..4473f358 100644 --- a/src/protocol/admin/Cargo.toml +++ b/src/protocol/admin/Cargo.toml @@ -14,6 +14,7 @@ common = { path = "../../common" } config = { path = "../../config" } logger = { path = "../../logger" } metriken = { workspace = true } +parking_lot = { workspace = true } protocol-common = { path = "../../protocol/common" } storage-types = { path = "../../storage/types" } diff --git a/src/protocol/admin/src/admin.rs b/src/protocol/admin/src/admin.rs index 8e7cce90..060bab09 100644 --- a/src/protocol/admin/src/admin.rs +++ b/src/protocol/admin/src/admin.rs @@ -121,47 +121,45 @@ impl Compose for AdminResponse { 4 } Self::Stats => { - let mut size = 0; - let mut data = Vec::new(); - for metric in &metriken::metrics() { - let any = match metric.as_any() { - Some(any) => any, - None => { - continue; - } - }; - - if let Some(counter) = any.downcast_ref::() { - data.push(format!("STAT {} {}\r\n", metric.name(), counter.value())); - } else if let Some(gauge) = any.downcast_ref::() { - data.push(format!("STAT {} {}\r\n", metric.name(), gauge.value())); - } else if let Some(heatmap) = any.downcast_ref::() { - for (label, value) in PERCENTILES { - if let Some(Ok(bucket)) = heatmap.percentile(*value) { - data.push(format!( - "STAT {}_{} {}\r\n", - metric.name(), - label, - bucket.high() - )); - } - } - } - } - - data.sort(); - for line in data { - size += line.as_bytes().len(); - buf.put_slice(line.as_bytes()); - } - buf.put_slice(b"END\r\n"); - size + 5 + let message = memcache_stats(); + buf.put_slice(message.as_bytes()); + message.len() } Self::Version(v) => v.compose(buf), } } } +pub fn memcache_stats() -> String { + let snapshots = SNAPSHOTS.read(); + + let mut data = Vec::new(); + + for metric in &metriken::metrics() { + let any = match metric.as_any() { + Some(any) => any, + None => { + continue; + } + }; + + if let Some(counter) = any.downcast_ref::() { + data.push(format!("STAT {} {}\r\n", metric.name(), counter.value())); + } else if let Some(gauge) = any.downcast_ref::() { + data.push(format!("STAT {} {}\r\n", metric.name(), gauge.value())); + } else if any.downcast_ref::().is_some() + || any.downcast_ref::().is_some() + { + for (label, _percentile, value) in snapshots.percentiles(metric.name()) { + data.push(format!("STAT {}_{} {}\r\n", metric.name(), label, value,)); + } + } + } + + data.sort(); + data.join("\r\n") + "END\r\n" +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/protocol/admin/src/lib.rs b/src/protocol/admin/src/lib.rs index d8ba3f14..b5e42e23 100644 --- a/src/protocol/admin/src/lib.rs +++ b/src/protocol/admin/src/lib.rs @@ -5,8 +5,10 @@ pub use protocol_common::*; mod admin; +mod snapshots; pub use admin::*; +pub use snapshots::*; pub static PERCENTILES: &[(&str, f64)] = &[ ("p25", 25.0), diff --git a/src/protocol/admin/src/snapshots.rs b/src/protocol/admin/src/snapshots.rs new file mode 100644 index 00000000..a488de9c --- /dev/null +++ b/src/protocol/admin/src/snapshots.rs @@ -0,0 +1,122 @@ +use crate::*; +use metriken::Lazy; +use parking_lot::RwLock; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::SystemTime; + +type HistogramSnapshots = HashMap; + +pub static SNAPSHOTS: Lazy>> = + Lazy::new(|| Arc::new(RwLock::new(Snapshots::new()))); + +pub struct Snapshots { + timestamp: SystemTime, + previous: HistogramSnapshots, + deltas: HistogramSnapshots, +} + +impl Default for Snapshots { + fn default() -> Self { + Self::new() + } +} + +impl Snapshots { + pub fn new() -> Self { + let timestamp = SystemTime::now(); + + let mut current = HashMap::new(); + + for metric in metriken::metrics().iter() { + let any = if let Some(any) = metric.as_any() { + any + } else { + continue; + }; + + let key = metric.name().to_string(); + + let snapshot = if let Some(histogram) = any.downcast_ref::() + { + histogram.snapshot() + } else if let Some(histogram) = any.downcast_ref::() { + histogram.snapshot() + } else { + None + }; + + if let Some(snapshot) = snapshot { + current.insert(key, snapshot); + } + } + + let deltas = current.clone(); + + Self { + timestamp, + previous: current, + deltas, + } + } + + pub fn update(&mut self) { + self.timestamp = SystemTime::now(); + + let mut current = HashMap::new(); + + for metric in metriken::metrics().iter() { + let any = if let Some(any) = metric.as_any() { + any + } else { + continue; + }; + + let key = metric.name().to_string(); + + let snapshot = if let Some(histogram) = any.downcast_ref::() + { + histogram.snapshot() + } else if let Some(histogram) = any.downcast_ref::() { + histogram.snapshot() + } else { + None + }; + + if let Some(snapshot) = snapshot { + if let Some(previous) = self.previous.get(&key) { + self.deltas + .insert(key.clone(), snapshot.wrapping_sub(previous).unwrap()); + } + + current.insert(key, snapshot); + } + } + + self.previous = current; + } + + pub fn percentiles(&self, metric: &str) -> Vec<(String, f64, u64)> { + let mut result = Vec::new(); + + let percentiles: Vec = PERCENTILES + .iter() + .map(|(_, percentile)| *percentile) + .collect(); + + if let Some(snapshot) = self.deltas.get(metric) { + if let Ok(percentiles) = snapshot.percentiles(&percentiles) { + for ((label, _), (percentile, bucket)) in PERCENTILES.iter().zip(percentiles.iter()) + { + result.push((label.to_string(), *percentile, bucket.end())); + } + } + } + + result + } + + pub fn timestamp(&self) -> SystemTime { + self.timestamp + } +} diff --git a/src/protocol/memcache/src/lib.rs b/src/protocol/memcache/src/lib.rs index ca81dbaf..76715744 100644 --- a/src/protocol/memcache/src/lib.rs +++ b/src/protocol/memcache/src/lib.rs @@ -16,11 +16,11 @@ pub use request::*; pub use response::*; pub use storage::*; -pub use protocol_common::*; +pub use protocol_common::{Compose, Parse, ParseOk}; pub use common::expiry::TimeType; use logger::Klog; -use metriken::{metric, Counter, Heatmap}; +use metriken::{metric, AtomicHistogram, Counter}; const CRLF: &[u8] = b"\r\n"; @@ -30,8 +30,6 @@ pub enum MemcacheError { ServerError(ServerError), } -type Instant = common::time::Instant>; - /* * GET */ @@ -55,13 +53,7 @@ pub static GET_KEY_MISS: Counter = Counter::new(); name = "get_cardinality", description = "distribution of key cardinality for get requests" )] -pub static GET_CARDINALITY: Heatmap = Heatmap::new( - 0, - 8, - 20, - core::time::Duration::from_secs(60), - core::time::Duration::from_secs(1), -); +pub static GET_CARDINALITY: AtomicHistogram = AtomicHistogram::new(7, 20); /* * GETS diff --git a/src/protocol/memcache/src/request/get.rs b/src/protocol/memcache/src/request/get.rs index 41cb6cd1..2364a334 100644 --- a/src/protocol/memcache/src/request/get.rs +++ b/src/protocol/memcache/src/request/get.rs @@ -73,7 +73,7 @@ impl RequestParser { GET.increment(); let keys = request.keys.len() as u64; GET_KEY.add(keys); - let _ = GET_CARDINALITY.increment(Instant::now(), keys); + let _ = GET_CARDINALITY.increment(keys); Ok((input, request)) } Err(e) => { diff --git a/src/proxy/momento/src/admin.rs b/src/proxy/momento/src/admin.rs index be8ba589..1f5f068e 100644 --- a/src/proxy/momento/src/admin.rs +++ b/src/proxy/momento/src/admin.rs @@ -125,55 +125,6 @@ async fn handle_admin_client(mut socket: tokio::net::TcpStream) { } async fn stats_response(socket: &mut tokio::net::TcpStream) -> Result<(), Error> { - let mut data = Vec::new(); - for metric in &metriken::metrics() { - let any = match metric.as_any() { - Some(any) => any, - None => { - continue; - } - }; - - // we need to filter some irrelvant metrics that - // are defined in crates we depend on - if metric.name().starts_with("add") - || metric.name().starts_with("append") - || metric.name().starts_with("cas") - || metric.name().starts_with("decr") - || metric.name().starts_with("delete") - || metric.name().starts_with("gets") - || metric.name().starts_with("incr") - || metric.name().starts_with("get_cardinality") - || metric.name().starts_with("ping") - || metric.name().starts_with("pipeline_depth") - || metric.name().starts_with("prepend") - || metric.name().starts_with("replace") - || metric.name().starts_with("request_latency") - { - continue; - } - - if let Some(counter) = any.downcast_ref::() { - data.push(format!("STAT {} {}\r\n", metric.name(), counter.value())); - } else if let Some(gauge) = any.downcast_ref::() { - data.push(format!("STAT {} {}\r\n", metric.name(), gauge.value())); - } else if let Some(heatmap) = any.downcast_ref::() { - for (label, value) in PERCENTILES { - if let Some(Ok(bucket)) = heatmap.percentile(*value) { - data.push(format!( - "STAT {}_{} {}\r\n", - metric.name(), - label, - bucket.high() - )); - } - } - } - } - - data.sort(); - for line in data { - socket.write_all(line.as_bytes()).await?; - } - socket.write_all(b"END\r\n").await + let message = protocol_admin::memcache_stats(); + socket.write_all(message.as_bytes()).await } diff --git a/src/proxy/momento/src/main.rs b/src/proxy/momento/src/main.rs index 0da515d8..0846ff0f 100644 --- a/src/proxy/momento/src/main.rs +++ b/src/proxy/momento/src/main.rs @@ -245,7 +245,9 @@ fn main() -> Result<(), Box> { metrics.push(format!("{:<31} counter", metric.name())); } else if any.downcast_ref::().is_some() { metrics.push(format!("{:<31} gauge", metric.name())); - } else if any.downcast_ref::().is_some() { + } else if any.downcast_ref::().is_some() + || any.downcast_ref::().is_some() + { for (label, _) in PERCENTILES { let name = format!("{}_{}", metric.name(), label); metrics.push(format!("{name:<31} percentile")); diff --git a/src/proxy/momento/src/protocol/resp/lrange.rs b/src/proxy/momento/src/protocol/resp/lrange.rs index 37df2e3a..6955b7a7 100644 --- a/src/proxy/momento/src/protocol/resp/lrange.rs +++ b/src/proxy/momento/src/protocol/resp/lrange.rs @@ -24,7 +24,8 @@ pub async fn lrange( Duration::from_millis(200), client.list_fetch(cache_name, req.key()), ) - .await?? else { + .await?? + else { response_buf.extend_from_slice(b"*0\r\n"); return Ok(()); diff --git a/src/proxy/ping/src/main.rs b/src/proxy/ping/src/main.rs index 5feca496..4c248914 100644 --- a/src/proxy/ping/src/main.rs +++ b/src/proxy/ping/src/main.rs @@ -62,7 +62,9 @@ fn main() { metrics.push(format!("{:<31} counter", metric.name())); } else if any.downcast_ref::().is_some() { metrics.push(format!("{:<31} gauge", metric.name())); - } else if any.downcast_ref::().is_some() { + } else if any.downcast_ref::().is_some() + || any.downcast_ref::().is_some() + { for (label, _) in PERCENTILES { let name = format!("{}_{}", metric.name(), label); metrics.push(format!("{name:<31} percentile")); diff --git a/src/proxy/thrift/src/main.rs b/src/proxy/thrift/src/main.rs index 3b248a8c..bf61e4d4 100644 --- a/src/proxy/thrift/src/main.rs +++ b/src/proxy/thrift/src/main.rs @@ -61,7 +61,9 @@ fn main() { metrics.push(format!("{:<31} counter", metric.name())); } else if any.downcast_ref::().is_some() { metrics.push(format!("{:<31} gauge", metric.name())); - } else if any.downcast_ref::().is_some() { + } else if any.downcast_ref::().is_some() + || any.downcast_ref::().is_some() + { for (label, _) in PERCENTILES { let name = format!("{}_{}", metric.name(), label); metrics.push(format!("{name:<31} percentile")); diff --git a/src/server/pingserver/src/main.rs b/src/server/pingserver/src/main.rs index 5c773ce7..4b861965 100644 --- a/src/server/pingserver/src/main.rs +++ b/src/server/pingserver/src/main.rs @@ -73,7 +73,9 @@ fn main() { metrics.push(format!("{:<31} counter", metric.name())); } else if any.downcast_ref::().is_some() { metrics.push(format!("{:<31} gauge", metric.name())); - } else if any.downcast_ref::().is_some() { + } else if any.downcast_ref::().is_some() + || any.downcast_ref::().is_some() + { for (label, _) in PERCENTILES { let name = format!("{}_{}", metric.name(), label); metrics.push(format!("{name:<31} percentile")); diff --git a/src/server/rds/src/main.rs b/src/server/rds/src/main.rs index d673d11d..8a3cb4c1 100644 --- a/src/server/rds/src/main.rs +++ b/src/server/rds/src/main.rs @@ -82,7 +82,9 @@ fn main() { metrics.push(format!("{:<31} counter", metric.name())); } else if any.downcast_ref::().is_some() { metrics.push(format!("{:<31} gauge", metric.name())); - } else if any.downcast_ref::().is_some() { + } else if any.downcast_ref::().is_some() + || any.downcast_ref::().is_some() + { for (label, _) in PERCENTILES { let name = format!("{}_{}", metric.name(), label); metrics.push(format!("{name:<31} percentile")); diff --git a/src/server/segcache/src/main.rs b/src/server/segcache/src/main.rs index f631b61b..5bad855b 100644 --- a/src/server/segcache/src/main.rs +++ b/src/server/segcache/src/main.rs @@ -82,7 +82,9 @@ fn main() { metrics.push(format!("{:<31} counter", metric.name())); } else if any.downcast_ref::().is_some() { metrics.push(format!("{:<31} gauge", metric.name())); - } else if any.downcast_ref::().is_some() { + } else if any.downcast_ref::().is_some() + || any.downcast_ref::().is_some() + { for (label, _) in PERCENTILES { let name = format!("{}_{}", metric.name(), label); metrics.push(format!("{name:<31} percentile")); diff --git a/src/session/src/client.rs b/src/session/src/client.rs index 859b87a8..47f9e075 100644 --- a/src/session/src/client.rs +++ b/src/session/src/client.rs @@ -81,7 +81,7 @@ where .pop_front() .ok_or_else(|| Error::from(ErrorKind::InvalidInput))?; let latency = now - timestamp; - let _ = REQUEST_LATENCY.increment(now, latency.as_nanos()); + let _ = REQUEST_LATENCY.increment(latency.as_nanos()); let consumed = res.consumed(); let msg = res.into_inner(); self.session.consume(consumed); diff --git a/src/session/src/lib.rs b/src/session/src/lib.rs index e4a2e5aa..80c85505 100644 --- a/src/session/src/lib.rs +++ b/src/session/src/lib.rs @@ -27,7 +27,6 @@ use common::time::Nanoseconds; use core::borrow::{Borrow, BorrowMut}; use core::fmt::Debug; use core::marker::PhantomData; -use core::time::Duration; use metriken::*; use protocol_common::Compose; use protocol_common::Parse; @@ -78,8 +77,7 @@ pub static SESSION_SEND_BYTE: Counter = Counter::new(); name = "request_latency", description = "distribution of request latencies in nanoseconds" )] -pub static REQUEST_LATENCY: Heatmap = - Heatmap::new(0, 8, 32, Duration::from_secs(60), Duration::from_secs(1)); +pub static REQUEST_LATENCY: AtomicHistogram = AtomicHistogram::new(7, 32); type Instant = common::time::Instant>; diff --git a/src/session/src/server.rs b/src/session/src/server.rs index 5be5f3f8..cf20117e 100644 --- a/src/session/src/server.rs +++ b/src/session/src/server.rs @@ -97,7 +97,7 @@ where if let Some(timestamp) = timestamp { let now = Instant::now(); let latency = now - timestamp; - let _ = REQUEST_LATENCY.increment(now, latency.as_nanos()); + let _ = REQUEST_LATENCY.increment(latency.as_nanos()); } } else { // we have bytes in our response, we need to add it on the @@ -129,7 +129,7 @@ where amt -= front.1; if let Some(ts) = front.0 { let latency = now - ts; - let _ = REQUEST_LATENCY.increment(now, latency.as_nanos()); + let _ = REQUEST_LATENCY.increment(latency.as_nanos()); } } } else {