Skip to content
This repository has been archived by the owner on Jul 16, 2021. It is now read-only.

Commit

Permalink
initial development
Browse files Browse the repository at this point in the history
  • Loading branch information
SebRollen committed Jun 17, 2021
1 parent ab5cd6f commit 814085f
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/target
Cargo.lock
10 changes: 10 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "dogstatsd"
version = "0.1.0"
authors = ["Sebastian Rollen <[email protected]>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
bytes = "1.0.1"
34 changes: 34 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use crate::metric::Metric;
use std::io::Error;
use std::net::{ToSocketAddrs, UdpSocket};

pub struct Client<'a> {
socket: UdpSocket,
target: &'a str,
}

impl<'a> Client<'a> {
pub fn new<T: ToSocketAddrs>(host_address: &T, target_address: &'a str) -> Result<Self, Error> {
Ok(Self {
socket: UdpSocket::bind(host_address)?,
target: target_address,
})
}

pub fn send<I, T>(&self, metric: Metric<I, T>) -> Result<usize, Error>
where
I: IntoIterator<Item = T>,
T: AsRef<str>,
{
self.socket.send_to(metric.to_bytes().as_ref(), self.target)
}
}

impl Default for Client<'_> {
fn default() -> Self {
Self {
socket: UdpSocket::bind("127.0.0.1:0").unwrap(),
target: "127.0.0.1:8125",
}
}
}
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
mod client;
mod metric;
pub use client::Client;
pub use metric::Metric;
183 changes: 183 additions & 0 deletions src/metric.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
use bytes::{Bytes, BytesMut};

enum Type<'a> {
Count(isize),
Increase,
Decrease,
Gauge(&'a str),
Histogram(&'a str),
Distribution(&'a str),
Set(&'a str),
}

pub struct Metric<'a, I, T>
where
I: IntoIterator<Item = T>,
T: AsRef<str>,
{
frame_type: Type<'a>,
message: &'a str,
tags: I,
}

impl<'a, I, T> Metric<'a, I, T>
where
I: IntoIterator<Item = T>,
T: AsRef<str>,
{
pub fn increase(message: &'a str, tags: I) -> Self {
Self {
frame_type: Type::Increase,
message,
tags,
}
}

pub fn decrease(message: &'a str, tags: I) -> Self {
Self {
frame_type: Type::Decrease,
message,
tags,
}
}

pub fn count(count: isize, message: &'a str, tags: I) -> Self {
Self {
frame_type: Type::Count(count),
message,
tags,
}
}

pub fn gauge(value: &'a str, message: &'a str, tags: I) -> Self {
Self {
frame_type: Type::Gauge(value),
message,
tags,
}
}

pub fn histogram(value: &'a str, message: &'a str, tags: I) -> Self {
Self {
frame_type: Type::Histogram(value),
message,
tags,
}
}

pub fn distribution(value: &'a str, message: &'a str, tags: I) -> Self {
Self {
frame_type: Type::Distribution(value),
message,
tags,
}
}

pub fn set(value: &'a str, message: &'a str, tags: I) -> Self {
Self {
frame_type: Type::Set(value),
message,
tags,
}
}

pub(crate) fn to_bytes(self) -> Bytes {
let mut buf;
match self.frame_type {
Type::Count(count) => {
buf = BytesMut::with_capacity(3 + self.message.len() + 8);
buf.extend_from_slice(self.message.as_bytes());
buf.extend_from_slice(b":");
buf.extend_from_slice(count.to_string().as_bytes());
buf.extend_from_slice(b"|c");
}
Type::Increase => {
buf = BytesMut::with_capacity(self.message.len() + 4);
buf.extend_from_slice(self.message.as_bytes());
buf.extend_from_slice(b":1|c");
}
Type::Decrease => {
buf = BytesMut::with_capacity(self.message.len() + 5);
buf.extend_from_slice(self.message.as_bytes());
buf.extend_from_slice(b":-1|c");
}
Type::Gauge(val) => {
buf = BytesMut::with_capacity(3 + self.message.len() + val.len());
buf.extend_from_slice(self.message.as_bytes());
buf.extend_from_slice(b":");
buf.extend_from_slice(val.as_bytes());
buf.extend_from_slice(b"|g");
}
Type::Histogram(val) => {
buf = BytesMut::with_capacity(3 + self.message.len() + val.len());
buf.extend_from_slice(self.message.as_bytes());
buf.extend_from_slice(b":");
buf.extend_from_slice(val.as_bytes());
buf.extend_from_slice(b"|h");
}
Type::Distribution(val) => {
buf = BytesMut::with_capacity(3 + self.message.len() + val.len());
buf.extend_from_slice(self.message.as_bytes());
buf.extend_from_slice(b":");
buf.extend_from_slice(val.as_bytes());
buf.extend_from_slice(b"|d");
}
Type::Set(val) => {
buf = BytesMut::with_capacity(3 + self.message.len() + val.len());
buf.extend_from_slice(self.message.as_bytes());
buf.extend_from_slice(b":");
buf.extend_from_slice(val.as_bytes());
buf.extend_from_slice(b"|s");
}
}
let mut tags_iter = self.tags.into_iter();
let mut next_tag = tags_iter.next();

if next_tag.is_some() {
buf.extend_from_slice(b"|#");
}

while next_tag.is_some() {
buf.extend_from_slice(next_tag.unwrap().as_ref().as_bytes());
next_tag = tags_iter.next();

if next_tag.is_some() {
buf.extend_from_slice(b",");
}
}

buf.freeze()
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_to_bytes() {
let v: &[&str] = &[];
assert_eq!(Metric::increase("test", v).to_bytes().as_ref(), b"test:1|c");
assert_eq!(
Metric::decrease("test", v).to_bytes().as_ref(),
b"test:-1|c"
);
assert_eq!(Metric::count(2, "test", v).to_bytes().as_ref(), b"test:2|c");
assert_eq!(
Metric::gauge("1.2", "test", v).to_bytes().as_ref(),
b"test:1.2|g"
);
assert_eq!(
Metric::histogram("1.2", "test", v).to_bytes().as_ref(),
b"test:1.2|h"
);
assert_eq!(
Metric::distribution("1.2", "test", v).to_bytes().as_ref(),
b"test:1.2|d"
);
assert_eq!(
Metric::set("1.2", "test", v).to_bytes().as_ref(),
b"test:1.2|s"
);
}
}

0 comments on commit 814085f

Please sign in to comment.