Skip to content

Commit

Permalink
Use PulsarTopicBuilder in Pulsar binder (#2999)
Browse files Browse the repository at this point in the history
Spring for Apache Pulsar introduced support for default tenant and namespace
for Pulsar topics in spring-projects/spring-pulsar@6d23378.
This ensures that all topic names are fully-qualified (using the default
tenant and namespace when not fully-qualified).
  • Loading branch information
onobc authored Sep 3, 2024
1 parent fe1392e commit 64dd18c
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023-2023 the original author or authors.
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,7 @@
import org.springframework.pulsar.core.PulsarAdministration;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.core.PulsarTopicBuilder;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.support.header.JacksonUtils;
import org.springframework.pulsar.support.header.JsonPulsarHeaderMapper;
Expand All @@ -39,6 +40,7 @@
* Pulsar binder {@link Configuration}.
*
* @author Soby Chacko
* @author Chris Bono
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(Binder.class)
Expand All @@ -48,8 +50,9 @@ public class PulsarBinderConfiguration {

@Bean
public PulsarTopicProvisioner pulsarTopicProvisioner(PulsarAdministration pulsarAdministration,
PulsarBinderConfigurationProperties pulsarBinderConfigurationProperties) {
return new PulsarTopicProvisioner(pulsarAdministration, pulsarBinderConfigurationProperties);
PulsarBinderConfigurationProperties pulsarBinderConfigurationProperties,
PulsarTopicBuilder topicBuilder) {
return new PulsarTopicProvisioner(pulsarAdministration, pulsarBinderConfigurationProperties, topicBuilder);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,12 +27,13 @@
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.core.PulsarAdministration;
import org.springframework.pulsar.core.PulsarTopic;
import org.springframework.pulsar.core.PulsarTopicBuilder;

/**
* Pulsar topic provisioner.
*
* @author Soby Chacko
* @author Chris Bono
*/
public class PulsarTopicProvisioner implements
ProvisioningProvider<ExtendedConsumerProperties<PulsarConsumerProperties>, ExtendedProducerProperties<PulsarProducerProperties>> {
Expand All @@ -41,10 +42,19 @@ public class PulsarTopicProvisioner implements

private final PulsarBinderConfigurationProperties pulsarBinderConfigurationProperties;

private final PulsarTopicBuilder topicBuilder;

public PulsarTopicProvisioner(PulsarAdministration pulsarAdministration,
PulsarBinderConfigurationProperties pulsarBinderConfigurationProperties) {
this(pulsarAdministration, pulsarBinderConfigurationProperties, new PulsarTopicBuilder());
}

public PulsarTopicProvisioner(PulsarAdministration pulsarAdministration,
PulsarBinderConfigurationProperties pulsarBinderConfigurationProperties,
PulsarTopicBuilder topicBuilder) {
this.pulsarAdministration = pulsarAdministration;
this.pulsarBinderConfigurationProperties = pulsarBinderConfigurationProperties;
this.topicBuilder = topicBuilder;
}

@Override
Expand All @@ -53,7 +63,7 @@ public ProducerDestination provisionProducerDestination(String name,
throws ProvisioningException {
Integer partitionCountFromBinding = pulsarProducerProperties.getExtension().getPartitionCount();
var partitionCount = getPartitionCount(partitionCountFromBinding);
var pulsarTopic = PulsarTopic.builder(name).numberOfPartitions(partitionCount).build();
var pulsarTopic = this.topicBuilder.name(name).numberOfPartitions(partitionCount).build();
this.pulsarAdministration.createOrModifyTopics(pulsarTopic);
return new PulsarDestination(pulsarTopic.topicName(), pulsarTopic.numberOfPartitions());
}
Expand All @@ -72,7 +82,7 @@ public ConsumerDestination provisionConsumerDestination(String name, String grou
throws ProvisioningException {
var partitionCountFromBinding = pulsarConsumerProperties.getExtension().getPartitionCount();
var partitionCount = getPartitionCount(partitionCountFromBinding);
var pulsarTopic = PulsarTopic.builder(name).numberOfPartitions(partitionCount).build();
var pulsarTopic = this.topicBuilder.name(name).numberOfPartitions(partitionCount).build();
this.pulsarAdministration.createOrModifyTopics(pulsarTopic);
return new PulsarDestination(pulsarTopic.topicName(), pulsarTopic.numberOfPartitions());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023-2023 the original author or authors.
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,6 +55,7 @@
import org.springframework.pulsar.core.DefaultSchemaResolver;
import org.springframework.pulsar.core.PulsarAdministration;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.core.PulsarTopicBuilder;
import org.springframework.pulsar.support.header.JsonPulsarHeaderMapper;
import org.springframework.util.Assert;
import org.springframework.util.MimeTypeUtils;
Expand Down Expand Up @@ -101,7 +102,8 @@ protected String getClassUnderTestName() {
protected PulsarTestBinder getBinder() {
var pulsarAdministration = new PulsarAdministration(PulsarTestContainerSupport.getHttpServiceUrl());
var configProps = new PulsarBinderConfigurationProperties();
var provisioner = new PulsarTopicProvisioner(pulsarAdministration, configProps);
var topicBuilder = new PulsarTopicBuilder();
var provisioner = new PulsarTopicProvisioner(pulsarAdministration, configProps, topicBuilder);
var producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient);
var pulsarTemplate = new PulsarTemplate<>(producerFactory);
var consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023-2023 the original author or authors.
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,17 +50,7 @@ void provisionThroughProducerBindingWithDefaultPartitioning() {
new PulsarProducerProperties());
ProducerDestination producerDestination = pulsarTopicProvisioner.provisionProducerDestination("foo",
properties);
verifyAndAssert(pulsarAdministration, producerDestination.getName(), "foo", 0);
}

private static void verifyAndAssert(PulsarAdministration pulsarAdministration, String actualProducerDestination,
String expectedProducerDestination, int expectedPartitionCount) {
ArgumentCaptor<PulsarTopic> pulsarTopicArgumentCaptor = ArgumentCaptor.forClass(PulsarTopic.class);
verify(pulsarAdministration, times(1)).createOrModifyTopics(pulsarTopicArgumentCaptor.capture());
assertThat(actualProducerDestination).isEqualTo(expectedProducerDestination);
PulsarTopic pulsarTopic = pulsarTopicArgumentCaptor.getValue();
assertThat(pulsarTopic.topicName()).isEqualTo(expectedProducerDestination);
assertThat(pulsarTopic.numberOfPartitions()).isEqualTo(expectedPartitionCount);
verifyAndAssert(pulsarAdministration, producerDestination.getName(), "persistent://public/default/foo", 0);
}

@Test
Expand All @@ -73,7 +63,7 @@ void provisionThroughConsumerBindingWithDefaultPartitioning() {
new PulsarConsumerProperties());
ConsumerDestination consumerDestination = pulsarTopicProvisioner.provisionConsumerDestination("bar", "",
properties);
verifyAndAssert(pulsarAdministration, consumerDestination.getName(), "bar", 0);
verifyAndAssert(pulsarAdministration, consumerDestination.getName(), "persistent://public/default/bar", 0);
}

@Test
Expand All @@ -87,7 +77,7 @@ void provisioningOnProducerBindingWithPartitionsSetAtTheBinderProperties() {
new PulsarProducerProperties());
ProducerDestination producerDestination = pulsarTopicProvisioner.provisionProducerDestination("foo",
properties);
verifyAndAssert(pulsarAdministration, producerDestination.getName(), "foo", 4);
verifyAndAssert(pulsarAdministration, producerDestination.getName(), "persistent://public/default/foo", 4);
}

@Test
Expand All @@ -101,7 +91,7 @@ void provisioningOnProducerBindingWithPartitionsSetAtTheBindingProperties() {
properties.getExtension().setPartitionCount(4);
ProducerDestination producerDestination = pulsarTopicProvisioner.provisionProducerDestination("foo",
properties);
verifyAndAssert(pulsarAdministration, producerDestination.getName(), "foo", 4);
verifyAndAssert(pulsarAdministration, producerDestination.getName(), "persistent://public/default/foo", 4);
}

@Test
Expand All @@ -115,7 +105,7 @@ void provisionThroughConsumerBindingWithPartitionsSetAtTheBinderProperties() {
new PulsarConsumerProperties());
ConsumerDestination consumerDestination = pulsarTopicProvisioner.provisionConsumerDestination("bar", "",
properties);
verifyAndAssert(pulsarAdministration, consumerDestination.getName(), "bar", 4);
verifyAndAssert(pulsarAdministration, consumerDestination.getName(), "persistent://public/default/bar", 4);
}

@Test
Expand All @@ -130,7 +120,16 @@ void provisionThroughConsumerBindingWithPartitionsSetAtTheBindingProperties() {
pulsarConsumerProperties);
ConsumerDestination consumerDestination = pulsarTopicProvisioner.provisionConsumerDestination("bar", "",
properties);
verifyAndAssert(pulsarAdministration, consumerDestination.getName(), "bar", 4);
verifyAndAssert(pulsarAdministration, consumerDestination.getName(), "persistent://public/default/bar", 4);
}

private static void verifyAndAssert(PulsarAdministration pulsarAdministration, String actualProducerDestination,
String expectedProducerDestination, int expectedPartitionCount) {
ArgumentCaptor<PulsarTopic> pulsarTopicArgumentCaptor = ArgumentCaptor.forClass(PulsarTopic.class);
verify(pulsarAdministration, times(1)).createOrModifyTopics(pulsarTopicArgumentCaptor.capture());
assertThat(actualProducerDestination).isEqualTo(expectedProducerDestination);
PulsarTopic pulsarTopic = pulsarTopicArgumentCaptor.getValue();
assertThat(pulsarTopic.topicName()).isEqualTo(expectedProducerDestination);
assertThat(pulsarTopic.numberOfPartitions()).isEqualTo(expectedPartitionCount);
}
}
2 changes: 1 addition & 1 deletion bom/spring-cloud-starter-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.4.0-M1</version>
<version>3.4.0-M2</version>
<relativePath/>
</parent>
<groupId>org.springframework.cloud</groupId>
Expand Down

0 comments on commit 64dd18c

Please sign in to comment.