diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index c524238e6c..1db9cfc925 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -2768,37 +2768,37 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco DefaultKafkaListenerObservationConvention.INSTANCE, () -> new KafkaRecordReceiverContext(cRecord, getListenerId(), this::clusterId), this.observationRegistry); - return observation.observe(() -> { - try { + try { + observation.observe(() -> { invokeOnMessage(cRecord); successTimer(sample, cRecord); recordInterceptAfter(cRecord, null); + }); + } + catch (RuntimeException e) { + failureTimer(sample, cRecord); + recordInterceptAfter(cRecord, e); + if (this.commonErrorHandler == null) { + throw e; } - catch (RuntimeException e) { - failureTimer(sample, cRecord); - recordInterceptAfter(cRecord, e); - if (this.commonErrorHandler == null) { - throw e; - } - try { - invokeErrorHandler(cRecord, iterator, e); - commitOffsetsIfNeededAfterHandlingError(cRecord); - } - catch (KafkaException ke) { - ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger); - return ke; - } - catch (RuntimeException ee) { - this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION); - return ee; - } - catch (Error er) { // NOSONAR - this.logger.error(er, "Error handler threw an error"); - throw er; - } + try { + invokeErrorHandler(cRecord, iterator, e); + commitOffsetsIfNeededAfterHandlingError(cRecord); } - return null; - }); + catch (KafkaException ke) { + ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger); + return ke; + } + catch (RuntimeException ee) { + this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION); + return ee; + } + catch (Error er) { // NOSONAR + this.logger.error(er, "Error handler threw an error"); + throw er; + } + } + return null; } private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord cRecord) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index d67396aa8b..b5caf4e51e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.junit.jupiter.api.Test; @@ -54,7 +55,6 @@ import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention; import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention; import org.springframework.kafka.test.EmbeddedKafkaBroker; @@ -85,14 +85,20 @@ /** * @author Gary Russell * @author Artem Bilan + * @author Wang Zhiyang * * @since 3.0 */ @SpringJUnitConfig -@EmbeddedKafka(topics = { "observation.testT1", "observation.testT2", "ObservationTests.testT3" }) +@EmbeddedKafka(topics = { "observation.testT1", "observation.testT2", "observation.testT3", + ObservationTests.OBSERVATION_RUNTIME_EXCEPTION, ObservationTests.OBSERVATION_ERROR}) @DirtiesContext public class ObservationTests { + public final static String OBSERVATION_RUNTIME_EXCEPTION = "observation.runtime-exception"; + + public final static String OBSERVATION_ERROR = "observation.error"; + @Test void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate template, @Autowired SimpleTracer tracer, @Autowired KafkaListenerEndpointRegistry rler, @@ -106,8 +112,8 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate hdr.value()).isEqualTo("some foo value".getBytes()); - assertThat(headers.lastHeader("bar")).extracting(hdr -> hdr.value()).isEqualTo("some bar value".getBytes()); + assertThat(headers.lastHeader("foo")).extracting(Header::value).isEqualTo("some foo value".getBytes()); + assertThat(headers.lastHeader("bar")).extracting(Header::value).isEqualTo("some bar value".getBytes()); Deque spans = tracer.getSpans(); assertThat(spans).hasSize(4); SimpleSpan span = spans.poll(); @@ -148,14 +154,15 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) } }); + rler.getListenerContainer("obs1").stop(); rler.getListenerContainer("obs1").start(); template.send("observation.testT1", "test").get(10, TimeUnit.SECONDS); assertThat(listener.latch2.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(listener.record).isNotNull(); headers = listener.record.headers(); - assertThat(headers.lastHeader("foo")).extracting(hdr -> hdr.value()).isEqualTo("some foo value".getBytes()); - assertThat(headers.lastHeader("bar")).extracting(hdr -> hdr.value()).isEqualTo("some bar value".getBytes()); + assertThat(headers.lastHeader("foo")).extracting(Header::value).isEqualTo("some foo value".getBytes()); + assertThat(headers.lastHeader("bar")).extracting(Header::value).isEqualTo("some bar value".getBytes()); assertThat(spans).hasSize(4); span = spans.poll(); assertThat(span.getTags()).containsEntry("spring.kafka.template.name", "template"); @@ -230,6 +237,48 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) .doesNotHaveMeterWithNameAndTags("spring.kafka.template", KeyValues.of("error", "KafkaException")); } + @Test + void observationRuntimeException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer, + @Autowired @Qualifier("throwableTemplate") KafkaTemplate runtimeExceptionTemplate, + @Autowired KafkaListenerEndpointRegistry endpointRegistry) + throws ExecutionException, InterruptedException, TimeoutException { + + runtimeExceptionTemplate.send(OBSERVATION_RUNTIME_EXCEPTION, "testRuntimeException").get(10, TimeUnit.SECONDS); + assertThat(listener.latch4.await(10, TimeUnit.SECONDS)).isTrue(); + endpointRegistry.getListenerContainer("obs4").stop(); + + Deque spans = tracer.getSpans(); + assertThat(spans).hasSize(2); + SimpleSpan span = spans.poll(); + assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate"); + span = spans.poll(); + assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs4-0"); + assertThat(span.getError().getCause()) + .isInstanceOf(IllegalStateException.class) + .hasMessage("obs4 run time exception"); + } + + @Test + void observationErrorException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer, + @Autowired @Qualifier("throwableTemplate") KafkaTemplate errorTemplate, + @Autowired KafkaListenerEndpointRegistry endpointRegistry) + throws ExecutionException, InterruptedException, TimeoutException { + + errorTemplate.send(OBSERVATION_ERROR, "testError").get(10, TimeUnit.SECONDS); + assertThat(listener.latch5.await(10, TimeUnit.SECONDS)).isTrue(); + endpointRegistry.getListenerContainer("obs5").stop(); + + Deque spans = tracer.getSpans(); + assertThat(spans).hasSize(2); + SimpleSpan span = spans.poll(); + assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate"); + span = spans.poll(); + assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs5-0"); + assertThat(span.getError()) + .isInstanceOf(Error.class) + .hasMessage("obs5 error"); + } + @Configuration @EnableKafka public static class Config { @@ -276,6 +325,13 @@ KafkaTemplate customTemplate(ProducerFactory p return template; } + @Bean + KafkaTemplate throwableTemplate(ProducerFactory pf) { + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setObservationEnabled(true); + return template; + } + @Bean ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConsumerFactory cf) { @@ -286,7 +342,7 @@ ConcurrentKafkaListenerContainerFactory kafkaListenerContainerF factory.getContainerProperties().setObservationEnabled(true); factory.setContainerCustomizer(container -> { if (container.getListenerId().equals("obs3")) { - ((AbstractMessageListenerContainer) container).setKafkaAdmin(this.mockAdmin); + container.setKafkaAdmin(this.mockAdmin); } }); return factory; @@ -352,6 +408,11 @@ Listener listener(KafkaTemplate template) { return new Listener(template); } + @Bean + ExceptionListener exceptionListener() { + return new ExceptionListener(); + } + } public static class Listener { @@ -387,4 +448,24 @@ void listen3(ConsumerRecord in) { } + public static class ExceptionListener { + + final CountDownLatch latch4 = new CountDownLatch(1); + + final CountDownLatch latch5 = new CountDownLatch(1); + + @KafkaListener(id = "obs4", topics = OBSERVATION_RUNTIME_EXCEPTION) + void listenRuntimeException(ConsumerRecord in) { + this.latch4.countDown(); + throw new IllegalStateException("obs4 run time exception"); + } + + @KafkaListener(id = "obs5", topics = OBSERVATION_ERROR) + void listenError(ConsumerRecord in) { + this.latch5.countDown(); + throw new Error("obs5 error"); + } + + } + }