From 3146f9b5d20252ca45e9fb6263f1aec4af063d38 Mon Sep 17 00:00:00 2001 From: Tamas Kornai Date: Fri, 3 Nov 2023 10:06:58 +0100 Subject: [PATCH] [RDP-1913]Reduce metadata refresh interval (#76) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The goal with this change is to provide more time for https://github.com/transferwise/kafka-health-checker to demote unhealthy brokers. We assume that faulty broker is in a zombie state, so it won't return PARTITION_MIGRATED exception that would force the metadata update. Default metadata.max.age.ms is 5 min. Producer’s delivery timeout is 7 min. Let’s say trouble starts at 13:01, health checker reacts to this and demotes the broker at 13:04, if Kafka client’s metadata was refreshed at 13:03, then next metadata refresh will be at 13:08, by that time we would already hit delivery timeout, which would be at 13:08. If producers producing to a changelog topic fail, then it forces the whole Kafka streams task to migrate to another instance, hence the rebalancing. Problem will be that exception will be thrown when produce fails within the delivery timeout, leading the Kafka Streams thread to be moved to another instance. Producer metadata is refreshed periodically, or if there’re certain exceptions returned by the broker. (Source: According to https://wise.slack.com/archives/G01P8RBLGCC/p1693400446185769) --- CHANGELOG.md | 4 ++++ gradle.properties | 2 +- .../kafka/tkms/config/TkmsKafkaProducerProvider.java | 1 + 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index efb53d2..84e2dab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.25.1] - 2023-10-30 +### Added +- Setting METADATA_MAX_AGE_CONFIG to two minutes for producer + ## [0.25.0] - 2023-08-09 ### Added diff --git a/gradle.properties b/gradle.properties index 37a3805..b73a05f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.25.0 \ No newline at end of file +version=0.25.1 diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java index e9de4fc..2ce4f24 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java @@ -44,6 +44,7 @@ public KafkaProducer getKafkaProducer(int shard) { configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000"); configs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "10000"); configs.put(ProducerConfig.LINGER_MS_CONFIG, "5"); + configs.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "120000"); configs.putAll(tkmsProperties.getKafka());