diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d0dee43..b4bee95 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -94,39 +94,3 @@ jobs: with: command: clippy args: -- -D warnings - - coverage: - name: Test & Coverage - runs-on: ubuntu-latest - needs: [fmt, clippy] - steps: - - name: Checkout repository - uses: actions/checkout@v2 - - name: Cache - uses: actions/cache@v2 - id: cache - with: - path: | - ~/.cargo/registry - ~/.cargo/git - target - key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} - - name: Setup git credentials - uses: fusion-engineering/setup-git-credentials@v2 - with: - credentials: ${{secrets.GIT_USER_CREDENTIALS}} - - name: Install toolchain - uses: actions-rs/toolchain@v1 - with: - toolchain: nightly - - name: Initiate kafka - run: ./scripts/init_kafka.sh - - name: Install tarpaulin - run: cargo install cargo-tarpaulin - - name: Generate coverage - run: cargo tarpaulin --out Xml - - name: Upload to codecov - uses: codecov/codecov-action@v1 - with: - token: ${{secrets.CODECOV_TOKEN}} - fail_ci_if_error: true diff --git a/Cargo.toml b/Cargo.toml index 24e0461..a310bc2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kafka-settings" -version = "0.2.2" +version = "0.3.0" authors = ["Sebastian Rollen "] edition = "2018" diff --git a/src/kafka.rs b/src/kafka.rs index 387e5b1..99c4a7f 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -1,4 +1,4 @@ -use crate::settings::KafkaSettings; +use crate::settings::{Acks, KafkaSettings}; use rdkafka::{ consumer::{Consumer, StreamConsumer}, error::KafkaError, @@ -30,7 +30,17 @@ pub fn consumer(settings: &KafkaSettings) -> Result pub fn producer(settings: &KafkaSettings) -> Result { let mut config = ClientConfig::new(); let config = settings.config(&mut config); + let producer_settings = settings + .producer + .clone() + .expect("Producer settings not specified"); + let acks = match producer_settings.acks { + Acks::All => "all".to_string(), + Acks::Number(n) => format!("{}", n), + }; let producer: FutureProducer = config + .set("acks", acks) + .set("retries", format!("{}", producer_settings.retries)) // TODO: Figure out how to remove this setting .set("enable.ssl.certificate.verification", "false") .create()?; diff --git a/src/settings.rs b/src/settings.rs index 73299e4..bc2e282 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -25,6 +25,29 @@ where Ok(s.split(',').map(From::from).collect()) } +fn default_acks() -> Acks { + Acks::Number(1) +} + +#[derive(Debug, Clone, Deserialize)] +pub enum Acks { + #[serde(rename = "all")] + All, + Number(usize), +} + +fn default_retries() -> usize { + 2147483647 +} + +#[derive(Debug, Clone, Deserialize)] +pub struct ProducerSettings { + #[serde(default = "default_acks")] + pub acks: Acks, + #[serde(default = "default_retries")] + pub retries: usize, +} + #[derive(Debug, Clone, Deserialize)] pub struct KafkaSettings { pub bootstrap_servers: String, @@ -32,6 +55,8 @@ pub struct KafkaSettings { pub security_protocol: SecurityProtocol, #[serde(flatten)] pub consumer: Option, + #[serde(flatten)] + pub producer: Option, } impl KafkaSettings {