From 7e7ac48c27690d951ae78a093d82ba25def1a52e Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Fri, 7 Oct 2022 15:50:54 -0500 Subject: [PATCH] Add observations auto-configuration See #147 --- .../DocumentConfigurationProperties.java | 5 +- .../PulsarAnnotationDrivenConfiguration.java | 31 ++-- .../PulsarAutoConfiguration.java | 12 +- .../autoconfigure/PulsarProperties.java | 38 +++++ .../PulsarAutoConfigurationTests.java | 69 +++++++++ ...bstractPulsarListenerContainerFactory.java | 59 ++++--- ...currentPulsarListenerContainerFactory.java | 45 +++--- .../config/MethodPulsarListenerEndpoint.java | 2 +- .../pulsar/core/PulsarTemplate.java | 92 ++++------- ...bstractPulsarMessageListenerContainer.java | 43 +++--- ...currentPulsarMessageListenerContainer.java | 14 +- ...DefaultPulsarMessageListenerContainer.java | 146 +++++++++--------- .../listener/PulsarContainerProperties.java | 14 -- ...ntPulsarMessageListenerContainerTests.java | 43 ++---- .../pulsar/listener/PulsarListenerTests.java | 4 +- .../ObservationIntegrationTests.java | 16 +- .../pulsar/observation/ObservationTests.java | 62 ++++---- 17 files changed, 386 insertions(+), 309 deletions(-) diff --git a/buildSrc/src/main/java/org/springframework/pulsar/gradle/docs/configprops/DocumentConfigurationProperties.java b/buildSrc/src/main/java/org/springframework/pulsar/gradle/docs/configprops/DocumentConfigurationProperties.java index 86a9a838..2fee9254 100644 --- a/buildSrc/src/main/java/org/springframework/pulsar/gradle/docs/configprops/DocumentConfigurationProperties.java +++ b/buildSrc/src/main/java/org/springframework/pulsar/gradle/docs/configprops/DocumentConfigurationProperties.java @@ -65,7 +65,10 @@ public void setOutputDir(File outputDir) { void documentConfigurationProperties() throws IOException { Snippets snippets = new Snippets(this.configurationPropertyMetadata); snippets.add("application-properties.pulsar-client", "Pulsar Client Properties", (c) -> c.accept("spring.pulsar.client")); - snippets.add("application-properties.pulsar-producer", "Pulsar Producer Properties", (c) -> c.accept("spring.pulsar.producer")); + snippets.add("application-properties.pulsar-producer", "Pulsar Producer Properties", (c) -> { + c.accept("spring.pulsar.producer"); + c.accept("spring.pulsar.template"); + }); snippets.add("application-properties.pulsar-consumer", "Pulsar Consumer Properties", (c) -> { c.accept("spring.pulsar.consumer"); c.accept("spring.pulsar.listener"); diff --git a/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAnnotationDrivenConfiguration.java b/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAnnotationDrivenConfiguration.java index c49f4599..cf975e34 100644 --- a/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAnnotationDrivenConfiguration.java +++ b/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAnnotationDrivenConfiguration.java @@ -27,6 +27,9 @@ import org.springframework.pulsar.config.PulsarListenerBeanNames; import org.springframework.pulsar.core.PulsarConsumerFactory; import org.springframework.pulsar.listener.PulsarContainerProperties; +import org.springframework.pulsar.observation.PulsarListenerObservationConvention; + +import io.micrometer.observation.ObservationRegistry; /** * Configuration for Pulsar annotation-driven support. @@ -47,25 +50,25 @@ public PulsarAnnotationDrivenConfiguration(PulsarProperties pulsarProperties) { @Bean @ConditionalOnMissingBean(name = "pulsarListenerContainerFactory") ConcurrentPulsarListenerContainerFactory pulsarListenerContainerFactory( - ObjectProvider> pulsarConsumerFactory) { - ConcurrentPulsarListenerContainerFactory factory = new ConcurrentPulsarListenerContainerFactory<>(); - - final PulsarConsumerFactory pulsarConsumerFactory1 = pulsarConsumerFactory.getIfAvailable(); - factory.setPulsarConsumerFactory(pulsarConsumerFactory1); + ObjectProvider> consumerFactoryProvider, + ObjectProvider observationRegistryProvider, + ObjectProvider observationConventionProvider) { - final PulsarContainerProperties containerProperties = factory.getContainerProperties(); + PulsarContainerProperties containerProperties = new PulsarContainerProperties(); containerProperties.setSubscriptionType(this.pulsarProperties.getConsumer().getSubscriptionType()); + containerProperties.setObservationConvention(observationConventionProvider.getIfUnique()); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); - PulsarProperties.Listener properties = this.pulsarProperties.getListener(); - - map.from(properties::getSchemaType).to(containerProperties::setSchemaType); - map.from(properties::getAckMode).to(containerProperties::setAckMode); - map.from(properties::getBatchTimeoutMillis).to(containerProperties::setBatchTimeoutMillis); - map.from(properties::getMaxNumBytes).to(containerProperties::setMaxNumBytes); - map.from(properties::getMaxNumMessages).to(containerProperties::setMaxNumMessages); + PulsarProperties.Listener listenerProperties = this.pulsarProperties.getListener(); + map.from(listenerProperties::getSchemaType).to(containerProperties::setSchemaType); + map.from(listenerProperties::getAckMode).to(containerProperties::setAckMode); + map.from(listenerProperties::getBatchTimeoutMillis).to(containerProperties::setBatchTimeoutMillis); + map.from(listenerProperties::getMaxNumBytes).to(containerProperties::setMaxNumBytes); + map.from(listenerProperties::getMaxNumMessages).to(containerProperties::setMaxNumMessages); - return factory; + return new ConcurrentPulsarListenerContainerFactory<>(consumerFactoryProvider.getIfAvailable(), + containerProperties, this.pulsarProperties.getListener().isObservationsEnabled() + ? observationRegistryProvider.getIfUnique() : null); } @Configuration(proxyBeanMethods = false) diff --git a/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfiguration.java b/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfiguration.java index 543c037d..5e4bb1d8 100644 --- a/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfiguration.java +++ b/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfiguration.java @@ -37,6 +37,9 @@ import org.springframework.pulsar.core.PulsarConsumerFactory; import org.springframework.pulsar.core.PulsarProducerFactory; import org.springframework.pulsar.core.PulsarTemplate; +import org.springframework.pulsar.observation.PulsarTemplateObservationConvention; + +import io.micrometer.observation.ObservationRegistry; /** * {@link EnableAutoConfiguration Auto-configuration} for Apache Pulsar. @@ -89,8 +92,13 @@ public PulsarProducerFactory cachingPulsarProducerFactory(PulsarClient pulsar @Bean @ConditionalOnMissingBean(PulsarTemplate.class) public PulsarTemplate pulsarTemplate(PulsarProducerFactory pulsarProducerFactory, - ObjectProvider interceptors) { - return new PulsarTemplate<>(pulsarProducerFactory, interceptors.orderedStream().toList()); + ObjectProvider interceptorsProvider, + ObjectProvider observationRegistryProvider, + ObjectProvider observationConventionProvider) { + return new PulsarTemplate<>(pulsarProducerFactory, interceptorsProvider.orderedStream().toList(), + this.properties.getTemplate().isObservationsEnabled() ? observationRegistryProvider.getIfUnique() + : null, + observationConventionProvider.getIfUnique()); } @Bean diff --git a/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarProperties.java b/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarProperties.java index a712b937..c96aaf27 100644 --- a/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarProperties.java +++ b/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarProperties.java @@ -62,6 +62,8 @@ public class PulsarProperties { private final Producer producer = new Producer(); + private final Template template = new Template(); + private final Admin admin = new Admin(); public Consumer getConsumer() { @@ -80,6 +82,10 @@ public Producer getProducer() { return this.producer; } + public Template getTemplate() { + return this.template; + } + public Admin getAdministration() { return this.admin; } @@ -691,6 +697,24 @@ public Map buildProperties() { } + public static class Template { + + /** + * Whether to record observations for send operations when the Observations API is + * available. + */ + private boolean observationsEnabled = true; + + public boolean isObservationsEnabled() { + return this.observationsEnabled; + } + + public void setObservationsEnabled(boolean observationsEnabled) { + this.observationsEnabled = observationsEnabled; + } + + } + public static class Cache { /** Time period to expire unused entries in the cache. */ @@ -1339,6 +1363,12 @@ public static class Listener { */ private int batchTimeoutMillis = 100; + /** + * Whether to record observations for receive operations when the Observations API + * is available. + */ + private boolean observationsEnabled = true; + public AckMode getAckMode() { return this.ackMode; } @@ -1379,6 +1409,14 @@ public void setBatchTimeoutMillis(int batchTimeoutMillis) { this.batchTimeoutMillis = batchTimeoutMillis; } + public boolean isObservationsEnabled() { + return this.observationsEnabled; + } + + public void setObservationsEnabled(boolean observationsEnabled) { + this.observationsEnabled = observationsEnabled; + } + } public static class Admin { diff --git a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfigurationTests.java b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfigurationTests.java index b8410745..e6df1e36 100644 --- a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfigurationTests.java +++ b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfigurationTests.java @@ -49,6 +49,11 @@ import org.springframework.pulsar.core.PulsarProducerFactory; import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.pulsar.listener.DefaultPulsarMessageListenerContainer; +import org.springframework.pulsar.listener.PulsarContainerProperties; +import org.springframework.pulsar.observation.PulsarListenerObservationConvention; +import org.springframework.pulsar.observation.PulsarTemplateObservationConvention; + +import io.micrometer.observation.ObservationRegistry; /** * Autoconfiguration tests for {@link PulsarAutoConfiguration}. @@ -230,6 +235,70 @@ void authParamMapConvertedToEncodedParamString() { } + @Nested + class ObservationAutoConfigurationTests { + + @Test + void templateObservationsEnabledByDefault() { + ObservationRegistry observationRegistry = mock(ObservationRegistry.class); + contextRunner.withBean("observationRegistry", ObservationRegistry.class, () -> observationRegistry) + .run((context -> assertThat(context).hasNotFailed().getBean(PulsarTemplate.class) + .extracting("observationRegistry").isSameAs(observationRegistry))); + } + + @Test + void templateObservationsCanBeDisabled() { + ObservationRegistry observationRegistry = mock(ObservationRegistry.class); + contextRunner.withPropertyValues("spring.pulsar.template.observations-enabled=false") + .withBean("observationRegistry", ObservationRegistry.class, () -> observationRegistry) + .run((context -> assertThat(context).hasNotFailed().getBean(PulsarTemplate.class) + .extracting("observationRegistry").isNull())); + } + + @Test + void templateObservationsWithCustomConvention() { + ObservationRegistry observationRegistry = mock(ObservationRegistry.class); + PulsarTemplateObservationConvention customConvention = mock(PulsarTemplateObservationConvention.class); + contextRunner.withBean("observationRegistry", ObservationRegistry.class, () -> observationRegistry) + .withBean("customConvention", PulsarTemplateObservationConvention.class, () -> customConvention) + .run((context -> assertThat(context).hasNotFailed().getBean(PulsarTemplate.class) + .extracting("observationConvention").isSameAs(customConvention))); + } + + @Test + void listenerObservationsEnabledByDefault() { + ObservationRegistry observationRegistry = mock(ObservationRegistry.class); + contextRunner.withBean("observationRegistry", ObservationRegistry.class, () -> observationRegistry) + .run((context -> assertThat(context).hasNotFailed() + .getBean(ConcurrentPulsarListenerContainerFactory.class).extracting("observationRegistry") + .isSameAs(observationRegistry))); + } + + @Test + void listenerObservationsCanBeDisabled() { + ObservationRegistry observationRegistry = mock(ObservationRegistry.class); + contextRunner.withPropertyValues("spring.pulsar.listener.observations-enabled=false") + .withBean("observationRegistry", ObservationRegistry.class, () -> observationRegistry) + .run((context -> assertThat(context).hasNotFailed() + .getBean(ConcurrentPulsarListenerContainerFactory.class).extracting("observationRegistry") + .isNull())); + } + + @Test + void listenerObservationsWithCustomConvention() { + ObservationRegistry observationRegistry = mock(ObservationRegistry.class); + PulsarListenerObservationConvention customConvention = mock(PulsarListenerObservationConvention.class); + contextRunner.withBean("observationRegistry", ObservationRegistry.class, () -> observationRegistry) + .withBean("customConvention", PulsarListenerObservationConvention.class, () -> customConvention) + .run((context -> assertThat(context).hasNotFailed() + .getBean(ConcurrentPulsarListenerContainerFactory.class) + .extracting(ConcurrentPulsarListenerContainerFactory::getContainerProperties) + .extracting(PulsarContainerProperties::getObservationConvention) + .isSameAs(customConvention))); + } + + } + @Nested class ProducerFactoryAutoConfigurationTests { diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/AbstractPulsarListenerContainerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/AbstractPulsarListenerContainerFactory.java index 853539f3..038423d2 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/AbstractPulsarListenerContainerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/AbstractPulsarListenerContainerFactory.java @@ -16,16 +16,15 @@ package org.springframework.pulsar.config; -import org.apache.commons.logging.LogFactory; import org.apache.pulsar.client.api.Schema; import org.springframework.beans.BeansException; -import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.core.log.LogAccessor; +import org.springframework.lang.Nullable; import org.springframework.pulsar.core.PulsarConsumerFactory; import org.springframework.pulsar.listener.AbstractPulsarMessageListenerContainer; import org.springframework.pulsar.listener.AckMode; @@ -33,23 +32,26 @@ import org.springframework.pulsar.support.JavaUtils; import org.springframework.pulsar.support.MessageConverter; +import io.micrometer.observation.ObservationRegistry; + /** * Base {@link PulsarListenerContainerFactory} implementation. * * @param the {@link AbstractPulsarMessageListenerContainer} implementation type. * @param Message payload type. * @author Soby Chacko + * @author Chris Bono */ public abstract class AbstractPulsarListenerContainerFactory, T> - implements PulsarListenerContainerFactory, ApplicationEventPublisherAware, InitializingBean, - ApplicationContextAware { + implements PulsarListenerContainerFactory, ApplicationEventPublisherAware, ApplicationContextAware { + + protected final LogAccessor logger = new LogAccessor(this.getClass()); - protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR - // protected + private final PulsarConsumerFactory consumerFactory; - private final PulsarContainerProperties containerProperties = new PulsarContainerProperties(); + private final PulsarContainerProperties containerProperties; - private PulsarConsumerFactory consumerFactory; + private final ObservationRegistry observationRegistry; private Boolean autoStartup; @@ -63,19 +65,30 @@ public abstract class AbstractPulsarListenerContainerFactory consumerFactory) { + protected AbstractPulsarListenerContainerFactory(PulsarConsumerFactory consumerFactory, + PulsarContainerProperties containerProperties, @Nullable ObservationRegistry observationRegistry) { this.consumerFactory = consumerFactory; + this.containerProperties = containerProperties; + this.observationRegistry = observationRegistry; } - public PulsarConsumerFactory getPulsarConsumerFactory() { + protected PulsarConsumerFactory getConsumerFactory() { return this.consumerFactory; } + protected ObservationRegistry getObservationRegistry() { + return this.observationRegistry; + } + + public PulsarContainerProperties getContainerProperties() { + return this.containerProperties; + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + public void setAutoStartup(Boolean autoStartup) { this.autoStartup = autoStartup; } @@ -92,10 +105,6 @@ public void setMessageConverter(MessageConverter messageConverter) { this.messageConverter = messageConverter; } - public Boolean isBatchListener() { - return this.batchListener; - } - public void setBatchListener(Boolean batchListener) { this.batchListener = batchListener; } @@ -105,15 +114,6 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv this.applicationEventPublisher = applicationEventPublisher; } - public PulsarContainerProperties getContainerProperties() { - return this.containerProperties; - } - - @Override - public void afterPropertiesSet() { - - } - @SuppressWarnings("unchecked") @Override public C createListenerContainer(PulsarListenerEndpoint endpoint) { @@ -138,7 +138,7 @@ private void configureEndpoint(AbstractPulsarListenerEndpoint aplEndpoint) { } protected void initializeContainer(C instance, PulsarListenerEndpoint endpoint) { - PulsarContainerProperties instanceProperties = instance.getPulsarContainerProperties(); + PulsarContainerProperties instanceProperties = instance.getContainerProperties(); if (instanceProperties.getSchemaType() == null) { JavaUtils.INSTANCE.acceptIfNotNull(this.containerProperties.getSchemaType(), @@ -171,7 +171,6 @@ else if (this.autoStartup != null) { instanceProperties.setMaxNumMessages(this.containerProperties.getMaxNumMessages()); instanceProperties.setMaxNumBytes(this.containerProperties.getMaxNumBytes()); instanceProperties.setBatchTimeoutMillis(this.containerProperties.getBatchTimeoutMillis()); - instanceProperties.setObservationEnabled(this.containerProperties.isObservationEnabled()); instanceProperties.setObservationConvention(this.containerProperties.getObservationConvention()); JavaUtils.INSTANCE.acceptIfNotNull(this.phase, instance::setPhase) diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java index 0372a514..1108ee3d 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java @@ -19,12 +19,15 @@ import java.util.Arrays; import java.util.Collection; -import org.apache.pulsar.client.api.SubscriptionType; - +import org.springframework.lang.Nullable; +import org.springframework.pulsar.core.PulsarConsumerFactory; import org.springframework.pulsar.listener.ConcurrentPulsarMessageListenerContainer; import org.springframework.pulsar.listener.PulsarContainerProperties; +import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; +import io.micrometer.observation.ObservationRegistry; + /** * Concrete implementation for {@link PulsarListenerContainerFactory}. * @@ -38,6 +41,11 @@ public class ConcurrentPulsarListenerContainerFactory private Integer concurrency; + public ConcurrentPulsarListenerContainerFactory(PulsarConsumerFactory consumerFactory, + PulsarContainerProperties containerProperties, @Nullable ObservationRegistry observationRegistry) { + super(consumerFactory, containerProperties, observationRegistry); + } + /** * Specify the container concurrency. * @param concurrency the number of consumers to create. @@ -48,42 +56,41 @@ public void setConcurrency(Integer concurrency) { @Override protected ConcurrentPulsarMessageListenerContainer createContainerInstance(PulsarListenerEndpoint endpoint) { + PulsarContainerProperties properties = new PulsarContainerProperties(); - Collection topics = endpoint.getTopics(); - String topicPattern = endpoint.getTopicPattern(); - if (!topics.isEmpty()) { - final String[] topics1 = topics.toArray(new String[0]); - properties.setTopics(topics1); + if (!CollectionUtils.isEmpty(endpoint.getTopics())) { + properties.setTopics(endpoint.getTopics().toArray(new String[0])); } - if (StringUtils.hasText(topicPattern)) { - properties.setTopicsPattern(topicPattern); + + if (StringUtils.hasText(endpoint.getTopicPattern())) { + properties.setTopicsPattern(endpoint.getTopicPattern()); } - final String subscriptionName = endpoint.getSubscriptionName(); - if (StringUtils.hasText(subscriptionName)) { + if (StringUtils.hasText(endpoint.getSubscriptionName())) { properties.setSubscriptionName(endpoint.getSubscriptionName()); } + if (endpoint.isBatchListener()) { properties.setBatchListener(endpoint.isBatchListener()); } - final SubscriptionType subscriptionType = endpoint.getSubscriptionType(); - if (subscriptionType != null) { - properties.setSubscriptionType(subscriptionType); + + if (endpoint.getSubscriptionType() != null) { + properties.setSubscriptionType(endpoint.getSubscriptionType()); } properties.setSchemaType(endpoint.getSchemaType()); - return new ConcurrentPulsarMessageListenerContainer(getPulsarConsumerFactory(), properties); + + return new ConcurrentPulsarMessageListenerContainer<>(this.getConsumerFactory(), properties, + this.getObservationRegistry()); } @Override protected void initializeContainer(ConcurrentPulsarMessageListenerContainer instance, PulsarListenerEndpoint endpoint) { - super.initializeContainer(instance, endpoint); - Integer conc = endpoint.getConcurrency(); - if (conc != null) { - instance.setConcurrency(conc); + if (endpoint.getConcurrency() != null) { + instance.setConcurrency(endpoint.getConcurrency()); } else if (this.concurrency != null) { instance.setConcurrency(this.concurrency); diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java index dc096997..bf17e6ba 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java @@ -146,7 +146,7 @@ protected PulsarMessagingMessageListenerAdapter createMessageListener(PulsarM } final ConcurrentPulsarMessageListenerContainer containerInstance = (ConcurrentPulsarMessageListenerContainer) container; - final PulsarContainerProperties pulsarContainerProperties = containerInstance.getPulsarContainerProperties(); + final PulsarContainerProperties pulsarContainerProperties = containerInstance.getContainerProperties(); final SchemaType schemaType = pulsarContainerProperties.getSchemaType(); if (schemaType != SchemaType.NONE) { switch (schemaType) { diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java index 4595c069..01eeda30 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java @@ -20,7 +20,6 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import org.apache.commons.logging.LogFactory; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRouter; import org.apache.pulsar.client.api.Producer; @@ -30,11 +29,8 @@ import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; import org.springframework.beans.factory.BeanNameAware; -import org.springframework.beans.factory.ObjectProvider; -import org.springframework.beans.factory.SmartInitializingSingleton; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; import org.springframework.core.log.LogAccessor; +import org.springframework.lang.Nullable; import org.springframework.pulsar.observation.DefaultPulsarTemplateObservationConvention; import org.springframework.pulsar.observation.PulsarMessageSenderContext; import org.springframework.pulsar.observation.PulsarTemplateObservation; @@ -51,27 +47,22 @@ * @author Chris Bono * @author Alexander Preuß */ -public class PulsarTemplate - implements PulsarOperations, ApplicationContextAware, BeanNameAware, SmartInitializingSingleton { +public class PulsarTemplate implements PulsarOperations, BeanNameAware { - private final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass())); + private final LogAccessor logger = new LogAccessor(this.getClass()); private final PulsarProducerFactory producerFactory; private final List interceptors; - private ApplicationContext applicationContext; + private final ObservationRegistry observationRegistry; + + private final PulsarTemplateObservationConvention observationConvention; private String beanName; private Schema schema; - private boolean observationEnabled; - - private PulsarTemplateObservationConvention observationConvention; - - private ObservationRegistry observationRegistry; - /** * Construct a template instance. * @param producerFactory the factory used to create the backing Pulsar producers. @@ -81,13 +72,31 @@ public PulsarTemplate(PulsarProducerFactory producerFactory) { } /** - * Construct a template instance. + * Construct a template instance with optional interceptors. * @param producerFactory the factory used to create the backing Pulsar producers. * @param interceptors the interceptors to add to the producer. */ public PulsarTemplate(PulsarProducerFactory producerFactory, List interceptors) { + this(producerFactory, interceptors, null, null); + } + + /** + * Construct a template instance with optional interceptors and observation + * configuration. + * @param producerFactory the factory used to create the backing Pulsar producers + * @param interceptors the optional list of interceptors to add to the producer + * @param observationRegistry the registry to record observations with or {@code null} + * to not record observations + * @param observationConvention the optional custom observation convention to use when + * recording observations + */ + public PulsarTemplate(PulsarProducerFactory producerFactory, @Nullable List interceptors, + @Nullable ObservationRegistry observationRegistry, + @Nullable PulsarTemplateObservationConvention observationConvention) { this.producerFactory = producerFactory; this.interceptors = interceptors; + this.observationRegistry = observationRegistry; + this.observationConvention = observationConvention; } @Override @@ -115,11 +124,6 @@ public SendMessageBuilder newMessage(T message) { return new SendMessageBuilderImpl<>(this, message); } - @Override - public void setApplicationContext(ApplicationContext applicationContext) { - this.applicationContext = applicationContext; - } - @Override public void setBeanName(String beanName) { this.beanName = beanName; @@ -133,34 +137,6 @@ public void setSchema(Schema schema) { this.schema = schema; } - /** - * Set to true to enable observation via Micrometer. - * @param observationEnabled true to enable. - */ - public void setObservationEnabled(boolean observationEnabled) { - this.observationEnabled = observationEnabled; - } - - /** - * Set a custom observation convention. - * @param observationConvention the convention. - */ - public void setObservationConvention(PulsarTemplateObservationConvention observationConvention) { - this.observationConvention = observationConvention; - } - - @Override - public void afterSingletonsInstantiated() { - // TODO is this how we want to do this? What about SBAC? - // TODO when would AC be null? Should we assert or at least log the fact if it - // happens? - if (this.observationEnabled && this.observationRegistry == null && this.applicationContext != null) { - ObjectProvider registry = this.applicationContext - .getBeanProvider(ObservationRegistry.class); - this.observationRegistry = registry.getIfUnique(); - } - } - private MessageId doSend(String topic, T message, TypedMessageBuilderCustomizer typedMessageBuilderCustomizer, MessageRouter messageRouter, ProducerBuilderCustomizer producerCustomizer) throws PulsarClientException { try { @@ -186,9 +162,9 @@ private CompletableFuture doSendAsync(String topic, T message, if (typedMessageBuilderCustomizer != null) { typedMessageBuilderCustomizer.customize(messageBuilder); } - senderContext.properties().forEach(messageBuilder::property); // propagate - // props to - // message + // propagate props to message + senderContext.properties().forEach(messageBuilder::property); + return messageBuilder.sendAsync().whenComplete((msgId, ex) -> { if (ex == null) { this.logger.trace(() -> String.format("Sent msg to '%s' topic", topicName)); @@ -210,15 +186,11 @@ private CompletableFuture doSendAsync(String topic, T message, } private Observation newObservation(PulsarMessageSenderContext senderContext) { - Observation observation; - if (!this.observationEnabled || this.observationRegistry == null) { - observation = Observation.NOOP; - } - else { - observation = PulsarTemplateObservation.TEMPLATE_OBSERVATION.observation(this.observationConvention, - DefaultPulsarTemplateObservationConvention.INSTANCE, () -> senderContext, this.observationRegistry); + if (this.observationRegistry == null) { + return Observation.NOOP; } - return observation; + return PulsarTemplateObservation.TEMPLATE_OBSERVATION.observation(this.observationConvention, + DefaultPulsarTemplateObservationConvention.INSTANCE, () -> senderContext, this.observationRegistry); } private Producer prepareProducerForSend(String topic, T message, MessageRouter messageRouter, diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/AbstractPulsarMessageListenerContainer.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/AbstractPulsarMessageListenerContainer.java index 9be102e5..0c9fb38b 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/AbstractPulsarMessageListenerContainer.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/AbstractPulsarMessageListenerContainer.java @@ -16,7 +16,6 @@ package org.springframework.pulsar.listener; -import org.apache.commons.logging.LogFactory; import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.RedeliveryBackoff; @@ -31,6 +30,8 @@ import org.springframework.pulsar.core.PulsarConsumerFactory; import org.springframework.util.Assert; +import io.micrometer.observation.ObservationRegistry; + /** * Base implementation for the {@link PulsarMessageListenerContainer}. * @@ -41,7 +42,13 @@ public abstract class AbstractPulsarMessageListenerContainer implements PulsarMessageListenerContainer, BeanNameAware, ApplicationEventPublisherAware, ApplicationContextAware { - protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass())); // NOSONAR + protected final LogAccessor logger = new LogAccessor(this.getClass()); + + private final PulsarConsumerFactory pulsarConsumerFactory; + + private final PulsarContainerProperties pulsarContainerProperties; + + private final ObservationRegistry observationRegistry; private ApplicationEventPublisher applicationEventPublisher; @@ -49,10 +56,6 @@ public abstract class AbstractPulsarMessageListenerContainer implements Pulsa private ApplicationContext applicationContext; - private final PulsarContainerProperties pulsarContainerProperties; - - protected final PulsarConsumerFactory pulsarConsumerFactory; - private boolean autoStartup = true; private int phase; @@ -71,10 +74,22 @@ public abstract class AbstractPulsarMessageListenerContainer implements Pulsa @SuppressWarnings("unchecked") protected AbstractPulsarMessageListenerContainer(PulsarConsumerFactory pulsarConsumerFactory, - PulsarContainerProperties pulsarContainerProperties) { - this.pulsarContainerProperties = pulsarContainerProperties; + PulsarContainerProperties pulsarContainerProperties, @Nullable ObservationRegistry observationRegistry) { this.pulsarConsumerFactory = (PulsarConsumerFactory) pulsarConsumerFactory; + this.pulsarContainerProperties = pulsarContainerProperties; + this.observationRegistry = observationRegistry; + } + + public PulsarConsumerFactory getPulsarConsumerFactory() { + return this.pulsarConsumerFactory; + } + public PulsarContainerProperties getContainerProperties() { + return this.pulsarContainerProperties; + } + + public ObservationRegistry getObservationRegistry() { + return this.observationRegistry; } @Override @@ -129,14 +144,6 @@ public void setupMessageListener(Object messageListener) { this.pulsarContainerProperties.setMessageListener(messageListener); } - public PulsarContainerProperties getPulsarContainerProperties() { - return this.pulsarContainerProperties; - } - - public PulsarConsumerFactory getPulsarConsumerFactory() { - return this.pulsarConsumerFactory; - } - @Override public boolean isAutoStartup() { return this.autoStartup; @@ -156,10 +163,6 @@ public int getPhase() { return this.phase; } - public PulsarContainerProperties getContainerProperties() { - return this.pulsarContainerProperties; - } - protected abstract void doStart(); protected abstract void doStop(); diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainer.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainer.java index ac274470..c4829677 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainer.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainer.java @@ -25,9 +25,12 @@ import org.springframework.context.ApplicationEventPublisher; import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.lang.Nullable; import org.springframework.pulsar.core.PulsarConsumerFactory; import org.springframework.util.Assert; +import io.micrometer.observation.ObservationRegistry; + /** * Creates a concurrent execution context of {@link DefaultPulsarMessageListenerContainer} * instances based on the {@link #setConcurrency(int) concurrency}. Concurrency > 1 is not @@ -36,6 +39,7 @@ * @param the payload type. * @author Soby Chacko * @author Alexander Preuß + * @author Chris Bono */ public class ConcurrentPulsarMessageListenerContainer extends AbstractPulsarMessageListenerContainer { @@ -46,8 +50,8 @@ public class ConcurrentPulsarMessageListenerContainer extends AbstractPulsarM private final List executors = new ArrayList<>(); public ConcurrentPulsarMessageListenerContainer(PulsarConsumerFactory pulsarConsumerFactory, - PulsarContainerProperties pulsarContainerProperties) { - super(pulsarConsumerFactory, pulsarContainerProperties); + PulsarContainerProperties pulsarContainerProperties, @Nullable ObservationRegistry observationRegistry) { + super(pulsarConsumerFactory, pulsarContainerProperties, observationRegistry); } public int getConcurrency() { @@ -68,8 +72,8 @@ public void setConcurrency(int concurrency) { @Override public void doStart() { if (!isRunning()) { - PulsarContainerProperties containerProperties = getContainerProperties(); + PulsarContainerProperties containerProperties = getContainerProperties(); if (containerProperties.getSubscriptionType() == SubscriptionType.Exclusive && this.concurrency > 1) { throw new IllegalStateException("concurrency > 1 is not allowed on Exclusive subscription type"); } @@ -79,7 +83,6 @@ public void doStart() { for (int i = 0; i < this.concurrency; i++) { DefaultPulsarMessageListenerContainer container = constructContainer(containerProperties); configureChildContainer(i, container); - container.start(); this.containers.add(container); } @@ -87,7 +90,8 @@ public void doStart() { } private DefaultPulsarMessageListenerContainer constructContainer(PulsarContainerProperties containerProperties) { - return new DefaultPulsarMessageListenerContainer<>(this.pulsarConsumerFactory, containerProperties); + return new DefaultPulsarMessageListenerContainer<>(this.getPulsarConsumerFactory(), containerProperties, + this.getObservationRegistry()); } private void configureChildContainer(int index, DefaultPulsarMessageListenerContainer container) { diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java index 26df8f2f..db30a948 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java @@ -45,8 +45,6 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; -import org.springframework.beans.factory.ObjectProvider; -import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationEventPublisher; import org.springframework.core.log.LogAccessor; import org.springframework.core.task.AsyncTaskExecutor; @@ -86,14 +84,19 @@ public class DefaultPulsarMessageListenerContainer extends AbstractPulsarMess public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory pulsarConsumerFactory, PulsarContainerProperties pulsarContainerProperties) { - super(pulsarConsumerFactory, pulsarContainerProperties); + this(pulsarConsumerFactory, pulsarContainerProperties, null); + } + + public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory pulsarConsumerFactory, + PulsarContainerProperties pulsarContainerProperties, @Nullable ObservationRegistry observationRegistry) { + super(pulsarConsumerFactory, pulsarContainerProperties, observationRegistry); this.thisOrParentContainer = this; } @Override protected void doStart() { - PulsarContainerProperties containerProperties = getPulsarContainerProperties(); + PulsarContainerProperties containerProperties = getContainerProperties(); Object messageListenerObject = containerProperties.getMessageListener(); AsyncTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor(); @@ -106,15 +109,8 @@ protected void doStart() { containerProperties.setConsumerTaskExecutor(consumerExecutor); } - ObservationRegistry observationRegistry = null; - ApplicationContext applicationContext = getApplicationContext(); - if (applicationContext != null) { - ObjectProvider registry = applicationContext - .getBeanProvider(ObservationRegistry.class); - observationRegistry = registry.getIfUnique(); - } - - this.listenerConsumer = new Listener(messageListener, observationRegistry); + this.listenerConsumer = new Listener(messageListener, this.getContainerProperties(), + this.getObservationRegistry()); setRunning(true); this.startLatch = new CountDownLatch(1); this.listenerConsumerFuture = consumerExecutor.submit(this.listenerConsumer); @@ -179,26 +175,31 @@ private final class Listener implements SchedulingAwareRunnable { private final PulsarBatchMessageListener batchMessageListener; + private final PulsarContainerProperties containerProperties; + private final ObservationRegistry observationRegistry; private Consumer consumer; private final Set nackableMessages = new HashSet<>(); - private final PulsarContainerProperties containerProperties = getPulsarContainerProperties(); - - private volatile Thread consumerThread; - private final PulsarConsumerErrorHandler pulsarConsumerErrorHandler; - private final boolean isBatchListener = this.containerProperties.isBatchListener(); + private final boolean isBatchListener; - private final AckMode ackMode = this.containerProperties.getAckMode(); + private final AckMode ackMode; - private final SubscriptionType subscriptionType = this.containerProperties.getSubscriptionType(); + private final SubscriptionType subscriptionType; @SuppressWarnings({ "unchecked", "rawtypes" }) - Listener(MessageListener messageListener, @Nullable ObservationRegistry observationRegistry) { + Listener(MessageListener messageListener, PulsarContainerProperties containerProperties, + @Nullable ObservationRegistry observationRegistry) { + + this.containerProperties = containerProperties; + this.isBatchListener = this.containerProperties.isBatchListener(); + this.ackMode = this.containerProperties.getAckMode(); + this.subscriptionType = this.containerProperties.getSubscriptionType(); + if (messageListener instanceof PulsarBatchMessageListener) { this.batchMessageListener = (PulsarBatchMessageListener) messageListener; this.listener = null; @@ -214,16 +215,15 @@ else if (messageListener != null) { this.observationRegistry = observationRegistry; this.pulsarConsumerErrorHandler = getPulsarConsumerErrorHandler(); try { - final PulsarContainerProperties pulsarContainerProperties = getPulsarContainerProperties(); Map propertiesToConsumer = extractDirectConsumerProperties(); populateAllNecessaryPropertiesIfNeedBe(propertiesToConsumer); final BatchReceivePolicy batchReceivePolicy = new BatchReceivePolicy.Builder() - .maxNumMessages(pulsarContainerProperties.getMaxNumMessages()) - .maxNumBytes(pulsarContainerProperties.getMaxNumBytes()) - .timeout(pulsarContainerProperties.getBatchTimeoutMillis(), TimeUnit.MILLISECONDS).build(); - this.consumer = getPulsarConsumerFactory().createConsumer( - (Schema) pulsarContainerProperties.getSchema(), batchReceivePolicy, propertiesToConsumer); + .maxNumMessages(containerProperties.getMaxNumMessages()) + .maxNumBytes(containerProperties.getMaxNumBytes()) + .timeout(containerProperties.getBatchTimeoutMillis(), TimeUnit.MILLISECONDS).build(); + this.consumer = getPulsarConsumerFactory().createConsumer((Schema) containerProperties.getSchema(), + batchReceivePolicy, propertiesToConsumer); Assert.state(this.consumer != null, "Unable to create a consumer"); } catch (PulsarClientException e) { @@ -292,8 +292,6 @@ public boolean isLongLived() { @Override public void run() { publishConsumerStartingEvent(); - this.consumerThread = Thread.currentThread(); - publishConsumerStartedEvent(); AtomicBoolean inRetryMode = new AtomicBoolean(false); AtomicBoolean messagesPendingInBatch = new AtomicBoolean(false); @@ -361,51 +359,7 @@ public void run() { else { for (Message message : messages) { do { - Observation observation; - if (!this.containerProperties.isObservationEnabled() || this.observationRegistry == null) { - observation = Observation.NOOP; - } - else { - observation = PulsarListenerObservation.LISTENER_OBSERVATION.observation( - this.containerProperties.getObservationConvention(), - DefaultPulsarListenerObservationConvention.INSTANCE, - () -> new PulsarMessageReceiverContext(message, getBeanName()), - this.observationRegistry); - } - observation.observe(() -> { - try { - if (this.listener instanceof PulsarAcknowledgingMessageListener) { - this.listener.received(this.consumer, message, - this.ackMode.equals(AckMode.MANUAL) - ? new ConsumerAcknowledgment(this.consumer, message) : null); - } - else if (this.listener != null) { - this.listener.received(this.consumer, message); - } - if (this.ackMode.equals(AckMode.RECORD)) { - handleAck(message); - } - inRetryMode.compareAndSet(true, false); - } - catch (Exception e) { - if (this.pulsarConsumerErrorHandler != null) { - invokeRecordListenerErrorHandler(inRetryMode, message, e); - } - else { - if (this.ackMode.equals(AckMode.RECORD)) { - this.consumer.negativeAcknowledge(message); - } - else if (this.ackMode.equals(AckMode.BATCH)) { - this.nackableMessages.add(message.getMessageId()); - } - else { - throw new IllegalStateException(String.format( - "Exception occurred and message %s was not auto-nacked; switch to AckMode BATCH or RECORD to enable auto-nacks", - message.getMessageId()), e); - } - } - } - }); + newObservation(message).observe(() -> this.dispatchMessageToListener(message, inRetryMode)); } while (inRetryMode.get()); } @@ -417,6 +371,50 @@ else if (this.ackMode.equals(AckMode.BATCH)) { } } + private Observation newObservation(Message message) { + if (this.observationRegistry == null) { + return Observation.NOOP; + } + return PulsarListenerObservation.LISTENER_OBSERVATION.observation( + this.containerProperties.getObservationConvention(), + DefaultPulsarListenerObservationConvention.INSTANCE, + () -> new PulsarMessageReceiverContext(message, getBeanName()), this.observationRegistry); + } + + private void dispatchMessageToListener(Message message, AtomicBoolean inRetryMode) { + try { + if (this.listener instanceof PulsarAcknowledgingMessageListener) { + this.listener.received(this.consumer, message, this.ackMode.equals(AckMode.MANUAL) + ? new ConsumerAcknowledgment(this.consumer, message) : null); + } + else if (this.listener != null) { + this.listener.received(this.consumer, message); + } + if (this.ackMode.equals(AckMode.RECORD)) { + handleAck(message); + } + inRetryMode.compareAndSet(true, false); + } + catch (Exception e) { + if (this.pulsarConsumerErrorHandler != null) { + invokeRecordListenerErrorHandler(inRetryMode, message, e); + } + else { + if (this.ackMode.equals(AckMode.RECORD)) { + this.consumer.negativeAcknowledge(message); + } + else if (this.ackMode.equals(AckMode.BATCH)) { + this.nackableMessages.add(message.getMessageId()); + } + else { + throw new IllegalStateException(String.format( + "Exception occurred and message %s was not auto-nacked; switch to AckMode BATCH or RECORD to enable auto-nacks", + message.getMessageId()), e); + } + } + } + } + /** * Special scenario for batch error handling round1: messages m1,m2,...m10 are * received batch listener throws error on m3 goes through error handle flow and diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarContainerProperties.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarContainerProperties.java index 709885a5..d8819e31 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarContainerProperties.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarContainerProperties.java @@ -65,8 +65,6 @@ public class PulsarContainerProperties { private AckMode ackMode = AckMode.BATCH; - private boolean observationEnabled; - private PulsarListenerObservationConvention observationConvention; private Properties pulsarConsumerProperties = new Properties(); @@ -145,18 +143,6 @@ public void setAckMode(AckMode ackMode) { this.ackMode = ackMode; } - public boolean isObservationEnabled() { - return this.observationEnabled; - } - - /** - * Set to true to enable observations. - * @param observationEnabled true to enable. - */ - public void setObservationEnabled(boolean observationEnabled) { - this.observationEnabled = observationEnabled; - } - public PulsarListenerObservationConvention getObservationConvention() { return this.observationConvention; } diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerTests.java index 9a36c1d9..02ce951b 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerTests.java @@ -53,23 +53,21 @@ public class ConcurrentPulsarMessageListenerContainerTests { @Test @SuppressWarnings("unchecked") void createConcurrentContainerFromFactoryAndVerifyBatchReceivePolicy() { - ConcurrentPulsarListenerContainerFactory factory = new ConcurrentPulsarListenerContainerFactory<>(); - final PulsarConsumerFactory pulsarConsumerFactory = mock(PulsarConsumerFactory.class); - factory.setPulsarConsumerFactory(pulsarConsumerFactory); - - PulsarContainerProperties containerProperties = factory.getContainerProperties(); + PulsarConsumerFactory consumerFactory = mock(PulsarConsumerFactory.class); + PulsarContainerProperties containerProperties = new PulsarContainerProperties(); containerProperties.setBatchTimeoutMillis(60_000); containerProperties.setMaxNumMessages(120); containerProperties.setMaxNumBytes(32000); - - factory.setConcurrency(1); - + ConcurrentPulsarListenerContainerFactory containerFactory = new ConcurrentPulsarListenerContainerFactory<>( + consumerFactory, containerProperties, null); + containerFactory.setConcurrency(1); PulsarListenerEndpoint pulsarListenerEndpoint = mock(PulsarListenerEndpoint.class); when(pulsarListenerEndpoint.getConcurrency()).thenReturn(1); - final ConcurrentPulsarMessageListenerContainer concurrentContainer = factory + ConcurrentPulsarMessageListenerContainer concurrentContainer = containerFactory .createListenerContainer(pulsarListenerEndpoint); - final PulsarContainerProperties pulsarContainerProperties = concurrentContainer.getContainerProperties(); + + PulsarContainerProperties pulsarContainerProperties = concurrentContainer.getContainerProperties(); assertThat(pulsarContainerProperties.getBatchTimeoutMillis()).isEqualTo(60_000); assertThat(pulsarContainerProperties.getMaxNumMessages()).isEqualTo(120); assertThat(pulsarContainerProperties.getMaxNumBytes()).isEqualTo(32_000); @@ -85,7 +83,7 @@ void deadLetterPolicyAppliedOnChildContainer() throws Exception { concurrentContainer.start(); - final DefaultPulsarMessageListenerContainer childContainer = concurrentContainer.getContainers().get(0); + DefaultPulsarMessageListenerContainer childContainer = concurrentContainer.getContainers().get(0); assertThat(childContainer.getDeadLetterPolicy()).isEqualTo(deadLetterPolicy); } @@ -99,7 +97,7 @@ void nackRedeliveryBackoffAppliedOnChildContainer() throws Exception { concurrentContainer.start(); - final DefaultPulsarMessageListenerContainer childContainer = concurrentContainer.getContainers().get(0); + DefaultPulsarMessageListenerContainer childContainer = concurrentContainer.getContainers().get(0); assertThat(childContainer.getNegativeAckRedeliveryBackoff()).isEqualTo(redeliveryBackoff); } @@ -113,7 +111,7 @@ void ackTimeoutRedeliveryBackoffAppliedOnChildContainer() throws Exception { concurrentContainer.start(); - final DefaultPulsarMessageListenerContainer childContainer = concurrentContainer.getContainers().get(0); + DefaultPulsarMessageListenerContainer childContainer = concurrentContainer.getContainers().get(0); assertThat(childContainer.getAckTimeoutkRedeliveryBackoff()).isEqualTo(redeliveryBackoff); } @@ -122,14 +120,13 @@ void ackTimeoutRedeliveryBackoffAppliedOnChildContainer() throws Exception { void pulsarConsumerErrorHandlerAppliedOnChildContainer() throws Exception { PulsarListenerMockComponents env = setupListenerMockComponents(SubscriptionType.Shared); ConcurrentPulsarMessageListenerContainer concurrentContainer = env.concurrentContainer(); - PulsarConsumerErrorHandler pulsarConsumerErrorHandler = new DefaultPulsarConsumerErrorHandler( mock(PulsarMessageRecovererFactory.class), mock(BackOff.class)); concurrentContainer.setPulsarConsumerErrorHandler(pulsarConsumerErrorHandler); concurrentContainer.start(); - final DefaultPulsarMessageListenerContainer childContainer = concurrentContainer.getContainers().get(0); + DefaultPulsarMessageListenerContainer childContainer = concurrentContainer.getContainers().get(0); assertThat(childContainer.getPulsarConsumerErrorHandler()).isEqualTo(pulsarConsumerErrorHandler); } @@ -138,17 +135,14 @@ void pulsarConsumerErrorHandlerAppliedOnChildContainer() throws Exception { void observationConfigAppliedOnChildContainer() throws Exception { PulsarListenerMockComponents env = setupListenerMockComponents(SubscriptionType.Shared); ConcurrentPulsarMessageListenerContainer concurrentContainer = env.concurrentContainer(); - PulsarListenerObservationConvention customObservationConvention = mock( PulsarListenerObservationConvention.class); - concurrentContainer.getContainerProperties().setObservationEnabled(true); concurrentContainer.getContainerProperties().setObservationConvention(customObservationConvention); concurrentContainer.start(); - final DefaultPulsarMessageListenerContainer childContainer = concurrentContainer.getContainers().get(0); + DefaultPulsarMessageListenerContainer childContainer = concurrentContainer.getContainers().get(0); assertThat(childContainer.getContainerProperties().getObservationConvention()) .isSameAs(customObservationConvention); - assertThat(childContainer.getContainerProperties().isObservationEnabled()).isTrue(); } @Test @@ -158,7 +152,6 @@ void basicConcurrencyTesting() throws Exception { PulsarConsumerFactory pulsarConsumerFactory = env.consumerFactory(); Consumer consumer = env.consumer(); ConcurrentPulsarMessageListenerContainer concurrentContainer = env.concurrentContainer(); - concurrentContainer.setConcurrency(3); concurrentContainer.start(); @@ -182,12 +175,10 @@ void exclusiveSubscriptionMustUseSingleThread() throws Exception { @SuppressWarnings("unchecked") private PulsarListenerMockComponents setupListenerMockComponents(SubscriptionType subscriptionType) throws Exception { - PulsarConsumerFactory pulsarConsumerFactory = mock(PulsarConsumerFactory.class); + PulsarConsumerFactory consumerFactory = mock(PulsarConsumerFactory.class); Consumer consumer = mock(Consumer.class); - - when(pulsarConsumerFactory.createConsumer(any(Schema.class), any(BatchReceivePolicy.class), any(Map.class))) + when(consumerFactory.createConsumer(any(Schema.class), any(BatchReceivePolicy.class), any(Map.class))) .thenReturn(consumer); - when(consumer.batchReceive()).thenReturn(mock(Messages.class)); PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties(); @@ -197,9 +188,9 @@ private PulsarListenerMockComponents setupListenerMockComponents(SubscriptionTyp }); ConcurrentPulsarMessageListenerContainer concurrentContainer = new ConcurrentPulsarMessageListenerContainer<>( - pulsarConsumerFactory, pulsarContainerProperties); + consumerFactory, pulsarContainerProperties, null); - return new PulsarListenerMockComponents(pulsarConsumerFactory, consumer, concurrentContainer); + return new PulsarListenerMockComponents(consumerFactory, consumer, concurrentContainer); } private record PulsarListenerMockComponents(PulsarConsumerFactory consumerFactory, diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java index 821bfd17..5262aa75 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java @@ -129,8 +129,8 @@ public PulsarConsumerFactory pulsarConsumerFactory(PulsarClient pulsarClient) @Bean PulsarListenerContainerFactory pulsarListenerContainerFactory( PulsarConsumerFactory pulsarConsumerFactory) { - final ConcurrentPulsarListenerContainerFactory pulsarListenerContainerFactory = new ConcurrentPulsarListenerContainerFactory<>(); - pulsarListenerContainerFactory.setPulsarConsumerFactory(pulsarConsumerFactory); + final ConcurrentPulsarListenerContainerFactory pulsarListenerContainerFactory = new ConcurrentPulsarListenerContainerFactory<>( + pulsarConsumerFactory, new PulsarContainerProperties(), null); return pulsarListenerContainerFactory; } diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationIntegrationTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationIntegrationTests.java index eee680c0..bf211e44 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationIntegrationTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationIntegrationTests.java @@ -45,6 +45,7 @@ import org.springframework.pulsar.core.PulsarProducerFactory; import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.pulsar.core.PulsarTestContainerSupport; +import org.springframework.pulsar.listener.PulsarContainerProperties; import io.micrometer.common.KeyValues; import io.micrometer.core.tck.MeterRegistryAssert; @@ -130,10 +131,9 @@ public PulsarClientConfiguration pulsarClientConfiguration() { } @Bean - public PulsarTemplate pulsarTemplate(PulsarProducerFactory pulsarProducerFactory) { - PulsarTemplate template = new PulsarTemplate<>(pulsarProducerFactory); - template.setObservationEnabled(true); - return template; + public PulsarTemplate pulsarTemplate(PulsarProducerFactory pulsarProducerFactory, + ObservationRegistry observationRegistry) { + return new PulsarTemplate<>(pulsarProducerFactory, null, observationRegistry, null); } @Bean @@ -143,11 +143,9 @@ public PulsarConsumerFactory pulsarConsumerFactory(PulsarClient pulsarClient) @Bean PulsarListenerContainerFactory pulsarListenerContainerFactory( - PulsarConsumerFactory pulsarConsumerFactory) { - final ConcurrentPulsarListenerContainerFactory pulsarListenerContainerFactory = new ConcurrentPulsarListenerContainerFactory<>(); - pulsarListenerContainerFactory.setPulsarConsumerFactory(pulsarConsumerFactory); - pulsarListenerContainerFactory.getContainerProperties().setObservationEnabled(true); - return pulsarListenerContainerFactory; + PulsarConsumerFactory pulsarConsumerFactory, ObservationRegistry observationRegistry) { + return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, + new PulsarContainerProperties(), observationRegistry); } @Bean diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationTests.java index f9405468..6570016f 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationTests.java @@ -51,6 +51,7 @@ import org.springframework.pulsar.core.PulsarProducerFactory; import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.pulsar.core.PulsarTestContainerSupport; +import org.springframework.pulsar.listener.PulsarContainerProperties; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -82,9 +83,9 @@ * * @author Chris Bono */ +@Disabled @SpringJUnitConfig @DirtiesContext -@Disabled public class ObservationTests implements PulsarTestContainerSupport { private static final String LISTENER_ID_TAG = "spring.pulsar.listener.id"; @@ -170,57 +171,54 @@ void sendAndReceiveCreatesExpectedSpansAndMetrics(@Autowired ObservationTestAppL static class ObservationTestAppConfig { @Bean - public PulsarProducerFactory pulsarProducerFactory(PulsarClient pulsarClient) { + PulsarProducerFactory pulsarProducerFactory(PulsarClient pulsarClient) { return new DefaultPulsarProducerFactory<>(pulsarClient, Collections.emptyMap()); } @Bean - public PulsarClientFactoryBean pulsarClientFactoryBean(PulsarClientConfiguration pulsarClientConfiguration) { + PulsarClientFactoryBean pulsarClientFactoryBean(PulsarClientConfiguration pulsarClientConfiguration) { return new PulsarClientFactoryBean(pulsarClientConfiguration); } @Bean - public PulsarClientConfiguration pulsarClientConfiguration() { + PulsarClientConfiguration pulsarClientConfiguration() { return new PulsarClientConfiguration(Map.of("serviceUrl", PulsarTestContainerSupport.getPulsarBrokerUrl())); } @Bean(name = "observationTestsTemplate") - public PulsarTemplate pulsarTemplate(PulsarProducerFactory pulsarProducerFactory) { - PulsarTemplate template = new PulsarTemplate<>(pulsarProducerFactory); - template.setObservationEnabled(true); - template.setObservationConvention(new DefaultPulsarTemplateObservationConvention() { - @Override - public KeyValues getLowCardinalityKeyValues(PulsarMessageSenderContext context) { - return super.getLowCardinalityKeyValues(context).and(SENDER_EXTRA_TAG, context.getBeanName()); - } - }); - return template; + PulsarTemplate pulsarTemplate(PulsarProducerFactory pulsarProducerFactory, + ObservationRegistry observationRegistry) { + return new PulsarTemplate<>(pulsarProducerFactory, null, observationRegistry, + new DefaultPulsarTemplateObservationConvention() { + @Override + public KeyValues getLowCardinalityKeyValues(PulsarMessageSenderContext context) { + return super.getLowCardinalityKeyValues(context).and(SENDER_EXTRA_TAG, + context.getBeanName()); + } + }); } @Bean - public PulsarConsumerFactory pulsarConsumerFactory(PulsarClient pulsarClient) { + PulsarConsumerFactory pulsarConsumerFactory(PulsarClient pulsarClient) { return new DefaultPulsarConsumerFactory<>(pulsarClient, Collections.emptyMap()); } @Bean PulsarListenerContainerFactory pulsarListenerContainerFactory( - PulsarConsumerFactory pulsarConsumerFactory) { - final ConcurrentPulsarListenerContainerFactory pulsarListenerContainerFactory = new ConcurrentPulsarListenerContainerFactory<>(); - pulsarListenerContainerFactory.setPulsarConsumerFactory(pulsarConsumerFactory); - pulsarListenerContainerFactory.getContainerProperties().setObservationEnabled(true); - pulsarListenerContainerFactory.getContainerProperties() - .setObservationConvention(new DefaultPulsarListenerObservationConvention() { - @Override - public KeyValues getLowCardinalityKeyValues(PulsarMessageReceiverContext context) { - // Only add the extra tag for the 1st listener - if (context.getListenerId().equals(OBS2_ID)) { - return super.getLowCardinalityKeyValues(context); - } - return super.getLowCardinalityKeyValues(context).and(RECEIVER_EXTRA_TAG, - context.getListenerId()); - } - }); - return pulsarListenerContainerFactory; + PulsarConsumerFactory pulsarConsumerFactory, ObservationRegistry observationRegistry) { + PulsarContainerProperties containerProperties = new PulsarContainerProperties(); + containerProperties.setObservationConvention(new DefaultPulsarListenerObservationConvention() { + @Override + public KeyValues getLowCardinalityKeyValues(PulsarMessageReceiverContext context) { + // Only add the extra tag for the 1st listener + if (context.getListenerId().equals(OBS2_ID)) { + return super.getLowCardinalityKeyValues(context); + } + return super.getLowCardinalityKeyValues(context).and(RECEIVER_EXTRA_TAG, context.getListenerId()); + } + }); + return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, + new PulsarContainerProperties(), observationRegistry); } @Bean