diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..ded65b9 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,128 @@ +on: + push: + branches: + - main + paths: + - 'Cargo.toml' + - 'Cargo.lock' + - 'Dockerfile' + - 'src/**' + pull_request: + paths: + - 'Cargo.toml' + - 'Cargo.lock' + - 'Dockerfile' + - 'src/**' + +name: Continuous integration + +jobs: + cancel-previous: + name: Cancel Previous Runs + runs-on: ubuntu-latest + steps: + - name: Cancel actions + uses: styfle/cancel-workflow-action@0.8.0 + with: + access_token: ${{ github.token }} + + fmt: + name: Rustfmt + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v2 + - name: Cache + uses: actions/cache@v2 + id: cache + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + - name: Setup git credentials + uses: fusion-engineering/setup-git-credentials@v2 + with: + credentials: ${{secrets.GIT_USER_CREDENTIALS}} + - name: Setup toolchain + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + - name: Install rustfmt + run: rustup component add rustfmt + - name: Run check + uses: actions-rs/cargo@v1 + with: + command: fmt + args: --all -- --check + + clippy: + name: Clippy + runs-on: ubuntu-latest + needs: [fmt] + steps: + - name: Checkout repository + uses: actions/checkout@v2 + - name: Cache + uses: actions/cache@v2 + id: cache + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + - name: Setup git credentials + uses: fusion-engineering/setup-git-credentials@v2 + with: + credentials: ${{secrets.GIT_USER_CREDENTIALS}} + - name: Setup toolchain + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + - name: Install clippy + run: rustup component add clippy + - name: Run clippy + uses: actions-rs/cargo@v1 + with: + command: clippy + args: -- -D warnings + + coverage: + name: Test & Coverage + runs-on: ubuntu-latest + needs: [fmt, clippy] + steps: + - name: Checkout repository + uses: actions/checkout@v2 + - name: Cache + uses: actions/cache@v2 + id: cache + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + - name: Setup git credentials + uses: fusion-engineering/setup-git-credentials@v2 + with: + credentials: ${{secrets.GIT_USER_CREDENTIALS}} + - name: Install toolchain + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + - name: Install tarpaulin + run: cargo install cargo-tarpaulin + - name: Generate coverage + run: cargo tarpaulin --out Xml + - name: Upload to codecov + uses: codecov/codecov-action@v1 + with: + token: ${{secrets.CODECOV_TOKEN}} + fail_ci_if_error: true diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..24e502a --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,27 @@ +name: Release +on: + push: + branches: + - main + +jobs: + deploy: + name: Tag if new release + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v2 + - name: Read version number + id: read_toml + uses: outcome-co/action-read-toml@v2.0.10 + with: + path: Cargo.toml + key: package.version + - name: Set tag env variable + run: echo IMAGE_TAG=v${{steps.read_toml.outputs.package_version}} >> $GITHUB_ENV + - uses: ncipollo/release-action@v1 + continue-on-error: true + with: + allowUpdates: false + tag: ${{ env.IMAGE_TAG }} + token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..b4b6a52 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "dogstatsd" +version = "0.1.0" +authors = ["Sebastian Rollen "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bytes = "1.0.1" +tokio = { version = "1.7.0", default-features = false, features = ["net"] } + +[dev-dependencies] +tokio = { version = "1.7.0", default-features = false, features = ["macros", "rt"] } diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..dd7f1cf --- /dev/null +++ b/src/client.rs @@ -0,0 +1,50 @@ +use crate::metric::Metric; +use std::io::Error; +use tokio::net::{ToSocketAddrs, UdpSocket}; + +pub struct Client { + socket: UdpSocket, +} + +impl Client { + pub async fn new( + host_address: T, + target_address: &str, + ) -> Result { + let socket = UdpSocket::bind(host_address).await?; + socket.connect(target_address).await?; + Ok(Self { socket }) + } + + pub async fn send(&self, metric: Metric<'_, I, T>) -> Result<(), Error> + where + I: IntoIterator, + T: AsRef, + { + let bytes = metric.into_bytes(); + self.socket.send(&bytes).await?; + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[tokio::test] + async fn test_client() { + let udp_receiver = UdpSocket::bind("127.0.0.1:8125").await.unwrap(); + let v: &[&str] = &[]; + let client = Client::new("127.0.0.1:1234", "127.0.0.1:8125") + .await + .unwrap(); + client.send(Metric::increase("test", v)).await.unwrap(); + udp_receiver.connect("127.0.0.1:1234").await.unwrap(); + let mut bytes_received: usize = 0; + let mut buf = [0; 8]; + while bytes_received < 8 { + bytes_received += udp_receiver.recv_from(&mut buf).await.unwrap().0; + } + assert_eq!(&buf, b"test:1|c"); + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..e086fa3 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,4 @@ +mod client; +mod metric; +pub use client::Client; +pub use metric::Metric; diff --git a/src/metric.rs b/src/metric.rs new file mode 100644 index 0000000..009ba63 --- /dev/null +++ b/src/metric.rs @@ -0,0 +1,195 @@ +use bytes::{Bytes, BytesMut}; + +enum Type<'a> { + Count(isize), + Increase, + Decrease, + Gauge(&'a str), + Histogram(&'a str), + Distribution(&'a str), + Set(&'a str), +} + +pub struct Metric<'a, I, T> +where + I: IntoIterator, + T: AsRef, +{ + frame_type: Type<'a>, + message: &'a str, + tags: I, +} + +impl<'a, I, T> Metric<'a, I, T> +where + I: IntoIterator, + T: AsRef, +{ + pub fn increase(message: &'a str, tags: I) -> Self { + Self { + frame_type: Type::Increase, + message, + tags, + } + } + + pub fn decrease(message: &'a str, tags: I) -> Self { + Self { + frame_type: Type::Decrease, + message, + tags, + } + } + + pub fn count(count: isize, message: &'a str, tags: I) -> Self { + Self { + frame_type: Type::Count(count), + message, + tags, + } + } + + pub fn gauge(value: &'a str, message: &'a str, tags: I) -> Self { + Self { + frame_type: Type::Gauge(value), + message, + tags, + } + } + + pub fn histogram(value: &'a str, message: &'a str, tags: I) -> Self { + Self { + frame_type: Type::Histogram(value), + message, + tags, + } + } + + pub fn distribution(value: &'a str, message: &'a str, tags: I) -> Self { + Self { + frame_type: Type::Distribution(value), + message, + tags, + } + } + + pub fn set(value: &'a str, message: &'a str, tags: I) -> Self { + Self { + frame_type: Type::Set(value), + message, + tags, + } + } + + pub(crate) fn into_bytes(self) -> Bytes { + let mut buf; + match self.frame_type { + Type::Count(count) => { + buf = BytesMut::with_capacity(3 + self.message.len() + 8); + buf.extend_from_slice(self.message.as_bytes()); + buf.extend_from_slice(b":"); + buf.extend_from_slice(count.to_string().as_bytes()); + buf.extend_from_slice(b"|c"); + } + Type::Increase => { + buf = BytesMut::with_capacity(self.message.len() + 4); + buf.extend_from_slice(self.message.as_bytes()); + buf.extend_from_slice(b":1|c"); + } + Type::Decrease => { + buf = BytesMut::with_capacity(self.message.len() + 5); + buf.extend_from_slice(self.message.as_bytes()); + buf.extend_from_slice(b":-1|c"); + } + Type::Gauge(val) => { + buf = BytesMut::with_capacity(3 + self.message.len() + val.len()); + buf.extend_from_slice(self.message.as_bytes()); + buf.extend_from_slice(b":"); + buf.extend_from_slice(val.as_bytes()); + buf.extend_from_slice(b"|g"); + } + Type::Histogram(val) => { + buf = BytesMut::with_capacity(3 + self.message.len() + val.len()); + buf.extend_from_slice(self.message.as_bytes()); + buf.extend_from_slice(b":"); + buf.extend_from_slice(val.as_bytes()); + buf.extend_from_slice(b"|h"); + } + Type::Distribution(val) => { + buf = BytesMut::with_capacity(3 + self.message.len() + val.len()); + buf.extend_from_slice(self.message.as_bytes()); + buf.extend_from_slice(b":"); + buf.extend_from_slice(val.as_bytes()); + buf.extend_from_slice(b"|d"); + } + Type::Set(val) => { + buf = BytesMut::with_capacity(3 + self.message.len() + val.len()); + buf.extend_from_slice(self.message.as_bytes()); + buf.extend_from_slice(b":"); + buf.extend_from_slice(val.as_bytes()); + buf.extend_from_slice(b"|s"); + } + } + let mut tags_iter = self.tags.into_iter(); + let mut next_tag = tags_iter.next(); + + if next_tag.is_some() { + buf.extend_from_slice(b"|#"); + } + + while next_tag.is_some() { + buf.extend_from_slice(next_tag.unwrap().as_ref().as_bytes()); + next_tag = tags_iter.next(); + + if next_tag.is_some() { + buf.extend_from_slice(b","); + } + } + + buf.freeze() + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_into_bytes() { + let v: &[&str] = &[]; + assert_eq!( + Metric::increase("test", v).into_bytes().as_ref(), + b"test:1|c" + ); + assert_eq!( + Metric::decrease("test", v).into_bytes().as_ref(), + b"test:-1|c" + ); + assert_eq!( + Metric::count(2, "test", v).into_bytes().as_ref(), + b"test:2|c" + ); + assert_eq!( + Metric::gauge("1.2", "test", v).into_bytes().as_ref(), + b"test:1.2|g" + ); + assert_eq!( + Metric::histogram("1.2", "test", v).into_bytes().as_ref(), + b"test:1.2|h" + ); + assert_eq!( + Metric::distribution("1.2", "test", v).into_bytes().as_ref(), + b"test:1.2|d" + ); + assert_eq!( + Metric::set("1.2", "test", v).into_bytes().as_ref(), + b"test:1.2|s" + ); + assert_eq!( + Metric::increase("test", &["a:b", "c"]) + .into_bytes() + .as_ref(), + b"test:1|c|#a:b,c" + ); + } +}