From 2e7b3696bb69529ea3f4d96f8fe2e371ee9e60fd Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Mon, 26 Sep 2022 19:48:25 -0500 Subject: [PATCH] Add Observations for send/receive * Observes sends on PulsarTemplate * Observes receives on PulsarListener * Adds auto-generated adocs Closes #29 --- spring-pulsar-dependencies/build.gradle | 4 + spring-pulsar-docs/build.gradle | 36 +- .../src/main/asciidoc/pulsar.adoc | 21 ++ spring-pulsar/build.gradle | 5 + ...bstractPulsarListenerContainerFactory.java | 4 +- ...currentPulsarListenerContainerFactory.java | 1 - .../pulsar/core/PulsarTemplate.java | 122 ++++++- ...bstractPulsarMessageListenerContainer.java | 2 +- ...DefaultPulsarMessageListenerContainer.java | 86 +++-- .../listener/PulsarContainerProperties.java | 29 ++ ...ltPulsarListenerObservationConvention.java | 51 +++ ...ltPulsarTemplateObservationConvention.java | 51 +++ .../PulsarListenerObservation.java | 75 +++++ .../PulsarListenerObservationConvention.java | 39 +++ .../PulsarMessageReceiverContext.java | 58 ++++ .../PulsarMessageSenderContext.java | 91 +++++ .../PulsarTemplateObservation.java | 75 +++++ .../PulsarTemplateObservationConvention.java | 39 +++ ...ntPulsarMessageListenerContainerTests.java | 19 ++ .../ObservationIntegrationTests.java | 185 ++++++++++ .../pulsar/observation/ObservationTests.java | 316 ++++++++++++++++++ 21 files changed, 1260 insertions(+), 49 deletions(-) create mode 100644 spring-pulsar/src/main/java/org/springframework/pulsar/observation/DefaultPulsarListenerObservationConvention.java create mode 100644 spring-pulsar/src/main/java/org/springframework/pulsar/observation/DefaultPulsarTemplateObservationConvention.java create mode 100644 spring-pulsar/src/main/java/org/springframework/pulsar/observation/PulsarListenerObservation.java create mode 100644 spring-pulsar/src/main/java/org/springframework/pulsar/observation/PulsarListenerObservationConvention.java create mode 100644 spring-pulsar/src/main/java/org/springframework/pulsar/observation/PulsarMessageReceiverContext.java create mode 100644 spring-pulsar/src/main/java/org/springframework/pulsar/observation/PulsarMessageSenderContext.java create mode 100644 spring-pulsar/src/main/java/org/springframework/pulsar/observation/PulsarTemplateObservation.java create mode 100644 spring-pulsar/src/main/java/org/springframework/pulsar/observation/PulsarTemplateObservationConvention.java create mode 100644 spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationIntegrationTests.java create mode 100644 spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationTests.java diff --git a/spring-pulsar-dependencies/build.gradle b/spring-pulsar-dependencies/build.gradle index d095d347..5d4b044f 100644 --- a/spring-pulsar-dependencies/build.gradle +++ b/spring-pulsar-dependencies/build.gradle @@ -17,6 +17,8 @@ ext { jaywayJsonPathVersion = '2.6.0' junitJupiterVersion = '5.9.0' log4jVersion = '2.18.0' + micrometerVersion = '1.10.0-SNAPSHOT' + micrometerTracingVersion = '1.0.0-SNAPSHOT' mockitoVersion = '4.6.1' protobufJavaVersion = '3.21.5' pulsarTestcontainersVersion = '1.17.3' @@ -35,6 +37,8 @@ dependencies { api platform("org.junit:junit-bom:$junitJupiterVersion") api platform("org.mockito:mockito-bom:$mockitoVersion") api platform("org.springframework:spring-framework-bom:$springVersion") + api platform("io.micrometer:micrometer-bom:$micrometerVersion") + api platform("io.micrometer:micrometer-tracing-bom:$micrometerTracingVersion") constraints { api "com.github.ben-manes.caffeine:caffeine:$caffeineVersion" diff --git a/spring-pulsar-docs/build.gradle b/spring-pulsar-docs/build.gradle index 11071bd9..9d0cd39f 100644 --- a/spring-pulsar-docs/build.gradle +++ b/spring-pulsar-docs/build.gradle @@ -5,14 +5,21 @@ plugins { description = 'Spring Pulsar Docs' +ext { + micrometerDocsVersion="1.0.0-SNAPSHOT" +} + configurations { configurationProperties + observationDocs } dependencies { api project (':spring-pulsar') api 'org.springframework.boot:spring-boot-starter' configurationProperties(project(path: ":spring-pulsar-spring-boot-autoconfigure", configuration: "configurationPropertiesMetadata")) + observationDocs "io.micrometer:micrometer-docs-generator-spans:$micrometerDocsVersion" + observationDocs "io.micrometer:micrometer-docs-generator-metrics:$micrometerDocsVersion" } task aggregatedJavadoc(type: Javadoc) { @@ -53,6 +60,29 @@ task documentConfigurationProperties(type: org.springframework.pulsar.gradle.doc outputDir = file("${buildDir}/docs/generated/") } +def observationsInputDir = file("${rootDir}/spring-pulsar/src/main/java/org/springframework/pulsar/observation").absolutePath +def observationsOutputDir = file("${buildDir}/docs/generated/observation/").absolutePath + +task generateObservabilityMetricsDocs(type: JavaExec) { + mainClass = 'io.micrometer.docs.metrics.DocsFromSources' + inputs.dir(observationsInputDir) + outputs.dir(observationsOutputDir) + classpath configurations.observationDocs + args observationsInputDir, '.*', observationsOutputDir +} + +task generateObservabilitySpansDocs(type: JavaExec) { + mainClass = 'io.micrometer.docs.spans.DocsFromSources' + inputs.dir(observationsInputDir) + outputs.dir(observationsOutputDir) + classpath configurations.observationDocs + args observationsInputDir, '.*', observationsOutputDir +} + +task generateObservabilityDocs { + dependsOn generateObservabilityMetricsDocs, generateObservabilitySpansDocs +} + tasks.withType(org.asciidoctor.gradle.jvm.AbstractAsciidoctorTask) { asciidoctorj { fatalWarnings = ['^((?!successfully validated).)*$'] @@ -81,7 +111,7 @@ task asciidoctorMultipage(type: org.asciidoctor.gradle.jvm.AsciidoctorTask) { } syncDocumentationSourceForAsciidoctor { - dependsOn documentConfigurationProperties + dependsOn documentConfigurationProperties, generateObservabilityDocs from("${buildDir}/docs/generated") { into "asciidoc" } @@ -91,7 +121,7 @@ syncDocumentationSourceForAsciidoctor { } syncDocumentationSourceForAsciidoctorMultipage { - dependsOn documentConfigurationProperties + dependsOn documentConfigurationProperties, generateObservabilityDocs from("${buildDir}/docs/generated") { into "asciidoc" } @@ -101,7 +131,7 @@ syncDocumentationSourceForAsciidoctorMultipage { } syncDocumentationSourceForAsciidoctorPdf { - dependsOn documentConfigurationProperties + dependsOn documentConfigurationProperties, generateObservabilityDocs from("${buildDir}/docs/generated") { into "asciidoc" } diff --git a/spring-pulsar-docs/src/main/asciidoc/pulsar.adoc b/spring-pulsar-docs/src/main/asciidoc/pulsar.adoc index c3dd5d1f..bbd80648 100644 --- a/spring-pulsar-docs/src/main/asciidoc/pulsar.adoc +++ b/spring-pulsar-docs/src/main/asciidoc/pulsar.adoc @@ -1222,6 +1222,27 @@ PulsarTopic partitionedTopic { ---- ==== +[[micrometer]] +=== Observability + +[[observation]] +==== Micrometer Observation +The `PulsarTemplate` and `PulsarListener` are instrumented with the Micrometer observations API. +When enabled, send and receive operations are traced and timed. + +To enable, set `observationEnabled` on each component. + +===== Custom tags +The default implementation adds the `bean.name` tag for template observations and `listener.id` tag for listener observations. +To add other tags to timers/traces, configure a custom `PulsarTemplateObservationConvention` or `PulsarListenerObservationConvention` to the template or listener container, respectively. + +TIP: You can either subclass `DefaultPulsarTemplateObservationConvention` or `DefaultPulsarListenerObservationConvention` or provide completely new implementations. + +include::observation/_metrics.adoc[leveloffset=+2] + +include::observation/_spans.adoc[leveloffset=+2] + +Refer to https://micrometer.io/docs/tracing[Micrometer Tracing] for more information. ==== Appendix The reference documentation has the following appendices: diff --git a/spring-pulsar/build.gradle b/spring-pulsar/build.gradle index c0f14110..3ac9c9d0 100644 --- a/spring-pulsar/build.gradle +++ b/spring-pulsar/build.gradle @@ -7,6 +7,7 @@ description = 'Spring Pulsar Support' dependencies { api 'com.github.ben-manes.caffeine:caffeine' api 'com.google.protobuf:protobuf-java' + api 'io.micrometer:micrometer-observation' api 'org.apache.pulsar:pulsar-client-all' api 'org.springframework:spring-context' api 'org.springframework:spring-messaging' @@ -26,6 +27,10 @@ dependencies { testRuntimeOnly 'org.junit.platform:junit-platform-launcher' testRuntimeOnly 'org.apache.logging.log4j:log4j-core' testRuntimeOnly 'org.apache.logging.log4j:log4j-jcl' + testImplementation 'io.micrometer:micrometer-observation-test' + testImplementation 'io.micrometer:micrometer-tracing-bridge-brave' + testImplementation 'io.micrometer:micrometer-tracing-test' + testImplementation 'io.micrometer:micrometer-tracing-integration-test' testImplementation 'org.assertj:assertj-core' testImplementation 'org.awaitility:awaitility' testImplementation 'org.hamcrest:hamcrest' diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/AbstractPulsarListenerContainerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/AbstractPulsarListenerContainerFactory.java index cf4adf08..853539f3 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/AbstractPulsarListenerContainerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/AbstractPulsarListenerContainerFactory.java @@ -118,7 +118,7 @@ public void afterPropertiesSet() { @Override public C createListenerContainer(PulsarListenerEndpoint endpoint) { C instance = createContainerInstance(endpoint); - JavaUtils.INSTANCE.acceptIfNotNull(endpoint.getSubscriptionName(), instance::setBeanName); + JavaUtils.INSTANCE.acceptIfNotNull(endpoint.getId(), instance::setBeanName); if (endpoint instanceof AbstractPulsarListenerEndpoint) { configureEndpoint((AbstractPulsarListenerEndpoint) endpoint); } @@ -171,6 +171,8 @@ else if (this.autoStartup != null) { instanceProperties.setMaxNumMessages(this.containerProperties.getMaxNumMessages()); instanceProperties.setMaxNumBytes(this.containerProperties.getMaxNumBytes()); instanceProperties.setBatchTimeoutMillis(this.containerProperties.getBatchTimeoutMillis()); + instanceProperties.setObservationEnabled(this.containerProperties.isObservationEnabled()); + instanceProperties.setObservationConvention(this.containerProperties.getObservationConvention()); JavaUtils.INSTANCE.acceptIfNotNull(this.phase, instance::setPhase) .acceptIfNotNull(this.applicationContext, instance::setApplicationContext) diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java index ed5526f9..0372a514 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java @@ -73,7 +73,6 @@ protected ConcurrentPulsarMessageListenerContainer createContainerInstance(Pu } properties.setSchemaType(endpoint.getSchemaType()); - return new ConcurrentPulsarMessageListenerContainer(getPulsarConsumerFactory(), properties); } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java index 09abc98d..105d8530 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java @@ -29,7 +29,19 @@ import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; +import org.springframework.beans.factory.BeanNameAware; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.SmartInitializingSingleton; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; import org.springframework.core.log.LogAccessor; +import org.springframework.pulsar.observation.DefaultPulsarTemplateObservationConvention; +import org.springframework.pulsar.observation.PulsarMessageSenderContext; +import org.springframework.pulsar.observation.PulsarTemplateObservation; +import org.springframework.pulsar.observation.PulsarTemplateObservationConvention; + +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationRegistry; /** * A thread-safe template for executing high-level Pulsar operations. @@ -39,7 +51,8 @@ * @author Chris Bono * @author Alexander Preuß */ -public class PulsarTemplate implements PulsarOperations { +public class PulsarTemplate + implements PulsarOperations, ApplicationContextAware, BeanNameAware, SmartInitializingSingleton { private final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass())); @@ -47,8 +60,18 @@ public class PulsarTemplate implements PulsarOperations { private final List interceptors; + private ApplicationContext applicationContext; + + private String beanName; + private Schema schema; + private boolean observationEnabled; + + private PulsarTemplateObservationConvention observationConvention; + + private ObservationRegistry observationRegistry; + /** * Construct a template instance. * @param producerFactory the factory used to create the backing Pulsar producers. @@ -92,14 +115,52 @@ public SendMessageBuilder newMessage(T message) { return new SendMessageBuilderImpl<>(this, message); } + @Override + public void setApplicationContext(ApplicationContext applicationContext) { + this.applicationContext = applicationContext; + } + + @Override + public void setBeanName(String beanName) { + this.beanName = beanName; + } + /** - * Setter for schema. + * Set the schema to use on this template. * @param schema provides the {@link Schema} used on this template */ public void setSchema(Schema schema) { this.schema = schema; } + /** + * Set to true to enable observation via Micrometer. + * @param observationEnabled true to enable. + */ + public void setObservationEnabled(boolean observationEnabled) { + this.observationEnabled = observationEnabled; + } + + /** + * Set a custom observation convention. + * @param observationConvention the convention. + */ + public void setObservationConvention(PulsarTemplateObservationConvention observationConvention) { + this.observationConvention = observationConvention; + } + + @Override + public void afterSingletonsInstantiated() { + // TODO is this how we want to do this? What about SBAC? + // TODO when would AC be null? Should we assert or at least log the fact if it + // happens? + if (this.observationEnabled && this.observationRegistry == null && this.applicationContext != null) { + ObjectProvider registry = this.applicationContext + .getBeanProvider(ObservationRegistry.class); + this.observationRegistry = registry.getIfUnique(); + } + } + private MessageId doSend(String topic, T message, TypedMessageBuilderCustomizer typedMessageBuilderCustomizer, MessageRouter messageRouter, ProducerBuilderCustomizer producerCustomizer) throws PulsarClientException { try { @@ -115,22 +176,49 @@ private CompletableFuture doSendAsync(String topic, T message, ProducerBuilderCustomizer producerCustomizer) throws PulsarClientException { final String topicName = ProducerUtils.resolveTopicName(topic, this.producerFactory); this.logger.trace(() -> String.format("Sending msg to '%s' topic", topicName)); - final Producer producer = prepareProducerForSend(topic, message, messageRouter, producerCustomizer); - TypedMessageBuilder messageBuilder = producer.newMessage().value(message); - if (typedMessageBuilderCustomizer != null) { - typedMessageBuilderCustomizer.customize(messageBuilder); - } - return messageBuilder.sendAsync().whenComplete((msgId, ex) -> { - if (ex == null) { - this.logger.trace(() -> String.format("Sent msg to '%s' topic", topicName)); - // TODO success metrics - } - else { - this.logger.error(ex, () -> String.format("Failed to send msg to '%s' topic", topicName)); - // TODO fail metrics + + PulsarMessageSenderContext senderContext = PulsarMessageSenderContext.newContext(topicName, this.beanName); + Observation observation = newObservation(senderContext); + try { + observation.start(); + final Producer producer = prepareProducerForSend(topic, message, messageRouter, producerCustomizer); + TypedMessageBuilder messageBuilder = producer.newMessage().value(message); + if (typedMessageBuilderCustomizer != null) { + typedMessageBuilderCustomizer.customize(messageBuilder); } - ProducerUtils.closeProducerAsync(producer, this.logger); - }); + senderContext.properties().forEach(messageBuilder::property); // propagate + // props to + // message + return messageBuilder.sendAsync().whenComplete((msgId, ex) -> { + if (ex == null) { + this.logger.trace(() -> String.format("Sent msg to '%s' topic", topicName)); + observation.stop(); + } + else { + this.logger.error(ex, () -> String.format("Failed to send msg to '%s' topic", topicName)); + observation.error(ex); + observation.stop(); + } + ProducerUtils.closeProducerAsync(producer, this.logger); + }); + } + catch (RuntimeException ex) { + observation.error(ex); + observation.stop(); + throw ex; + } + } + + private Observation newObservation(PulsarMessageSenderContext senderContext) { + Observation observation; + if (!this.observationEnabled || this.observationRegistry == null) { + observation = Observation.NOOP; + } + else { + observation = PulsarTemplateObservation.TEMPLATE_OBSERVATION.observation(this.observationConvention, + DefaultPulsarTemplateObservationConvention.INSTANCE, senderContext, this.observationRegistry); + } + return observation; } private Producer prepareProducerForSend(String topic, T message, MessageRouter messageRouter, diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/AbstractPulsarMessageListenerContainer.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/AbstractPulsarMessageListenerContainer.java index 85dd0f84..9be102e5 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/AbstractPulsarMessageListenerContainer.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/AbstractPulsarMessageListenerContainer.java @@ -111,7 +111,7 @@ public void setBeanName(String name) { */ @Nullable public String getBeanName() { - return this.beanName; + return this.beanName; // the container factory sets this to the listener id } @Override diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java index e462d5ed..aa61fed8 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java @@ -45,18 +45,27 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationEventPublisher; import org.springframework.core.log.LogAccessor; import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.lang.Nullable; import org.springframework.pulsar.core.PulsarConsumerFactory; import org.springframework.pulsar.event.ConsumerFailedToStartEvent; import org.springframework.pulsar.event.ConsumerStartedEvent; import org.springframework.pulsar.event.ConsumerStartingEvent; +import org.springframework.pulsar.observation.DefaultPulsarListenerObservationConvention; +import org.springframework.pulsar.observation.PulsarListenerObservation; +import org.springframework.pulsar.observation.PulsarMessageReceiverContext; import org.springframework.scheduling.SchedulingAwareRunnable; import org.springframework.util.Assert; import org.springframework.util.StringUtils; +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationRegistry; + /** * Default implementation for {@link PulsarMessageListenerContainer}. * @@ -97,7 +106,15 @@ protected void doStart() { containerProperties.setConsumerTaskExecutor(consumerExecutor); } - this.listenerConsumer = new Listener(messageListener); + ObservationRegistry observationRegistry = null; + ApplicationContext applicationContext = getApplicationContext(); + if (applicationContext != null) { + ObjectProvider registry = applicationContext + .getBeanProvider(ObservationRegistry.class); + observationRegistry = registry.getIfUnique(); + } + + this.listenerConsumer = new Listener(messageListener, observationRegistry); setRunning(true); this.startLatch = new CountDownLatch(1); this.listenerConsumerFuture = consumerExecutor.submit(this.listenerConsumer); @@ -162,6 +179,8 @@ private final class Listener implements SchedulingAwareRunnable { private final PulsarBatchMessageListener batchMessageListener; + private final ObservationRegistry observationRegistry; + private Consumer consumer; private final Set nackableMessages = new HashSet<>(); @@ -179,7 +198,7 @@ private final class Listener implements SchedulingAwareRunnable { private final SubscriptionType subscriptionType = this.containerProperties.getSubscriptionType(); @SuppressWarnings({ "unchecked", "rawtypes" }) - Listener(MessageListener messageListener) { + Listener(MessageListener messageListener, @Nullable ObservationRegistry observationRegistry) { if (messageListener instanceof PulsarBatchMessageListener) { this.batchMessageListener = (PulsarBatchMessageListener) messageListener; this.listener = null; @@ -192,6 +211,7 @@ else if (messageListener != null) { this.listener = null; this.batchMessageListener = null; } + this.observationRegistry = observationRegistry; this.pulsarConsumerErrorHandler = getPulsarConsumerErrorHandler(); try { final PulsarContainerProperties pulsarContainerProperties = getPulsarContainerProperties(); @@ -341,37 +361,51 @@ public void run() { else { for (Message message : messages) { do { - try { - if (this.listener instanceof PulsarAcknowledgingMessageListener) { - this.listener.received(this.consumer, message, this.ackMode.equals(AckMode.MANUAL) - ? new ConsumerAcknowledgment(this.consumer, message) : null); - } - else if (this.listener != null) { - this.listener.received(this.consumer, message); - } - if (this.ackMode.equals(AckMode.RECORD)) { - handleAck(message); - } - inRetryMode.compareAndSet(true, false); + Observation observation; + if (!this.containerProperties.isObservationEnabled() || this.observationRegistry == null) { + observation = Observation.NOOP; } - catch (Exception e) { - if (this.pulsarConsumerErrorHandler != null) { - invokeRecordListenerErrorHandler(inRetryMode, message, e); - } - else { + else { + observation = PulsarListenerObservation.LISTENER_OBSERVATION.observation( + this.containerProperties.getObservationConvention(), + DefaultPulsarListenerObservationConvention.INSTANCE, + new PulsarMessageReceiverContext(message, getBeanName()), + this.observationRegistry); + } + observation.observe(() -> { + try { + if (this.listener instanceof PulsarAcknowledgingMessageListener) { + this.listener.received(this.consumer, message, + this.ackMode.equals(AckMode.MANUAL) + ? new ConsumerAcknowledgment(this.consumer, message) : null); + } + else if (this.listener != null) { + this.listener.received(this.consumer, message); + } if (this.ackMode.equals(AckMode.RECORD)) { - this.consumer.negativeAcknowledge(message); + handleAck(message); } - else if (this.ackMode.equals(AckMode.BATCH)) { - this.nackableMessages.add(message.getMessageId()); + inRetryMode.compareAndSet(true, false); + } + catch (Exception e) { + if (this.pulsarConsumerErrorHandler != null) { + invokeRecordListenerErrorHandler(inRetryMode, message, e); } else { - throw new IllegalStateException(String.format( - "Exception occurred and message %s was not auto-nacked; switch to AckMode BATCH or RECORD to enable auto-nacks", - message.getMessageId()), e); + if (this.ackMode.equals(AckMode.RECORD)) { + this.consumer.negativeAcknowledge(message); + } + else if (this.ackMode.equals(AckMode.BATCH)) { + this.nackableMessages.add(message.getMessageId()); + } + else { + throw new IllegalStateException(String.format( + "Exception occurred and message %s was not auto-nacked; switch to AckMode BATCH or RECORD to enable auto-nacks", + message.getMessageId()), e); + } } } - } + }); } while (inRetryMode.get()); } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarContainerProperties.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarContainerProperties.java index b41f5966..709885a5 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarContainerProperties.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarContainerProperties.java @@ -24,6 +24,7 @@ import org.apache.pulsar.common.schema.SchemaType; import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.pulsar.observation.PulsarListenerObservationConvention; import org.springframework.util.Assert; /** @@ -64,6 +65,10 @@ public class PulsarContainerProperties { private AckMode ackMode = AckMode.BATCH; + private boolean observationEnabled; + + private PulsarListenerObservationConvention observationConvention; + private Properties pulsarConsumerProperties = new Properties(); public PulsarContainerProperties(String... topics) { @@ -140,6 +145,30 @@ public void setAckMode(AckMode ackMode) { this.ackMode = ackMode; } + public boolean isObservationEnabled() { + return this.observationEnabled; + } + + /** + * Set to true to enable observations. + * @param observationEnabled true to enable. + */ + public void setObservationEnabled(boolean observationEnabled) { + this.observationEnabled = observationEnabled; + } + + public PulsarListenerObservationConvention getObservationConvention() { + return this.observationConvention; + } + + /** + * Set a custom observation convention. + * @param observationConvention the convention. + */ + public void setObservationConvention(PulsarListenerObservationConvention observationConvention) { + this.observationConvention = observationConvention; + } + public Duration getConsumerStartTimeout() { return this.consumerStartTimeout; } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/observation/DefaultPulsarListenerObservationConvention.java b/spring-pulsar/src/main/java/org/springframework/pulsar/observation/DefaultPulsarListenerObservationConvention.java new file mode 100644 index 00000000..ec0de193 --- /dev/null +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/observation/DefaultPulsarListenerObservationConvention.java @@ -0,0 +1,51 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.observation; + +import io.micrometer.common.KeyValues; + +/** + * Default {@link PulsarListenerObservationConvention} for Pulsar listener key values. + * + * @author Chris Bono + */ +public class DefaultPulsarListenerObservationConvention implements PulsarListenerObservationConvention { + + /** + * A singleton instance of the convention. + */ + public static final DefaultPulsarListenerObservationConvention INSTANCE = new DefaultPulsarListenerObservationConvention(); + + @Override + public KeyValues getLowCardinalityKeyValues(PulsarMessageReceiverContext context) { + return KeyValues.of(PulsarListenerObservation.ListenerLowCardinalityTags.LISTENER_ID.asString(), + context.getListenerId()); + } + + // Remove once addressed: + // https://github.com/micrometer-metrics/micrometer-docs-generator/issues/30 + @Override + public String getName() { + return "spring.pulsar.listener"; + } + + @Override + public String getContextualName(PulsarMessageReceiverContext context) { + return context.getSource() + " receive"; + } + +} diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/observation/DefaultPulsarTemplateObservationConvention.java b/spring-pulsar/src/main/java/org/springframework/pulsar/observation/DefaultPulsarTemplateObservationConvention.java new file mode 100644 index 00000000..3045539d --- /dev/null +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/observation/DefaultPulsarTemplateObservationConvention.java @@ -0,0 +1,51 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.observation; + +import io.micrometer.common.KeyValues; + +/** + * Default {@link PulsarTemplateObservationConvention} for Pulsar template key values. + * + * @author Chris Bono + */ +public class DefaultPulsarTemplateObservationConvention implements PulsarTemplateObservationConvention { + + /** + * A singleton instance of the convention. + */ + public static final DefaultPulsarTemplateObservationConvention INSTANCE = new DefaultPulsarTemplateObservationConvention(); + + @Override + public KeyValues getLowCardinalityKeyValues(PulsarMessageSenderContext context) { + return KeyValues.of(PulsarTemplateObservation.TemplateLowCardinalityTags.BEAN_NAME.asString(), + context.getBeanName()); + } + + // Remove once addressed: + // https://github.com/micrometer-metrics/micrometer-docs-generator/issues/30 + @Override + public String getName() { + return "spring.pulsar.template"; + } + + @Override + public String getContextualName(PulsarMessageSenderContext context) { + return context.getDestination() + " send"; + } + +} diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/observation/PulsarListenerObservation.java b/spring-pulsar/src/main/java/org/springframework/pulsar/observation/PulsarListenerObservation.java new file mode 100644 index 00000000..19f3282f --- /dev/null +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/observation/PulsarListenerObservation.java @@ -0,0 +1,75 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.observation; + +import org.springframework.pulsar.annotation.PulsarListener; + +import io.micrometer.common.docs.KeyName; +import io.micrometer.observation.Observation; +import io.micrometer.observation.Observation.Context; +import io.micrometer.observation.ObservationConvention; +import io.micrometer.observation.docs.ObservationDocumentation; + +/** + * An {@link Observation} for a {@link PulsarListener}. + * + * @author Chris Bono + */ +public enum PulsarListenerObservation implements ObservationDocumentation { + + /** + * Observation created when a Pulsar listener receives a message. + */ + LISTENER_OBSERVATION { + + @Override + public Class> getDefaultConvention() { + return DefaultPulsarListenerObservationConvention.class; + } + + @Override + public String getPrefix() { + return "spring.pulsar.listener"; + } + + @Override + public KeyName[] getLowCardinalityKeyNames() { + return ListenerLowCardinalityTags.values(); + } + + }; + + /** + * Low cardinality tags. + */ + public enum ListenerLowCardinalityTags implements KeyName { + + /** + * Id of the listener container that received the message. + */ + LISTENER_ID { + + @Override + public String asString() { + return "spring.pulsar.listener.id"; + } + + } + + } + +} diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/observation/PulsarListenerObservationConvention.java b/spring-pulsar/src/main/java/org/springframework/pulsar/observation/PulsarListenerObservationConvention.java new file mode 100644 index 00000000..a3b73a04 --- /dev/null +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/observation/PulsarListenerObservationConvention.java @@ -0,0 +1,39 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.observation; + +import io.micrometer.observation.Observation.Context; +import io.micrometer.observation.ObservationConvention; + +/** + * {@link ObservationConvention} for Pulsar listener key values. + * + * @author Chris Bono + */ +public interface PulsarListenerObservationConvention extends ObservationConvention { + + @Override + default boolean supportsContext(Context context) { + return context instanceof PulsarMessageReceiverContext; + } + + @Override + default String getName() { + return "spring.pulsar.listener"; + } + +} diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/observation/PulsarMessageReceiverContext.java b/spring-pulsar/src/main/java/org/springframework/pulsar/observation/PulsarMessageReceiverContext.java new file mode 100644 index 00000000..43ca5615 --- /dev/null +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/observation/PulsarMessageReceiverContext.java @@ -0,0 +1,58 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.observation; + +import org.apache.pulsar.client.api.Message; + +import io.micrometer.observation.transport.ReceiverContext; + +/** + * {@link ReceiverContext} for Pulsar messages. + * + * @author Chris Bono + */ +public class PulsarMessageReceiverContext extends ReceiverContext> { + + private final String listenerId; + + private final Message message; + + public PulsarMessageReceiverContext(Message message, String listenerId) { + super((carrier, key) -> carrier.getProperty(key)); + setCarrier(message); + this.message = message; + this.listenerId = listenerId; + } + + /** + * The identifier of the listener receiving the message (typically a + * {@code PulsarListener}). + * @return the identifier of the listener receiving the message + */ + public String getListenerId() { + return this.listenerId; + } + + /** + * The name of the topic the message came from. + * @return the name of the topic the message came from + */ + public String getSource() { + return this.message.getTopicName(); + } + +} diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/observation/PulsarMessageSenderContext.java b/spring-pulsar/src/main/java/org/springframework/pulsar/observation/PulsarMessageSenderContext.java new file mode 100644 index 00000000..8e6f5996 --- /dev/null +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/observation/PulsarMessageSenderContext.java @@ -0,0 +1,91 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.observation; + +import java.util.HashMap; +import java.util.Map; + +import org.springframework.pulsar.observation.PulsarMessageSenderContext.MessageHolder; + +import io.micrometer.observation.transport.SenderContext; + +/** + * {@link SenderContext} for Pulsar messages. + * + * @author Chris Bono + */ +public final class PulsarMessageSenderContext extends SenderContext { + + private final String beanName; + + private final String destination; + + private PulsarMessageSenderContext(MessageHolder messageHolder, String topic, String beanName) { + super((carrier, key, value) -> messageHolder.property(key, value)); + setCarrier(messageHolder); + this.beanName = beanName; + this.destination = topic; + } + + public static PulsarMessageSenderContext newContext(String topic, String beanName) { + MessageHolder messageHolder = new MessageHolder(); + PulsarMessageSenderContext senderContext = new PulsarMessageSenderContext(messageHolder, topic, beanName); + return senderContext; + } + + public Map properties() { + return getCarrier().properties(); + } + + /** + * The name of the bean sending the message (typically a {@code PulsarTemplate}). + * @return the name of the bean sending the message + */ + public String getBeanName() { + return this.beanName; + } + + /** + * The destination topic for the message. + * @return the topic the message is being sent to + */ + public String getDestination() { + return this.destination; + } + + /** + * Acts as a carrier for a Pulsar message and records the propagated properties for + * later access by the Pulsar message builder. + */ + public static final class MessageHolder { + + private final Map properties = new HashMap<>(); + + private MessageHolder() { + } + + public void property(String key, String value) { + this.properties.put(key, value); + } + + public Map properties() { + return this.properties; + } + + } + +} diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/observation/PulsarTemplateObservation.java b/spring-pulsar/src/main/java/org/springframework/pulsar/observation/PulsarTemplateObservation.java new file mode 100644 index 00000000..e4e7a05c --- /dev/null +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/observation/PulsarTemplateObservation.java @@ -0,0 +1,75 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.observation; + +import org.springframework.pulsar.core.PulsarTemplate; + +import io.micrometer.common.docs.KeyName; +import io.micrometer.observation.Observation; +import io.micrometer.observation.Observation.Context; +import io.micrometer.observation.ObservationConvention; +import io.micrometer.observation.docs.ObservationDocumentation; + +/** + * An {@link Observation} for {@link PulsarTemplate}. + * + * @author Chris Bono + */ +public enum PulsarTemplateObservation implements ObservationDocumentation { + + /** + * Observation created when a Pulsar template sends a message. + */ + TEMPLATE_OBSERVATION { + + @Override + public Class> getDefaultConvention() { + return DefaultPulsarTemplateObservationConvention.class; + } + + @Override + public String getPrefix() { + return "spring.pulsar.template"; + } + + @Override + public KeyName[] getLowCardinalityKeyNames() { + return TemplateLowCardinalityTags.values(); + } + + }; + + /** + * Low cardinality tags. + */ + public enum TemplateLowCardinalityTags implements KeyName { + + /** + * Bean name of the template that sent the message. + */ + BEAN_NAME { + + @Override + public String asString() { + return "spring.pulsar.template.name"; + } + + } + + } + +} diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/observation/PulsarTemplateObservationConvention.java b/spring-pulsar/src/main/java/org/springframework/pulsar/observation/PulsarTemplateObservationConvention.java new file mode 100644 index 00000000..e941d6c2 --- /dev/null +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/observation/PulsarTemplateObservationConvention.java @@ -0,0 +1,39 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.observation; + +import io.micrometer.observation.Observation.Context; +import io.micrometer.observation.ObservationConvention; + +/** + * {@link ObservationConvention} for Pulsar template key values. + * + * @author Chris Bono + */ +public interface PulsarTemplateObservationConvention extends ObservationConvention { + + @Override + default boolean supportsContext(Context context) { + return context instanceof PulsarMessageSenderContext; + } + + @Override + default String getName() { + return "spring.pulsar.template"; + } + +} diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerTests.java index 1627da6b..9a36c1d9 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerTests.java @@ -41,6 +41,7 @@ import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory; import org.springframework.pulsar.config.PulsarListenerEndpoint; import org.springframework.pulsar.core.PulsarConsumerFactory; +import org.springframework.pulsar.observation.PulsarListenerObservationConvention; import org.springframework.util.backoff.BackOff; /** @@ -132,6 +133,24 @@ void pulsarConsumerErrorHandlerAppliedOnChildContainer() throws Exception { assertThat(childContainer.getPulsarConsumerErrorHandler()).isEqualTo(pulsarConsumerErrorHandler); } + @Test + @SuppressWarnings({ "unchecked", "rawtypes" }) + void observationConfigAppliedOnChildContainer() throws Exception { + PulsarListenerMockComponents env = setupListenerMockComponents(SubscriptionType.Shared); + ConcurrentPulsarMessageListenerContainer concurrentContainer = env.concurrentContainer(); + + PulsarListenerObservationConvention customObservationConvention = mock( + PulsarListenerObservationConvention.class); + concurrentContainer.getContainerProperties().setObservationEnabled(true); + concurrentContainer.getContainerProperties().setObservationConvention(customObservationConvention); + concurrentContainer.start(); + + final DefaultPulsarMessageListenerContainer childContainer = concurrentContainer.getContainers().get(0); + assertThat(childContainer.getContainerProperties().getObservationConvention()) + .isSameAs(customObservationConvention); + assertThat(childContainer.getContainerProperties().isObservationEnabled()).isTrue(); + } + @Test @SuppressWarnings("unchecked") void basicConcurrencyTesting() throws Exception { diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationIntegrationTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationIntegrationTests.java new file mode 100644 index 00000000..00db09be --- /dev/null +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationIntegrationTests.java @@ -0,0 +1,185 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.observation; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; + +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.pulsar.annotation.EnablePulsar; +import org.springframework.pulsar.annotation.PulsarListener; +import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory; +import org.springframework.pulsar.config.PulsarClientConfiguration; +import org.springframework.pulsar.config.PulsarClientFactoryBean; +import org.springframework.pulsar.config.PulsarListenerContainerFactory; +import org.springframework.pulsar.core.AbstractContainerBaseTests; +import org.springframework.pulsar.core.DefaultPulsarConsumerFactory; +import org.springframework.pulsar.core.DefaultPulsarProducerFactory; +import org.springframework.pulsar.core.PulsarAdministration; +import org.springframework.pulsar.core.PulsarConsumerFactory; +import org.springframework.pulsar.core.PulsarProducerFactory; +import org.springframework.pulsar.core.PulsarTemplate; + +import io.micrometer.common.KeyValues; +import io.micrometer.core.tck.MeterRegistryAssert; +import io.micrometer.observation.ObservationRegistry; +import io.micrometer.tracing.Span.Kind; +import io.micrometer.tracing.exporter.FinishedSpan; +import io.micrometer.tracing.test.SampleTestRunner; +import io.micrometer.tracing.test.simple.SpanAssert; +import io.micrometer.tracing.test.simple.SpansAssert; + +/** + * Integration tests for {@link PulsarTemplateObservation send} and + * {@link PulsarListenerObservation receive} observations in Spring Pulsar against all + * supported Tracing runtimes. + * + * @author Chris Bono + * @see SampleTestRunner + */ +public class ObservationIntegrationTests extends SampleTestRunner { + + @SuppressWarnings("unchecked") + @Override + public SampleTestRunnerConsumer yourCode() { + // template -> listener -> template -> listener + return (bb, meterRegistry) -> { + ObservationRegistry observationRegistry = getObservationRegistry(); + try (AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext()) { + applicationContext.registerBean(ObservationRegistry.class, () -> observationRegistry); + applicationContext.register(ObservationIntegrationTestAppConfig.class); + applicationContext.refresh(); + applicationContext.getBean(PulsarTemplate.class).send("obs1-topic", "hello"); + CountDownLatch latch = applicationContext.getBean(ObservationIntegrationTestAppListeners.class).latch; + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + } + + List finishedSpans = bb.getFinishedSpans(); + SpansAssert.assertThat(finishedSpans).haveSameTraceId().hasSize(4); + + List producerSpans = finishedSpans.stream() + .filter(span -> span.getKind().equals(Kind.PRODUCER)).toList(); + SpanAssert.assertThat(producerSpans.get(0)).hasTag("spring.pulsar.template.name", "pulsarTemplate"); + SpanAssert.assertThat(producerSpans.get(1)).hasTag("spring.pulsar.template.name", "pulsarTemplate"); + + List consumerSpans = finishedSpans.stream() + .filter(span -> span.getKind().equals(Kind.CONSUMER)).toList(); + SpanAssert.assertThat(consumerSpans.get(0)).hasTagWithKey("spring.pulsar.listener.id"); + assertThat(consumerSpans.get(0).getTags().get("spring.pulsar.listener.id")).isIn("obs1-id-0", "obs2-id-0"); + SpanAssert.assertThat(consumerSpans.get(1)).hasTagWithKey("spring.pulsar.listener.id"); + assertThat(consumerSpans.get(1).getTags().get("spring.pulsar.listener.id")).isIn("obs1-id-0", "obs2-id-0"); + assertThat(consumerSpans.get(0).getTags().get("spring.pulsar.listener.id")) + .isNotEqualTo(consumerSpans.get(1).getTags().get("spring.pulsar.listener.id")); + + MeterRegistryAssert.assertThat(getMeterRegistry()) + .hasTimerWithNameAndTags("spring.pulsar.template", + KeyValues.of("spring.pulsar.template.name", "pulsarTemplate")) + .hasTimerWithNameAndTags("spring.pulsar.template", + KeyValues.of("spring.pulsar.template.name", "pulsarTemplate")) + .hasTimerWithNameAndTags("spring.pulsar.listener", + KeyValues.of("spring.pulsar.listener.id", "obs1-id-0")) + .hasTimerWithNameAndTags("spring.pulsar.listener", + KeyValues.of("spring.pulsar.listener.id", "obs2-id-0")); + }; + } + + @Configuration(proxyBeanMethods = false) + @EnablePulsar + static class ObservationIntegrationTestAppConfig extends AbstractContainerBaseTests { + + @Bean + public PulsarProducerFactory pulsarProducerFactory(PulsarClient pulsarClient) { + return new DefaultPulsarProducerFactory<>(pulsarClient, Collections.emptyMap()); + } + + @Bean + public PulsarClientFactoryBean pulsarClientFactoryBean(PulsarClientConfiguration pulsarClientConfiguration) { + return new PulsarClientFactoryBean(pulsarClientConfiguration); + } + + @Bean + public PulsarClientConfiguration pulsarClientConfiguration() { + return new PulsarClientConfiguration(Map.of("serviceUrl", getPulsarBrokerUrl())); + } + + @Bean + public PulsarTemplate pulsarTemplate(PulsarProducerFactory pulsarProducerFactory) { + PulsarTemplate template = new PulsarTemplate<>(pulsarProducerFactory); + template.setObservationEnabled(true); + return template; + } + + @Bean + public PulsarConsumerFactory pulsarConsumerFactory(PulsarClient pulsarClient) { + return new DefaultPulsarConsumerFactory<>(pulsarClient, Collections.emptyMap()); + } + + @Bean + PulsarListenerContainerFactory pulsarListenerContainerFactory( + PulsarConsumerFactory pulsarConsumerFactory) { + final ConcurrentPulsarListenerContainerFactory pulsarListenerContainerFactory = new ConcurrentPulsarListenerContainerFactory<>(); + pulsarListenerContainerFactory.setPulsarConsumerFactory(pulsarConsumerFactory); + pulsarListenerContainerFactory.getContainerProperties().setObservationEnabled(true); + return pulsarListenerContainerFactory; + } + + @Bean + PulsarAdministration pulsarAdministration() { + return new PulsarAdministration(PulsarAdmin.builder().serviceHttpUrl(getHttpServiceUrl())); + } + + @Bean + ObservationIntegrationTestAppListeners observationTestAppListeners(PulsarTemplate pulsarTemplate) { + return new ObservationIntegrationTestAppListeners(pulsarTemplate); + } + + } + + static class ObservationIntegrationTestAppListeners { + + private PulsarTemplate template; + + CountDownLatch latch = new CountDownLatch(1); + + ObservationIntegrationTestAppListeners(PulsarTemplate template) { + this.template = template; + } + + @PulsarListener(id = "obs1-id", properties = { "subscriptionName=obs1-sub", "topicNames=obs1-topic" }) + void listen1(String message) throws PulsarClientException { + this.template.send("obs2-topic", message); + } + + @PulsarListener(id = "obs2-id", properties = { "subscriptionName=obs2-sub", "topicNames=obs2-topic" }) + void listen2(String message) { + latch.countDown(); + } + + } + +} diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationTests.java new file mode 100644 index 00000000..686928c2 --- /dev/null +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationTests.java @@ -0,0 +1,316 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.observation; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.lang.Nullable; +import org.springframework.pulsar.annotation.EnablePulsar; +import org.springframework.pulsar.annotation.PulsarListener; +import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory; +import org.springframework.pulsar.config.PulsarClientConfiguration; +import org.springframework.pulsar.config.PulsarClientFactoryBean; +import org.springframework.pulsar.config.PulsarListenerContainerFactory; +import org.springframework.pulsar.core.AbstractContainerBaseTests; +import org.springframework.pulsar.core.DefaultPulsarConsumerFactory; +import org.springframework.pulsar.core.DefaultPulsarProducerFactory; +import org.springframework.pulsar.core.PulsarAdministration; +import org.springframework.pulsar.core.PulsarConsumerFactory; +import org.springframework.pulsar.core.PulsarProducerFactory; +import org.springframework.pulsar.core.PulsarTemplate; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import io.micrometer.common.KeyValues; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Timer; +import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import io.micrometer.core.tck.MeterRegistryAssert; +import io.micrometer.observation.ObservationHandler; +import io.micrometer.observation.ObservationRegistry; +import io.micrometer.observation.tck.TestObservationRegistry; +import io.micrometer.tracing.Span; +import io.micrometer.tracing.TraceContext; +import io.micrometer.tracing.Tracer; +import io.micrometer.tracing.handler.DefaultTracingObservationHandler; +import io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler; +import io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler; +import io.micrometer.tracing.propagation.Propagator; +import io.micrometer.tracing.test.simple.SimpleSpan; +import io.micrometer.tracing.test.simple.SimpleTracer; + +/** + * Tests for {@link PulsarTemplateObservation send} and {@link PulsarListenerObservation + * receive} observations in Spring Pulsar. + *

