diff --git a/metrics-exporter-dogstatsd/src/forwarder/mod.rs b/metrics-exporter-dogstatsd/src/forwarder/mod.rs index 513476a6..8ba93437 100644 --- a/metrics-exporter-dogstatsd/src/forwarder/mod.rs +++ b/metrics-exporter-dogstatsd/src/forwarder/mod.rs @@ -110,19 +110,6 @@ pub(crate) struct ForwarderConfiguration { pub write_timeout: Duration, } -impl ForwarderConfiguration { - /// Returns `true` if the remote address requires a length prefix to be sent before each payload. - pub fn is_length_prefixed(&self) -> bool { - match self.remote_addr { - RemoteAddr::Udp(_) => false, - #[cfg(target_os = "linux")] - RemoteAddr::Unix(_) => true, - #[cfg(target_os = "linux")] - RemoteAddr::Unixgram(_) => true, - } - } -} - #[cfg(test)] mod tests { use std::net::SocketAddrV4; diff --git a/metrics-exporter-dogstatsd/src/forwarder/sync.rs b/metrics-exporter-dogstatsd/src/forwarder/sync.rs index e81f6d95..deb3c6d4 100644 --- a/metrics-exporter-dogstatsd/src/forwarder/sync.rs +++ b/metrics-exporter-dogstatsd/src/forwarder/sync.rs @@ -60,10 +60,17 @@ impl Client { Client::Unixgram(socket) => socket.send(buf), #[cfg(target_os = "linux")] - Client::Unix(socket) => match socket.write_all(buf) { - Ok(()) => Ok(buf.len()), - Err(e) => Err(e), - }, + Client::Unix(socket) => { + match u32::try_from(buf.len()) { + Ok(len) => socket.write_all(&len.to_be_bytes())?, + Err(e) => { + use std::io::{Error, ErrorKind}; + return Err(Error::new(ErrorKind::InvalidData, e)); + } + } + + socket.write_all(buf).map(|()| buf.len()) + } } } } @@ -142,8 +149,7 @@ impl Forwarder { /// Run the forwarder, sending out payloads to the configured remote address at the configured interval. pub fn run(mut self) { let mut flush_state = FlushState::default(); - let mut writer = - PayloadWriter::new(self.config.max_payload_len, self.config.is_length_prefixed()); + let mut writer = PayloadWriter::new(self.config.max_payload_len); let mut telemetry_update = TelemetryUpdate::default(); let mut next_flush = Instant::now() + self.config.flush_interval; diff --git a/metrics-exporter-dogstatsd/src/writer.rs b/metrics-exporter-dogstatsd/src/writer.rs index 44812179..e59dde73 100644 --- a/metrics-exporter-dogstatsd/src/writer.rs +++ b/metrics-exporter-dogstatsd/src/writer.rs @@ -46,12 +46,11 @@ pub(super) struct PayloadWriter { buf: Vec, trailer_buf: Vec, offsets: Vec, - with_length_prefix: bool, } impl PayloadWriter { /// Creates a new `PayloadWriter` with the given maximum payload length. - pub fn new(max_payload_len: usize, with_length_prefix: bool) -> Self { + pub fn new(max_payload_len: usize) -> Self { // NOTE: This should also be handled in the builder, but we want to just double check here that we're getting a // properly sanitized value. assert!( @@ -59,16 +58,7 @@ impl PayloadWriter { "maximum payload length must be less than 2^32 bytes" ); - let mut writer = Self { - max_payload_len, - buf: Vec::new(), - trailer_buf: Vec::new(), - offsets: Vec::new(), - with_length_prefix, - }; - - writer.prepare_for_write(); - writer + Self { max_payload_len, buf: Vec::new(), trailer_buf: Vec::new(), offsets: Vec::new() } } fn last_offset(&self) -> usize { @@ -80,21 +70,10 @@ impl PayloadWriter { // // If there aren't any committed metrics, then the last offset is simply zero. let last_offset = self.last_offset(); - let maybe_length_prefix_len = if self.with_length_prefix { 4 } else { 0 }; - self.buf.len() - last_offset - maybe_length_prefix_len - } - - fn prepare_for_write(&mut self) { - if self.with_length_prefix { - // If we're adding length prefixes, we need to write the length of the payload first. - // - // We write a dummy length of zero for now, and then we'll go back and fill it in later. - self.buf.extend_from_slice(&[0, 0, 0, 0]); - } + self.buf.len() - last_offset } fn commit(&mut self) -> bool { - let current_last_offset = self.last_offset(); let current_len = self.current_len(); if current_len > self.max_payload_len { // If the current metric is too long, we need to truncate everything we just wrote to get us back to the end @@ -104,22 +83,20 @@ impl PayloadWriter { return false; } - // Track the new offset. - self.offsets.push(self.buf.len()); - - // If we're dealing with length-delimited payloads, go back to the beginning of this payload and fill in the - // length of it. - if self.with_length_prefix { - // NOTE: We unwrap the conversion here because we know that `self.max_payload_len` is less than 2^32, and we - // check above that `current_len` is less than or equal to `self.max_payload_len`. - let current_len_buf = u32::try_from(current_len).unwrap().to_le_bytes(); - self.buf[current_last_offset..current_last_offset + 4] - .copy_from_slice(¤t_len_buf[..]); + // Offset update + if current_len + self.last_offset() <= self.max_payload_len { + // If the current metric can be written within the max_payload_len + // replace the last offset (if there is valid offset) + if let Some(last_offset) = self.offsets.last_mut() { + *last_offset = self.buf.len(); + } else { + self.offsets.push(self.buf.len()); + } + } else { + // - else add a new offset to send current metric in a new Packet + self.offsets.push(self.buf.len()); } - // Initialize the buffer for the next payload. - self.prepare_for_write(); - true } @@ -542,7 +519,7 @@ mod tests { ]; for (key, value, ts, prefix, global_labels, expected) in cases { - let mut writer = PayloadWriter::new(8192, false); + let mut writer = PayloadWriter::new(8192); let result = writer.write_counter(&key, value, ts, prefix, global_labels); assert_eq!(result.payloads_written(), 1); @@ -607,7 +584,7 @@ mod tests { ]; for (key, value, ts, prefix, global_labels, expected) in cases { - let mut writer = PayloadWriter::new(8192, false); + let mut writer = PayloadWriter::new(8192); let result = writer.write_gauge(&key, value, ts, prefix, global_labels); assert_eq!(result.payloads_written(), 1); @@ -666,7 +643,7 @@ mod tests { ]; for (key, values, prefix, global_labels, expected) in cases { - let mut writer = PayloadWriter::new(8192, false); + let mut writer = PayloadWriter::new(8192); let result = writer.write_histogram(&key, values.iter().copied(), None, prefix, global_labels); assert_eq!(result.payloads_written(), 1); @@ -726,7 +703,7 @@ mod tests { ]; for (key, values, prefix, global_labels, expected) in cases { - let mut writer = PayloadWriter::new(8192, false); + let mut writer = PayloadWriter::new(8192); let result = writer.write_distribution( &key, values.iter().copied(), @@ -741,51 +718,12 @@ mod tests { } } - #[test] - fn length_prefix() { - let prefixed = |buf: &str| { - let mut prefixed_buf = Vec::with_capacity(buf.len() + 4); - prefixed_buf.extend_from_slice(&(buf.len() as u32).to_le_bytes()); - prefixed_buf.extend_from_slice(buf.as_bytes()); - prefixed_buf - }; - - // Cases are defined as: metric key, metric values, metric timestamp, expected output. - let cases = [ - (Key::from("test_distribution"), &[22.22][..], prefixed("test_distribution:22.22|d\n")), - ( - Key::from_parts("test_distribution", &[("foo", "bar"), ("baz", "quux")]), - &[88.0][..], - prefixed("test_distribution:88.0|d|#foo:bar,baz:quux\n"), - ), - ( - Key::from("test_distribution"), - &[22.22, 33.33, 44.44][..], - prefixed("test_distribution:22.22:33.33:44.44|d\n"), - ), - ( - Key::from_parts("test_distribution", &[("foo", "bar"), ("baz", "quux")]), - &[88.0, 66.6, 123.4][..], - prefixed("test_distribution:88.0:66.6:123.4|d|#foo:bar,baz:quux\n"), - ), - ]; - - for (key, values, expected) in cases { - let mut writer = PayloadWriter::new(8192, true); - let result = writer.write_distribution(&key, values.iter().copied(), None, None, &[]); - assert_eq!(result.payloads_written(), 1); - - let actual = buf_from_writer(&mut writer); - assert_eq!(actual, expected); - } - } - proptest! { #[test] fn property_test_gauntlet(payload_limit in 0..16384usize, inputs in arb_vec(arb_metric(), 1..128)) { // TODO: Parameterize reservoir size so we can exercise the sample rate stuff.[] - let mut writer = PayloadWriter::new(payload_limit, false); + let mut writer = PayloadWriter::new(payload_limit); let mut total_input_points: u64 = 0; let mut payloads_written = 0; let mut points_dropped = 0;