Skip to content

Commit

Permalink
GH-2817: KafkaTemplate: No double error timers (#2823)
Browse files Browse the repository at this point in the history
* GH-2817: KafkaTemplate: No double error timers

Fixes #2817

An observation in `KafkaTemplate` can be marked with error from a `Callback`.
Then `Future` is evaluated and its exception is thrown back to the `observeSend()`.
Here this exception is caught and reported to the observation again.
This creates a second timer in the Micrometer, but with different error tag

* Check for error presence in the `observeSend()` `catch` block and skip
second report

**Cherry-pick to `3.0.x`**

* * Fix import order for Checkstyle rule
  • Loading branch information
artembilan authored Sep 25, 2023
1 parent 1957a8c commit b212b4d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -756,8 +756,11 @@ private CompletableFuture<SendResult<K, V>> observeSend(final ProducerRecord<K,
return doSend(producerRecord, observation);
}
catch (RuntimeException ex) {
observation.error(ex);
observation.stop();
// The error is added from org.apache.kafka.clients.producer.Callback
if (observation.getContext().getError() == null) {
observation.error(ex);
observation.stop();
}
throw ex;
}
}
Expand All @@ -783,7 +786,7 @@ protected CompletableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V>
}
Future<RecordMetadata> sendFuture =
producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample, observation));
// May be an immediate failure
// Maybe an immediate failure
if (sendFuture.isDone()) {
try {
sendFuture.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.kafka.support.micrometer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.mock;

Expand All @@ -33,6 +34,7 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.header.Headers;
import org.junit.jupiter.api.Test;

Expand All @@ -41,6 +43,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
Expand Down Expand Up @@ -81,8 +84,9 @@

/**
* @author Gary Russell
* @since 3.0
* @author Artem Bilan
*
* @since 3.0
*/
@SpringJUnitConfig
@EmbeddedKafka(topics = { "observation.testT1", "observation.testT2", "ObservationTests.testT3" })
Expand Down Expand Up @@ -216,6 +220,14 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context)
.getPropertyValue(endpointRegistry.getListenerContainer("obs3"), "containers", List.class).get(0);
cAdmin = KafkaTestUtils.getPropertyValue(container, "listenerConsumer.kafkaAdmin", KafkaAdmin.class);
assertThat(cAdmin).isSameAs(config.mockAdmin);

assertThatExceptionOfType(KafkaException.class)
.isThrownBy(() -> template.send("wrong%Topic", "data"))
.withCauseExactlyInstanceOf(InvalidTopicException.class);

MeterRegistryAssert.assertThat(meterRegistry)
.hasTimerWithNameAndTags("spring.kafka.template", KeyValues.of("error", "InvalidTopicException"))
.doesNotHaveMeterWithNameAndTags("spring.kafka.template", KeyValues.of("error", "KafkaException"));
}

@Configuration
Expand All @@ -235,15 +247,15 @@ KafkaAdmin admin(EmbeddedKafkaBroker broker) {
@Bean
ProducerFactory<Integer, String> producerFactory(EmbeddedKafkaBroker broker) {
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + ","
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + ","
+ broker.getBrokersAsString());
return new DefaultKafkaProducerFactory<>(producerProps);
}

@Bean
ConsumerFactory<Integer, String> consumerFactory(EmbeddedKafkaBroker broker) {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("obs", "false", broker);
consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + ","
consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + ","
+ broker.getBrokersAsString() + "," + broker.getBrokersAsString());
return new DefaultKafkaConsumerFactory<>(consumerProps);
}
Expand Down

0 comments on commit b212b4d

Please sign in to comment.