Skip to content

Commit

Permalink
spring-projectsGH-3690: Fix observation leak in the `KafkaMessageList…
Browse files Browse the repository at this point in the history
…enerContainer` (spring-projects#3694)

Fixes: spring-projects#3690 

Issue: spring-projects#3690

When `this.listener` is an instance of `RecordMessagingMessageListenerAdapter`,
we rely on its logic to call `invoke()` from super class to handle observation
lifecycle this or other way.
However, Spring Integration's `KafkaMessageDrivenChannelAdapter` use its own
`IntegrationRecordMessageListener` extension of the `RecordMessagingMessageListenerAdapter`
without calling super `invoke()`.
The problem apparent from Spring Cloud Stream Kafka Binder, where an observation is enabled.

* Fix `KafkaMessageListenerContainer` to check for exact type of `this.listener`
before making decision to close an observation here, or propagate it down to the `RecordMessagingMessageListenerAdapter`
  • Loading branch information
artembilan authored Dec 23, 2024
1 parent be612c5 commit c9e7edc
Showing 1 changed file with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -966,8 +966,8 @@ else if (listener instanceof MessageListener) {
this.wasIdlePartition = new HashMap<>();
this.kafkaAdmin = obtainAdmin();

if (this.listener instanceof RecordMessagingMessageListenerAdapter<?, ?> rmmla) {
rmmla.setObservationRegistry(observationRegistry);
if (isListenerAdapterObservationAware()) {
((RecordMessagingMessageListenerAdapter<?, ?>) this.listener).setObservationRegistry(observationRegistry);
}
}

Expand Down Expand Up @@ -1228,6 +1228,10 @@ else if (timeout instanceof String str) {
}
}

private boolean isListenerAdapterObservationAware() {
return this.listener != null && RecordMessagingMessageListenerAdapter.class.equals(this.listener.getClass());
}

private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> subscribingConsumer) {
if (KafkaMessageListenerContainer.this.topicPartitions == null) {
ConsumerRebalanceListener rebalanceListener = new ListenerConsumerRebalanceListener();
Expand Down Expand Up @@ -2772,7 +2776,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
catch (RuntimeException e) {
failureTimer(sample, cRecord);
recordInterceptAfter(cRecord, e);
if (!(this.listener instanceof RecordMessagingMessageListenerAdapter<K, V>)) {
if (!isListenerAdapterObservationAware()) {
observation.error(e);
}
if (this.commonErrorHandler == null) {
Expand Down Expand Up @@ -2800,7 +2804,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
}
}
finally {
if (!(this.listener instanceof RecordMessagingMessageListenerAdapter<K, V>)) {
if (!isListenerAdapterObservationAware()) {
observation.stop();
}
observationScope.close();
Expand Down

0 comments on commit c9e7edc

Please sign in to comment.