diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java index 888a7de28..c051e777b 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java @@ -40,7 +40,6 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoSink; import reactor.util.function.Tuples; import org.springframework.beans.BeansException; @@ -65,7 +64,6 @@ import org.springframework.cloud.function.context.message.MessageUtils; import org.springframework.cloud.stream.binder.BinderFactory; import org.springframework.cloud.stream.binder.BinderHeaders; -import org.springframework.cloud.stream.binder.BindingCreatedEvent; import org.springframework.cloud.stream.binder.ConsumerProperties; import org.springframework.cloud.stream.binder.PartitionHandler; import org.springframework.cloud.stream.binder.ProducerProperties; @@ -129,6 +127,7 @@ * @author Byungjun You * @author Ivan Shapoval * @author Patrik Péter Süli + * @author Artem Bilan * @since 2.1 */ @Lazy(false) @@ -224,8 +223,6 @@ InitializingBean supplierInitializer(FunctionCatalog functionCatalog, StreamFunc functionWrapper = functionCatalog.lookup(proxyFactory.getFunctionDefinition(), contentTypes.toArray(new String[0])); } - Publisher beginPublishingTrigger = setupBindingTrigger(context); - if (!functionProperties.isComposeFrom() && !functionProperties.isComposeTo()) { String integrationFlowName = proxyFactory.getFunctionDefinition() + "_integrationflow"; @@ -239,7 +236,7 @@ InitializingBean supplierInitializer(FunctionCatalog functionCatalog, StreamFunc if (functionWrapper != null) { IntegrationFlow integrationFlow = integrationFlowFromProvidedSupplier(new PartitionAwareFunctionWrapper(functionWrapper, context, producerProperties), - beginPublishingTrigger, pollable, context, taskScheduler, producerProperties, outputName) + pollable, context, taskScheduler, producerProperties, outputName) .route(Message.class, message -> { if (message.getHeaders().get("spring.cloud.stream.sendto.destination") != null) { String destinationName = (String) message.getHeaders().get("spring.cloud.stream.sendto.destination"); @@ -253,7 +250,7 @@ InitializingBean supplierInitializer(FunctionCatalog functionCatalog, StreamFunc } else { IntegrationFlow integrationFlow = integrationFlowFromProvidedSupplier(new PartitionAwareFunctionWrapper(supplier, context, producerProperties), - beginPublishingTrigger, pollable, context, taskScheduler, producerProperties, outputName) + pollable, context, taskScheduler, producerProperties, outputName) .channel(c -> c.direct()) .fluxTransform((Function>, ? extends Publisher>) function) .route(Message.class, message -> { @@ -274,26 +271,9 @@ InitializingBean supplierInitializer(FunctionCatalog functionCatalog, StreamFunc }; } - - /* - * Creates a publishing trigger to ensure Supplier does not begin publishing until binding is created - */ - private Publisher setupBindingTrigger(GenericApplicationContext context) { - AtomicReference> triggerRef = new AtomicReference<>(); - Publisher beginPublishingTrigger = Mono.create(triggerRef::set); - context.addApplicationListener(event -> { - if (event instanceof BindingCreatedEvent) { - if (triggerRef.get() != null) { - triggerRef.get().success(); - } - } - }); - return beginPublishingTrigger; - } - @SuppressWarnings({ "rawtypes", "unchecked" }) private IntegrationFlowBuilder integrationFlowFromProvidedSupplier(Supplier supplier, - Publisher beginPublishingTrigger, PollableBean pollable, GenericApplicationContext context, + PollableBean pollable, GenericApplicationContext context, TaskScheduler taskScheduler, ProducerProperties producerProperties, String bindingName) { IntegrationFlowBuilder integrationFlowBuilder; @@ -309,8 +289,8 @@ private IntegrationFlowBuilder integrationFlowFromProvidedSupplier(Supplier s if (pollable == null && reactive) { Publisher publisher = (Publisher) supplier.get(); publisher = publisher instanceof Mono - ? ((Mono) publisher).delaySubscription(beginPublishingTrigger).map(this::wrapToMessageIfNecessary) - : ((Flux) publisher).delaySubscription(beginPublishingTrigger).map(this::wrapToMessageIfNecessary); + ? ((Mono) publisher).map(this::wrapToMessageIfNecessary) + : ((Flux) publisher).map(this::wrapToMessageIfNecessary); integrationFlowBuilder = IntegrationFlow.from(publisher);