From d2320291161f09492160d087b54c7d9752d97f97 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Tue, 15 Oct 2024 19:51:39 -0400 Subject: [PATCH] Addressing PR review --- .../KafkaMessageListenerContainer.java | 5 ++--- .../MessagingMessageListenerAdapter.java | 22 +++++++++++++------ .../support/micrometer/ObservationTests.java | 1 + 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 42d879039e..a492dc9d61 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -106,7 +106,6 @@ import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption; import org.springframework.kafka.listener.ContainerProperties.EOSMode; import org.springframework.kafka.listener.adapter.AsyncRepliesAware; -import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter; import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; @@ -2720,7 +2719,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco catch (RuntimeException e) { failureTimer(sample, cRecord); recordInterceptAfter(cRecord, e); - if (!(this.listener instanceof MessagingMessageListenerAdapter)) { + if (!(this.listener instanceof RecordMessagingMessageListenerAdapter)) { observation.error(e); } if (this.commonErrorHandler == null) { @@ -2748,7 +2747,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco } } finally { - if (!(this.listener instanceof MessagingMessageListenerAdapter)) { + if (!(this.listener instanceof RecordMessagingMessageListenerAdapter)) { observation.stop(); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 9c83ee0691..8d73318ea8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -252,6 +252,15 @@ public void setHandlerMethod(HandlerAdapter handlerMethod) { this.handlerMethod = handlerMethod; } + /** + * Set the {@link ObservationRegistry} to handle observability. + * @param observationRegistry {@link ObservationRegistry} instance. + * @since 3.3.0 + */ + public void setObservationRegistry(ObservationRegistry observationRegistry) { + this.observationRegistry = observationRegistry; + } + public boolean isAsyncReplies() { return this.handlerMethod != null && this.handlerMethod.isAsyncReplies(); } @@ -499,24 +508,27 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle invocationResult.messageReturnType() : this.messageReturnType; + CompletableFuture completableFutureResult; + if (monoPresent && result instanceof Mono mono) { if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) { this.logger.warn("Container 'Acknowledgment' must be async ack for Mono return type " + "(or Kotlin suspend function); otherwise the container will ack the message immediately"); } - result = mono.toFuture(); + completableFutureResult = mono.toFuture(); } else if (!(result instanceof CompletableFuture)) { - result = CompletableFuture.completedFuture(result); + completableFutureResult = CompletableFuture.completedFuture(result); } else { + completableFutureResult = (CompletableFuture) result; if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) { this.logger.warn("Container 'Acknowledgment' must be async ack for Future return type; " + "otherwise the container will ack the message immediately"); } } - ((CompletableFuture) result).whenComplete((r, t) -> { + completableFutureResult.whenComplete((r, t) -> { try { if (t == null) { asyncSuccess(r, replyTopic, source, messageReturnType); @@ -899,10 +911,6 @@ private boolean rawByParameterIsType(Type parameterType, Type type) { return parameterType instanceof ParameterizedType pType && pType.getRawType().equals(type); } - public void setObservationRegistry(ObservationRegistry observationRegistry) { - this.observationRegistry = observationRegistry; - } - /** * Root object for reply expression evaluation. * @param request the request. diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index 8819e196f1..d5e78c7dd8 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -398,6 +398,7 @@ void observationErrorExceptionWhenCompletableFutureReturned(@Autowired Exception @Autowired @Qualifier("throwableTemplate") KafkaTemplate errorTemplate, @Autowired KafkaListenerEndpointRegistry endpointRegistry) throws ExecutionException, InterruptedException, TimeoutException { + errorTemplate.send(OBSERVATION_ERROR_COMPLETABLE_FUTURE, "testError").get(10, TimeUnit.SECONDS); Deque spans = tracer.getSpans(); await().untilAsserted(() -> assertThat(spans).hasSize(2));