Handling connection errors with KafkaTemplate when cluster not available #2250
-
Hi everyone, I'm really strunggeling with some basic functionality that I would like to achieve using KafkaTemplate and I hope someone can help me out. Consider the basic following scenario: Why do I want to achieve that? Well, in a production environment, it can always be the case, that the Kafka Cluster goes down for some reason. I want to be able to detect that during the sending process of data. And worst problem ist, that, as of now, my producer thread will starve to death, because it goes into in infinite loop trying to connect to the broker. This is my producer config:
This is my service. My first approach sendMessageWithCallback() does not work, because the onFailure() method won't get invoked if the KafkaProducer cannot establish a connection to the Kafka cluster. Using my second service method sendMessageWithProperErrorHandling(), I can at least catch the TimeoutException which is thrown by the KafkaProducer when the metadata for the topic could not be fetched within MAX_BLOCK_MS_CONFIG, but still, I can't stop the producer from going into an infite loop after that first timeout. Below you also find a picutre of the infinite loop. The KafkaProducer will essentially try to connect to the KafkaCluster for the rest of it's life, creating thread starving to death. It also looks like that it completly ignores my RETRIES_CONFIG which is set to zero retires ...
Now my simple question: What is the best practice to detect connection errors during a send process and stop the sending process, when that error occurs? |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 8 replies
-
Unfortunately, the kafka-clients provide no hooks to determine that a send failed because the broker is down. You can configure the https://kafka.apache.org/documentation/#producerconfigs_reconnect.backoff.max.ms to increase the time between connection attempts; it caps it a 1 second by default. Eventually (60 seconds by default You can then use, say, |
Beta Was this translation helpful? Give feedback.
-
As I said, there are no hooks; I was suggesting that if you get a timeout, use the admin client to see if it was a connectivity problem.
I meant stop calling
You can call |
Beta Was this translation helpful? Give feedback.
-
I do not believe the synchronous call you defined in |
Beta Was this translation helpful? Give feedback.
As I said, there are no hooks; I was suggesting that if you get a timeout, use the admin client to see if it was a connectivity problem.
I meant stop calling
kafkaTemplate.send()
.You can call
reset()
on the producer factory to close the producer(s).