Skip to content

Commit

Permalink
fix settings
Browse files Browse the repository at this point in the history
  • Loading branch information
SebRollen committed Apr 20, 2021
1 parent b8738fc commit 8fdf0ee
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 25 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,5 @@ jobs:
with:
command: clippy
args: -- -D warnings

# TODO: When adding in tests, remember to set `cargo test -- --test-threads=1`
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kafka-settings"
version = "0.3.0"
version = "0.3.1"
authors = ["Sebastian Rollen <[email protected]>"]
edition = "2018"

Expand All @@ -9,3 +9,6 @@ edition = "2018"
[dependencies]
rdkafka = "0.26"
serde = "1.0"

[dev-dependencies]
config = "0.11.0"
8 changes: 2 additions & 6 deletions src/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::settings::{Acks, KafkaSettings};
use crate::settings::KafkaSettings;
use rdkafka::{
consumer::{Consumer, StreamConsumer},
error::KafkaError,
Expand Down Expand Up @@ -34,12 +34,8 @@ pub fn producer(settings: &KafkaSettings) -> Result<FutureProducer, KafkaError>
.producer
.clone()
.expect("Producer settings not specified");
let acks = match producer_settings.acks {
Acks::All => "all".to_string(),
Acks::Number(n) => format!("{}", n),
};
let producer: FutureProducer = config
.set("acks", acks)
.set("acks", format!("{}", producer_settings.acks))
.set("retries", format!("{}", producer_settings.retries))
// TODO: Figure out how to remove this setting
.set("enable.ssl.certificate.verification", "false")
Expand Down
112 changes: 94 additions & 18 deletions src/settings.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use serde::{Deserialize, Deserializer};
use serde::{de, Deserialize, Deserializer};
use std::{fmt::Display, str::FromStr};

#[derive(Debug, Clone, Deserialize)]
#[derive(Debug, Clone, Deserialize, PartialEq)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE", tag = "security_protocol")]
pub enum SecurityProtocol {
Plaintext,
Expand All @@ -25,26 +26,21 @@ where
Ok(s.split(',').map(From::from).collect())
}

fn default_acks() -> Acks {
Acks::Number(1)
}

#[derive(Debug, Clone, Deserialize)]
pub enum Acks {
#[serde(rename = "all")]
All,
Number(usize),
}

fn default_retries() -> usize {
2147483647
fn from_str<'de, T, D>(deserializer: D) -> Result<T, D::Error>
where
T: FromStr,
T::Err: Display,
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
T::from_str(&s).map_err(de::Error::custom)
}

#[derive(Debug, Clone, Deserialize)]
pub struct ProducerSettings {
#[serde(default = "default_acks")]
pub acks: Acks,
#[serde(default = "default_retries")]
#[serde(deserialize_with = "from_str")]
pub acks: usize,
#[serde(deserialize_with = "from_str")]
pub retries: usize,
}

Expand All @@ -55,6 +51,7 @@ pub struct KafkaSettings {
pub security_protocol: SecurityProtocol,
#[serde(flatten)]
pub consumer: Option<ConsumerSettings>,
// Doesn't technically need to be an option now, but future-proofing
#[serde(flatten)]
pub producer: Option<ProducerSettings>,
}
Expand Down Expand Up @@ -83,3 +80,82 @@ impl KafkaSettings {
config
}
}

#[cfg(test)]
mod test {
use super::*;
use config::{Config, Environment};

#[derive(Debug, Clone, Deserialize)]
struct Test {
kafka: KafkaSettings,
}

#[test]
fn bare_settings() {
std::env::set_var("KAFKA__BOOTSTRAP_SERVERS", "ADDRESS");
std::env::set_var("KAFKA__SECURITY_PROTOCOL", "PLAINTEXT");

let mut s = Config::new();
s.merge(Environment::new().separator("__")).unwrap();
let settings: Test = s.try_into().unwrap();
assert_eq!(settings.kafka.bootstrap_servers, "ADDRESS".to_string());
assert_eq!(
settings.kafka.security_protocol,
SecurityProtocol::Plaintext
);
assert!(settings.kafka.consumer.is_none());
assert!(settings.kafka.producer.is_none());
}

#[test]
fn sasl_ssl() {
std::env::set_var("KAFKA__BOOTSTRAP_SERVERS", "ADDRESS");
std::env::set_var("KAFKA__SECURITY_PROTOCOL", "SASL_SSL");
std::env::set_var("KAFKA__SASL_USERNAME", "USER");
std::env::set_var("KAFKA__SASL_PASSWORD", "PASS");

let mut s = Config::new();
s.merge(Environment::new().separator("__")).unwrap();
let settings: Test = s.try_into().unwrap();
assert_eq!(settings.kafka.bootstrap_servers, "ADDRESS".to_string());
assert_eq!(
settings.kafka.security_protocol,
SecurityProtocol::SaslSsl {
sasl_username: "USER".into(),
sasl_password: "PASS".into()
}
);
}

#[test]
fn consumer_settings() {
std::env::set_var("KAFKA__BOOTSTRAP_SERVERS", "ADDRESS");
std::env::set_var("KAFKA__SECURITY_PROTOCOL", "PLAINTEXT");
std::env::set_var("KAFKA__GROUP_ID", "group");
std::env::set_var("KAFKA__INPUT_TOPICS", "A,B,C");
let mut s = Config::new();
s.merge(Environment::new().separator("__")).unwrap();
let settings: Test = s.try_into().unwrap();
let consumer_settings = settings.kafka.consumer.unwrap();
assert_eq!(consumer_settings.group_id, "group".to_string());
assert_eq!(
consumer_settings.input_topics,
vec!["A".to_string(), "B".to_string(), "C".to_string()]
);
}

#[test]
fn producer_settings() {
std::env::set_var("KAFKA__BOOTSTRAP_SERVERS", "ADDRESS");
std::env::set_var("KAFKA__SECURITY_PROTOCOL", "PLAINTEXT");
std::env::set_var("KAFKA__ACKS", "0");
std::env::set_var("KAFKA__RETRIES", "0");
let mut s = Config::new();
s.merge(Environment::new().separator("__")).unwrap();
let settings: Test = s.try_into().unwrap();
let producer_settings = settings.kafka.producer.unwrap();
assert_eq!(producer_settings.acks, 0);
assert_eq!(producer_settings.retries, 0);
}
}

0 comments on commit 8fdf0ee

Please sign in to comment.