diff --git a/functions/function/aggregator-function/src/main/java/org/springframework/cloud/fn/aggregator/AggregatorFunctionConfiguration.java b/functions/function/aggregator-function/src/main/java/org/springframework/cloud/fn/aggregator/AggregatorFunctionConfiguration.java index 1f28ce767..d89077330 100644 --- a/functions/function/aggregator-function/src/main/java/org/springframework/cloud/fn/aggregator/AggregatorFunctionConfiguration.java +++ b/functions/function/aggregator-function/src/main/java/org/springframework/cloud/fn/aggregator/AggregatorFunctionConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 the original author or authors. + * Copyright 2020-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,6 +43,7 @@ import org.springframework.integration.channel.FluxMessageChannel; import org.springframework.integration.config.AggregatorFactoryBean; import org.springframework.integration.store.MessageGroupStore; +import org.springframework.integration.support.MessageBuilder; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -67,7 +68,12 @@ public Function>, Flux>> aggregatorFunction( FluxMessageChannel outputChannel ) { return input -> Flux.from(outputChannel) - .doOnRequest((request) -> inputChannel.subscribeTo(input)); + .doOnRequest((request) -> + inputChannel.subscribeTo( + input.map((inputMessage) -> + MessageBuilder.fromMessage(inputMessage) + .removeHeader("kafka_consumer") + .build()))); } @Bean diff --git a/functions/function/aggregator-function/src/test/java/org/springframework/cloud/fn/aggregator/RedisMessageStoreAggregatorTests.java b/functions/function/aggregator-function/src/test/java/org/springframework/cloud/fn/aggregator/RedisMessageStoreAggregatorTests.java index 2a2af98c0..feb39c26a 100644 --- a/functions/function/aggregator-function/src/test/java/org/springframework/cloud/fn/aggregator/RedisMessageStoreAggregatorTests.java +++ b/functions/function/aggregator-function/src/test/java/org/springframework/cloud/fn/aggregator/RedisMessageStoreAggregatorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2020 the original author or authors. + * Copyright 2020-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.cloud.fn.aggregator; +import java.io.ByteArrayInputStream; +import java.io.InputStream; import java.time.Duration; import java.util.List; @@ -23,6 +25,7 @@ import reactor.core.publisher.Flux; import reactor.test.StepVerifier; +import org.springframework.aop.framework.ProxyFactory; import org.springframework.cloud.fn.consumer.redis.RedisTestContainerSupport; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.redis.store.RedisMessageStore; @@ -46,11 +49,14 @@ static void redisProperties(DynamicPropertyRegistry registry) { @Test public void test() { + InputStream fakeNonSerializableKafkaConsumer = new ByteArrayInputStream(new byte[0]); + Flux> input = Flux.just(MessageBuilder.withPayload("2") .setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "my_correlation") .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 2) .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2) + .setHeader("kafka_consumer", new ProxyFactory(fakeNonSerializableKafkaConsumer).getProxy()) .build(), MessageBuilder.withPayload("1") .setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "my_correlation")