diff --git a/README.md b/README.md index d89c680..1e66164 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ Some reasons why you might be interested: * Modify consumer group offsets (e.g., resetting or manually setting offsets per topic and per partition). * JSON output for easy consumption with tools like [kp](https://github.com/echojc/kp) or [jq](https://stedolan.github.io/jq/). * JSON input to facilitate automation via tools like [jsonify](https://github.com/fgeller/jsonify). -* Configure brokers, topic and authentication via environment variables `KT_BROKERS`, `KT_TOPIC` and `KT_AUTH`. +* Configure brokers, topic, Kafka version and authentication via environment variables `KT_BROKERS`, `KT_TOPIC`, `KT_KAFKA_VERSION` and `KT_AUTH`. * Fast start up time. * No buffering of output. * Binary keys and payloads can be passed and presented in base64 or hex encoding. diff --git a/common.go b/common.go index 13bbda4..e143a0c 100644 --- a/common.go +++ b/common.go @@ -26,6 +26,7 @@ const ( ENV_ADMIN_TIMEOUT = "KT_ADMIN_TIMEOUT" ENV_BROKERS = "KT_BROKERS" ENV_TOPIC = "KT_TOPIC" + ENV_KAFKA_VERSION = "KT_KAFKA_VERSION" ) var ( @@ -45,6 +46,11 @@ func listenForInterrupt(q chan struct{}) { } func kafkaVersion(s string) sarama.KafkaVersion { + ev := os.Getenv(ENV_KAFKA_VERSION) + if s == "" && ev != "" { + s = ev + } + if s == "" { return sarama.V2_0_0_0 } diff --git a/system_test.go b/system_test.go index 25f9bab..80017cb 100644 --- a/system_test.go +++ b/system_test.go @@ -32,6 +32,7 @@ func (c *cmd) run(name string, args ...string) (int, string, string) { cmd.Env = os.Environ() cmd.Env = append(cmd.Env, fmt.Sprintf("%s=localhost:9092", ENV_BROKERS)) cmd.Env = append(cmd.Env, fmt.Sprintf("%s=test-secrets/auth.json", ENV_AUTH)) + cmd.Env = append(cmd.Env, fmt.Sprintf("%s=v3.0.0", ENV_KAFKA_VERSION)) if len(c.in) > 0 { cmd.Stdin = strings.NewReader(c.in)