From 9f1fa188bc8532136d28e863f72459ff01056eee Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Mon, 18 Sep 2023 13:20:05 -0400 Subject: [PATCH 1/2] GH-2806: Pulsar binder defaults properties config - Ensure that the Pulsar binder default properties can be properly expressed via spring.cloud.stream.pulsar.default property prefix. - Add the binder child context bean with the name binderName_binderProducingContext into the parent application context so that individual beans from the binder context can be easily queried. Resolves https://github.com/spring-cloud/spring-cloud-stream/issues/2806 --- ...gHandlerMappingsProviderConfiguration.java | 48 ++++++++ ...ot.autoconfigure.AutoConfiguration.imports | 1 + ...ExtendedBindingDefaultPropertiesTests.java | 114 ++++++++++++++++++ .../stream/binder/DefaultBinderFactory.java | 4 + 4 files changed, 167 insertions(+) create mode 100644 binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/config/ExtendedBindingHandlerMappingsProviderConfiguration.java create mode 100644 binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports create mode 100644 binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarExtendedBindingDefaultPropertiesTests.java diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/config/ExtendedBindingHandlerMappingsProviderConfiguration.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/config/ExtendedBindingHandlerMappingsProviderConfiguration.java new file mode 100644 index 0000000000..b24426079a --- /dev/null +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/config/ExtendedBindingHandlerMappingsProviderConfiguration.java @@ -0,0 +1,48 @@ +/* + * Copyright 2023-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.pulsar.config; + +import java.util.HashMap; +import java.util.Map; + +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.context.properties.source.ConfigurationPropertyName; +import org.springframework.cloud.stream.config.BindingHandlerAdvise; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +/** + * {@link EnableAutoConfiguration Auto-configuration} for extended binding metadata for the Pulsar binder. + * + * @author Soby Chacko + * @since 4.1.0 + */ +@Configuration(proxyBeanMethods = false) +public class ExtendedBindingHandlerMappingsProviderConfiguration { + + @Bean + public BindingHandlerAdvise.MappingsProvider pulsarExtendedPropertiesDefaultMappingsProvider() { + return () -> { + Map mappings = new HashMap<>(); + mappings.put( + ConfigurationPropertyName.of("spring.cloud.stream.pulsar.bindings"), + ConfigurationPropertyName.of("spring.cloud.stream.pulsar.default")); + return mappings; + }; + } +} diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 0000000000..65aab20933 --- /dev/null +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +org.springframework.cloud.stream.binder.pulsar.config.ExtendedBindingHandlerMappingsProviderConfiguration diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarExtendedBindingDefaultPropertiesTests.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarExtendedBindingDefaultPropertiesTests.java new file mode 100644 index 0000000000..7b1e779022 --- /dev/null +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarExtendedBindingDefaultPropertiesTests.java @@ -0,0 +1,114 @@ +/* + * Copyright 2023-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.pulsar; + +import java.util.function.Function; + +import org.apache.pulsar.common.schema.SchemaType; +import org.junit.jupiter.api.Test; + +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.cloud.stream.binder.pulsar.config.ExtendedBindingHandlerMappingsProviderConfiguration; +import org.springframework.cloud.stream.binder.pulsar.properties.PulsarExtendedBindingProperties; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; + + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link ExtendedBindingHandlerMappingsProviderConfiguration}. + * + * @author Soby Chacko + */ +class PulsarExtendedBindingDefaultPropertiesTests implements PulsarTestContainerSupport { + + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withUserConfiguration(DefaultPropertiesTestApp.class) + .withPropertyValues( + "--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), + "--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), + "--spring.cloud.stream.pulsar.binder.partitionCount: 1", + "spring.cloud.stream.pulsar.default.consumer.schema-type: JSON", + "spring.cloud.stream.pulsar.default.consumer.receiverQueueSize: 5000", + "spring.cloud.stream.pulsar.default.consumer.startPaused: true", + "spring.cloud.stream.pulsar.default.consumer.subscription.name: my-subscription", + "spring.cloud.stream.pulsar.default.producer.schema-type: JSON", + "spring.cloud.stream.pulsar.default.producer.blockIfQueueFull: true", + "spring.cloud.stream.pulsar.default.producer.maxPendingMessages: 200", + "spring.cloud.stream.pulsar.default.producer.name: my-producer"); + + @Test + void defaultsUsedWhenNoCustomBindingProperties() { + this.contextRunner.run((context) -> { + assertThat(context) + .hasNotFailed() + .hasBean("pulsar_binderProducingContext"); + ConfigurableApplicationContext pulsarBinderProducingContext = + context.getBean("pulsar_binderProducingContext", ConfigurableApplicationContext.class); + PulsarExtendedBindingProperties extendedBindingProperties = pulsarBinderProducingContext.getBean(PulsarExtendedBindingProperties.class); + assertThat(extendedBindingProperties.getExtendedConsumerProperties("process-in-0")) + .hasFieldOrPropertyWithValue("schemaType", SchemaType.JSON) + .hasFieldOrPropertyWithValue("receiverQueueSize", 5000) + .hasFieldOrPropertyWithValue("subscription.name", "my-subscription") + .hasFieldOrPropertyWithValue("startPaused", true); + assertThat(extendedBindingProperties.getExtendedProducerProperties("process-out-0")) + .hasFieldOrPropertyWithValue("schemaType", SchemaType.JSON) + .hasFieldOrPropertyWithValue("blockIfQueueFull", true) + .hasFieldOrPropertyWithValue("maxPendingMessages", 200) + .hasFieldOrPropertyWithValue("name", "my-producer"); + }); + } + + @Test + void defaultsRespectedWhenCustomBindingProperties() { + this.contextRunner + .withPropertyValues( + "spring.cloud.stream.pulsar.bindings.process-in-0.consumer.receiverQueueSize: 8000", + "spring.cloud.stream.pulsar.bindings.process-out-0.producer.blockIfQueueFull: false", + "spring.cloud.stream.pulsar.bindings.process-out-0.producer.maxPendingMessages: 400") + .run((context) -> { + assertThat(context) + .hasNotFailed() + .hasBean("pulsar_binderProducingContext"); + ConfigurableApplicationContext pulsarBinderProducingContext = + context.getBean("pulsar_binderProducingContext", ConfigurableApplicationContext.class); + PulsarExtendedBindingProperties extendedBindingProperties = pulsarBinderProducingContext.getBean(PulsarExtendedBindingProperties.class); + assertThat(extendedBindingProperties.getExtendedConsumerProperties("process-in-0")) + .hasFieldOrPropertyWithValue("schemaType", SchemaType.JSON) + .hasFieldOrPropertyWithValue("receiverQueueSize", 8000) + .hasFieldOrPropertyWithValue("subscription.name", "my-subscription") + .hasFieldOrPropertyWithValue("startPaused", true); + assertThat(extendedBindingProperties.getExtendedProducerProperties("process-out-0")) + .hasFieldOrPropertyWithValue("schemaType", SchemaType.JSON) + .hasFieldOrPropertyWithValue("blockIfQueueFull", false) + .hasFieldOrPropertyWithValue("maxPendingMessages", 400) + .hasFieldOrPropertyWithValue("name", "my-producer"); + }); + } + + + @EnableAutoConfiguration + static class DefaultPropertiesTestApp { + + @Bean + public Function process() { + return s -> s; + } + } +} diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinderFactory.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinderFactory.java index db3a5e6fc5..4abb426143 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinderFactory.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinderFactory.java @@ -482,6 +482,10 @@ public void onApplicationEvent(ApplicationEvent event) { if (refresh) { binderProducingContext.refresh(); + if (this.context != null) { + this.context.getBeanFactory().registerSingleton(configurationName + "_binderProducingContext", + binderProducingContext); + } if (!useApplicationContextAsParent || "integration".equals(binderType.getDefaultName())) { this.propagateSharedBeans((GenericApplicationContext) this.context, binderProducingContext); } From 55b34805afbe8e36965163534d984bd273295cd2 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 21 Sep 2023 10:03:30 -0400 Subject: [PATCH 2/2] Addressing PR review comments --- .../pulsar/PulsarExtendedBindingDefaultPropertiesTests.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarExtendedBindingDefaultPropertiesTests.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarExtendedBindingDefaultPropertiesTests.java index 7b1e779022..67cc5540b8 100644 --- a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarExtendedBindingDefaultPropertiesTests.java +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarExtendedBindingDefaultPropertiesTests.java @@ -41,9 +41,9 @@ class PulsarExtendedBindingDefaultPropertiesTests implements PulsarTestContainer private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() .withUserConfiguration(DefaultPropertiesTestApp.class) .withPropertyValues( - "--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), - "--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), - "--spring.cloud.stream.pulsar.binder.partitionCount: 1", + "spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), + "spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), + "spring.cloud.stream.pulsar.binder.partitionCount: 1", "spring.cloud.stream.pulsar.default.consumer.schema-type: JSON", "spring.cloud.stream.pulsar.default.consumer.receiverQueueSize: 5000", "spring.cloud.stream.pulsar.default.consumer.startPaused: true",