diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 09947227b..fe18836a6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -966,8 +966,8 @@ else if (listener instanceof MessageListener) { this.wasIdlePartition = new HashMap<>(); this.kafkaAdmin = obtainAdmin(); - if (this.listener instanceof RecordMessagingMessageListenerAdapter rmmla) { - rmmla.setObservationRegistry(observationRegistry); + if (isListenerAdapterObservationAware()) { + ((RecordMessagingMessageListenerAdapter) this.listener).setObservationRegistry(observationRegistry); } } @@ -1228,6 +1228,10 @@ else if (timeout instanceof String str) { } } + private boolean isListenerAdapterObservationAware() { + return this.listener != null && RecordMessagingMessageListenerAdapter.class.equals(this.listener.getClass()); + } + private void subscribeOrAssignTopics(final Consumer subscribingConsumer) { if (KafkaMessageListenerContainer.this.topicPartitions == null) { ConsumerRebalanceListener rebalanceListener = new ListenerConsumerRebalanceListener(); @@ -2772,7 +2776,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco catch (RuntimeException e) { failureTimer(sample, cRecord); recordInterceptAfter(cRecord, e); - if (!(this.listener instanceof RecordMessagingMessageListenerAdapter)) { + if (!isListenerAdapterObservationAware()) { observation.error(e); } if (this.commonErrorHandler == null) { @@ -2800,7 +2804,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco } } finally { - if (!(this.listener instanceof RecordMessagingMessageListenerAdapter)) { + if (!isListenerAdapterObservationAware()) { observation.stop(); } observationScope.close();