From 630f997a5df6dded43d51621eb6d21bb3fa1341d Mon Sep 17 00:00:00 2001 From: Sebastian Rollen Date: Wed, 16 Jun 2021 23:59:04 -0400 Subject: [PATCH] updates --- Cargo.toml | 3 +++ src/client.rs | 46 +++++++++++++++++++++++++++++++++------------- src/metric.rs | 30 +++++++++++++++++++++--------- 3 files changed, 57 insertions(+), 22 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2ed41ff..b4b6a52 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,3 +9,6 @@ edition = "2018" [dependencies] bytes = "1.0.1" tokio = { version = "1.7.0", default-features = false, features = ["net"] } + +[dev-dependencies] +tokio = { version = "1.7.0", default-features = false, features = ["macros", "rt"] } diff --git a/src/client.rs b/src/client.rs index 9671cac..dd7f1cf 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,29 +2,49 @@ use crate::metric::Metric; use std::io::Error; use tokio::net::{ToSocketAddrs, UdpSocket}; -pub struct Client<'a> { +pub struct Client { socket: UdpSocket, - target: &'a str, } -impl<'a> Client<'a> { +impl Client { pub async fn new( - host_address: &T, - target_address: &'a str, - ) -> Result, Error> { - Ok(Self { - socket: UdpSocket::bind(host_address).await?, - target: target_address, - }) + host_address: T, + target_address: &str, + ) -> Result { + let socket = UdpSocket::bind(host_address).await?; + socket.connect(target_address).await?; + Ok(Self { socket }) } - pub async fn send(&self, metric: Metric<'_, I, T>) -> Result + pub async fn send(&self, metric: Metric<'_, I, T>) -> Result<(), Error> where I: IntoIterator, T: AsRef, { - self.socket - .send_to(metric.to_bytes().as_ref(), self.target) + let bytes = metric.into_bytes(); + self.socket.send(&bytes).await?; + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[tokio::test] + async fn test_client() { + let udp_receiver = UdpSocket::bind("127.0.0.1:8125").await.unwrap(); + let v: &[&str] = &[]; + let client = Client::new("127.0.0.1:1234", "127.0.0.1:8125") .await + .unwrap(); + client.send(Metric::increase("test", v)).await.unwrap(); + udp_receiver.connect("127.0.0.1:1234").await.unwrap(); + let mut bytes_received: usize = 0; + let mut buf = [0; 8]; + while bytes_received < 8 { + bytes_received += udp_receiver.recv_from(&mut buf).await.unwrap().0; + } + assert_eq!(&buf, b"test:1|c"); } } diff --git a/src/metric.rs b/src/metric.rs index 39df64a..009ba63 100644 --- a/src/metric.rs +++ b/src/metric.rs @@ -81,7 +81,7 @@ where } } - pub(crate) fn to_bytes(self) -> Bytes { + pub(crate) fn into_bytes(self) -> Bytes { let mut buf; match self.frame_type { Type::Count(count) => { @@ -155,29 +155,41 @@ mod test { use super::*; #[test] - fn test_to_bytes() { + fn test_into_bytes() { let v: &[&str] = &[]; - assert_eq!(Metric::increase("test", v).to_bytes().as_ref(), b"test:1|c"); assert_eq!( - Metric::decrease("test", v).to_bytes().as_ref(), + Metric::increase("test", v).into_bytes().as_ref(), + b"test:1|c" + ); + assert_eq!( + Metric::decrease("test", v).into_bytes().as_ref(), b"test:-1|c" ); - assert_eq!(Metric::count(2, "test", v).to_bytes().as_ref(), b"test:2|c"); assert_eq!( - Metric::gauge("1.2", "test", v).to_bytes().as_ref(), + Metric::count(2, "test", v).into_bytes().as_ref(), + b"test:2|c" + ); + assert_eq!( + Metric::gauge("1.2", "test", v).into_bytes().as_ref(), b"test:1.2|g" ); assert_eq!( - Metric::histogram("1.2", "test", v).to_bytes().as_ref(), + Metric::histogram("1.2", "test", v).into_bytes().as_ref(), b"test:1.2|h" ); assert_eq!( - Metric::distribution("1.2", "test", v).to_bytes().as_ref(), + Metric::distribution("1.2", "test", v).into_bytes().as_ref(), b"test:1.2|d" ); assert_eq!( - Metric::set("1.2", "test", v).to_bytes().as_ref(), + Metric::set("1.2", "test", v).into_bytes().as_ref(), b"test:1.2|s" ); + assert_eq!( + Metric::increase("test", &["a:b", "c"]) + .into_bytes() + .as_ref(), + b"test:1|c|#a:b,c" + ); } }