From 9c5ee3e2de1a9872aefc0d8b0eee84054d90533f Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Tue, 15 Oct 2024 16:31:54 -0400 Subject: [PATCH] GH-3528: Improving Observability in Asynchronous Processing Fixes: #3528 https://github.com/spring-projects/spring-kafka/issues/3528 - Improve spring-kafka observability for failures in async consumer tasks when listener methods return CompletableFuture or Mono and throw errors during async execution - Refactoring code in KafkaMessageListenerContainer and MessagingMessageListenerAdapter around observability - Adding tests to verify - Add @Nullable annotations to relevant methods for better null safety --- .../KafkaMessageListenerContainer.java | 80 +++++++++++-------- .../listener/adapter/HandlerAdapter.java | 1 + .../MessagingMessageListenerAdapter.java | 75 ++++++++++++----- .../support/micrometer/ObservationTests.java | 71 ++++++++++++++-- 4 files changed, 168 insertions(+), 59 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index bbc8ae8ccb..42d879039e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -106,6 +106,8 @@ import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption; import org.springframework.kafka.listener.ContainerProperties.EOSMode; import org.springframework.kafka.listener.adapter.AsyncRepliesAware; +import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter; +import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaUtils; @@ -948,6 +950,10 @@ else if (listener instanceof MessageListener) { this.lastAlertPartition = new HashMap<>(); this.wasIdlePartition = new HashMap<>(); this.kafkaAdmin = obtainAdmin(); + + if (this.listener instanceof RecordMessagingMessageListenerAdapter rmmla) { + rmmla.setObservationRegistry(observationRegistry); + } } private AckMode determineAckMode() { @@ -2693,6 +2699,7 @@ private void pauseForNackSleep() { * @throws Error an error. */ @Nullable + @SuppressWarnings("try") private RuntimeException doInvokeRecordListener(final ConsumerRecord cRecord, // NOSONAR Iterator> iterator) { @@ -2703,42 +2710,49 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco () -> new KafkaRecordReceiverContext(cRecord, getListenerId(), getClientId(), this.consumerGroupId, this::clusterId), this.observationRegistry); - return observation.observe(() -> { + + observation.start(); + try (Observation.Scope ignored = observation.openScope()) { + invokeOnMessage(cRecord); + successTimer(sample, cRecord); + recordInterceptAfter(cRecord, null); + } + catch (RuntimeException e) { + failureTimer(sample, cRecord); + recordInterceptAfter(cRecord, e); + if (!(this.listener instanceof MessagingMessageListenerAdapter)) { + observation.error(e); + } + if (this.commonErrorHandler == null) { + throw e; + } try { - invokeOnMessage(cRecord); - successTimer(sample, cRecord); - recordInterceptAfter(cRecord, null); + invokeErrorHandler(cRecord, iterator, e); + commitOffsetsIfNeededAfterHandlingError(cRecord); } - catch (RuntimeException e) { - failureTimer(sample, cRecord); - recordInterceptAfter(cRecord, e); - if (this.commonErrorHandler == null) { - throw e; - } - observation.error(e); - try { - invokeErrorHandler(cRecord, iterator, e); - commitOffsetsIfNeededAfterHandlingError(cRecord); - } - catch (RecordInRetryException rire) { - this.logger.info("Record in retry and not yet recovered"); - return rire; - } - catch (KafkaException ke) { - ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger); - return ke; - } - catch (RuntimeException ee) { - this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION); - return ee; - } - catch (Error er) { // NOSONAR - this.logger.error(er, "Error handler threw an error"); - throw er; - } + catch (RecordInRetryException rire) { + this.logger.info("Record in retry and not yet recovered"); + return rire; } - return null; - }); + catch (KafkaException ke) { + ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger); + return ke; + } + catch (RuntimeException ee) { + this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION); + return ee; + } + catch (Error er) { // NOSONAR + this.logger.error(er, "Error handler threw an error"); + throw er; + } + } + finally { + if (!(this.listener instanceof MessagingMessageListenerAdapter)) { + observation.stop(); + } + } + return null; } private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord cRecord) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java index 82caa738fd..31f7743bb8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java @@ -65,6 +65,7 @@ public boolean isAsyncReplies() { return this.asyncReplies; } + @Nullable public Object invoke(Message message, Object... providedArgs) throws Exception { //NOSONAR if (this.invokerHandlerMethod != null) { return this.invokerHandlerMethod.invoke(message, providedArgs); // NOSONAR diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index f92e5b823f..9c83ee0691 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.stream.Collectors; import org.apache.commons.logging.LogFactory; @@ -73,6 +74,8 @@ import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationRegistry; import reactor.core.publisher.Mono; /** @@ -153,6 +156,8 @@ public abstract class MessagingMessageListenerAdapter implements ConsumerS private String correlationHeaderName = KafkaHeaders.CORRELATION_ID; + private ObservationRegistry observationRegistry = ObservationRegistry.NOOP; + /** * Create an instance with the provided bean and method. * @param bean the bean. @@ -382,15 +387,34 @@ protected Message toMessagingMessage(ConsumerRecord cRecord, @Nullable protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, Consumer consumer, final Message message) { + Throwable listenerError = null; + Object result = null; + Observation currentObservation = getCurrentObservation(); try { - Object result = invokeHandler(records, acknowledgment, message, consumer); + result = invokeHandler(records, acknowledgment, message, consumer); if (result != null) { handleResult(result, records, acknowledgment, consumer, message); } } - catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control + catch (ListenerExecutionFailedException e) { + listenerError = e; + currentObservation.error(e); handleException(records, acknowledgment, consumer, message, e); } + catch (Error e) { + listenerError = e; + currentObservation.error(e); + } + finally { + if (listenerError != null || result == null) { + currentObservation.stop(); + } + } + } + + private Observation getCurrentObservation() { + Observation currentObservation = this.observationRegistry.getCurrentObservation(); + return currentObservation == null ? Observation.NOOP : currentObservation; } /** @@ -402,6 +426,7 @@ protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, C * @param consumer the consumer. * @return the result of invocation. */ + @Nullable protected final Object invokeHandler(Object data, @Nullable Acknowledgment acknowledgment, Message message, Consumer consumer) { @@ -460,7 +485,7 @@ private RuntimeException checkAckArg(@Nullable Acknowledgment acknowledgment, Me */ protected void handleResult(Object resultArg, Object request, @Nullable Acknowledgment acknowledgment, Consumer consumer, @Nullable Message source) { - + final Observation observation = getCurrentObservation(); this.logger.debug(() -> "Listener method returned result [" + resultArg + "] - generating response message for it"); String replyTopic = evaluateReplyTopic(request, source, resultArg); @@ -474,35 +499,39 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle invocationResult.messageReturnType() : this.messageReturnType; - if (result instanceof CompletableFuture completable) { + if (monoPresent && result instanceof Mono mono) { + if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) { + this.logger.warn("Container 'Acknowledgment' must be async ack for Mono return type " + + "(or Kotlin suspend function); otherwise the container will ack the message immediately"); + } + result = mono.toFuture(); + } + else if (!(result instanceof CompletableFuture)) { + result = CompletableFuture.completedFuture(result); + } + else { if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) { this.logger.warn("Container 'Acknowledgment' must be async ack for Future return type; " + "otherwise the container will ack the message immediately"); } - completable.whenComplete((r, t) -> { + } + + ((CompletableFuture) result).whenComplete((r, t) -> { + try { if (t == null) { asyncSuccess(r, replyTopic, source, messageReturnType); acknowledge(acknowledgment); } else { - asyncFailure(request, acknowledgment, consumer, t, source); + Throwable cause = t instanceof CompletionException ? t.getCause() : t; + observation.error(cause); + asyncFailure(request, acknowledgment, consumer, cause, source); } - }); - } - else if (monoPresent && result instanceof Mono mono) { - if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) { - this.logger.warn("Container 'Acknowledgment' must be async ack for Mono return type " + - "(or Kotlin suspend function); otherwise the container will ack the message immediately"); } - mono.subscribe( - r -> asyncSuccess(r, replyTopic, source, messageReturnType), - t -> asyncFailure(request, acknowledgment, consumer, t, source), - () -> acknowledge(acknowledgment) - ); - } - else { - sendResponse(result, replyTopic, source, messageReturnType); - } + finally { + observation.stop(); + } + }); } @Nullable @@ -870,6 +899,10 @@ private boolean rawByParameterIsType(Type parameterType, Type type) { return parameterType instanceof ParameterizedType pType && pType.getRawType().equals(type); } + public void setObservationRegistry(ObservationRegistry observationRegistry) { + this.observationRegistry = observationRegistry; + } + /** * Root object for reply expression evaluation. * @param request the request. diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index 37f52c6b6e..8819e196f1 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -25,6 +25,7 @@ import java.util.Deque; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -87,6 +88,7 @@ import io.micrometer.tracing.propagation.Propagator; import io.micrometer.tracing.test.simple.SimpleSpan; import io.micrometer.tracing.test.simple.SimpleTracer; +import reactor.core.publisher.Mono; /** * @author Gary Russell @@ -112,7 +114,11 @@ public class ObservationTests { public final static String OBSERVATION_RUNTIME_EXCEPTION = "observation.runtime-exception"; - public final static String OBSERVATION_ERROR = "observation.error"; + public final static String OBSERVATION_ERROR = "observation.error.sync"; + + public final static String OBSERVATION_ERROR_COMPLETABLE_FUTURE = "observation.error.completableFuture"; + + public final static String OBSERVATION_ERROR_MONO = "observation.error.mono"; @Test void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate template, @@ -387,6 +393,41 @@ void observationErrorException(@Autowired ExceptionListener listener, @Autowired .hasMessage("obs5 error"); } + @Test + void observationErrorExceptionWhenCompletableFutureReturned(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer, + @Autowired @Qualifier("throwableTemplate") KafkaTemplate errorTemplate, + @Autowired KafkaListenerEndpointRegistry endpointRegistry) + throws ExecutionException, InterruptedException, TimeoutException { + errorTemplate.send(OBSERVATION_ERROR_COMPLETABLE_FUTURE, "testError").get(10, TimeUnit.SECONDS); + Deque spans = tracer.getSpans(); + await().untilAsserted(() -> assertThat(spans).hasSize(2)); + SimpleSpan span = spans.poll(); + assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate"); + span = spans.poll(); + assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs6-0"); + assertThat(span.getError()) + .isInstanceOf(Error.class) + .hasMessage("Should report metric."); + } + + @Test + void observationErrorExceptionWhenMonoReturned(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer, + @Autowired @Qualifier("throwableTemplate") KafkaTemplate errorTemplate, + @Autowired KafkaListenerEndpointRegistry endpointRegistry) + throws ExecutionException, InterruptedException, TimeoutException { + + errorTemplate.send(OBSERVATION_ERROR_MONO, "testError").get(10, TimeUnit.SECONDS); + Deque spans = tracer.getSpans(); + await().untilAsserted(() -> assertThat(spans).hasSize(2)); + SimpleSpan span = spans.poll(); + assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate"); + span = spans.poll(); + assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs7-0"); + assertThat(span.getError()) + .isInstanceOf(Error.class) + .hasMessage("Should report metric."); + } + @Test void kafkaAdminNotRecreatedIfBootstrapServersSameInProducerAndAdminConfig( @Autowired @Qualifier("reuseAdminBeanKafkaTemplate") KafkaTemplate template, @@ -590,14 +631,34 @@ public static class ExceptionListener { @KafkaListener(id = "obs4", topics = OBSERVATION_RUNTIME_EXCEPTION) void listenRuntimeException(ConsumerRecord in) { - this.latch4.countDown(); - throw new IllegalStateException("obs4 run time exception"); + try { + throw new IllegalStateException("obs4 run time exception"); + } + finally { + this.latch4.countDown(); + } } @KafkaListener(id = "obs5", topics = OBSERVATION_ERROR) void listenError(ConsumerRecord in) { - this.latch5.countDown(); - throw new Error("obs5 error"); + try { + throw new Error("obs5 error"); + } + finally { + this.latch5.countDown(); + } + } + + @KafkaListener(id = "obs6", topics = OBSERVATION_ERROR_COMPLETABLE_FUTURE) + CompletableFuture receive(ConsumerRecord record) { + return CompletableFuture.supplyAsync(() -> { + throw new Error("Should report metric."); + }); + } + + @KafkaListener(id = "obs7", topics = OBSERVATION_ERROR_MONO) + Mono receive1(ConsumerRecord record) { + return Mono.error(new Error("Should report metric.")); } }