Skip to content

Commit

Permalink
Merge pull request #6 from Overmuse/SR/unique_id
Browse files Browse the repository at this point in the history
create option for unique consumer id
  • Loading branch information
SebRollen authored Jun 21, 2021
2 parents 9dcca75 + a9401a7 commit 248af2a
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 2 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kafka-settings"
version = "0.3.1"
version = "0.3.2"
authors = ["Sebastian Rollen <[email protected]>"]
edition = "2018"

Expand All @@ -9,6 +9,7 @@ edition = "2018"
[dependencies]
rdkafka = "0.26"
serde = "1.0"
uuid = { version = "0.8.2", features = ["v4"] }

[dev-dependencies]
config = "0.11.0"
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Environment variables need to be prefixed with `KAFKA__`. The following settings
- `SASL_USERNAME`
- `SASL_PASSWORD`
- `GROUP_ID` (optional): The group id for consumers.
- `UNIQUE_ID` (optional): Append a unique identifier to the end of `GROUP_ID`.
- `INPUT_TOPICS` (optional): A CSV list of kafka topics for consumers to subscribe to.
- `ACKS` (optional): The number of acknowledgements needed from the brokers before committing a message.
- `RETRIES` (optional): The number of times to retry sending a message to the brokers.
13 changes: 12 additions & 1 deletion src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use rdkafka::{
producer::FutureProducer,
ClientConfig,
};
use uuid::Uuid;

pub fn consumer(settings: &KafkaSettings) -> Result<StreamConsumer, KafkaError> {
let mut config = ClientConfig::new();
Expand All @@ -13,8 +14,18 @@ pub fn consumer(settings: &KafkaSettings) -> Result<StreamConsumer, KafkaError>
.consumer
.clone()
.expect("Consumer settings not specified");
let group_id = if consumer_settings.unique_id {
format!(
"{}_{}",
consumer_settings.group_id,
Uuid::new_v4().to_string(),
)
} else {
consumer_settings.group_id
};

let consumer: StreamConsumer = config
.set("group.id", &consumer_settings.group_id)
.set("group.id", &group_id)
// TODO: Figure out how to remove this setting
.set("enable.ssl.certificate.verification", "false")
.create()?;
Expand Down
3 changes: 3 additions & 0 deletions src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ pub enum SecurityProtocol {
#[derive(Debug, Clone, Deserialize)]
pub struct ConsumerSettings {
pub group_id: String,
#[serde(default)]
pub unique_id: bool,
#[serde(deserialize_with = "vec_from_str")]
pub input_topics: Vec<String>,
}
Expand Down Expand Up @@ -139,6 +141,7 @@ mod test {
let settings: Test = s.try_into().unwrap();
let consumer_settings = settings.kafka.consumer.unwrap();
assert_eq!(consumer_settings.group_id, "group".to_string());
assert!(!consumer_settings.unique_id);
assert_eq!(
consumer_settings.input_topics,
vec!["A".to_string(), "B".to_string(), "C".to_string()]
Expand Down

0 comments on commit 248af2a

Please sign in to comment.