+ * Differs from {@link ObservationIntegrationTests} in that it uses a test observation + * registry and verifies more details such as propagation and message content. + * + * @author Chris Bono + */ +@SpringJUnitConfig +@DirtiesContext +public class ObservationTests extends AbstractContainerBaseTests { + + private static final String LISTENER_ID_TAG = "spring.pulsar.listener.id"; + + private static final String OBS1_ID = "obs1-id-0"; + + private static final String OBS2_ID = "obs2-id-0"; + + private static final String RECEIVER_EXTRA_TAG = "receiver-extra-tag-bean-name"; + + private static final String SENDER_EXTRA_TAG = "sender-extra-tag-bean-name"; + + private static final String TAG1 = "tag1"; + + private static final String TAG1_VALUE = "tag1-value"; + + private static final String TAG2 = "tag2"; + + private static final String TAG2_VALUE = "tag2-value"; + + private static final String TEMPLATE_NAME_TAG = "spring.pulsar.template.name"; + + private static final String TEMPLATE_NAME = "observationTestsTemplate"; + + @Test + void sendAndReceiveCreatesExpectedSpansAndMetrics(@Autowired ObservationTestAppListeners appListeners, + @Autowired PulsarTemplate template, @Autowired SimpleTracer tracer, + @Autowired MeterRegistry meterRegistry) throws Exception { + + template.send("obs1-topic", "hello"); + + // The final message should have tags propagated through + assertThat(appListeners.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(appListeners.message).isNotNull(); + assertThat(appListeners.message.getProperty(TAG1)).isEqualTo(TAG1_VALUE); + assertThat(appListeners.message.getProperty(TAG2)).isEqualTo(TAG2_VALUE); + + Deque spans = tracer.getSpans(); + assertThat(spans).hasSize(4); + + // Span1: The initial template.send to obs1-topic + SimpleSpan span = spans.pollFirst(); + assertThat(span.getName()).isEqualTo("obs1-topic send"); + assertThat(span.getTags()).containsEntry(TEMPLATE_NAME_TAG, TEMPLATE_NAME); + assertThat(span.getTags()).containsEntry(SENDER_EXTRA_TAG, TEMPLATE_NAME); + + // Span2: The listen1.receive from obs1-topic + await().until(() -> spans.peekFirst().getTags().size() == 4); + span = spans.pollFirst(); + assertThat(span.getName()).isEqualTo("persistent://public/default/obs1-topic receive"); + assertThat(span.getTags()).containsEntry(LISTENER_ID_TAG, OBS1_ID); + assertThat(span.getTags()).containsEntry(RECEIVER_EXTRA_TAG, OBS1_ID); + assertThat(span.getTags()).containsEntry(TAG1, TAG1_VALUE); + assertThat(span.getTags()).containsEntry(TAG2, TAG2_VALUE); + + // Span3: The template.send from listen1 to obs2-topic + await().until(() -> spans.peekFirst().getTags().size() == 2); + span = spans.pollFirst(); + assertThat(span.getName()).isEqualTo("obs2-topic send"); + assertThat(span.getTags()).containsEntry(TEMPLATE_NAME_TAG, TEMPLATE_NAME); + assertThat(span.getTags()).containsEntry(SENDER_EXTRA_TAG, TEMPLATE_NAME); + + // Span4: The final listen2.receive from obs2-topic - does not get extra tag + // (hence count = 3) + await().until(() -> spans.peekFirst().getTags().size() == 3); + span = spans.pollFirst(); + assertThat(span.getName()).isEqualTo("persistent://public/default/obs2-topic receive"); + assertThat(span.getTags()).containsEntry(LISTENER_ID_TAG, OBS2_ID); + assertThat(span.getTags()).containsEntry(TAG1, TAG1_VALUE); + assertThat(span.getTags()).containsEntry(TAG2, TAG2_VALUE); + + MeterRegistryAssert.assertThat(meterRegistry) + .hasTimerWithNameAndTags("spring.pulsar.listener", + KeyValues.of(LISTENER_ID_TAG, OBS1_ID, RECEIVER_EXTRA_TAG, OBS1_ID)) + .hasTimerWithNameAndTags("spring.pulsar.listener", KeyValues.of(LISTENER_ID_TAG, OBS2_ID)); + assertThat(meterRegistry.find("spring.pulsar.template") + .tags(TEMPLATE_NAME_TAG, TEMPLATE_NAME, SENDER_EXTRA_TAG, TEMPLATE_NAME).timer()).isNotNull() + .extracting(Timer::count).isEqualTo(2L); + } + + @Configuration(proxyBeanMethods = false) + @EnablePulsar + static class ObservationTestAppConfig { + + @Bean + public PulsarProducerFactory pulsarProducerFactory(PulsarClient pulsarClient) { + return new DefaultPulsarProducerFactory<>(pulsarClient, Collections.emptyMap()); + } + + @Bean + public PulsarClientFactoryBean pulsarClientFactoryBean(PulsarClientConfiguration pulsarClientConfiguration) { + return new PulsarClientFactoryBean(pulsarClientConfiguration); + } + + @Bean + public PulsarClientConfiguration pulsarClientConfiguration() { + return new PulsarClientConfiguration(Map.of("serviceUrl", getPulsarBrokerUrl())); + } + + @Bean(name = "observationTestsTemplate") + public PulsarTemplate pulsarTemplate(PulsarProducerFactory pulsarProducerFactory) { + PulsarTemplate template = new PulsarTemplate<>(pulsarProducerFactory); + template.setObservationEnabled(true); + template.setObservationConvention(new DefaultPulsarTemplateObservationConvention() { + @Override + public KeyValues getLowCardinalityKeyValues(PulsarMessageSenderContext context) { + return super.getLowCardinalityKeyValues(context).and(SENDER_EXTRA_TAG, context.getBeanName()); + } + }); + return template; + } + + @Bean + public PulsarConsumerFactory pulsarConsumerFactory(PulsarClient pulsarClient) { + return new DefaultPulsarConsumerFactory<>(pulsarClient, Collections.emptyMap()); + } + + @Bean + PulsarListenerContainerFactory pulsarListenerContainerFactory( + PulsarConsumerFactory pulsarConsumerFactory) { + final ConcurrentPulsarListenerContainerFactory pulsarListenerContainerFactory = new ConcurrentPulsarListenerContainerFactory<>(); + pulsarListenerContainerFactory.setPulsarConsumerFactory(pulsarConsumerFactory); + pulsarListenerContainerFactory.getContainerProperties().setObservationEnabled(true); + pulsarListenerContainerFactory.getContainerProperties() + .setObservationConvention(new DefaultPulsarListenerObservationConvention() { + @Override + public KeyValues getLowCardinalityKeyValues(PulsarMessageReceiverContext context) { + // Only add the extra tag for the 1st listener + if (context.getListenerId().equals(OBS2_ID)) { + return super.getLowCardinalityKeyValues(context); + } + return super.getLowCardinalityKeyValues(context).and(RECEIVER_EXTRA_TAG, + context.getListenerId()); + } + }); + return pulsarListenerContainerFactory; + } + + @Bean + PulsarAdministration pulsarAdministration() { + return new PulsarAdministration(PulsarAdmin.builder().serviceHttpUrl(getHttpServiceUrl())); + } + + @Bean + SimpleTracer simpleTracer() { + return new SimpleTracer(); + } + + @Bean + MeterRegistry meterRegistry() { + return new SimpleMeterRegistry(); + } + + @Bean + ObservationRegistry observationRegistry(Tracer tracer, Propagator propagator, MeterRegistry meterRegistry) { + TestObservationRegistry observationRegistry = TestObservationRegistry.create(); + observationRegistry.observationConfig().observationHandler( + // Composite will pick the first matching handler + new ObservationHandler.FirstMatchingCompositeObservationHandler( + // This is responsible for creating a child span on the sender + // side + new PropagatingSenderTracingObservationHandler<>(tracer, propagator), + // This is responsible for creating a span on the receiver + // side + new PropagatingReceiverTracingObservationHandler<>(tracer, propagator), + // This is responsible for creating a default span + new DefaultTracingObservationHandler(tracer))) + .observationHandler(new DefaultMeterObservationHandler(meterRegistry)); + return observationRegistry; + } + + @Bean + Propagator propagator(Tracer tracer) { + return new Propagator() { + + // Headers required for tracing propagation + @Override + public List fields() { + return Arrays.asList(TAG1, TAG2); + } + + // Called on the producer side when the message is being sent + @Override + public void inject(TraceContext context, @Nullable C carrier, Setter setter) { + setter.set(carrier, TAG1, TAG1_VALUE); + setter.set(carrier, TAG2, TAG2_VALUE); + } + + // Called on the consumer side when the message is consumed + @Override + public Span.Builder extract(C carrier, Getter getter) { + String tag1Value = getter.get(carrier, TAG1); + String tag2Value = getter.get(carrier, TAG2); + return tracer.spanBuilder().tag(TAG1, tag1Value).tag(TAG2, tag2Value); + } + }; + } + + @Bean + ObservationTestAppListeners observationTestAppListeners(PulsarTemplate pulsarTemplate) { + return new ObservationTestAppListeners(pulsarTemplate); + } + + } + + static class ObservationTestAppListeners { + + private PulsarTemplate template; + + private Message message; + + CountDownLatch latch = new CountDownLatch(1); + + ObservationTestAppListeners(PulsarTemplate template) { + this.template = template; + } + + @PulsarListener(id = "obs1-id", properties = { "subscriptionName=obs1-sub", "topicNames=obs1-topic" }) + void listen1(Message message) throws PulsarClientException { + this.template.send("obs2-topic", message.getValue()); + } + + @PulsarListener(id = "obs2-id", properties = { "subscriptionName=obs2-sub", "topicNames=obs2-topic" }) + void listen2(Message message) { + this.message = message; + this.latch.countDown(); + } + + } + +}