Skip to content

Commit

Permalink
Add observations auto-configuration
Browse files Browse the repository at this point in the history
See #147
  • Loading branch information
onobc committed Oct 7, 2022
1 parent 47b9f1b commit 7e7ac48
Show file tree
Hide file tree
Showing 17 changed files with 386 additions and 309 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ public void setOutputDir(File outputDir) {
void documentConfigurationProperties() throws IOException {
Snippets snippets = new Snippets(this.configurationPropertyMetadata);
snippets.add("application-properties.pulsar-client", "Pulsar Client Properties", (c) -> c.accept("spring.pulsar.client"));
snippets.add("application-properties.pulsar-producer", "Pulsar Producer Properties", (c) -> c.accept("spring.pulsar.producer"));
snippets.add("application-properties.pulsar-producer", "Pulsar Producer Properties", (c) -> {
c.accept("spring.pulsar.producer");
c.accept("spring.pulsar.template");
});
snippets.add("application-properties.pulsar-consumer", "Pulsar Consumer Properties", (c) -> {
c.accept("spring.pulsar.consumer");
c.accept("spring.pulsar.listener");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import org.springframework.pulsar.config.PulsarListenerBeanNames;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.observation.PulsarListenerObservationConvention;

import io.micrometer.observation.ObservationRegistry;

/**
* Configuration for Pulsar annotation-driven support.
Expand All @@ -47,25 +50,25 @@ public PulsarAnnotationDrivenConfiguration(PulsarProperties pulsarProperties) {
@Bean
@ConditionalOnMissingBean(name = "pulsarListenerContainerFactory")
ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
ObjectProvider<PulsarConsumerFactory<Object>> pulsarConsumerFactory) {
ConcurrentPulsarListenerContainerFactory<Object> factory = new ConcurrentPulsarListenerContainerFactory<>();

final PulsarConsumerFactory<Object> pulsarConsumerFactory1 = pulsarConsumerFactory.getIfAvailable();
factory.setPulsarConsumerFactory(pulsarConsumerFactory1);
ObjectProvider<PulsarConsumerFactory<Object>> consumerFactoryProvider,
ObjectProvider<ObservationRegistry> observationRegistryProvider,
ObjectProvider<PulsarListenerObservationConvention> observationConventionProvider) {

final PulsarContainerProperties containerProperties = factory.getContainerProperties();
PulsarContainerProperties containerProperties = new PulsarContainerProperties();
containerProperties.setSubscriptionType(this.pulsarProperties.getConsumer().getSubscriptionType());
containerProperties.setObservationConvention(observationConventionProvider.getIfUnique());

PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
PulsarProperties.Listener properties = this.pulsarProperties.getListener();

map.from(properties::getSchemaType).to(containerProperties::setSchemaType);
map.from(properties::getAckMode).to(containerProperties::setAckMode);
map.from(properties::getBatchTimeoutMillis).to(containerProperties::setBatchTimeoutMillis);
map.from(properties::getMaxNumBytes).to(containerProperties::setMaxNumBytes);
map.from(properties::getMaxNumMessages).to(containerProperties::setMaxNumMessages);
PulsarProperties.Listener listenerProperties = this.pulsarProperties.getListener();
map.from(listenerProperties::getSchemaType).to(containerProperties::setSchemaType);
map.from(listenerProperties::getAckMode).to(containerProperties::setAckMode);
map.from(listenerProperties::getBatchTimeoutMillis).to(containerProperties::setBatchTimeoutMillis);
map.from(listenerProperties::getMaxNumBytes).to(containerProperties::setMaxNumBytes);
map.from(listenerProperties::getMaxNumMessages).to(containerProperties::setMaxNumMessages);

return factory;
return new ConcurrentPulsarListenerContainerFactory<>(consumerFactoryProvider.getIfAvailable(),
containerProperties, this.pulsarProperties.getListener().isObservationsEnabled()
? observationRegistryProvider.getIfUnique() : null);
}

@Configuration(proxyBeanMethods = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.core.PulsarProducerFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.observation.PulsarTemplateObservationConvention;

import io.micrometer.observation.ObservationRegistry;

/**
* {@link EnableAutoConfiguration Auto-configuration} for Apache Pulsar.
Expand Down Expand Up @@ -89,8 +92,13 @@ public PulsarProducerFactory<?> cachingPulsarProducerFactory(PulsarClient pulsar
@Bean
@ConditionalOnMissingBean(PulsarTemplate.class)
public PulsarTemplate<?> pulsarTemplate(PulsarProducerFactory<?> pulsarProducerFactory,
ObjectProvider<ProducerInterceptor> interceptors) {
return new PulsarTemplate<>(pulsarProducerFactory, interceptors.orderedStream().toList());
ObjectProvider<ProducerInterceptor> interceptorsProvider,
ObjectProvider<ObservationRegistry> observationRegistryProvider,
ObjectProvider<PulsarTemplateObservationConvention> observationConventionProvider) {
return new PulsarTemplate<>(pulsarProducerFactory, interceptorsProvider.orderedStream().toList(),
this.properties.getTemplate().isObservationsEnabled() ? observationRegistryProvider.getIfUnique()
: null,
observationConventionProvider.getIfUnique());
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public class PulsarProperties {

private final Producer producer = new Producer();

private final Template template = new Template();

private final Admin admin = new Admin();

public Consumer getConsumer() {
Expand All @@ -80,6 +82,10 @@ public Producer getProducer() {
return this.producer;
}

public Template getTemplate() {
return this.template;
}

public Admin getAdministration() {
return this.admin;
}
Expand Down Expand Up @@ -691,6 +697,24 @@ public Map<String, Object> buildProperties() {

}

public static class Template {

/**
* Whether to record observations for send operations when the Observations API is
* available.
*/
private boolean observationsEnabled = true;

public boolean isObservationsEnabled() {
return this.observationsEnabled;
}

public void setObservationsEnabled(boolean observationsEnabled) {
this.observationsEnabled = observationsEnabled;
}

}

public static class Cache {

/** Time period to expire unused entries in the cache. */
Expand Down Expand Up @@ -1339,6 +1363,12 @@ public static class Listener {
*/
private int batchTimeoutMillis = 100;

/**
* Whether to record observations for receive operations when the Observations API
* is available.
*/
private boolean observationsEnabled = true;

public AckMode getAckMode() {
return this.ackMode;
}
Expand Down Expand Up @@ -1379,6 +1409,14 @@ public void setBatchTimeoutMillis(int batchTimeoutMillis) {
this.batchTimeoutMillis = batchTimeoutMillis;
}

public boolean isObservationsEnabled() {
return this.observationsEnabled;
}

public void setObservationsEnabled(boolean observationsEnabled) {
this.observationsEnabled = observationsEnabled;
}

}

public static class Admin {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@
import org.springframework.pulsar.core.PulsarProducerFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.listener.DefaultPulsarMessageListenerContainer;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.observation.PulsarListenerObservationConvention;
import org.springframework.pulsar.observation.PulsarTemplateObservationConvention;

import io.micrometer.observation.ObservationRegistry;

/**
* Autoconfiguration tests for {@link PulsarAutoConfiguration}.
Expand Down Expand Up @@ -230,6 +235,70 @@ void authParamMapConvertedToEncodedParamString() {

}

@Nested
class ObservationAutoConfigurationTests {

@Test
void templateObservationsEnabledByDefault() {
ObservationRegistry observationRegistry = mock(ObservationRegistry.class);
contextRunner.withBean("observationRegistry", ObservationRegistry.class, () -> observationRegistry)
.run((context -> assertThat(context).hasNotFailed().getBean(PulsarTemplate.class)
.extracting("observationRegistry").isSameAs(observationRegistry)));
}

@Test
void templateObservationsCanBeDisabled() {
ObservationRegistry observationRegistry = mock(ObservationRegistry.class);
contextRunner.withPropertyValues("spring.pulsar.template.observations-enabled=false")
.withBean("observationRegistry", ObservationRegistry.class, () -> observationRegistry)
.run((context -> assertThat(context).hasNotFailed().getBean(PulsarTemplate.class)
.extracting("observationRegistry").isNull()));
}

@Test
void templateObservationsWithCustomConvention() {
ObservationRegistry observationRegistry = mock(ObservationRegistry.class);
PulsarTemplateObservationConvention customConvention = mock(PulsarTemplateObservationConvention.class);
contextRunner.withBean("observationRegistry", ObservationRegistry.class, () -> observationRegistry)
.withBean("customConvention", PulsarTemplateObservationConvention.class, () -> customConvention)
.run((context -> assertThat(context).hasNotFailed().getBean(PulsarTemplate.class)
.extracting("observationConvention").isSameAs(customConvention)));
}

@Test
void listenerObservationsEnabledByDefault() {
ObservationRegistry observationRegistry = mock(ObservationRegistry.class);
contextRunner.withBean("observationRegistry", ObservationRegistry.class, () -> observationRegistry)
.run((context -> assertThat(context).hasNotFailed()
.getBean(ConcurrentPulsarListenerContainerFactory.class).extracting("observationRegistry")
.isSameAs(observationRegistry)));
}

@Test
void listenerObservationsCanBeDisabled() {
ObservationRegistry observationRegistry = mock(ObservationRegistry.class);
contextRunner.withPropertyValues("spring.pulsar.listener.observations-enabled=false")
.withBean("observationRegistry", ObservationRegistry.class, () -> observationRegistry)
.run((context -> assertThat(context).hasNotFailed()
.getBean(ConcurrentPulsarListenerContainerFactory.class).extracting("observationRegistry")
.isNull()));
}

@Test
void listenerObservationsWithCustomConvention() {
ObservationRegistry observationRegistry = mock(ObservationRegistry.class);
PulsarListenerObservationConvention customConvention = mock(PulsarListenerObservationConvention.class);
contextRunner.withBean("observationRegistry", ObservationRegistry.class, () -> observationRegistry)
.withBean("customConvention", PulsarListenerObservationConvention.class, () -> customConvention)
.run((context -> assertThat(context).hasNotFailed()
.getBean(ConcurrentPulsarListenerContainerFactory.class)
.extracting(ConcurrentPulsarListenerContainerFactory<Object>::getContainerProperties)
.extracting(PulsarContainerProperties::getObservationConvention)
.isSameAs(customConvention)));
}

}

@Nested
class ProducerFactoryAutoConfigurationTests {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,42 @@

package org.springframework.pulsar.config;

import org.apache.commons.logging.LogFactory;
import org.apache.pulsar.client.api.Schema;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.listener.AbstractPulsarMessageListenerContainer;
import org.springframework.pulsar.listener.AckMode;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.support.JavaUtils;
import org.springframework.pulsar.support.MessageConverter;

import io.micrometer.observation.ObservationRegistry;

/**
* Base {@link PulsarListenerContainerFactory} implementation.
*
* @param <C> the {@link AbstractPulsarMessageListenerContainer} implementation type.
* @param <T> Message payload type.
* @author Soby Chacko
* @author Chris Bono
*/
public abstract class AbstractPulsarListenerContainerFactory<C extends AbstractPulsarMessageListenerContainer<T>, T>
implements PulsarListenerContainerFactory<C>, ApplicationEventPublisherAware, InitializingBean,
ApplicationContextAware {
implements PulsarListenerContainerFactory<C>, ApplicationEventPublisherAware, ApplicationContextAware {

protected final LogAccessor logger = new LogAccessor(this.getClass());

protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR
// protected
private final PulsarConsumerFactory<? super T> consumerFactory;

private final PulsarContainerProperties containerProperties = new PulsarContainerProperties();
private final PulsarContainerProperties containerProperties;

private PulsarConsumerFactory<? super T> consumerFactory;
private final ObservationRegistry observationRegistry;

private Boolean autoStartup;

Expand All @@ -63,19 +65,30 @@ public abstract class AbstractPulsarListenerContainerFactory<C extends AbstractP

private ApplicationContext applicationContext;

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

public void setPulsarConsumerFactory(PulsarConsumerFactory<? super T> consumerFactory) {
protected AbstractPulsarListenerContainerFactory(PulsarConsumerFactory<? super T> consumerFactory,
PulsarContainerProperties containerProperties, @Nullable ObservationRegistry observationRegistry) {
this.consumerFactory = consumerFactory;
this.containerProperties = containerProperties;
this.observationRegistry = observationRegistry;
}

public PulsarConsumerFactory<? super T> getPulsarConsumerFactory() {
protected PulsarConsumerFactory<? super T> getConsumerFactory() {
return this.consumerFactory;
}

protected ObservationRegistry getObservationRegistry() {
return this.observationRegistry;
}

public PulsarContainerProperties getContainerProperties() {
return this.containerProperties;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

public void setAutoStartup(Boolean autoStartup) {
this.autoStartup = autoStartup;
}
Expand All @@ -92,10 +105,6 @@ public void setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
}

public Boolean isBatchListener() {
return this.batchListener;
}

public void setBatchListener(Boolean batchListener) {
this.batchListener = batchListener;
}
Expand All @@ -105,15 +114,6 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
this.applicationEventPublisher = applicationEventPublisher;
}

public PulsarContainerProperties getContainerProperties() {
return this.containerProperties;
}

@Override
public void afterPropertiesSet() {

}

@SuppressWarnings("unchecked")
@Override
public C createListenerContainer(PulsarListenerEndpoint endpoint) {
Expand All @@ -138,7 +138,7 @@ private void configureEndpoint(AbstractPulsarListenerEndpoint<C> aplEndpoint) {
}

protected void initializeContainer(C instance, PulsarListenerEndpoint endpoint) {
PulsarContainerProperties instanceProperties = instance.getPulsarContainerProperties();
PulsarContainerProperties instanceProperties = instance.getContainerProperties();

if (instanceProperties.getSchemaType() == null) {
JavaUtils.INSTANCE.acceptIfNotNull(this.containerProperties.getSchemaType(),
Expand Down Expand Up @@ -171,7 +171,6 @@ else if (this.autoStartup != null) {
instanceProperties.setMaxNumMessages(this.containerProperties.getMaxNumMessages());
instanceProperties.setMaxNumBytes(this.containerProperties.getMaxNumBytes());
instanceProperties.setBatchTimeoutMillis(this.containerProperties.getBatchTimeoutMillis());
instanceProperties.setObservationEnabled(this.containerProperties.isObservationEnabled());
instanceProperties.setObservationConvention(this.containerProperties.getObservationConvention());

JavaUtils.INSTANCE.acceptIfNotNull(this.phase, instance::setPhase)
Expand Down
Loading

0 comments on commit 7e7ac48

Please sign in to comment.