Skip to content

Commit

Permalink
dataspec: add histogram merge operation
Browse files Browse the repository at this point in the history
  • Loading branch information
mihirn committed Oct 9, 2023
1 parent a31a405 commit 145b85c
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/dataspec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ license = { workspace = true }
[dependencies]
histogram = { workspace = true }
serde = { workspace = true }
thiserror = "1.0.47"
111 changes: 110 additions & 1 deletion lib/dataspec/src/histogram.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use serde::{Deserialize, Serialize};
use thiserror::Error;

use histogram::Histogram as _Histogram;

Expand All @@ -9,7 +10,7 @@ use histogram::Histogram as _Histogram;
/// of the Histogram. It stores an individual vector for each field
/// of non-zero buckets. Assuming index[0] = n, (index[0], count[0])
/// corresponds to the nth bucket.
#[derive(Default, Serialize, Deserialize)]
#[derive(Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct Histogram {
/// parameters representing the resolution and the range of
/// the histogram tracking request latencies
Expand All @@ -21,6 +22,67 @@ pub struct Histogram {
/// histogram bucket counts corresponding to the indices
pub count: Vec<u32>,
}
///
/// Errors returned for operations on histograms.
#[non_exhaustive]
#[derive(Error, Debug, PartialEq)]
pub enum Error {
#[error("histograms with different parameters can't be merged")]
MismatchedParams,
}

impl Histogram {
fn add_bucket(&mut self, idx: usize, n: u32) {
self.index.push(idx);
self.count.push(n);
}

pub fn merge(&self, h: &Histogram) -> Result<Histogram, Error> {
if self.m != h.m || self.r != h.r || self.n != h.n {
return Err(Error::MismatchedParams);
}

let mut histogram = Histogram {
m: self.m,
r: self.r,
n: self.n,
index: Vec::new(),
count: Vec::new(),
};

// Sort and merge buckets from both histograms
let (mut i, mut j) = (0, 0);
while i < self.index.len() && j < h.index.len() {
let (k1, v1) = (self.index[i], self.count[i]);
let (k2, v2) = (h.index[j], h.count[j]);

if k1 == k2 {
histogram.add_bucket(k1, v1 + v2);
(i, j) = (i + 1, j + 1);
} else if k1 < k2 {
histogram.add_bucket(k1, v1);
i += 1;
} else {
histogram.add_bucket(k2, v2);
j += 1;
}
}

// Fill remaining values, if any, from the left histogram
if i < self.index.len() {
histogram.index.extend(&self.index[i..self.index.len()]);
histogram.count.extend(&self.count[i..self.count.len()]);
}

// Fill remaining values, if any, from the left histogram
if j < h.index.len() {
histogram.index.extend(&h.index[i..h.index.len()]);
histogram.count.extend(&h.count[i..h.count.len()]);
}

return Ok(histogram);
}
}

impl From<&_Histogram> for Histogram {
fn from(histogram: &_Histogram) -> Self {
Expand All @@ -46,3 +108,50 @@ impl From<&_Histogram> for Histogram {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn merge() {
let h1 = Histogram {
m: 0,
r: 7,
n: 32,
index: vec![1, 3, 5],
count: vec![6, 12, 7],
};

let h2 = Histogram {
m: 0,
r: 7,
n: 32,
index: Vec::new(),
count: Vec::new(),
};

let h3 = Histogram {
m: 0,
r: 7,
n: 32,
index: vec![2, 3, 4, 11],
count: vec![5, 7, 3, 15],
};

let h = h1.merge(&Histogram::default());
assert_eq!(h, Err(Error::MismatchedParams));

let h = h1.merge(&h2).unwrap();
assert_eq!(h.index, vec![1, 3, 5]);
assert_eq!(h.count, vec![6, 12, 7]);

let h = h2.merge(&h3).unwrap();
assert_eq!(h.index, vec![2, 3, 4, 11]);
assert_eq!(h.count, vec![5, 7, 3, 15]);

let h = h1.merge(&h3).unwrap();
assert_eq!(h.index, vec![1, 2, 3, 4, 5, 11]);
assert_eq!(h.count, vec![6, 5, 19, 3, 7, 15]);
}
}

0 comments on commit 145b85c

Please sign in to comment.