From 60164661aadf2881591b694725e0c24a162f3bf3 Mon Sep 17 00:00:00 2001 From: Sebastian Rollen Date: Tue, 20 Apr 2021 16:49:34 -0400 Subject: [PATCH 1/3] add producer settings --- Cargo.toml | 2 +- src/kafka.rs | 11 ++++++++++- src/settings.rs | 19 +++++++++++++++++++ 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 24e0461..a310bc2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kafka-settings" -version = "0.2.2" +version = "0.3.0" authors = ["Sebastian Rollen "] edition = "2018" diff --git a/src/kafka.rs b/src/kafka.rs index 387e5b1..e8c0c79 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -1,4 +1,4 @@ -use crate::settings::KafkaSettings; +use crate::settings::{Acks, KafkaSettings}; use rdkafka::{ consumer::{Consumer, StreamConsumer}, error::KafkaError, @@ -30,7 +30,16 @@ pub fn consumer(settings: &KafkaSettings) -> Result pub fn producer(settings: &KafkaSettings) -> Result { let mut config = ClientConfig::new(); let config = settings.config(&mut config); + let producer_settings = settings + .producer + .clone() + .expect("Producer settings not specified"); + let acks = match producer_settings.acks { + Acks::All => "all".to_string(), + Acks::Number(n) => format!("{}", n), + }; let producer: FutureProducer = config + .set("acks", acks) // TODO: Figure out how to remove this setting .set("enable.ssl.certificate.verification", "false") .create()?; diff --git a/src/settings.rs b/src/settings.rs index 73299e4..fc073b2 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -25,6 +25,23 @@ where Ok(s.split(',').map(From::from).collect()) } +fn default_acks() -> Acks { + Acks::Number(1) +} + +#[derive(Debug, Clone, Deserialize)] +pub enum Acks { + #[serde(rename = "all")] + All, + Number(usize), +} + +#[derive(Debug, Clone, Deserialize)] +pub struct ProducerSettings { + #[serde(default = "default_acks")] + pub acks: Acks, +} + #[derive(Debug, Clone, Deserialize)] pub struct KafkaSettings { pub bootstrap_servers: String, @@ -32,6 +49,8 @@ pub struct KafkaSettings { pub security_protocol: SecurityProtocol, #[serde(flatten)] pub consumer: Option, + #[serde(flatten)] + pub producer: Option, } impl KafkaSettings { From e0d88cbdfc97962dd52b0654853ac6bc2ff46bd5 Mon Sep 17 00:00:00 2001 From: Sebastian Rollen Date: Tue, 20 Apr 2021 16:54:47 -0400 Subject: [PATCH 2/3] also add retries --- src/kafka.rs | 1 + src/settings.rs | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/src/kafka.rs b/src/kafka.rs index e8c0c79..99c4a7f 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -40,6 +40,7 @@ pub fn producer(settings: &KafkaSettings) -> Result }; let producer: FutureProducer = config .set("acks", acks) + .set("retries", format!("{}", producer_settings.retries)) // TODO: Figure out how to remove this setting .set("enable.ssl.certificate.verification", "false") .create()?; diff --git a/src/settings.rs b/src/settings.rs index fc073b2..bc2e282 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -36,10 +36,16 @@ pub enum Acks { Number(usize), } +fn default_retries() -> usize { + 2147483647 +} + #[derive(Debug, Clone, Deserialize)] pub struct ProducerSettings { #[serde(default = "default_acks")] pub acks: Acks, + #[serde(default = "default_retries")] + pub retries: usize, } #[derive(Debug, Clone, Deserialize)] From b8738fcee0a4a663370deaa25fbea2088c22c483 Mon Sep 17 00:00:00 2001 From: Sebastian Rollen Date: Tue, 20 Apr 2021 16:55:08 -0400 Subject: [PATCH 3/3] remove test step --- .github/workflows/ci.yml | 36 ------------------------------------ 1 file changed, 36 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d0dee43..b4bee95 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -94,39 +94,3 @@ jobs: with: command: clippy args: -- -D warnings - - coverage: - name: Test & Coverage - runs-on: ubuntu-latest - needs: [fmt, clippy] - steps: - - name: Checkout repository - uses: actions/checkout@v2 - - name: Cache - uses: actions/cache@v2 - id: cache - with: - path: | - ~/.cargo/registry - ~/.cargo/git - target - key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} - - name: Setup git credentials - uses: fusion-engineering/setup-git-credentials@v2 - with: - credentials: ${{secrets.GIT_USER_CREDENTIALS}} - - name: Install toolchain - uses: actions-rs/toolchain@v1 - with: - toolchain: nightly - - name: Initiate kafka - run: ./scripts/init_kafka.sh - - name: Install tarpaulin - run: cargo install cargo-tarpaulin - - name: Generate coverage - run: cargo tarpaulin --out Xml - - name: Upload to codecov - uses: codecov/codecov-action@v1 - with: - token: ${{secrets.CODECOV_TOKEN}} - fail_ci_if_error: true