Skip to content
This repository has been archived by the owner on Jun 8, 2024. It is now read-only.

Commit

Permalink
tinker with metric aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
KodrAus committed Nov 9, 2023
1 parent 6fa6373 commit 9f95f2e
Show file tree
Hide file tree
Showing 10 changed files with 417 additions and 157 deletions.
30 changes: 22 additions & 8 deletions core/src/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ pub trait Emitter {
Self: Sized,
{
And {
lhs: self,
rhs: other,
left: self,
right: other,
}
}

Expand Down Expand Up @@ -91,21 +91,35 @@ pub fn from_fn<F: Fn(&Event<&dyn ErasedProps>)>(f: F) -> FromFn<F> {
}

pub struct And<T, U> {
lhs: T,
rhs: U,
left: T,
right: U,
}

impl<T, U> And<T, U> {
pub fn left(&self) -> &T {
&self.left
}

pub fn right(&self) -> &U {
&self.right
}
}

impl<T: Emitter, U: Emitter> Emitter for And<T, U> {
fn emit<P: Props>(&self, evt: &Event<P>) {
self.lhs.emit(evt);
self.rhs.emit(evt);
self.left.emit(evt);
self.right.emit(evt);
}

fn blocking_flush(&self, timeout: Duration) {
// Approximate; give each target an equal
// time to flush. With a monotonic clock
// we could measure the time each takes
// to flush and track in our timeout
let timeout = timeout / 2;

self.lhs.blocking_flush(timeout);
self.rhs.blocking_flush(timeout);
self.left.blocking_flush(timeout);
self.right.blocking_flush(timeout);
}
}

Expand Down
8 changes: 8 additions & 0 deletions core/src/extent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ impl Extent {
}

pub fn as_point(&self) -> Option<&Timestamp> {
if self.is_point() {
self.to_point()
} else {
None
}
}

pub fn to_point(&self) -> Option<&Timestamp> {
Some(&self.0.as_ref()?.end)
}

Expand Down
40 changes: 30 additions & 10 deletions core/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ pub trait Filter {
Self: Sized,
{
And {
lhs: self,
rhs: other,
left: self,
right: other,
}
}

Expand All @@ -22,8 +22,8 @@ pub trait Filter {
Self: Sized,
{
Or {
lhs: self,
rhs: other,
left: self,
right: other,
}
}

Expand Down Expand Up @@ -79,24 +79,44 @@ pub fn from_fn<F: Fn(&Event<&dyn ErasedProps>)>(f: F) -> FromFn<F> {
}

pub struct And<T, U> {
lhs: T,
rhs: U,
left: T,
right: U,
}

impl<T, U> And<T, U> {
pub fn left(&self) -> &T {
&self.left
}

pub fn right(&self) -> &U {
&self.right
}
}

impl<T: Filter, U: Filter> Filter for And<T, U> {
fn matches<P: Props>(&self, evt: &Event<P>) -> bool {
self.lhs.matches(evt) && self.rhs.matches(evt)
self.left.matches(evt) && self.right.matches(evt)
}
}

pub struct Or<T, U> {
lhs: T,
rhs: U,
left: T,
right: U,
}

impl<T, U> Or<T, U> {
pub fn left(&self) -> &T {
&self.left
}

pub fn right(&self) -> &U {
&self.right
}
}

impl<T: Filter, U: Filter> Filter for Or<T, U> {
fn matches<P: Props>(&self, evt: &Event<P>) -> bool {
self.lhs.matches(evt) || self.rhs.matches(evt)
self.left.matches(evt) || self.right.matches(evt)
}
}

Expand Down
88 changes: 47 additions & 41 deletions core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,36 @@ use crate::{
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Metric<'m, T> {
name: Key<'m>,
kind: MetricKind,
kind: Option<MetricKind>,
value: T,
}

impl<'m, T> Metric<'m, T> {
pub const fn new(name: Key<'m>, kind: MetricKind, value: T) -> Self {
pub const fn new(kind: Option<MetricKind>, name: Key<'m>, value: T) -> Self {
Metric { name, kind, value }
}

pub const fn sum(name: Key<'m>, value: T) -> Self {
Metric::new(Some(MetricKind::Sum), name, value)
}

pub const fn min(name: Key<'m>, value: T) -> Self {
Metric::new(Some(MetricKind::Min), name, value)
}

pub const fn max(name: Key<'m>, value: T) -> Self {
Metric::new(Some(MetricKind::Max), name, value)
}

pub const fn mean(name: Key<'m>, value: T) -> Self {
Metric::new(Some(MetricKind::Mean), name, value)
}

pub const fn name(&self) -> &Key<'m> {
&self.name
}

pub const fn kind(&self) -> MetricKind {
pub const fn kind(&self) -> Option<MetricKind> {
self.kind
}

Expand All @@ -34,32 +50,6 @@ impl<'m, T> Metric<'m, T> {
pub fn value_mut(&mut self) -> &mut T {
&mut self.value
}

pub fn sample<'a, U: ToValue>(
&'a self,
sample: impl FnOnce(MetricKind, &'a T) -> (MetricKind, U),
) -> Metric<'a, U> {
let (kind, value) = sample(self.kind, &self.value);

Metric {
name: self.name.by_ref(),
kind,
value,
}
}

pub fn sample_mut<'a, U: ToValue>(
&'a mut self,
sample: impl FnOnce(MetricKind, &'a mut T) -> (MetricKind, U),
) -> Metric<'a, U> {
let (kind, value) = sample(self.kind, &mut self.value);

Metric {
name: self.name.by_ref(),
kind,
value,
}
}
}

impl<'m, V: ToValue> Props for Metric<'m, V> {
Expand All @@ -68,18 +58,23 @@ impl<'m, V: ToValue> Props for Metric<'m, V> {
mut for_each: F,
) -> ControlFlow<()> {
for_each(METRIC_NAME_KEY.to_key(), self.name.to_value())?;
for_each(METRIC_KIND_KEY.to_key(), self.kind.to_value())?;
for_each(METRIC_VALUE_KEY.to_key(), self.value.to_value())?;

if let Some(ref kind) = self.kind {
for_each(METRIC_KIND_KEY.to_key(), kind.to_value())?;
}

ControlFlow::Continue(())
}
}

#[non_exhaustive]
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub enum MetricKind {
Point,
Counter,
Sum,
Min,
Max,
Mean,
}

impl fmt::Debug for MetricKind {
Expand All @@ -91,8 +86,10 @@ impl fmt::Debug for MetricKind {
impl fmt::Display for MetricKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
MetricKind::Point => "point",
MetricKind::Counter => "counter",
MetricKind::Sum => "sum",
MetricKind::Min => "min",
MetricKind::Max => "max",
MetricKind::Mean => "mean",
})
}
}
Expand All @@ -101,12 +98,17 @@ impl FromStr for MetricKind {
type Err = ParseMetricKindError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.eq_ignore_ascii_case("point") {
return Ok(MetricKind::Point);
if s.eq_ignore_ascii_case("sum") {
return Ok(MetricKind::Sum);
}

if s.eq_ignore_ascii_case("counter") {
return Ok(MetricKind::Counter);
if s.eq_ignore_ascii_case("min") {
return Ok(MetricKind::Min);
}
if s.eq_ignore_ascii_case("max") {
return Ok(MetricKind::Max);
}
if s.eq_ignore_ascii_case("mean") {
return Ok(MetricKind::Mean);
}

Err(ParseMetricKindError {})
Expand Down Expand Up @@ -138,12 +140,16 @@ mod tests {

#[test]
fn metric_well_known() {
let metric = Metric::new(Key::new("metric"), MetricKind::Point, Value::from(1usize));
let metric = Metric::new(
Some(MetricKind::Sum),
Key::new("metric"),
Value::from(1usize),
);

let well_known = WellKnown::metric(&metric).unwrap();

assert_eq!("metric", well_known.name());
assert_eq!(MetricKind::Point, well_known.kind());
assert_eq!(Some(MetricKind::Sum), well_known.kind());
assert_eq!(1, well_known.value().to_usize().unwrap());
}
}
10 changes: 10 additions & 0 deletions core/src/props.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ pub struct Chain<T, U> {
second: U,
}

impl<T, U> Chain<T, U> {
pub fn first(&self) -> &T {
&self.first
}

pub fn second(&self) -> &U {
&self.second
}
}

impl<A: Props, B: Props> Props for Chain<A, B> {
fn for_each<'kv, F: FnMut(Key<'kv>, Value<'kv>) -> ControlFlow<()>>(
&'kv self,
Expand Down
4 changes: 4 additions & 0 deletions core/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ impl<'v> Value<'v> {
pub fn to_usize(&self) -> Option<usize> {
self.0.to_u64()?.try_into().ok()
}

pub fn to_f64(&self) -> Option<f64> {
self.0.to_f64()
}
}

pub trait Visitor<'v> {
Expand Down
2 changes: 1 addition & 1 deletion core/src/well_known.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ pub trait WellKnown: Props {

fn metric(&self) -> Option<Metric<Value>> {
Some(Metric::new(
self.metric_kind(),
self.metric_name()?,
self.metric_kind()?,
self.metric_value()?,
))
}
Expand Down
8 changes: 8 additions & 0 deletions src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ pub struct Init<TEmitter: Emitter = DefaultEmitter, TCtxt: Ctxt = DefaultCtxt> {
}

impl<TEmitter: Emitter, TCtxt: Ctxt> Init<TEmitter, TCtxt> {
pub fn emitter(&self) -> &TEmitter {
&self.emitter
}

pub fn ctxt(&self) -> &TCtxt {
&self.ctxt
}

pub fn blocking_flush(&self, timeout: Duration) {
self.emitter.blocking_flush(timeout);
}
Expand Down
Loading

0 comments on commit 9f95f2e

Please sign in to comment.