Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ version.workspace = true
[dependencies]
chrono = { version = "0", default-features = false, features = ["clock"] }
derive_more = { version = "2", features = ["constructor"] }
openmetrics-parser = "0.4.4"
serde = { version = "1", features = ["derive"] }
serde_json = "1.0.140"
thiserror = "2"
Expand Down
152 changes: 151 additions & 1 deletion packages/metrics/src/metric_collection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,138 @@ impl MetricCollection {

Ok(())
}

/// Parse a Prometheus exposition text (0.0.4 or OpenMetrics-compatible) into a MetricCollection.
/// This currently supports Counter and Gauge families. Other families will return UnsupportedPrometheusMetricType.
pub fn from_prometheus_text(input: &str, now: DurationSinceUnixEpoch) -> Result<Self, Error> {
use openmetrics_parser::prometheus::parse_prometheus;
use openmetrics_parser::{MetricNumber, MetricsExposition, PrometheusType, PrometheusValue};

let exposition: MetricsExposition<PrometheusType, PrometheusValue> =
parse_prometheus(input).map_err(|e| Error::PrometheusTextParse { message: e.to_string() })?;

let mut counters: HashMap<MetricName, Metric<Counter>> = HashMap::new();
let mut gauges: HashMap<MetricName, Metric<Gauge>> = HashMap::new();

for (family_name, family) in exposition.families {
match family.family_type {
PrometheusType::Counter => {
let mut metric = Metric::<Counter>::new(
MetricName::new(&family_name),
None,
if family.help.is_empty() { None } else { Some(MetricDescription::new(&family.help)) },
SampleCollection::default(),
);
for sample in family.into_iter_samples() {
// Build LabelSet from parser's labelset
let parser_label_set = sample
.get_labelset()
.map_err(|e| Error::PrometheusTextParse { message: e.to_string() })?;
let labels: Vec<(String, String)> = parser_label_set
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
let label_set: LabelSet = labels.into();

// Timestamp (prefer sample's, fallback to now)
let ts = sample.timestamp.and_then(|t| {
if t.is_finite() && t >= 0.0 {
let secs_f = t.trunc();
let frac = t - secs_f;
let secs = secs_f as u64;
let nanos = (frac * 1_000_000_000.0).round() as u32;
let (secs, nanos) = if nanos >= 1_000_000_000 { (secs + 1, nanos - 1_000_000_000) } else { (secs, nanos) };
Some(DurationSinceUnixEpoch::new(secs, nanos))
} else {
None
}
}).unwrap_or(now);

// Value
let value = match sample.value {
PrometheusValue::Counter(cv) => match cv.value {
MetricNumber::Int(i) if i >= 0 => i as u64,
MetricNumber::Float(f) if f.is_finite() && f >= 0.0 => f as u64,
_ => 0,
},
PrometheusValue::Unknown(_) => 0,
_ => {
return Err(Error::UnsupportedPrometheusMetricType {
metric_name: family_name.clone(),
metric_type: "counter(value-mismatch)".to_string(),
})
}
};

metric.absolute(&label_set, value, ts);
}
counters.insert(MetricName::new(&family_name), metric);
}
PrometheusType::Gauge => {
let mut metric = Metric::<Gauge>::new(
MetricName::new(&family_name),
None,
if family.help.is_empty() { None } else { Some(MetricDescription::new(&family.help)) },
SampleCollection::default(),
);
for sample in family.into_iter_samples() {
let parser_label_set = sample
.get_labelset()
.map_err(|e| Error::PrometheusTextParse { message: e.to_string() })?;
let labels: Vec<(String, String)> = parser_label_set
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
let label_set: LabelSet = labels.into();

// Timestamp (prefer sample's, fallback to now)
let ts = sample.timestamp.and_then(|t| {
if t.is_finite() && t >= 0.0 {
let secs_f = t.trunc();
let frac = t - secs_f;
let secs = secs_f as u64;
let nanos = (frac * 1_000_000_000.0).round() as u32;
let (secs, nanos) = if nanos >= 1_000_000_000 { (secs + 1, nanos - 1_000_000_000) } else { (secs, nanos) };
Some(DurationSinceUnixEpoch::new(secs, nanos))
} else {
None
}
}).unwrap_or(now);

let value = match sample.value {
PrometheusValue::Gauge(mn) => mn.as_f64(),
PrometheusValue::Unknown(_) => 0.0,
_ => {
return Err(Error::UnsupportedPrometheusMetricType {
metric_name: family_name.clone(),
metric_type: "gauge(value-mismatch)".to_string(),
})
}
};

metric.set(&label_set, value, ts);
}
gauges.insert(MetricName::new(&family_name), metric);
}
other => {
return Err(Error::UnsupportedPrometheusMetricType {
metric_name: family_name.clone(),
metric_type: format!("{other:?}"),
});
}
}
}

let counters = MetricKindCollection::new(counters.into_values().collect())
.map_err(|e| Error::PrometheusTextParse { message: e.to_string() })?;
let gauges = MetricKindCollection::new(gauges.into_values().collect())
.map_err(|e| Error::PrometheusTextParse { message: e.to_string() })?;

MetricCollection::new(counters, gauges)
}
}


#[derive(thiserror::Error, Debug, Clone)]
pub enum Error {
#[error("Metric names must be unique across all metrics types.")]
Expand All @@ -247,6 +377,12 @@ pub enum Error {

#[error("Cannot create metric with name '{metric_name}': another metric with this name already exists")]
MetricNameCollisionAdding { metric_name: MetricName },

#[error("Failed to parse Prometheus text: {message}")]
PrometheusTextParse { message: String },

#[error("Unsupported or unknown Prometheus metric type '{metric_type}' for metric '{metric_name}'")]
UnsupportedPrometheusMetricType { metric_name: String, metric_type: String },
}

/// Implements serialization for `MetricCollection`.
Expand Down Expand Up @@ -722,7 +858,21 @@ udp_tracker_server_performance_avg_announce_processing_time_ns{server_binding_ip
}

#[test]
fn it_should_allow_serializing_to_prometheus_format() {
fn it_should_allow_deserializing_from_prometheus_text() {
// Given the fixture's Prometheus exposition
let (expected_metric_collection, _expected_json, prometheus_text) = MetricCollectionFixture::default().deconstruct();
let time = DurationSinceUnixEpoch::from_secs(1_743_552_000);

// Only ensure a trailing newline for the parser
let with_trailing_newline = if prometheus_text.ends_with('\n') { prometheus_text } else { format!("{}\n", prometheus_text) };
let parsed = MetricCollection::from_prometheus_text(&with_trailing_newline, time).unwrap();

// Then it should match the fixture object
assert_eq!(parsed, expected_metric_collection);
}

#[test]
fn it_should_allow_serializing_to_prometheus_format_again() {
let (metric_collection, _expected_json, expected_prometheus) = MetricCollectionFixture::default().deconstruct();

let prometheus_output = metric_collection.to_prometheus();
Expand Down
Loading