Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dataspec: add histogram merge operation #55

Merged
merged 2 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/dataspec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ license = { workspace = true }
[dependencies]
histogram = { workspace = true }
serde = { workspace = true }
thiserror = "1.0.47"
115 changes: 114 additions & 1 deletion lib/dataspec/src/histogram.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use serde::{Deserialize, Serialize};
use thiserror::Error;

use histogram::Histogram as _Histogram;

Expand All @@ -9,7 +10,7 @@
/// of the Histogram. It stores an individual vector for each field
/// of non-zero buckets. Assuming index[0] = n, (index[0], count[0])
/// corresponds to the nth bucket.
#[derive(Default, Serialize, Deserialize)]
#[derive(Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct Histogram {
/// parameters representing the resolution and the range of
/// the histogram tracking request latencies
Expand All @@ -22,6 +23,71 @@
pub count: Vec<u32>,
}

/// Errors returned for operations on histograms.
#[non_exhaustive]
#[derive(Error, Debug, PartialEq)]
pub enum Error {
#[error("histograms with different parameters can't be merged")]
MismatchedParams,
}

impl Histogram {
fn add_bucket(&mut self, idx: usize, n: u32) {
self.index.push(idx);
self.count.push(n);
}

/// Merges two Histograms and returns the results in a new Histogram.
///
/// Both histograms must have the same configuration parameters.
/// Buckets which have values in both histograms are allowed to wrap.
pub fn merge(&self, h: &Histogram) -> Result<Histogram, Error> {
if self.m != h.m || self.r != h.r || self.n != h.n {
return Err(Error::MismatchedParams);
}

let mut histogram = Histogram {
m: self.m,
r: self.r,
n: self.n,
index: Vec::new(),
count: Vec::new(),
};

// Sort and merge buckets from both histograms
let (mut i, mut j) = (0, 0);
while i < self.index.len() && j < h.index.len() {
let (k1, v1) = (self.index[i], self.count[i]);
let (k2, v2) = (h.index[j], h.count[j]);

if k1 == k2 {
histogram.add_bucket(k1, v1 + v2);
(i, j) = (i + 1, j + 1);
} else if k1 < k2 {
histogram.add_bucket(k1, v1);
i += 1;
} else {
histogram.add_bucket(k2, v2);
j += 1;
}
Fixed Show fixed Hide fixed
}

// Fill remaining values, if any, from the left histogram
if i < self.index.len() {
histogram.index.extend(&self.index[i..self.index.len()]);
histogram.count.extend(&self.count[i..self.count.len()]);
}

// Fill remaining values, if any, from the left histogram
if j < h.index.len() {
histogram.index.extend(&h.index[i..h.index.len()]);
histogram.count.extend(&h.count[i..h.count.len()]);
}

return Ok(histogram);
Fixed Show fixed Hide fixed
}
}

impl From<&_Histogram> for Histogram {
fn from(histogram: &_Histogram) -> Self {
let mut index = Vec::new();
Expand All @@ -46,3 +112,50 @@
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn merge() {
let h1 = Histogram {
m: 0,
r: 7,
n: 32,
index: vec![1, 3, 5],
count: vec![6, 12, 7],
};

let h2 = Histogram {
m: 0,
r: 7,
n: 32,
index: Vec::new(),
count: Vec::new(),
};

let h3 = Histogram {
m: 0,
r: 7,
n: 32,
index: vec![2, 3, 4, 11],
count: vec![5, 7, 3, 15],
};

let h = h1.merge(&Histogram::default());
assert_eq!(h, Err(Error::MismatchedParams));

let h = h1.merge(&h2).unwrap();
assert_eq!(h.index, vec![1, 3, 5]);
assert_eq!(h.count, vec![6, 12, 7]);

let h = h2.merge(&h3).unwrap();
assert_eq!(h.index, vec![2, 3, 4, 11]);
assert_eq!(h.count, vec![5, 7, 3, 15]);

let h = h1.merge(&h3).unwrap();
assert_eq!(h.index, vec![1, 2, 3, 4, 5, 11]);
assert_eq!(h.count, vec![6, 5, 19, 3, 7, 15]);
}
}
Loading