From 807c9dbc02081138547692dd43a590c1add6aabc Mon Sep 17 00:00:00 2001 From: Marina Sahakyan Date: Mon, 24 Apr 2023 17:32:51 +0200 Subject: [PATCH] kafka:fix issue 7123 Kafka events fail to post Motivation After we upgraded to 8.2 we no longer are getting events into Kafka. We have 3 dCache instances. One 7.2 remaining still publishing to Kafka with no issue. (https://github.com/dCache/dcache/issues/7123). The issue is that according to https://github.com/spring-projects/spring-kafka/issues/2251, the kafka-clients provide no hooks to determine that a send failed because the broker is down (https://github.com/spring-projects/spring-kafka/discussions/2250). This is still not fixed so this should be fixed. Modification Change the LoggingProducerListener so that when TimeoutException will be catch, the error message will indicate that there is a connection issue or the broker is down. Result Log looks like this 24 Apr 2023 16:17:04 (pool_write) [] Producer failed to send the message, the broker is down or the connection was refused 24 Apr 2023 16:17:04 (pool_write) [] Producer failed to send the message, the broker is down or the connection was refused 24 Apr 2023 16:17:04 (NFS-localhost) [] Producer failed to send the message, the broker is down or the connection was refused or 24 Apr 2023 17:27:51 (NFS-localhost) [door:NFS-localhost@dCacheDomain:AAX6FqLTdag pool_write DoorTransferFinished 0000C9CFA47686574B43B1EF9CF037A24780] Producer failed to send the message, the broker is down or the connection was refused 24 Apr 2023 17:27:51 (pool_write) [door:NFS-localhost@dCacheDomain:AAX6FqLTdag NFS-localhost PoolAcceptFile 0000C9CFA47686574B43B1EF9CF037A24780] Topic billing not present in metadata after 60000 ms. 24 Apr 2023 17:27:51 (NFS-localhost) [door:NFS-localhost@dCacheDomain:AAX6FqLTdag pool_write DoorTransferFinished 0000C9CFA47686574B43B1EF9CF037A24780] TEST Topic billing not present in metadata after 60000 ms. class org.springframework.kafka.KafkaException 24 Apr 2023 17:28:51 (NFS-localhost) [door:NFS-localhost@dCacheDomain:AAX6FqZqubA pool_write DoorTransferFinished 00002B30ED198C494F25A31F589AB91F903F] Producer failed to send the message, the broker is down or the co Target: master 8.2, 9.0 Require-book: no Require-notes: yes Patch: https://rb.dcache.org/r/13967/ Acked-by: Lea Morschel, Abert Rossi, Tigran Mkrtchyan --- .../org/dcache/kafka/LoggingProducerListener.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/modules/dcache/src/main/java/org/dcache/kafka/LoggingProducerListener.java b/modules/dcache/src/main/java/org/dcache/kafka/LoggingProducerListener.java index 69a8b8bbe3d..edbb749bba4 100644 --- a/modules/dcache/src/main/java/org/dcache/kafka/LoggingProducerListener.java +++ b/modules/dcache/src/main/java/org/dcache/kafka/LoggingProducerListener.java @@ -17,8 +17,10 @@ */ package org.dcache.kafka; +import com.google.common.base.Throwables; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.errors.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.support.ProducerListener; @@ -35,7 +37,13 @@ public void onSuccess(ProducerRecord record, RecordMetadata recordMetadata @Override public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { - LOGGER.error("Producer exception occurred while publishing message : {}, exception : {}", - producerRecord, exception.toString()); + if (exception instanceof TimeoutException && exception.getCause() == null) { + LOGGER.error("Producer failed to send the message," + + " the broker is down or the connection was refused "); + } else { + LOGGER.error( + "Producer exception occurred while publishing message : {}, exception : {}", + producerRecord, Throwables.getRootCause(exception).getMessage()); + } } }