Skip to content

Commit

Permalink
Merge pull request #4 from Overmuse/SR/producer_settings
Browse files Browse the repository at this point in the history
add producer settings
  • Loading branch information
SebRollen authored Apr 20, 2021
2 parents 33e2cee + b8738fc commit f62fd50
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 38 deletions.
36 changes: 0 additions & 36 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,39 +94,3 @@ jobs:
with:
command: clippy
args: -- -D warnings

coverage:
name: Test & Coverage
runs-on: ubuntu-latest
needs: [fmt, clippy]
steps:
- name: Checkout repository
uses: actions/checkout@v2
- name: Cache
uses: actions/cache@v2
id: cache
with:
path: |
~/.cargo/registry
~/.cargo/git
target
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
- name: Setup git credentials
uses: fusion-engineering/setup-git-credentials@v2
with:
credentials: ${{secrets.GIT_USER_CREDENTIALS}}
- name: Install toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: nightly
- name: Initiate kafka
run: ./scripts/init_kafka.sh
- name: Install tarpaulin
run: cargo install cargo-tarpaulin
- name: Generate coverage
run: cargo tarpaulin --out Xml
- name: Upload to codecov
uses: codecov/codecov-action@v1
with:
token: ${{secrets.CODECOV_TOKEN}}
fail_ci_if_error: true
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kafka-settings"
version = "0.2.2"
version = "0.3.0"
authors = ["Sebastian Rollen <[email protected]>"]
edition = "2018"

Expand Down
12 changes: 11 additions & 1 deletion src/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::settings::KafkaSettings;
use crate::settings::{Acks, KafkaSettings};
use rdkafka::{
consumer::{Consumer, StreamConsumer},
error::KafkaError,
Expand Down Expand Up @@ -30,7 +30,17 @@ pub fn consumer(settings: &KafkaSettings) -> Result<StreamConsumer, KafkaError>
pub fn producer(settings: &KafkaSettings) -> Result<FutureProducer, KafkaError> {
let mut config = ClientConfig::new();
let config = settings.config(&mut config);
let producer_settings = settings
.producer
.clone()
.expect("Producer settings not specified");
let acks = match producer_settings.acks {
Acks::All => "all".to_string(),
Acks::Number(n) => format!("{}", n),
};
let producer: FutureProducer = config
.set("acks", acks)
.set("retries", format!("{}", producer_settings.retries))
// TODO: Figure out how to remove this setting
.set("enable.ssl.certificate.verification", "false")
.create()?;
Expand Down
25 changes: 25 additions & 0 deletions src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,38 @@ where
Ok(s.split(',').map(From::from).collect())
}

fn default_acks() -> Acks {
Acks::Number(1)
}

#[derive(Debug, Clone, Deserialize)]
pub enum Acks {
#[serde(rename = "all")]
All,
Number(usize),
}

fn default_retries() -> usize {
2147483647
}

#[derive(Debug, Clone, Deserialize)]
pub struct ProducerSettings {
#[serde(default = "default_acks")]
pub acks: Acks,
#[serde(default = "default_retries")]
pub retries: usize,
}

#[derive(Debug, Clone, Deserialize)]
pub struct KafkaSettings {
pub bootstrap_servers: String,
#[serde(flatten)]
pub security_protocol: SecurityProtocol,
#[serde(flatten)]
pub consumer: Option<ConsumerSettings>,
#[serde(flatten)]
pub producer: Option<ProducerSettings>,
}

impl KafkaSettings {
Expand Down

0 comments on commit f62fd50

Please sign in to comment.