From 8dcff29f7cb89bc2b9b9fe923476fbfd3ff845f0 Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Mon, 9 Sep 2024 16:54:56 -0500 Subject: [PATCH] Add common container factory interface (#829) This introduces a common interface for all message listener containers. Prior to this commit, the reader and listener containers had no common abstraction. This is needed to introduce a generic container factory customizer in Spring Boot. --- .../antora/modules/ROOT/pages/whats-new.adoc | 11 ++++ ...eactivePulsarListenerContainerFactory.java | 22 +++----- ...vePulsarListenerContainerFactoryTests.java | 28 +++++++--- .../ReactivePulsarListenerSpelTests.java | 6 +- ...bstractPulsarListenerContainerFactory.java | 4 +- .../AbstractPulsarReaderContainerFactory.java | 4 +- .../DefaultPulsarReaderContainerFactory.java | 4 +- .../GenericListenerEndpointRegistry.java | 4 +- .../config/GenericReaderEndpointRegistry.java | 4 +- .../config/ListenerContainerFactory.java | 23 ++++---- .../pulsar/config/ListenerEndpoint.java | 4 +- .../pulsar/config/PulsarContainerFactory.java | 45 +++++++++++++++ .../pulsar/config/PulsarReaderEndpoint.java | 4 +- .../pulsar/config/ReaderContainerFactory.java | 31 ++++++++-- ...ntPulsarListenerContainerFactoryTests.java | 28 +++++++--- ...aultPulsarReaderContainerFactoryTests.java | 56 +++++++++++++++++++ ...ntPulsarMessageListenerContainerTests.java | 2 +- .../listener/PulsarListenerSpelTests.java | 8 +-- 18 files changed, 222 insertions(+), 66 deletions(-) create mode 100644 spring-pulsar/src/main/java/org/springframework/pulsar/config/PulsarContainerFactory.java create mode 100644 spring-pulsar/src/test/java/org/springframework/pulsar/config/DefaultPulsarReaderContainerFactoryTests.java diff --git a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index aa1786868..0f70947c2 100644 --- a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -24,6 +24,17 @@ When using Spring Boot the `PulsarTopicBuilder` is now a registered bean that is Therefore, if you are using Spring Boot, you can simply inject the builder where needed. Otherwise, use one of the `PulsarTopicBuilder` constructors directly. +==== Listener/ReaderContainerFactory +The `PulsarContainerFactory` common interface was introduced to bridge the gap between listener and reader container factories. +As part of this, the following APIs were deprecated, copied, and renamed: + +- `ListenerContainerFactory#createListenerContainer` replaced with `ListenerContainerFactory#createRegisteredContainer` + +- `ReaderContainerFactory#createReaderContainer(E endpoint)` replaced with `ReaderContainerFactory#createRegisteredContainer` + +- `ReaderContainerFactory#createReaderContainer(String... topics)` replaced with `ReaderContainerFactory#createContainer` + + === Breaking Changes ==== PulsarTopic# diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactory.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactory.java index f9a105c23..a0c1dfacb 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactory.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -139,25 +139,21 @@ public DefaultReactivePulsarMessageListenerContainer createContainerInstance( return new DefaultReactivePulsarMessageListenerContainer<>(this.getConsumerFactory(), containerProps); } + @SuppressWarnings("rawtypes") @Override - public DefaultReactivePulsarMessageListenerContainer createListenerContainer( + public DefaultReactivePulsarMessageListenerContainer createRegisteredContainer( ReactivePulsarListenerEndpoint endpoint) { - DefaultReactivePulsarMessageListenerContainer instance = createContainerInstance(endpoint); - if (endpoint instanceof AbstractReactivePulsarListenerEndpoint) { - configureEndpoint((AbstractReactivePulsarListenerEndpoint) endpoint); + var instance = createContainerInstance(endpoint); + if (endpoint instanceof AbstractReactivePulsarListenerEndpoint abstractReactiveEndpoint) { + if (abstractReactiveEndpoint.getFluxListener() == null) { + JavaUtils.INSTANCE.acceptIfNotNull(this.fluxListener, abstractReactiveEndpoint::setFluxListener); + } } - endpoint.setupListenerContainer(instance, this.messageConverter); initializeContainer(instance, endpoint); return instance; } - private void configureEndpoint(AbstractReactivePulsarListenerEndpoint aplEndpoint) { - if (aplEndpoint.getFluxListener() == null) { - JavaUtils.INSTANCE.acceptIfNotNull(this.fluxListener, aplEndpoint::setFluxListener); - } - } - @Override public DefaultReactivePulsarMessageListenerContainer createContainer(String... topics) { ReactivePulsarListenerEndpoint endpoint = new ReactivePulsarListenerEndpoint<>() { @@ -168,7 +164,7 @@ public List getTopics() { } }; - DefaultReactivePulsarMessageListenerContainer container = createContainerInstance(endpoint); + var container = createContainerInstance(endpoint); initializeContainer(container, endpoint); return container; } diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactoryTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactoryTests.java index e05d5bd02..ae7fd2e29 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactoryTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactoryTests.java @@ -18,6 +18,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import org.apache.pulsar.client.api.SubscriptionType; @@ -32,6 +34,18 @@ */ class DefaultReactivePulsarListenerContainerFactoryTests { + @SuppressWarnings({ "removal", "unchecked" }) + @Test + void deprecatedCreateListenerContainerCallsReplacementApi() { + var containerFactory = spy(new DefaultReactivePulsarListenerContainerFactory( + mock(ReactivePulsarConsumerFactory.class), new ReactivePulsarContainerProperties<>())); + var endpoint = mock(ReactivePulsarListenerEndpoint.class); + when(endpoint.getConcurrency()).thenReturn(1); + var createdContainer = containerFactory.createListenerContainer(endpoint); + assertThat(createdContainer).isNotNull(); + verify(containerFactory).createRegisteredContainer(endpoint); + } + @SuppressWarnings("unchecked") @Nested class SubscriptionTypeFrom { @@ -44,7 +58,7 @@ void factoryPropsUsedWhenNotSetOnEndpoint() { mock(ReactivePulsarConsumerFactory.class), factoryProps); var endpoint = mock(ReactivePulsarListenerEndpoint.class); when(endpoint.getConcurrency()).thenReturn(1); - var createdContainer = containerFactory.createListenerContainer(endpoint); + var createdContainer = containerFactory.createRegisteredContainer(endpoint); assertThat(createdContainer.getContainerProperties().getSubscriptionType()) .isEqualTo(SubscriptionType.Shared); } @@ -58,7 +72,7 @@ void endpointTakesPrecedenceOverFactoryProps() { var endpoint = mock(ReactivePulsarListenerEndpoint.class); when(endpoint.getConcurrency()).thenReturn(1); when(endpoint.getSubscriptionType()).thenReturn(SubscriptionType.Failover); - var createdContainer = containerFactory.createListenerContainer(endpoint); + var createdContainer = containerFactory.createRegisteredContainer(endpoint); assertThat(createdContainer.getContainerProperties().getSubscriptionType()) .isEqualTo(SubscriptionType.Failover); } @@ -70,7 +84,7 @@ void defaultUsedWhenNotSetOnEndpointNorFactoryProps() { mock(ReactivePulsarConsumerFactory.class), factoryProps); var endpoint = mock(ReactivePulsarListenerEndpoint.class); when(endpoint.getConcurrency()).thenReturn(1); - var createdContainer = containerFactory.createListenerContainer(endpoint); + var createdContainer = containerFactory.createRegisteredContainer(endpoint); assertThat(createdContainer.getContainerProperties().getSubscriptionType()) .isEqualTo(SubscriptionType.Exclusive); @@ -90,7 +104,7 @@ void factoryPropsUsedWhenNotSetOnEndpoint() { mock(ReactivePulsarConsumerFactory.class), factoryProps); var endpoint = mock(ReactivePulsarListenerEndpoint.class); when(endpoint.getConcurrency()).thenReturn(1); - var createdContainer = containerFactory.createListenerContainer(endpoint); + var createdContainer = containerFactory.createRegisteredContainer(endpoint); assertThat(createdContainer.getContainerProperties().getSubscriptionName()) .isEqualTo("my-factory-subscription"); } @@ -104,7 +118,7 @@ void endpointTakesPrecedenceOverFactoryProps() { var endpoint = mock(ReactivePulsarListenerEndpoint.class); when(endpoint.getConcurrency()).thenReturn(1); when(endpoint.getSubscriptionName()).thenReturn("my-endpoint-subscription"); - var createdContainer = containerFactory.createListenerContainer(endpoint); + var createdContainer = containerFactory.createRegisteredContainer(endpoint); assertThat(createdContainer.getContainerProperties().getSubscriptionName()) .isEqualTo("my-endpoint-subscription"); } @@ -117,10 +131,10 @@ void defaultUsedWhenNotSetOnEndpointNorFactoryProps() { var endpoint = mock(ReactivePulsarListenerEndpoint.class); when(endpoint.getConcurrency()).thenReturn(1); - var container1 = containerFactory.createListenerContainer(endpoint); + var container1 = containerFactory.createRegisteredContainer(endpoint); assertThat(container1.getContainerProperties().getSubscriptionName()) .startsWith("org.springframework.Pulsar.ReactivePulsarListenerEndpointContainer#"); - var container2 = containerFactory.createListenerContainer(endpoint); + var container2 = containerFactory.createRegisteredContainer(endpoint); assertThat(container2.getContainerProperties().getSubscriptionName()) .startsWith("org.springframework.Pulsar.ReactivePulsarListenerEndpointContainer#"); assertThat(container1.getContainerProperties().getSubscriptionName()) diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerSpelTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerSpelTests.java index cd19db62d..6ba0d0227 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerSpelTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerSpelTests.java @@ -171,9 +171,9 @@ class ContainerFactoryAttribute { @Test void containerFactoryDerivedFromAttribute( @Autowired ReactivePulsarListenerContainerFactory containerFactory) { - verify(containerFactory).createListenerContainer(argThat(endpoint -> endpoint.getId().equals("foo"))); - verify(containerFactory).createListenerContainer(argThat(endpoint -> endpoint.getId().equals("bar"))); - verify(containerFactory).createListenerContainer(argThat(endpoint -> endpoint.getId().equals("zaa"))); + verify(containerFactory).createRegisteredContainer(argThat(endpoint -> endpoint.getId().equals("foo"))); + verify(containerFactory).createRegisteredContainer(argThat(endpoint -> endpoint.getId().equals("bar"))); + verify(containerFactory).createRegisteredContainer(argThat(endpoint -> endpoint.getId().equals("zaa"))); } @EnablePulsar diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/AbstractPulsarListenerContainerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/AbstractPulsarListenerContainerFactory.java index fd3744cb6..95a23dd85 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/AbstractPulsarListenerContainerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/AbstractPulsarListenerContainerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -106,7 +106,7 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv @SuppressWarnings("unchecked") @Override - public C createListenerContainer(PulsarListenerEndpoint endpoint) { + public C createRegisteredContainer(PulsarListenerEndpoint endpoint) { C instance = createContainerInstance(endpoint); JavaUtils.INSTANCE.acceptIfNotNull(endpoint.getId(), instance::setBeanName); if (endpoint instanceof AbstractPulsarListenerEndpoint) { diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/AbstractPulsarReaderContainerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/AbstractPulsarReaderContainerFactory.java index 9ead150bb..cdce79632 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/AbstractPulsarReaderContainerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/AbstractPulsarReaderContainerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -99,7 +99,7 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv @SuppressWarnings("unchecked") @Override - public C createReaderContainer(PulsarReaderEndpoint endpoint) { + public C createRegisteredContainer(PulsarReaderEndpoint endpoint) { C instance = createContainerInstance(endpoint); JavaUtils.INSTANCE.acceptIfNotNull(endpoint.getId(), instance::setBeanName); if (endpoint instanceof AbstractPulsarReaderEndpoint) { diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/DefaultPulsarReaderContainerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/DefaultPulsarReaderContainerFactory.java index 24ff0318c..ea4959168 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/DefaultPulsarReaderContainerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/DefaultPulsarReaderContainerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,7 +56,7 @@ protected void initializeContainer(DefaultPulsarMessageReaderContainer instan } @Override - public DefaultPulsarMessageReaderContainer createReaderContainer(String... topics) { + public DefaultPulsarMessageReaderContainer createContainer(String... topics) { // TODO return null; } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/GenericListenerEndpointRegistry.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/GenericListenerEndpointRegistry.java index f8f2a332c..c3379df32 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/GenericListenerEndpointRegistry.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/GenericListenerEndpointRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -138,7 +138,7 @@ public void registerListenerContainer(E endpoint, ListenerContainerFactory factory) { - C listenerContainer = factory.createListenerContainer(endpoint); + C listenerContainer = factory.createRegisteredContainer(endpoint); if (listenerContainer instanceof InitializingBean) { try { diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/GenericReaderEndpointRegistry.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/GenericReaderEndpointRegistry.java index fdb7209a5..6f5c3a9fa 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/GenericReaderEndpointRegistry.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/GenericReaderEndpointRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -136,7 +136,7 @@ public void registerReaderContainer(E endpoint, ReaderContainerFactory factory) { - C readerContainer = factory.createReaderContainer(endpoint); + C readerContainer = factory.createRegisteredContainer(endpoint); if (readerContainer instanceof InitializingBean) { try { ((InitializingBean) readerContainer).afterPropertiesSet(); diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/ListenerContainerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/ListenerContainerFactory.java index d39987942..3a79727d2 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/ListenerContainerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/ListenerContainerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,6 @@ package org.springframework.pulsar.config; -import org.springframework.pulsar.annotation.PulsarListener; import org.springframework.pulsar.listener.MessageListenerContainer; /** @@ -26,24 +25,22 @@ * @param listener endpoint type. * @author Soby Chacko * @author Christophe Bornet + * @author Chris Bono */ -public interface ListenerContainerFactory> { +public interface ListenerContainerFactory> + extends PulsarContainerFactory { /** * Create a {@link MessageListenerContainer} for the given {@link ListenerEndpoint}. * Containers created using this method are added to the listener endpoint registry. * @param endpoint the endpoint to configure * @return the created container + * @deprecated since 1.2.0 for removal in 1.4.0 in favor of + * {@link PulsarContainerFactory#createRegisteredContainer} */ - C createListenerContainer(E endpoint); - - /** - * Create and configure a container without a listener; used to create containers that - * are not used for {@link PulsarListener} annotations. Containers created using this - * method are not added to the listener endpoint registry. - * @param topics the topics. - * @return the container. - */ - C createContainer(String... topics); + @Deprecated(since = "1.2.0", forRemoval = true) + default C createListenerContainer(E endpoint) { + return createRegisteredContainer(endpoint); + } } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/ListenerEndpoint.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/ListenerEndpoint.java index 72e7fa060..7ff130529 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/ListenerEndpoint.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/ListenerEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,7 +41,7 @@ public interface ListenerEndpoint { * Return the id of this endpoint. * @return the id of this endpoint. The id can be further qualified when the endpoint * is resolved against its actual listener container. - * @see ListenerContainerFactory#createListenerContainer + * @see ListenerContainerFactory#createRegisteredContainer */ @Nullable default String getId() { diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/PulsarContainerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/PulsarContainerFactory.java new file mode 100644 index 000000000..f8ced4a20 --- /dev/null +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/PulsarContainerFactory.java @@ -0,0 +1,45 @@ +/* + * Copyright 2022-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.config; + +/** + * Factory for Pulsar message listener containers. + * + * @param message container + * @param message listener endpoint + * @author Chris Bono + * @since 1.2.0 + */ +public interface PulsarContainerFactory { + + /** + * Create a message listener container for the given endpoint. Containers created + * using this method are added to the listener endpoint registry. + * @param endpoint the endpoint to configure + * @return the created container + */ + C createRegisteredContainer(E endpoint); + + /** + * Create and configure a container without a listener. Containers created using this + * method are not added to the listener endpoint registry. + * @param topics the topics. + * @return the container. + */ + C createContainer(String... topics); + +} diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/PulsarReaderEndpoint.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/PulsarReaderEndpoint.java index 60a9c7aea..4347ad739 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/PulsarReaderEndpoint.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/PulsarReaderEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,7 +39,7 @@ public interface PulsarReaderEndpoint { * Return the id of this endpoint. * @return the id of this endpoint. The id can be further qualified when the endpoint * is resolved against its actual listener container. - * @see ListenerContainerFactory#createListenerContainer + * @see ListenerContainerFactory#createRegisteredContainer */ @Nullable String getId(); diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/ReaderContainerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/ReaderContainerFactory.java index 8304dc496..dbedb0d63 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/ReaderContainerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/ReaderContainerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,11 +24,34 @@ * @param Container type * @param Endpoint type * @author Soby Chacko + * @author Chris Bono */ -public interface ReaderContainerFactory> { +public interface ReaderContainerFactory> + extends PulsarContainerFactory { - C createReaderContainer(E endpoint); + /** + * Create a message reader container for the given endpoint and register the container + * with the listener endpoint registry. + * @param endpoint reader endpoint + * @return the created container + * @deprecated since 1.2.0 for removal in 1.4.0 in favor of + * {@link PulsarContainerFactory#createRegisteredContainer} + */ + @Deprecated(since = "1.2.0", forRemoval = true) + default C createReaderContainer(E endpoint) { + return createRegisteredContainer(endpoint); + } - C createReaderContainer(String... topics); + /** + * Create a message reader container for the given endpoint. + * @param topics the topics to read from + * @return the created container + * @deprecated since 1.2.0 for removal in 1.4.0 in favor of + * {@link PulsarContainerFactory#createContainer} + */ + @Deprecated(since = "1.2.0", forRemoval = true) + default C createReaderContainer(String... topics) { + return createContainer(topics); + } } diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactoryTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactoryTests.java index a808cb9bd..b5dfcd450 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactoryTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactoryTests.java @@ -18,6 +18,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import org.apache.pulsar.client.api.SubscriptionType; @@ -32,6 +34,18 @@ */ class ConcurrentPulsarListenerContainerFactoryTests { + @SuppressWarnings({ "removal", "unchecked" }) + @Test + void deprecatedCreateListenerContainerCallsReplacementApi() { + var containerFactory = spy(new ConcurrentPulsarListenerContainerFactory( + mock(PulsarConsumerFactory.class), new PulsarContainerProperties())); + var endpoint = mock(PulsarListenerEndpoint.class); + when(endpoint.getConcurrency()).thenReturn(1); + var createdContainer = containerFactory.createListenerContainer(endpoint); + assertThat(createdContainer).isNotNull(); + verify(containerFactory).createRegisteredContainer(endpoint); + } + @SuppressWarnings("unchecked") @Nested class SubscriptionTypeFrom { @@ -44,7 +58,7 @@ void factoryPropsUsedWhenNotSetOnEndpoint() { mock(PulsarConsumerFactory.class), factoryProps); var endpoint = mock(PulsarListenerEndpoint.class); when(endpoint.getConcurrency()).thenReturn(1); - var createdContainer = containerFactory.createListenerContainer(endpoint); + var createdContainer = containerFactory.createRegisteredContainer(endpoint); assertThat(createdContainer.getContainerProperties().getSubscriptionType()) .isEqualTo(SubscriptionType.Shared); } @@ -58,7 +72,7 @@ void endpointTakesPrecedenceOverFactoryProps() { var endpoint = mock(PulsarListenerEndpoint.class); when(endpoint.getConcurrency()).thenReturn(1); when(endpoint.getSubscriptionType()).thenReturn(SubscriptionType.Failover); - var createdContainer = containerFactory.createListenerContainer(endpoint); + var createdContainer = containerFactory.createRegisteredContainer(endpoint); assertThat(createdContainer.getContainerProperties().getSubscriptionType()) .isEqualTo(SubscriptionType.Failover); } @@ -70,7 +84,7 @@ void defaultUsedWhenNotSetOnEndpointNorFactoryProps() { mock(PulsarConsumerFactory.class), factoryProps); var endpoint = mock(PulsarListenerEndpoint.class); when(endpoint.getConcurrency()).thenReturn(1); - var createdContainer = containerFactory.createListenerContainer(endpoint); + var createdContainer = containerFactory.createRegisteredContainer(endpoint); assertThat(createdContainer.getContainerProperties().getSubscriptionType()) .isEqualTo(SubscriptionType.Exclusive); } @@ -89,7 +103,7 @@ void factoryPropsUsedWhenNotSetOnEndpoint() { mock(PulsarConsumerFactory.class), factoryProps); var endpoint = mock(PulsarListenerEndpoint.class); when(endpoint.getConcurrency()).thenReturn(1); - var createdContainer = containerFactory.createListenerContainer(endpoint); + var createdContainer = containerFactory.createRegisteredContainer(endpoint); assertThat(createdContainer.getContainerProperties().getSubscriptionName()) .isEqualTo("my-factory-subscription"); } @@ -103,7 +117,7 @@ void endpointTakesPrecedenceOverFactoryProps() { var endpoint = mock(PulsarListenerEndpoint.class); when(endpoint.getConcurrency()).thenReturn(1); when(endpoint.getSubscriptionName()).thenReturn("my-endpoint-subscription"); - var createdContainer = containerFactory.createListenerContainer(endpoint); + var createdContainer = containerFactory.createRegisteredContainer(endpoint); assertThat(createdContainer.getContainerProperties().getSubscriptionName()) .isEqualTo("my-endpoint-subscription"); } @@ -116,10 +130,10 @@ void defaultUsedWhenNotSetOnEndpointNorFactoryProps() { var endpoint = mock(PulsarListenerEndpoint.class); when(endpoint.getConcurrency()).thenReturn(1); - var container1 = containerFactory.createListenerContainer(endpoint); + var container1 = containerFactory.createRegisteredContainer(endpoint); assertThat(container1.getContainerProperties().getSubscriptionName()) .startsWith("org.springframework.Pulsar.PulsarListenerEndpointContainer#"); - var container2 = containerFactory.createListenerContainer(endpoint); + var container2 = containerFactory.createRegisteredContainer(endpoint); assertThat(container2.getContainerProperties().getSubscriptionName()) .startsWith("org.springframework.Pulsar.PulsarListenerEndpointContainer#"); assertThat(container1.getContainerProperties().getSubscriptionName()) diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/config/DefaultPulsarReaderContainerFactoryTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/config/DefaultPulsarReaderContainerFactoryTests.java new file mode 100644 index 000000000..2f7eb2c27 --- /dev/null +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/config/DefaultPulsarReaderContainerFactoryTests.java @@ -0,0 +1,56 @@ +/* + * Copyright 2023-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.config; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +import org.junit.jupiter.api.Test; + +import org.springframework.pulsar.core.PulsarReaderFactory; +import org.springframework.pulsar.reader.PulsarReaderContainerProperties; + +/** + * Unit tests for {@link DefaultPulsarReaderContainerFactory}. + */ +class DefaultPulsarReaderContainerFactoryTests { + + @SuppressWarnings({ "removal", "unchecked" }) + @Test + void deprecatedCreateReaderContainerWithEndpointCallsReplacementApi() { + var containerFactory = spy(new DefaultPulsarReaderContainerFactory<>(mock(PulsarReaderFactory.class), + new PulsarReaderContainerProperties())); + var endpoint = mock(PulsarReaderEndpoint.class); + var createdContainer = containerFactory.createReaderContainer(endpoint); + assertThat(createdContainer).isNotNull(); + verify(containerFactory).createRegisteredContainer(endpoint); + } + + @SuppressWarnings({ "removal", "unchecked" }) + @Test + void deprecatedCreateReaderContainerWithTopicsCallsReplacementApi() { + var containerFactory = spy(new DefaultPulsarReaderContainerFactory<>(mock(PulsarReaderFactory.class), + new PulsarReaderContainerProperties())); + var createdContainer = containerFactory.createReaderContainer("my-topic"); + // reader does not implement this API - still ensure the replacement API is called + assertThat(createdContainer).isNull(); + verify(containerFactory).createContainer("my-topic"); + } + +} diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerTests.java index fcae1beb9..1cdf54b5d 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerTests.java @@ -70,7 +70,7 @@ void createConcurrentContainerFromFactoryAndVerifyBatchReceivePolicy() { when(pulsarListenerEndpoint.getConcurrency()).thenReturn(1); AbstractPulsarMessageListenerContainer concurrentContainer = containerFactory - .createListenerContainer(pulsarListenerEndpoint); + .createRegisteredContainer(pulsarListenerEndpoint); PulsarContainerProperties pulsarContainerProperties = concurrentContainer.getContainerProperties(); assertThat(pulsarContainerProperties.getBatchTimeoutMillis()).isEqualTo(60_000); diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerSpelTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerSpelTests.java index 6ce27a159..05247d23d 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerSpelTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerSpelTests.java @@ -172,9 +172,9 @@ class ContainerFactoryAttribute { @Test void containerFactoryDerivedFromAttribute(@Autowired PulsarListenerContainerFactory containerFactory) { - verify(containerFactory).createListenerContainer(argThat(endpoint -> endpoint.getId().equals("foo"))); - verify(containerFactory).createListenerContainer(argThat(endpoint -> endpoint.getId().equals("bar"))); - verify(containerFactory).createListenerContainer(argThat(endpoint -> endpoint.getId().equals("zaa"))); + verify(containerFactory).createRegisteredContainer(argThat(endpoint -> endpoint.getId().equals("foo"))); + verify(containerFactory).createRegisteredContainer(argThat(endpoint -> endpoint.getId().equals("bar"))); + verify(containerFactory).createRegisteredContainer(argThat(endpoint -> endpoint.getId().equals("zaa"))); } @EnablePulsar @@ -187,7 +187,7 @@ PulsarListenerContainerFactory customContainerFactory() { var mockContainerFactory = mock(PulsarListenerContainerFactory.class); AbstractPulsarMessageListenerContainer mockContainer = mock( AbstractPulsarMessageListenerContainer.class); - when(mockContainerFactory.createListenerContainer(any(PulsarListenerEndpoint.class))) + when(mockContainerFactory.createRegisteredContainer(any(PulsarListenerEndpoint.class))) .thenReturn(mockContainer); return mockContainerFactory; }