From eb7fa488e4955b076030467a76b1187d8726115d Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 17 Dec 2024 11:35:09 -0500 Subject: [PATCH] GH-3686: Fix observation scope closure in the `KafkaMessageListenerContainer` Fixes: https://github.com/spring-projects/spring-kafka/issues/3686 According to our investigation around the `try-with-resource`, it looks like the resource is already closed when we reach the `catch` block. * Rework `KafkaMessageListenerContainer.ListenerConsumer.doInvokeRecordListener()` to `observation.openScope()` before the `try` and close it manually in the `finally` block --- .../kafka/listener/KafkaMessageListenerContainer.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index de258bac4..09947227b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -2750,7 +2750,6 @@ private void pauseForNackSleep() { * @throws Error an error. */ @Nullable - @SuppressWarnings("try") private RuntimeException doInvokeRecordListener(final ConsumerRecord cRecord, // NOSONAR Iterator> iterator) { @@ -2763,7 +2762,9 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco this.observationRegistry); observation.start(); - try (Observation.Scope ignored = observation.openScope()) { + Observation.Scope observationScope = observation.openScope(); + // We cannot use 'try-with-resource' because the resource is closed just before catch block + try { invokeOnMessage(cRecord); successTimer(sample, cRecord); recordInterceptAfter(cRecord, null); @@ -2802,6 +2803,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco if (!(this.listener instanceof RecordMessagingMessageListenerAdapter)) { observation.stop(); } + observationScope.close(); } return null; } @@ -4020,6 +4022,6 @@ private static class StopAfterFenceException extends KafkaException { } - private record FailedRecordTuple(ConsumerRecord record, RuntimeException ex) { }; + private record FailedRecordTuple(ConsumerRecord record, RuntimeException ex) { } }