From 2cb37cb3857a39cfdbcc9162453cd2bdf6fed4bd Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Tue, 4 Mar 2025 15:34:27 -0500 Subject: [PATCH 1/2] experiment: try to reduce overhead of performing counter increments/gauge sets --- .../examples/dogstatsd_synchronous.rs | 2 + metrics-exporter-dogstatsd/src/storage.rs | 58 ++++++++++--------- 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/metrics-exporter-dogstatsd/examples/dogstatsd_synchronous.rs b/metrics-exporter-dogstatsd/examples/dogstatsd_synchronous.rs index 54f651f9..f7c22750 100644 --- a/metrics-exporter-dogstatsd/examples/dogstatsd_synchronous.rs +++ b/metrics-exporter-dogstatsd/examples/dogstatsd_synchronous.rs @@ -9,6 +9,8 @@ fn main() { DogStatsDBuilder::default() .with_remote_address("localhost:9125") .expect("failed to parse remote address") + .with_histogram_sampling(true) + .with_histogram_reservoir_size(128) .install() .expect("failed to install DogStatsD recorder"); diff --git a/metrics-exporter-dogstatsd/src/storage.rs b/metrics-exporter-dogstatsd/src/storage.rs index 49b40cf1..2ec392cd 100644 --- a/metrics-exporter-dogstatsd/src/storage.rs +++ b/metrics-exporter-dogstatsd/src/storage.rs @@ -19,40 +19,42 @@ use metrics_util::{ }; pub(crate) struct AtomicCounter { - is_absolute: AtomicBool, - last: AtomicU64, + //is_absolute: AtomicBool, + //last: AtomicU64, current: AtomicU64, - updates: AtomicU64, + //updates: AtomicU64, } impl AtomicCounter { /// Creates a new `AtomicCounter`. fn new() -> Self { Self { - is_absolute: AtomicBool::new(false), - last: AtomicU64::new(0), + //is_absolute: AtomicBool::new(false), + //last: AtomicU64::new(0), current: AtomicU64::new(0), - updates: AtomicU64::new(0), + //updates: AtomicU64::new(0), } } /// Flushes the current counter value, returning the delta of the counter value, and the number of updates, since /// the last flush. pub fn flush(&self) -> (u64, u64) { - let current = self.current.load(Acquire); - let last = self.last.swap(current, AcqRel); - let delta = current.wrapping_sub(last); - let updates = self.updates.swap(0, AcqRel); - - (delta, updates) + //let current = self.current.load(Acquire); + //let last = self.last.swap(current, AcqRel); + //let delta = current.wrapping_sub(last); + //let updates = self.updates.swap(0, AcqRel); + + //(delta, updates) + let delta = self.current.swap(0, Relaxed); + (delta, if delta > 0 { 1 } else { 0 }) } } impl CounterFn for AtomicCounter { fn increment(&self, value: u64) { - self.is_absolute.store(false, Release); + //self.is_absolute.store(false, Release); self.current.fetch_add(value, Relaxed); - self.updates.fetch_add(1, Relaxed); + //self.updates.fetch_add(1, Relaxed); } fn absolute(&self, value: u64) { @@ -60,32 +62,36 @@ impl CounterFn for AtomicCounter { // consistent starting point when flushing. This ensures that we only start flushing deltas once we've gotten // two consecutive absolute values, since otherwise we might be calculating a delta between a `last` of 0 and a // very large `current` value. - if !self.is_absolute.swap(true, Release) { - self.last.store(value, Release); - } + //if !self.is_absolute.swap(true, Release) { + // self.last.store(value, Release); + //} - self.current.store(value, Release); - self.updates.fetch_add(1, Relaxed); + //self.current.store(value, Release); + //self.updates.fetch_add(1, Relaxed); } } pub(crate) struct AtomicGauge { inner: AtomicU64, - updates: AtomicU64, + //updates: AtomicU64, } impl AtomicGauge { /// Creates a new `AtomicGauge`. fn new() -> Self { - Self { inner: AtomicU64::new(0.0f64.to_bits()), updates: AtomicU64::new(0) } + Self { + inner: AtomicU64::new(0.0f64.to_bits()), + //updates: AtomicU64::new(0), + } } /// Flushes the current gauge value and the number of updates since the last flush. pub fn flush(&self) -> (f64, u64) { let current = f64::from_bits(self.inner.load(Acquire)); - let updates = self.updates.swap(0, AcqRel); + //let updates = self.updates.swap(0, AcqRel); - (current, updates) + //(current, updates) + (current, 0) } } @@ -97,7 +103,7 @@ impl GaugeFn for AtomicGauge { Some(f64::to_bits(new)) }) .expect("should never fail to update gauge"); - self.updates.fetch_add(1, Relaxed); + //self.updates.fetch_add(1, Relaxed); } fn decrement(&self, value: f64) { @@ -107,12 +113,12 @@ impl GaugeFn for AtomicGauge { Some(f64::to_bits(new)) }) .expect("should never fail to update gauge"); - self.updates.fetch_add(1, Relaxed); + //self.updates.fetch_add(1, Relaxed); } fn set(&self, value: f64) { self.inner.store(value.to_bits(), Release); - self.updates.fetch_add(1, Relaxed); + //self.updates.fetch_add(1, Relaxed); } } From 27870901f203a1d254d9201f9938e8c1891f1769 Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Thu, 6 Mar 2025 08:00:21 -0500 Subject: [PATCH 2/2] try padding things out to determine if we're hitting false sharing --- metrics-exporter-dogstatsd/src/lib.rs | 1 + metrics-exporter-dogstatsd/src/storage.rs | 44 ++++----- metrics-exporter-dogstatsd/src/util.rs | 112 ++++++++++++++++++++++ 3 files changed, 134 insertions(+), 23 deletions(-) create mode 100644 metrics-exporter-dogstatsd/src/util.rs diff --git a/metrics-exporter-dogstatsd/src/lib.rs b/metrics-exporter-dogstatsd/src/lib.rs index b7845998..bb7c0be3 100644 --- a/metrics-exporter-dogstatsd/src/lib.rs +++ b/metrics-exporter-dogstatsd/src/lib.rs @@ -103,4 +103,5 @@ pub use self::recorder::DogStatsDRecorder; mod state; mod storage; mod telemetry; +pub(crate) mod util; mod writer; diff --git a/metrics-exporter-dogstatsd/src/storage.rs b/metrics-exporter-dogstatsd/src/storage.rs index 2ec392cd..8121b0c8 100644 --- a/metrics-exporter-dogstatsd/src/storage.rs +++ b/metrics-exporter-dogstatsd/src/storage.rs @@ -1,10 +1,7 @@ use std::{ slice::Iter, sync::{ - atomic::{ - AtomicBool, AtomicU64, - Ordering::{AcqRel, Acquire, Relaxed, Release}, - }, + atomic::{AtomicU64, Ordering::Relaxed}, Arc, }, }; @@ -18,10 +15,12 @@ use metrics_util::{ }, }; +use crate::util::CachePadded; + pub(crate) struct AtomicCounter { //is_absolute: AtomicBool, //last: AtomicU64, - current: AtomicU64, + current: CachePadded, //updates: AtomicU64, } @@ -31,7 +30,7 @@ impl AtomicCounter { Self { //is_absolute: AtomicBool::new(false), //last: AtomicU64::new(0), - current: AtomicU64::new(0), + current: CachePadded::new(AtomicU64::new(0)), //updates: AtomicU64::new(0), } } @@ -57,7 +56,7 @@ impl CounterFn for AtomicCounter { //self.updates.fetch_add(1, Relaxed); } - fn absolute(&self, value: u64) { + fn absolute(&self, _value: u64) { // Ensure the counter is in absolute mode, and if it wasn't already, reset `last` to `value` to give ourselves a // consistent starting point when flushing. This ensures that we only start flushing deltas once we've gotten // two consecutive absolute values, since otherwise we might be calculating a delta between a `last` of 0 and a @@ -72,66 +71,65 @@ impl CounterFn for AtomicCounter { } pub(crate) struct AtomicGauge { - inner: AtomicU64, - //updates: AtomicU64, + inner: CachePadded, + updates: CachePadded, } impl AtomicGauge { /// Creates a new `AtomicGauge`. fn new() -> Self { Self { - inner: AtomicU64::new(0.0f64.to_bits()), - //updates: AtomicU64::new(0), + inner: CachePadded::new(AtomicU64::new(0.0f64.to_bits())), + updates: CachePadded::new(AtomicU64::new(0)), } } /// Flushes the current gauge value and the number of updates since the last flush. pub fn flush(&self) -> (f64, u64) { - let current = f64::from_bits(self.inner.load(Acquire)); - //let updates = self.updates.swap(0, AcqRel); + let current = f64::from_bits(self.inner.load(Relaxed)); + let updates = self.updates.swap(0, Relaxed); - //(current, updates) - (current, 0) + (current, updates) } } impl GaugeFn for AtomicGauge { fn increment(&self, value: f64) { self.inner - .fetch_update(AcqRel, Relaxed, |current| { + .fetch_update(Relaxed, Relaxed, |current| { let new = f64::from_bits(current) + value; Some(f64::to_bits(new)) }) .expect("should never fail to update gauge"); - //self.updates.fetch_add(1, Relaxed); + self.updates.fetch_add(1, Relaxed); } fn decrement(&self, value: f64) { self.inner - .fetch_update(AcqRel, Relaxed, |current| { + .fetch_update(Relaxed, Relaxed, |current| { let new = f64::from_bits(current) - value; Some(f64::to_bits(new)) }) .expect("should never fail to update gauge"); - //self.updates.fetch_add(1, Relaxed); + self.updates.fetch_add(1, Relaxed); } fn set(&self, value: f64) { - self.inner.store(value.to_bits(), Release); - //self.updates.fetch_add(1, Relaxed); + self.inner.store(value.to_bits(), Relaxed); + self.updates.fetch_add(1, Relaxed); } } pub(crate) enum AtomicHistogram { Raw(AtomicBucket), - Sampled(AtomicSamplingReservoir), + Sampled(CachePadded), } impl AtomicHistogram { /// Creates a new `AtomicHistogram` based on the given sampling configuration. fn new(sampling: bool, reservoir_size: usize) -> Self { if sampling { - AtomicHistogram::Sampled(AtomicSamplingReservoir::new(reservoir_size)) + AtomicHistogram::Sampled(CachePadded::new(AtomicSamplingReservoir::new(reservoir_size))) } else { AtomicHistogram::Raw(AtomicBucket::new()) } diff --git a/metrics-exporter-dogstatsd/src/util.rs b/metrics-exporter-dogstatsd/src/util.rs new file mode 100644 index 00000000..931b2f06 --- /dev/null +++ b/metrics-exporter-dogstatsd/src/util.rs @@ -0,0 +1,112 @@ +use core::fmt; +use core::ops::{Deref, DerefMut}; + +/// Prevent false sharing by padding and aligning to the length of a cache line. +/// +/// Implementation taken from [cache-padded](https://github.com/smol-rs/cache-padded) crate, licensed under +/// the [MIT License](https://github.com/smol-rs/cache-padded/blob/master/LICENSE-MIT). +// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache +// lines at a time, so we have to align to 128 bytes rather than 64. +// +// Sources: +// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf +// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107 +// +// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size. +// +// Sources: +// - https://www.mono-project.com/news/2016/09/12/arm64-icache/ +// +// powerpc64 has 128-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9 +#[cfg_attr( + any(target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64",), + repr(align(128)) +)] +// arm, mips, mips64, and riscv64 have 32-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7 +#[cfg_attr( + any( + target_arch = "arm", + target_arch = "mips", + target_arch = "mips64", + target_arch = "riscv64", + ), + repr(align(32)) +)] +// s390x has 256-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7 +#[cfg_attr(target_arch = "s390x", repr(align(256)))] +// x86 and wasm have 64-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7 +// +// All others are assumed to have 64-byte cache line size. +#[cfg_attr( + not(any( + target_arch = "x86_64", + target_arch = "aarch64", + target_arch = "powerpc64", + target_arch = "arm", + target_arch = "mips", + target_arch = "mips64", + target_arch = "riscv64", + target_arch = "s390x", + )), + repr(align(64)) +)] +#[derive(Clone, Copy, Default, Hash, PartialEq, Eq)] +pub struct CachePadded(T); + +impl CachePadded { + /// Pads and aligns a piece of data to the length of a cache line. + /// + /// # Examples + /// + /// ``` + /// use cache_padded::CachePadded; + /// + /// let padded = CachePadded::new(1); + /// ``` + pub const fn new(t: T) -> CachePadded { + CachePadded(t) + } +} + +impl Deref for CachePadded { + type Target = T; + + fn deref(&self) -> &T { + &self.0 + } +} + +impl DerefMut for CachePadded { + fn deref_mut(&mut self) -> &mut T { + &mut self.0 + } +} + +impl fmt::Debug for CachePadded { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("CachePadded").field(&self.0).finish() + } +} + +impl From for CachePadded { + fn from(t: T) -> Self { + CachePadded::new(t) + } +}