From 64dd18cc8ebf88852ca1f683820ffcf41f380814 Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Tue, 3 Sep 2024 18:20:18 -0500 Subject: [PATCH] Use PulsarTopicBuilder in Pulsar binder (#2999) Spring for Apache Pulsar introduced support for default tenant and namespace for Pulsar topics in https://github.com/spring-projects/spring-pulsar/commit/6d23378fbb57a50c3a302be136f9173d2854085d. This ensures that all topic names are fully-qualified (using the default tenant and namespace when not fully-qualified). --- .../config/PulsarBinderConfiguration.java | 9 +++-- .../provisioning/PulsarTopicProvisioner.java | 18 +++++++--- .../binder/pulsar/PulsarBinderTests.java | 6 ++-- .../pulsar/PulsarTopicProvisionerTests.java | 33 +++++++++---------- bom/spring-cloud-starter-parent/pom.xml | 2 +- 5 files changed, 41 insertions(+), 27 deletions(-) diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/config/PulsarBinderConfiguration.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/config/PulsarBinderConfiguration.java index be135e29ae..397f58ae43 100644 --- a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/config/PulsarBinderConfiguration.java +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/config/PulsarBinderConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2023-2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ import org.springframework.pulsar.core.PulsarAdministration; import org.springframework.pulsar.core.PulsarConsumerFactory; import org.springframework.pulsar.core.PulsarTemplate; +import org.springframework.pulsar.core.PulsarTopicBuilder; import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.support.header.JacksonUtils; import org.springframework.pulsar.support.header.JsonPulsarHeaderMapper; @@ -39,6 +40,7 @@ * Pulsar binder {@link Configuration}. * * @author Soby Chacko + * @author Chris Bono */ @Configuration(proxyBeanMethods = false) @ConditionalOnMissingBean(Binder.class) @@ -48,8 +50,9 @@ public class PulsarBinderConfiguration { @Bean public PulsarTopicProvisioner pulsarTopicProvisioner(PulsarAdministration pulsarAdministration, - PulsarBinderConfigurationProperties pulsarBinderConfigurationProperties) { - return new PulsarTopicProvisioner(pulsarAdministration, pulsarBinderConfigurationProperties); + PulsarBinderConfigurationProperties pulsarBinderConfigurationProperties, + PulsarTopicBuilder topicBuilder) { + return new PulsarTopicProvisioner(pulsarAdministration, pulsarBinderConfigurationProperties, topicBuilder); } @Bean diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/provisioning/PulsarTopicProvisioner.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/provisioning/PulsarTopicProvisioner.java index 46a2f674ed..f94d3dd8de 100644 --- a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/provisioning/PulsarTopicProvisioner.java +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/provisioning/PulsarTopicProvisioner.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,12 +27,13 @@ import org.springframework.cloud.stream.provisioning.ProvisioningProvider; import org.springframework.lang.Nullable; import org.springframework.pulsar.core.PulsarAdministration; -import org.springframework.pulsar.core.PulsarTopic; +import org.springframework.pulsar.core.PulsarTopicBuilder; /** * Pulsar topic provisioner. * * @author Soby Chacko + * @author Chris Bono */ public class PulsarTopicProvisioner implements ProvisioningProvider, ExtendedProducerProperties> { @@ -41,10 +42,19 @@ public class PulsarTopicProvisioner implements private final PulsarBinderConfigurationProperties pulsarBinderConfigurationProperties; + private final PulsarTopicBuilder topicBuilder; + public PulsarTopicProvisioner(PulsarAdministration pulsarAdministration, PulsarBinderConfigurationProperties pulsarBinderConfigurationProperties) { + this(pulsarAdministration, pulsarBinderConfigurationProperties, new PulsarTopicBuilder()); + } + + public PulsarTopicProvisioner(PulsarAdministration pulsarAdministration, + PulsarBinderConfigurationProperties pulsarBinderConfigurationProperties, + PulsarTopicBuilder topicBuilder) { this.pulsarAdministration = pulsarAdministration; this.pulsarBinderConfigurationProperties = pulsarBinderConfigurationProperties; + this.topicBuilder = topicBuilder; } @Override @@ -53,7 +63,7 @@ public ProducerDestination provisionProducerDestination(String name, throws ProvisioningException { Integer partitionCountFromBinding = pulsarProducerProperties.getExtension().getPartitionCount(); var partitionCount = getPartitionCount(partitionCountFromBinding); - var pulsarTopic = PulsarTopic.builder(name).numberOfPartitions(partitionCount).build(); + var pulsarTopic = this.topicBuilder.name(name).numberOfPartitions(partitionCount).build(); this.pulsarAdministration.createOrModifyTopics(pulsarTopic); return new PulsarDestination(pulsarTopic.topicName(), pulsarTopic.numberOfPartitions()); } @@ -72,7 +82,7 @@ public ConsumerDestination provisionConsumerDestination(String name, String grou throws ProvisioningException { var partitionCountFromBinding = pulsarConsumerProperties.getExtension().getPartitionCount(); var partitionCount = getPartitionCount(partitionCountFromBinding); - var pulsarTopic = PulsarTopic.builder(name).numberOfPartitions(partitionCount).build(); + var pulsarTopic = this.topicBuilder.name(name).numberOfPartitions(partitionCount).build(); this.pulsarAdministration.createOrModifyTopics(pulsarTopic); return new PulsarDestination(pulsarTopic.topicName(), pulsarTopic.numberOfPartitions()); } diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderTests.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderTests.java index 34c3774f85..bdbbdabe76 100644 --- a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderTests.java +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2023-2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,6 +55,7 @@ import org.springframework.pulsar.core.DefaultSchemaResolver; import org.springframework.pulsar.core.PulsarAdministration; import org.springframework.pulsar.core.PulsarTemplate; +import org.springframework.pulsar.core.PulsarTopicBuilder; import org.springframework.pulsar.support.header.JsonPulsarHeaderMapper; import org.springframework.util.Assert; import org.springframework.util.MimeTypeUtils; @@ -101,7 +102,8 @@ protected String getClassUnderTestName() { protected PulsarTestBinder getBinder() { var pulsarAdministration = new PulsarAdministration(PulsarTestContainerSupport.getHttpServiceUrl()); var configProps = new PulsarBinderConfigurationProperties(); - var provisioner = new PulsarTopicProvisioner(pulsarAdministration, configProps); + var topicBuilder = new PulsarTopicBuilder(); + var provisioner = new PulsarTopicProvisioner(pulsarAdministration, configProps, topicBuilder); var producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient); var pulsarTemplate = new PulsarTemplate<>(producerFactory); var consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarTopicProvisionerTests.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarTopicProvisionerTests.java index 0f99083d38..d574f4faad 100644 --- a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarTopicProvisionerTests.java +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarTopicProvisionerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2023-2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,17 +50,7 @@ void provisionThroughProducerBindingWithDefaultPartitioning() { new PulsarProducerProperties()); ProducerDestination producerDestination = pulsarTopicProvisioner.provisionProducerDestination("foo", properties); - verifyAndAssert(pulsarAdministration, producerDestination.getName(), "foo", 0); - } - - private static void verifyAndAssert(PulsarAdministration pulsarAdministration, String actualProducerDestination, - String expectedProducerDestination, int expectedPartitionCount) { - ArgumentCaptor pulsarTopicArgumentCaptor = ArgumentCaptor.forClass(PulsarTopic.class); - verify(pulsarAdministration, times(1)).createOrModifyTopics(pulsarTopicArgumentCaptor.capture()); - assertThat(actualProducerDestination).isEqualTo(expectedProducerDestination); - PulsarTopic pulsarTopic = pulsarTopicArgumentCaptor.getValue(); - assertThat(pulsarTopic.topicName()).isEqualTo(expectedProducerDestination); - assertThat(pulsarTopic.numberOfPartitions()).isEqualTo(expectedPartitionCount); + verifyAndAssert(pulsarAdministration, producerDestination.getName(), "persistent://public/default/foo", 0); } @Test @@ -73,7 +63,7 @@ void provisionThroughConsumerBindingWithDefaultPartitioning() { new PulsarConsumerProperties()); ConsumerDestination consumerDestination = pulsarTopicProvisioner.provisionConsumerDestination("bar", "", properties); - verifyAndAssert(pulsarAdministration, consumerDestination.getName(), "bar", 0); + verifyAndAssert(pulsarAdministration, consumerDestination.getName(), "persistent://public/default/bar", 0); } @Test @@ -87,7 +77,7 @@ void provisioningOnProducerBindingWithPartitionsSetAtTheBinderProperties() { new PulsarProducerProperties()); ProducerDestination producerDestination = pulsarTopicProvisioner.provisionProducerDestination("foo", properties); - verifyAndAssert(pulsarAdministration, producerDestination.getName(), "foo", 4); + verifyAndAssert(pulsarAdministration, producerDestination.getName(), "persistent://public/default/foo", 4); } @Test @@ -101,7 +91,7 @@ void provisioningOnProducerBindingWithPartitionsSetAtTheBindingProperties() { properties.getExtension().setPartitionCount(4); ProducerDestination producerDestination = pulsarTopicProvisioner.provisionProducerDestination("foo", properties); - verifyAndAssert(pulsarAdministration, producerDestination.getName(), "foo", 4); + verifyAndAssert(pulsarAdministration, producerDestination.getName(), "persistent://public/default/foo", 4); } @Test @@ -115,7 +105,7 @@ void provisionThroughConsumerBindingWithPartitionsSetAtTheBinderProperties() { new PulsarConsumerProperties()); ConsumerDestination consumerDestination = pulsarTopicProvisioner.provisionConsumerDestination("bar", "", properties); - verifyAndAssert(pulsarAdministration, consumerDestination.getName(), "bar", 4); + verifyAndAssert(pulsarAdministration, consumerDestination.getName(), "persistent://public/default/bar", 4); } @Test @@ -130,7 +120,16 @@ void provisionThroughConsumerBindingWithPartitionsSetAtTheBindingProperties() { pulsarConsumerProperties); ConsumerDestination consumerDestination = pulsarTopicProvisioner.provisionConsumerDestination("bar", "", properties); - verifyAndAssert(pulsarAdministration, consumerDestination.getName(), "bar", 4); + verifyAndAssert(pulsarAdministration, consumerDestination.getName(), "persistent://public/default/bar", 4); } + private static void verifyAndAssert(PulsarAdministration pulsarAdministration, String actualProducerDestination, + String expectedProducerDestination, int expectedPartitionCount) { + ArgumentCaptor pulsarTopicArgumentCaptor = ArgumentCaptor.forClass(PulsarTopic.class); + verify(pulsarAdministration, times(1)).createOrModifyTopics(pulsarTopicArgumentCaptor.capture()); + assertThat(actualProducerDestination).isEqualTo(expectedProducerDestination); + PulsarTopic pulsarTopic = pulsarTopicArgumentCaptor.getValue(); + assertThat(pulsarTopic.topicName()).isEqualTo(expectedProducerDestination); + assertThat(pulsarTopic.numberOfPartitions()).isEqualTo(expectedPartitionCount); + } } diff --git a/bom/spring-cloud-starter-parent/pom.xml b/bom/spring-cloud-starter-parent/pom.xml index 1cc8f00296..7eee589476 100644 --- a/bom/spring-cloud-starter-parent/pom.xml +++ b/bom/spring-cloud-starter-parent/pom.xml @@ -6,7 +6,7 @@ org.springframework.boot spring-boot-starter-parent - 3.4.0-M1 + 3.4.0-M2 org.springframework.cloud