diff --git a/metrics-exporter-prometheus/Cargo.toml b/metrics-exporter-prometheus/Cargo.toml index 434e336f..475a82dd 100644 --- a/metrics-exporter-prometheus/Cargo.toml +++ b/metrics-exporter-prometheus/Cargo.toml @@ -17,11 +17,12 @@ categories = ["development-tools::debugging"] keywords = ["metrics", "telemetry", "prometheus"] [features] -default = ["http-listener", "push-gateway"] +default = ["http-listener", "push-gateway","remote-write"] async-runtime = ["tokio", "hyper-util/tokio"] http-listener = ["async-runtime", "ipnet", "tracing", "_hyper-server"] uds-listener = ["http-listener"] push-gateway = ["async-runtime", "tracing", "_hyper-client"] +remote-write = ["_hyper-client","async-runtime","dep:prost","dep:snap","dep:prometheus-parse"] _hyper-server = ["http-body-util", "hyper/server", "hyper-util/server-auto"] _hyper-client = [ "http-body-util", @@ -48,7 +49,10 @@ metrics-util = { version = "^0.18", path = "../metrics-util", default-features = "registry", "summary", ] } +prometheus-parse = {version = "0.2.5", optional = true} +prost = {workspace = true, optional = true} quanta = { workspace = true } +snap = { version = "1.1.1", optional = true} thiserror = { workspace = true } tokio = { workspace = true, optional = true } tracing = { workspace = true, optional = true } @@ -63,6 +67,10 @@ tracing-subscriber = { workspace = true, features = ["fmt"] } name = "prometheus_push_gateway" required-features = ["push-gateway"] +[[example]] +name = "prometheus_remote_write" +required-features = ["remote-write"] + [[example]] name = "prometheus_server" required-features = ["http-listener"] diff --git a/metrics-exporter-prometheus/examples/prometheus_remote_write.rs b/metrics-exporter-prometheus/examples/prometheus_remote_write.rs new file mode 100644 index 00000000..983eb590 --- /dev/null +++ b/metrics-exporter-prometheus/examples/prometheus_remote_write.rs @@ -0,0 +1,73 @@ +/// Make sure to run this example with `--features remote-write` to properly enable remote write support. +#[allow(unused_imports)] +use std::thread; +use std::time::Duration; + +#[allow(unused_imports)] +use metrics::{counter, gauge, histogram}; +use metrics::{describe_counter, describe_histogram}; +#[allow(unused_imports)] +use metrics_exporter_prometheus::PrometheusBuilder; +#[allow(unused_imports)] +use metrics_util::MetricKindMask; + +use quanta::Clock; +use rand::{thread_rng, Rng}; + +fn main() { + tracing_subscriber::fmt::init(); + + PrometheusBuilder::new() + .with_remote_write( + "http://127.0.0.1:9091/metrics/job/example", + Duration::from_secs(10), + "test-agent", + ) + .expect("remote write endpoint should be valid") + .idle_timeout( + MetricKindMask::COUNTER | MetricKindMask::HISTOGRAM, + Some(Duration::from_secs(10)), + ) + .install() + .expect("failed to install Prometheus recorder"); + + // We register these metrics, which gives us a chance to specify a description for them. The + // Prometheus exporter records this description and adds it as HELP text when the endpoint is + // scraped. + // + // Registering metrics ahead of using them is not required, but is the only way to specify the + // description of a metric. + describe_counter!("tcp_server_loops", "The iterations of the TCP server event loop so far."); + describe_histogram!( + "tcp_server_loop_delta_secs", + "The time taken for iterations of the TCP server event loop." + ); + + let clock = Clock::new(); + let mut last = None; + + counter!("idle_metric").increment(1); + gauge!("testing").set(42.0); + + // Loop over and over, pretending to do some work. + loop { + counter!("tcp_server_loops", "system" => "foo").increment(1); + + if let Some(t) = last { + let delta: Duration = clock.now() - t; + histogram!("tcp_server_loop_delta_secs", "system" => "foo").record(delta); + } + + let increment_gauge = thread_rng().gen_bool(0.75); + let gauge = gauge!("lucky_iterations"); + if increment_gauge { + gauge.increment(1.0); + } else { + gauge.decrement(1.0); + } + + last = Some(clock.now()); + + thread::sleep(Duration::from_millis(750)); + } +} diff --git a/metrics-exporter-prometheus/src/common.rs b/metrics-exporter-prometheus/src/common.rs index 51f9bad9..8f422a2f 100644 --- a/metrics-exporter-prometheus/src/common.rs +++ b/metrics-exporter-prometheus/src/common.rs @@ -63,7 +63,9 @@ pub enum BuildError { /// The given push gateway endpoint is not a valid URI. #[error("push gateway endpoint is not valid: {0}")] InvalidPushGatewayEndpoint(String), - + /// The given push gateway endpoint is not a valid URI. + #[error("remote write endpoint is not valid: {0}")] + InvalidRemoteWriteEndpoint(String), /// No exporter configuration was present. /// /// This generally only occurs when HTTP listener support is disabled, but no push gateway diff --git a/metrics-exporter-prometheus/src/exporter/builder.rs b/metrics-exporter-prometheus/src/exporter/builder.rs index fc019b04..ac2930a3 100644 --- a/metrics-exporter-prometheus/src/exporter/builder.rs +++ b/metrics-exporter-prometheus/src/exporter/builder.rs @@ -138,6 +138,37 @@ impl PrometheusBuilder { Ok(self) } + /// Configures the exporter to push periodic requests to endpoint by [remote write protocol](https://prometheus.io/docs/specs/remote_write_spec/). + /// + /// Running in remote write mode is mutually exclusive with the HTTP listener/push gateway i.e. enabling the remote write will + /// disable the HTTP listener/push gateway, and vise versa. + /// + /// Defaults to disabled. + /// + /// ## Errors + /// + /// If the given endpoint cannot be parsed into a valid URI, an error variant will be returned describing the error. + /// + #[cfg(feature = "remote-write")] + #[cfg_attr(docsrs, doc(cfg(feature = "remote-write")))] + pub fn with_remote_write( + mut self, + endpoint: T, + interval: Duration, + user_agent: &str, + ) -> Result + where + T: AsRef, + { + self.exporter_config = ExporterConfig::RemoteWrite { + endpoint: Uri::try_from(endpoint.as_ref()) + .map_err(|e| BuildError::InvalidRemoteWriteEndpoint(e.to_string()))?, + interval, + user_agent: user_agent.to_string(), + }; + Ok(self) + } + /// Configures the exporter to expose an HTTP listener that functions as a [scrape endpoint], listening on a Unix /// Domain socket at the given path /// @@ -486,6 +517,10 @@ impl PrometheusBuilder { endpoint, interval, username, password, handle, ) } + #[cfg(feature = "remote-write")] + ExporterConfig::RemoteWrite { endpoint, interval, user_agent } => { + super::remote_write::new_remote_write(endpoint, interval, handle, &user_agent) + } }, )) } diff --git a/metrics-exporter-prometheus/src/exporter/mod.rs b/metrics-exporter-prometheus/src/exporter/mod.rs index d10c0336..09d3f2e9 100644 --- a/metrics-exporter-prometheus/src/exporter/mod.rs +++ b/metrics-exporter-prometheus/src/exporter/mod.rs @@ -48,6 +48,11 @@ enum ExporterConfig { password: Option, }, + // Run a remote write task sending to the given `endpoint` after `interval` time has elapsed, + // infinitely. + #[cfg(feature = "remote-write")] + RemoteWrite { endpoint: Uri, interval: Duration, user_agent: String }, + #[allow(dead_code)] Unconfigured, } @@ -60,6 +65,8 @@ impl ExporterConfig { Self::HttpListener { .. } => "http-listener", #[cfg(feature = "push-gateway")] Self::PushGateway { .. } => "push-gateway", + #[cfg(feature = "remote-write")] + Self::RemoteWrite { .. } => "remote-write", Self::Unconfigured => "unconfigured,", } } @@ -71,4 +78,7 @@ mod http_listener; #[cfg(feature = "push-gateway")] mod push_gateway; +#[cfg(feature = "remote-write")] +mod remote_write; + pub(crate) mod builder; diff --git a/metrics-exporter-prometheus/src/exporter/remote_write.rs b/metrics-exporter-prometheus/src/exporter/remote_write.rs new file mode 100644 index 00000000..9f44e4b9 --- /dev/null +++ b/metrics-exporter-prometheus/src/exporter/remote_write.rs @@ -0,0 +1,68 @@ +use std::time::Duration; + +use http_body_util::{BodyExt, Collected, Full}; +use hyper::{body::Bytes, Uri}; +use hyper_util::{client::legacy::Client, rt::TokioExecutor}; +use tracing::error; + +use crate::PrometheusHandle; + +use super::ExporterFuture; + +// Creates an ExporterFuture implementing a remote write. +pub(super) fn new_remote_write( + endpoint: Uri, + interval: Duration, + handle: PrometheusHandle, + user_agent: &str, +) -> ExporterFuture { + let user_agent = user_agent.to_string(); + Box::pin(async move { + let https = hyper_rustls::HttpsConnectorBuilder::new() + .with_native_roots() + .expect("no native root CA certificates found") + .https_or_http() + .enable_http1() + .build(); + let client: Client<_, Full> = Client::builder(TokioExecutor::new()) + .pool_idle_timeout(Duration::from_secs(30)) + .build(https); + + loop { + // Sleep for `interval` amount of time, and then do a push. + tokio::time::sleep(interval).await; + + let binary = handle.render_remote_write_format(); + + let req = match binary.build_http_request(&endpoint, &user_agent) { + Ok(req) => req, + Err(err) => { + error!("failed to build http remote write request {}", err); + continue; + } + }; + match client.request(req).await { + Ok(response) => { + if !response.status().is_success() { + let status = response.status(); + let status = status.canonical_reason().unwrap_or_else(|| status.as_str()); + let body = response + .into_body() + .collect() + .await + .map(Collected::to_bytes) + .map_err(|_| ()) + .and_then(|b| String::from_utf8(b[..].to_vec()).map_err(|_| ())) + .unwrap_or_else(|()| String::from("")); + error!( + message = "unexpected status after pushing metrics to remote write", + status, + %body, + ); + } + } + Err(e) => error!("error sending request to remote write {}: {:?}", endpoint, e), + } + } + }) +} diff --git a/metrics-exporter-prometheus/src/lib.rs b/metrics-exporter-prometheus/src/lib.rs index 2240808c..213b4436 100644 --- a/metrics-exporter-prometheus/src/lib.rs +++ b/metrics-exporter-prometheus/src/lib.rs @@ -120,10 +120,11 @@ pub use distribution::{Distribution, DistributionBuilder}; mod exporter; pub use self::exporter::builder::PrometheusBuilder; -#[cfg(any(feature = "http-listener", feature = "push-gateway"))] -#[cfg_attr(docsrs, doc(cfg(any(feature = "http-listener", feature = "push-gateway"))))] +#[cfg(any(feature = "http-listener", feature = "push-gateway", feature = "remote-write"))] +#[cfg_attr(docsrs, doc(cfg(any(feature = "http-listener", feature = "push-gateway", feature = "remote-write"))))] pub use self::exporter::ExporterFuture; - +#[cfg(feature = "remote-write")] +mod remote_write_proto; pub mod formatting; mod recorder; diff --git a/metrics-exporter-prometheus/src/recorder.rs b/metrics-exporter-prometheus/src/recorder.rs index 7d49f688..35549888 100644 --- a/metrics-exporter-prometheus/src/recorder.rs +++ b/metrics-exporter-prometheus/src/recorder.rs @@ -26,7 +26,7 @@ pub(crate) struct Inner { } impl Inner { - fn get_recent_metrics(&self) -> Snapshot { + pub(crate) fn get_recent_metrics(&self) -> Snapshot { let mut counters = HashMap::new(); let counter_handles = self.registry.get_counter_handles(); for (key, counter) in counter_handles { @@ -108,7 +108,12 @@ impl Inner { histogram.get_inner().clear_with(|samples| entry.record_samples(samples)); } } - + /// Render metric to [Remote-Write format](https://prometheus.io/docs/specs/remote_write_spec/) + #[cfg(feature = "remote-write")] + fn render_remote_write_format(&self) -> crate::remote_write_proto::WriteRequest { + use crate::remote_write_proto::WriteRequest; + WriteRequest::from_raw(self) + } fn render(&self) -> String { let Snapshot { mut counters, mut distributions, mut gauges } = self.get_recent_metrics(); @@ -289,6 +294,13 @@ impl PrometheusHandle { self.inner.render() } + /// Takes a snapshot of the metrics held by the recorder and generates a payload conforming to + /// the Prometheus remote write format. + #[cfg(feature = "remote-write")] + pub fn render_remote_write_format(&self) -> crate::remote_write_proto::WriteRequest { + self.inner.render_remote_write_format() + } + /// Performs upkeeping operations to ensure metrics held by recorder are up-to-date and do not /// grow unboundedly. pub fn run_upkeep(&self) { diff --git a/metrics-exporter-prometheus/src/remote_write_proto.rs b/metrics-exporter-prometheus/src/remote_write_proto.rs new file mode 100644 index 00000000..2a7c7588 --- /dev/null +++ b/metrics-exporter-prometheus/src/remote_write_proto.rs @@ -0,0 +1,321 @@ +//! Types and utilities for calling Prometheus remote write API endpoints. +use std::sync::PoisonError; + +// Copy from https://github.com/theduke/prom-write +use http_body_util::Full; +use hyper::{body::Bytes, header, Method, Request, Uri}; +use quanta::Instant; + +use crate::{common::Snapshot, recorder::Inner, Distribution}; + +/// Special label for the name of a metric. +pub const LABEL_NAME: &str = "__name__"; +pub const CONTENT_TYPE: &str = "application/x-protobuf"; +pub const HEADER_NAME_REMOTE_WRITE_VERSION: &str = "X-Prometheus-Remote-Write-Version"; +pub const REMOTE_WRITE_VERSION_01: &str = "0.1.0"; + +/// A write request. +/// +/// .proto: +/// ```protobuf +/// message WriteRequest { +/// repeated TimeSeries timeseries = 1; +/// // Cortex uses this field to determine the source of the write request. +/// // We reserve it to avoid any compatibility issues. +/// reserved 2; + +/// // Prometheus uses this field to send metadata, but this is +/// // omitted from v1 of the spec as it is experimental. +/// reserved 3; +/// } +/// ``` +#[derive(prost::Message, Clone, PartialEq)] +pub struct WriteRequest { + #[prost(message, repeated, tag = "1")] + pub timeseries: Vec, +} + +impl WriteRequest { + /// Prepare the write request for sending. + /// + /// Ensures that the request conforms to the specification. + /// See https://prometheus.io/docs/concepts/remote_write_spec. + fn sort(&mut self) { + for series in &mut self.timeseries { + series.sort_labels_and_samples(); + } + } + + fn sorted(mut self) -> Self { + self.sort(); + self + } + + /// Encode this write request as a protobuf message. + pub fn encode_proto3(self) -> Vec { + prost::Message::encode_to_vec(&self.sorted()) + } + /// Encode this write request as a compressed protobuf message. + /// NOTE: The API requires snappy compression, not a raw protobuf message. + pub fn encode_compressed(self) -> Result, snap::Error> { + snap::raw::Encoder::new().compress_vec(&self.encode_proto3()) + } + + /// Parse metrics from inner metric object, and convert them into a [`WriteRequest`] + pub(super) fn from_raw(inner: &Inner) -> Self { + let Snapshot { mut counters, mut distributions, mut gauges } = inner.get_recent_metrics(); + let descriptions = inner.descriptions.read().unwrap_or_else(PoisonError::into_inner); + let mut req = WriteRequest::default(); + let mut all_series = std::collections::HashMap::::new(); + for (name, mut by_labels) in counters.drain() { + for (labels, value) in by_labels.drain() {} + } + for (name, mut by_labels) in gauges.drain() { + let mut labels = vec![]; + labels.push(Label { name: LABEL_NAME.to_string(), value: name }); + for (labels, value) in by_labels.drain() { + // labels.into_iter().map(|f|Label { + // name:f, + // value + // }).collect(); + } + } + for (name, mut by_labels) in distributions.drain() { + let distribution_type = inner.distribution_builder.get_distribution_type(name.as_str()); + + for (labels, distribution) in by_labels.drain(..) { + let (sum, count) = match distribution { + Distribution::Summary(summary, quantiles, sum) => { + let snapshot = summary.snapshot(Instant::now()); + for quantile in quantiles.iter() { + let value = snapshot.quantile(quantile.value()).unwrap_or(0.0); + } + + (sum, summary.count() as u64) + } + Distribution::Histogram(histogram) => { + for (le, count) in histogram.buckets() {} + + (histogram.sum(), histogram.count()) + } + }; + } + } + req + } + + /// Build a fully prepared HTTP request that an be sent to a remote write endpoint. + pub fn build_http_request( + self, + endpoint: &Uri, + user_agent: &str, + ) -> Result>, Box> { + let req = Request::builder() + .method(Method::POST) + .uri(endpoint) + .header(header::CONTENT_TYPE, CONTENT_TYPE) + .header(HEADER_NAME_REMOTE_WRITE_VERSION, REMOTE_WRITE_VERSION_01) + .header(header::CONTENT_ENCODING, "snappy") + .header(header::USER_AGENT, user_agent) + .body(Full::new(self.encode_compressed()?.into()))?; + Ok(req) + } +} + +/// A time series. +/// +/// .proto: +/// ```protobuf +/// message TimeSeries { +/// repeated Label labels = 1; +/// repeated Sample samples = 2; +/// } +/// ``` +#[derive(prost::Message, Clone, PartialEq)] +pub struct TimeSeries { + #[prost(message, repeated, tag = "1")] + pub labels: Vec