diff --git a/Cargo.toml b/Cargo.toml index 63d9a8f..5b82864 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kafka-settings" -version = "0.3.1" +version = "0.3.2" authors = ["Sebastian Rollen "] edition = "2018" @@ -9,6 +9,7 @@ edition = "2018" [dependencies] rdkafka = "0.26" serde = "1.0" +uuid = { version = "0.8.2", features = ["v4"] } [dev-dependencies] config = "0.11.0" diff --git a/README.md b/README.md index f8deaac..d4630f8 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,7 @@ Environment variables need to be prefixed with `KAFKA__`. The following settings - `SASL_USERNAME` - `SASL_PASSWORD` - `GROUP_ID` (optional): The group id for consumers. +- `UNIQUE_ID` (optional): Append a unique identifier to the end of `GROUP_ID`. - `INPUT_TOPICS` (optional): A CSV list of kafka topics for consumers to subscribe to. - `ACKS` (optional): The number of acknowledgements needed from the brokers before committing a message. - `RETRIES` (optional): The number of times to retry sending a message to the brokers. diff --git a/src/kafka.rs b/src/kafka.rs index aa9e18c..50cb05e 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -5,6 +5,7 @@ use rdkafka::{ producer::FutureProducer, ClientConfig, }; +use uuid::Uuid; pub fn consumer(settings: &KafkaSettings) -> Result { let mut config = ClientConfig::new(); @@ -13,8 +14,18 @@ pub fn consumer(settings: &KafkaSettings) -> Result .consumer .clone() .expect("Consumer settings not specified"); + let group_id = if consumer_settings.unique_id { + format!( + "{}_{}", + consumer_settings.group_id, + Uuid::new_v4().to_string(), + ) + } else { + consumer_settings.group_id + }; + let consumer: StreamConsumer = config - .set("group.id", &consumer_settings.group_id) + .set("group.id", &group_id) // TODO: Figure out how to remove this setting .set("enable.ssl.certificate.verification", "false") .create()?; diff --git a/src/settings.rs b/src/settings.rs index dbb450f..f1a9da1 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -14,6 +14,8 @@ pub enum SecurityProtocol { #[derive(Debug, Clone, Deserialize)] pub struct ConsumerSettings { pub group_id: String, + #[serde(default)] + pub unique_id: bool, #[serde(deserialize_with = "vec_from_str")] pub input_topics: Vec, } @@ -139,6 +141,7 @@ mod test { let settings: Test = s.try_into().unwrap(); let consumer_settings = settings.kafka.consumer.unwrap(); assert_eq!(consumer_settings.group_id, "group".to_string()); + assert!(!consumer_settings.unique_id); assert_eq!( consumer_settings.input_topics, vec!["A".to_string(), "B".to_string(), "C".to_string()]