From 814085fd1ffdfc0de1eb360c0abe578a76466a90 Mon Sep 17 00:00:00 2001 From: Sebastian Rollen Date: Wed, 16 Jun 2021 23:04:52 -0400 Subject: [PATCH 1/4] initial development --- .gitignore | 2 + Cargo.toml | 10 +++ src/client.rs | 34 ++++++++++ src/lib.rs | 4 ++ src/metric.rs | 183 ++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 233 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 src/client.rs create mode 100644 src/lib.rs create mode 100644 src/metric.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..cd63b64 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "dogstatsd" +version = "0.1.0" +authors = ["Sebastian Rollen "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bytes = "1.0.1" diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..901e0bd --- /dev/null +++ b/src/client.rs @@ -0,0 +1,34 @@ +use crate::metric::Metric; +use std::io::Error; +use std::net::{ToSocketAddrs, UdpSocket}; + +pub struct Client<'a> { + socket: UdpSocket, + target: &'a str, +} + +impl<'a> Client<'a> { + pub fn new(host_address: &T, target_address: &'a str) -> Result { + Ok(Self { + socket: UdpSocket::bind(host_address)?, + target: target_address, + }) + } + + pub fn send(&self, metric: Metric) -> Result + where + I: IntoIterator, + T: AsRef, + { + self.socket.send_to(metric.to_bytes().as_ref(), self.target) + } +} + +impl Default for Client<'_> { + fn default() -> Self { + Self { + socket: UdpSocket::bind("127.0.0.1:0").unwrap(), + target: "127.0.0.1:8125", + } + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..e086fa3 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,4 @@ +mod client; +mod metric; +pub use client::Client; +pub use metric::Metric; diff --git a/src/metric.rs b/src/metric.rs new file mode 100644 index 0000000..39df64a --- /dev/null +++ b/src/metric.rs @@ -0,0 +1,183 @@ +use bytes::{Bytes, BytesMut}; + +enum Type<'a> { + Count(isize), + Increase, + Decrease, + Gauge(&'a str), + Histogram(&'a str), + Distribution(&'a str), + Set(&'a str), +} + +pub struct Metric<'a, I, T> +where + I: IntoIterator, + T: AsRef, +{ + frame_type: Type<'a>, + message: &'a str, + tags: I, +} + +impl<'a, I, T> Metric<'a, I, T> +where + I: IntoIterator, + T: AsRef, +{ + pub fn increase(message: &'a str, tags: I) -> Self { + Self { + frame_type: Type::Increase, + message, + tags, + } + } + + pub fn decrease(message: &'a str, tags: I) -> Self { + Self { + frame_type: Type::Decrease, + message, + tags, + } + } + + pub fn count(count: isize, message: &'a str, tags: I) -> Self { + Self { + frame_type: Type::Count(count), + message, + tags, + } + } + + pub fn gauge(value: &'a str, message: &'a str, tags: I) -> Self { + Self { + frame_type: Type::Gauge(value), + message, + tags, + } + } + + pub fn histogram(value: &'a str, message: &'a str, tags: I) -> Self { + Self { + frame_type: Type::Histogram(value), + message, + tags, + } + } + + pub fn distribution(value: &'a str, message: &'a str, tags: I) -> Self { + Self { + frame_type: Type::Distribution(value), + message, + tags, + } + } + + pub fn set(value: &'a str, message: &'a str, tags: I) -> Self { + Self { + frame_type: Type::Set(value), + message, + tags, + } + } + + pub(crate) fn to_bytes(self) -> Bytes { + let mut buf; + match self.frame_type { + Type::Count(count) => { + buf = BytesMut::with_capacity(3 + self.message.len() + 8); + buf.extend_from_slice(self.message.as_bytes()); + buf.extend_from_slice(b":"); + buf.extend_from_slice(count.to_string().as_bytes()); + buf.extend_from_slice(b"|c"); + } + Type::Increase => { + buf = BytesMut::with_capacity(self.message.len() + 4); + buf.extend_from_slice(self.message.as_bytes()); + buf.extend_from_slice(b":1|c"); + } + Type::Decrease => { + buf = BytesMut::with_capacity(self.message.len() + 5); + buf.extend_from_slice(self.message.as_bytes()); + buf.extend_from_slice(b":-1|c"); + } + Type::Gauge(val) => { + buf = BytesMut::with_capacity(3 + self.message.len() + val.len()); + buf.extend_from_slice(self.message.as_bytes()); + buf.extend_from_slice(b":"); + buf.extend_from_slice(val.as_bytes()); + buf.extend_from_slice(b"|g"); + } + Type::Histogram(val) => { + buf = BytesMut::with_capacity(3 + self.message.len() + val.len()); + buf.extend_from_slice(self.message.as_bytes()); + buf.extend_from_slice(b":"); + buf.extend_from_slice(val.as_bytes()); + buf.extend_from_slice(b"|h"); + } + Type::Distribution(val) => { + buf = BytesMut::with_capacity(3 + self.message.len() + val.len()); + buf.extend_from_slice(self.message.as_bytes()); + buf.extend_from_slice(b":"); + buf.extend_from_slice(val.as_bytes()); + buf.extend_from_slice(b"|d"); + } + Type::Set(val) => { + buf = BytesMut::with_capacity(3 + self.message.len() + val.len()); + buf.extend_from_slice(self.message.as_bytes()); + buf.extend_from_slice(b":"); + buf.extend_from_slice(val.as_bytes()); + buf.extend_from_slice(b"|s"); + } + } + let mut tags_iter = self.tags.into_iter(); + let mut next_tag = tags_iter.next(); + + if next_tag.is_some() { + buf.extend_from_slice(b"|#"); + } + + while next_tag.is_some() { + buf.extend_from_slice(next_tag.unwrap().as_ref().as_bytes()); + next_tag = tags_iter.next(); + + if next_tag.is_some() { + buf.extend_from_slice(b","); + } + } + + buf.freeze() + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_to_bytes() { + let v: &[&str] = &[]; + assert_eq!(Metric::increase("test", v).to_bytes().as_ref(), b"test:1|c"); + assert_eq!( + Metric::decrease("test", v).to_bytes().as_ref(), + b"test:-1|c" + ); + assert_eq!(Metric::count(2, "test", v).to_bytes().as_ref(), b"test:2|c"); + assert_eq!( + Metric::gauge("1.2", "test", v).to_bytes().as_ref(), + b"test:1.2|g" + ); + assert_eq!( + Metric::histogram("1.2", "test", v).to_bytes().as_ref(), + b"test:1.2|h" + ); + assert_eq!( + Metric::distribution("1.2", "test", v).to_bytes().as_ref(), + b"test:1.2|d" + ); + assert_eq!( + Metric::set("1.2", "test", v).to_bytes().as_ref(), + b"test:1.2|s" + ); + } +} From c03385a1c32653a6d38d22df3c39360a4bd878d6 Mon Sep 17 00:00:00 2001 From: Sebastian Rollen Date: Wed, 16 Jun 2021 23:09:39 -0400 Subject: [PATCH 2/4] make client async --- Cargo.toml | 1 + src/client.rs | 24 ++++++++++-------------- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index cd63b64..2ed41ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,3 +8,4 @@ edition = "2018" [dependencies] bytes = "1.0.1" +tokio = { version = "1.7.0", default-features = false, features = ["net"] } diff --git a/src/client.rs b/src/client.rs index 901e0bd..9671cac 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,6 +1,6 @@ use crate::metric::Metric; use std::io::Error; -use std::net::{ToSocketAddrs, UdpSocket}; +use tokio::net::{ToSocketAddrs, UdpSocket}; pub struct Client<'a> { socket: UdpSocket, @@ -8,27 +8,23 @@ pub struct Client<'a> { } impl<'a> Client<'a> { - pub fn new(host_address: &T, target_address: &'a str) -> Result { + pub async fn new( + host_address: &T, + target_address: &'a str, + ) -> Result, Error> { Ok(Self { - socket: UdpSocket::bind(host_address)?, + socket: UdpSocket::bind(host_address).await?, target: target_address, }) } - pub fn send(&self, metric: Metric) -> Result + pub async fn send(&self, metric: Metric<'_, I, T>) -> Result where I: IntoIterator, T: AsRef, { - self.socket.send_to(metric.to_bytes().as_ref(), self.target) - } -} - -impl Default for Client<'_> { - fn default() -> Self { - Self { - socket: UdpSocket::bind("127.0.0.1:0").unwrap(), - target: "127.0.0.1:8125", - } + self.socket + .send_to(metric.to_bytes().as_ref(), self.target) + .await } } From 8488da84c33dfcc7b98df9b7e29eb0b0f54849c4 Mon Sep 17 00:00:00 2001 From: Sebastian Rollen Date: Wed, 16 Jun 2021 23:13:09 -0400 Subject: [PATCH 3/4] actions --- .github/workflows/ci.yml | 128 ++++++++++++++++++++++++++++++++++ .github/workflows/release.yml | 27 +++++++ 2 files changed, 155 insertions(+) create mode 100644 .github/workflows/ci.yml create mode 100644 .github/workflows/release.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..ded65b9 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,128 @@ +on: + push: + branches: + - main + paths: + - 'Cargo.toml' + - 'Cargo.lock' + - 'Dockerfile' + - 'src/**' + pull_request: + paths: + - 'Cargo.toml' + - 'Cargo.lock' + - 'Dockerfile' + - 'src/**' + +name: Continuous integration + +jobs: + cancel-previous: + name: Cancel Previous Runs + runs-on: ubuntu-latest + steps: + - name: Cancel actions + uses: styfle/cancel-workflow-action@0.8.0 + with: + access_token: ${{ github.token }} + + fmt: + name: Rustfmt + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v2 + - name: Cache + uses: actions/cache@v2 + id: cache + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + - name: Setup git credentials + uses: fusion-engineering/setup-git-credentials@v2 + with: + credentials: ${{secrets.GIT_USER_CREDENTIALS}} + - name: Setup toolchain + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + - name: Install rustfmt + run: rustup component add rustfmt + - name: Run check + uses: actions-rs/cargo@v1 + with: + command: fmt + args: --all -- --check + + clippy: + name: Clippy + runs-on: ubuntu-latest + needs: [fmt] + steps: + - name: Checkout repository + uses: actions/checkout@v2 + - name: Cache + uses: actions/cache@v2 + id: cache + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + - name: Setup git credentials + uses: fusion-engineering/setup-git-credentials@v2 + with: + credentials: ${{secrets.GIT_USER_CREDENTIALS}} + - name: Setup toolchain + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + - name: Install clippy + run: rustup component add clippy + - name: Run clippy + uses: actions-rs/cargo@v1 + with: + command: clippy + args: -- -D warnings + + coverage: + name: Test & Coverage + runs-on: ubuntu-latest + needs: [fmt, clippy] + steps: + - name: Checkout repository + uses: actions/checkout@v2 + - name: Cache + uses: actions/cache@v2 + id: cache + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + - name: Setup git credentials + uses: fusion-engineering/setup-git-credentials@v2 + with: + credentials: ${{secrets.GIT_USER_CREDENTIALS}} + - name: Install toolchain + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + - name: Install tarpaulin + run: cargo install cargo-tarpaulin + - name: Generate coverage + run: cargo tarpaulin --out Xml + - name: Upload to codecov + uses: codecov/codecov-action@v1 + with: + token: ${{secrets.CODECOV_TOKEN}} + fail_ci_if_error: true diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..24e502a --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,27 @@ +name: Release +on: + push: + branches: + - main + +jobs: + deploy: + name: Tag if new release + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v2 + - name: Read version number + id: read_toml + uses: outcome-co/action-read-toml@v2.0.10 + with: + path: Cargo.toml + key: package.version + - name: Set tag env variable + run: echo IMAGE_TAG=v${{steps.read_toml.outputs.package_version}} >> $GITHUB_ENV + - uses: ncipollo/release-action@v1 + continue-on-error: true + with: + allowUpdates: false + tag: ${{ env.IMAGE_TAG }} + token: ${{ secrets.GITHUB_TOKEN }} From 630f997a5df6dded43d51621eb6d21bb3fa1341d Mon Sep 17 00:00:00 2001 From: Sebastian Rollen Date: Wed, 16 Jun 2021 23:59:04 -0400 Subject: [PATCH 4/4] updates --- Cargo.toml | 3 +++ src/client.rs | 46 +++++++++++++++++++++++++++++++++------------- src/metric.rs | 30 +++++++++++++++++++++--------- 3 files changed, 57 insertions(+), 22 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2ed41ff..b4b6a52 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,3 +9,6 @@ edition = "2018" [dependencies] bytes = "1.0.1" tokio = { version = "1.7.0", default-features = false, features = ["net"] } + +[dev-dependencies] +tokio = { version = "1.7.0", default-features = false, features = ["macros", "rt"] } diff --git a/src/client.rs b/src/client.rs index 9671cac..dd7f1cf 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,29 +2,49 @@ use crate::metric::Metric; use std::io::Error; use tokio::net::{ToSocketAddrs, UdpSocket}; -pub struct Client<'a> { +pub struct Client { socket: UdpSocket, - target: &'a str, } -impl<'a> Client<'a> { +impl Client { pub async fn new( - host_address: &T, - target_address: &'a str, - ) -> Result, Error> { - Ok(Self { - socket: UdpSocket::bind(host_address).await?, - target: target_address, - }) + host_address: T, + target_address: &str, + ) -> Result { + let socket = UdpSocket::bind(host_address).await?; + socket.connect(target_address).await?; + Ok(Self { socket }) } - pub async fn send(&self, metric: Metric<'_, I, T>) -> Result + pub async fn send(&self, metric: Metric<'_, I, T>) -> Result<(), Error> where I: IntoIterator, T: AsRef, { - self.socket - .send_to(metric.to_bytes().as_ref(), self.target) + let bytes = metric.into_bytes(); + self.socket.send(&bytes).await?; + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[tokio::test] + async fn test_client() { + let udp_receiver = UdpSocket::bind("127.0.0.1:8125").await.unwrap(); + let v: &[&str] = &[]; + let client = Client::new("127.0.0.1:1234", "127.0.0.1:8125") .await + .unwrap(); + client.send(Metric::increase("test", v)).await.unwrap(); + udp_receiver.connect("127.0.0.1:1234").await.unwrap(); + let mut bytes_received: usize = 0; + let mut buf = [0; 8]; + while bytes_received < 8 { + bytes_received += udp_receiver.recv_from(&mut buf).await.unwrap().0; + } + assert_eq!(&buf, b"test:1|c"); } } diff --git a/src/metric.rs b/src/metric.rs index 39df64a..009ba63 100644 --- a/src/metric.rs +++ b/src/metric.rs @@ -81,7 +81,7 @@ where } } - pub(crate) fn to_bytes(self) -> Bytes { + pub(crate) fn into_bytes(self) -> Bytes { let mut buf; match self.frame_type { Type::Count(count) => { @@ -155,29 +155,41 @@ mod test { use super::*; #[test] - fn test_to_bytes() { + fn test_into_bytes() { let v: &[&str] = &[]; - assert_eq!(Metric::increase("test", v).to_bytes().as_ref(), b"test:1|c"); assert_eq!( - Metric::decrease("test", v).to_bytes().as_ref(), + Metric::increase("test", v).into_bytes().as_ref(), + b"test:1|c" + ); + assert_eq!( + Metric::decrease("test", v).into_bytes().as_ref(), b"test:-1|c" ); - assert_eq!(Metric::count(2, "test", v).to_bytes().as_ref(), b"test:2|c"); assert_eq!( - Metric::gauge("1.2", "test", v).to_bytes().as_ref(), + Metric::count(2, "test", v).into_bytes().as_ref(), + b"test:2|c" + ); + assert_eq!( + Metric::gauge("1.2", "test", v).into_bytes().as_ref(), b"test:1.2|g" ); assert_eq!( - Metric::histogram("1.2", "test", v).to_bytes().as_ref(), + Metric::histogram("1.2", "test", v).into_bytes().as_ref(), b"test:1.2|h" ); assert_eq!( - Metric::distribution("1.2", "test", v).to_bytes().as_ref(), + Metric::distribution("1.2", "test", v).into_bytes().as_ref(), b"test:1.2|d" ); assert_eq!( - Metric::set("1.2", "test", v).to_bytes().as_ref(), + Metric::set("1.2", "test", v).into_bytes().as_ref(), b"test:1.2|s" ); + assert_eq!( + Metric::increase("test", &["a:b", "c"]) + .into_bytes() + .as_ref(), + b"test:1|c|#a:b,c" + ); } }