From 46dc4d0dd5b2443721588316d79e4bf809ec32cb Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 22 Sep 2023 16:02:02 -0400 Subject: [PATCH] GH-500: Workaround for non-serializable header (#503) Fixes https://github.com/spring-cloud/stream-applications/issues/500 When `listeners` are provided for `DefaultKafkaConsumerFactory`, the target `KafkaConsumer` instance is proxied. The `java.lang.reflect.Proxy` is `Serializable`, but the value it is wrapping is not. When the `MessageHeaders` is serialized (e.g. into persistent `MessageStore`), it checks for `Serializable` type only on top-level object of the header. Therefore, the `Proxy` is passing condition, but eventually we fail with `NotSerializableException`, since the proxied object is not like that * Remove `kafka_consumer` from a message before it reaches an aggregator with its logic to serialize message into the store This is a workaround until Spring for Apache Kafka is released with the fix: https://github.com/spring-projects/spring-kafka/pull/2822 --- .../fn/aggregator/AggregatorFunctionConfiguration.java | 10 ++++++++-- .../aggregator/RedisMessageStoreAggregatorTests.java | 8 +++++++- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/functions/function/aggregator-function/src/main/java/org/springframework/cloud/fn/aggregator/AggregatorFunctionConfiguration.java b/functions/function/aggregator-function/src/main/java/org/springframework/cloud/fn/aggregator/AggregatorFunctionConfiguration.java index 1f28ce767..d89077330 100644 --- a/functions/function/aggregator-function/src/main/java/org/springframework/cloud/fn/aggregator/AggregatorFunctionConfiguration.java +++ b/functions/function/aggregator-function/src/main/java/org/springframework/cloud/fn/aggregator/AggregatorFunctionConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 the original author or authors. + * Copyright 2020-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,6 +43,7 @@ import org.springframework.integration.channel.FluxMessageChannel; import org.springframework.integration.config.AggregatorFactoryBean; import org.springframework.integration.store.MessageGroupStore; +import org.springframework.integration.support.MessageBuilder; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -67,7 +68,12 @@ public Function>, Flux>> aggregatorFunction( FluxMessageChannel outputChannel ) { return input -> Flux.from(outputChannel) - .doOnRequest((request) -> inputChannel.subscribeTo(input)); + .doOnRequest((request) -> + inputChannel.subscribeTo( + input.map((inputMessage) -> + MessageBuilder.fromMessage(inputMessage) + .removeHeader("kafka_consumer") + .build()))); } @Bean diff --git a/functions/function/aggregator-function/src/test/java/org/springframework/cloud/fn/aggregator/RedisMessageStoreAggregatorTests.java b/functions/function/aggregator-function/src/test/java/org/springframework/cloud/fn/aggregator/RedisMessageStoreAggregatorTests.java index 2a2af98c0..feb39c26a 100644 --- a/functions/function/aggregator-function/src/test/java/org/springframework/cloud/fn/aggregator/RedisMessageStoreAggregatorTests.java +++ b/functions/function/aggregator-function/src/test/java/org/springframework/cloud/fn/aggregator/RedisMessageStoreAggregatorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2020 the original author or authors. + * Copyright 2020-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.cloud.fn.aggregator; +import java.io.ByteArrayInputStream; +import java.io.InputStream; import java.time.Duration; import java.util.List; @@ -23,6 +25,7 @@ import reactor.core.publisher.Flux; import reactor.test.StepVerifier; +import org.springframework.aop.framework.ProxyFactory; import org.springframework.cloud.fn.consumer.redis.RedisTestContainerSupport; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.redis.store.RedisMessageStore; @@ -46,11 +49,14 @@ static void redisProperties(DynamicPropertyRegistry registry) { @Test public void test() { + InputStream fakeNonSerializableKafkaConsumer = new ByteArrayInputStream(new byte[0]); + Flux> input = Flux.just(MessageBuilder.withPayload("2") .setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "my_correlation") .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 2) .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2) + .setHeader("kafka_consumer", new ProxyFactory(fakeNonSerializableKafkaConsumer).getProxy()) .build(), MessageBuilder.withPayload("1") .setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "my_correlation")