diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/JaasLoginModuleConfiguration.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/JaasLoginModuleConfiguration.java index 9934f30dd..871a880c8 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/JaasLoginModuleConfiguration.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/JaasLoginModuleConfiguration.java @@ -17,6 +17,7 @@ package org.springframework.cloud.stream.binder.kafka.properties; import java.util.HashMap; +import java.util.Locale; import java.util.Map; import javax.security.auth.login.AppConfigurationEntry; @@ -55,7 +56,7 @@ public KafkaJaasLoginModuleInitializer.ControlFlag getControlFlag() { public void setControlFlag(String controlFlag) { Assert.notNull(controlFlag, "cannot be null"); this.controlFlag = KafkaJaasLoginModuleInitializer.ControlFlag - .valueOf(controlFlag.toUpperCase()); + .valueOf(controlFlag.toUpperCase(Locale.ROOT)); } public Map getOptions() { diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderIntegrationTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderIntegrationTests.java index 7495dacdb..e1a04dfe6 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderIntegrationTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderIntegrationTests.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.function.Function; @@ -193,7 +194,7 @@ Function, Flux> uppercase() { @Bean Function>, Flux> lowercase() { - return s -> s.map(rec -> new String(rec.value()).toLowerCase()); + return s -> s.map(rec -> new String(rec.value()).toLowerCase(Locale.ROOT)); } @Bean diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderObservationTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderObservationTests.java new file mode 100644 index 000000000..13a35377c --- /dev/null +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderObservationTests.java @@ -0,0 +1,161 @@ +/* + * Copyright 2022-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.reactorkafka; + +import java.lang.reflect.Type; +import java.time.Duration; +import java.util.Locale; +import java.util.function.Function; +import java.util.stream.Collectors; + +import brave.handler.SpanHandler; +import brave.test.TestSpanHandler; +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationRegistry; +import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor; +import io.micrometer.tracing.brave.bridge.BraveFinishedSpan; +import io.micrometer.tracing.test.simple.SpansAssert; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.kafka.receiver.ReceiverRecord; +import reactor.kafka.receiver.observation.KafkaReceiverObservation; +import reactor.kafka.receiver.observation.KafkaRecordReceiverContext; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringBootConfiguration; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.autoconfigure.actuate.observability.AutoConfigureObservability; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.function.StreamBridge; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.IntegrationMessageHeaderAccessor; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.converter.MessagingMessageConverter; +import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.messaging.Message; +import org.springframework.test.annotation.DirtiesContext; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +/** + * @author Artem Bilan + * @author Soby Chacko + * @since 4.2.0 + */ +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = { + "spring.kafka.consumer.metadata.max.age.ms=1000", + "spring.cloud.function.definition=receive", + "spring.cloud.stream.function.reactive.uppercase=true", + "spring.cloud.stream.bindings.receive-in-0.group=rkbot-in-group", + "spring.cloud.stream.bindings.receive-in-0.destination=rkbot-in-topic", + "spring.cloud.stream.bindings.receive-out-0.destination=rkbot-out-topic", + "spring.cloud.stream.kafka.binder.enable-observation=true", + "spring.cloud.stream.kafka.binder.brokers=${spring.kafka.bootstrap-servers}", + "management.tracing.sampling.probability=1", + "spring.cloud.stream.kafka.bindings.receive-in-0.consumer.converterBeanName=fullRR" + }) +@DirtiesContext +@AutoConfigureObservability +@EmbeddedKafka(topics = { "rkbot-out-topic" }) +public class ReactorKafkaBinderObservationTests { + + private static final TestSpanHandler SPANS = new TestSpanHandler(); + + @Autowired + StreamBridge streamBridge; + + @Autowired + ObservationRegistry observationRegistry; + + @Autowired + TestConfiguration testConfiguration; + + @Autowired + private EmbeddedKafkaBroker embeddedKafka; + + @Test + void endToEndReactorKafkaBinder1() { + + streamBridge.send("rkbot-in-topic", MessageBuilder.withPayload("data") + .build()); + + await().timeout(Duration.ofSeconds(10)).untilAsserted(() -> assertThat(SPANS.spans()).hasSize(3)); + SpansAssert.assertThat(SPANS.spans().stream().map(BraveFinishedSpan::fromBrave).collect(Collectors.toList())) + .haveSameTraceId(); + } + + @SpringBootConfiguration + @EnableAutoConfiguration(exclude = org.springframework.cloud.function.observability.ObservationAutoConfiguration.class) + public static class TestConfiguration { + + @Bean + SpanHandler testSpanHandler() { + return SPANS; + } + + @Bean + RecordMessageConverter fullRR() { + return new RecordMessageConverter() { + + private final RecordMessageConverter converter = new MessagingMessageConverter(); + + @Override + public Message toMessage(ConsumerRecord record, Acknowledgment acknowledgment, + org.apache.kafka.clients.consumer.Consumer consumer, Type payloadType) { + + return MessageBuilder.withPayload(record).build(); + } + + @Override + public ProducerRecord fromMessage(Message message, String defaultTopic) { + return this.converter.fromMessage(message, defaultTopic); + } + + }; + } + + @Bean + Function>, Flux>> receive(ObservationRegistry observationRegistry) { + return s -> s + .flatMap(record -> { + Observation receiverObservation = + KafkaReceiverObservation.RECEIVER_OBSERVATION.start(null, + KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE, + () -> + new KafkaRecordReceiverContext( + record, "user.receiver", "localhost:9092"), + observationRegistry); + + return Mono.deferContextual(contextView -> Mono.just(record) + .map(rec -> new String(rec.value()).toLowerCase(Locale.ROOT)) + .map(rec -> MessageBuilder.withPayload(rec).setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, contextView).build())) + .doOnTerminate(receiverObservation::stop) + .doOnError(receiverObservation::error) + .contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation)); + }); + } + } + +} + diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java index 2f289d316..30d11b0f2 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java @@ -18,6 +18,7 @@ import java.util.Arrays; import java.util.HashMap; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -370,7 +371,7 @@ public Object onSuccess(ConfigurationPropertyName name, Bindable target, if (!concurrencyExplicitlyProvided[0]) { concurrencyExplicitlyProvided[0] = name.getLastElement(ConfigurationPropertyName.Form.UNIFORM).equals("concurrency") && // name is normalized to contain only uniform elements and thus safe to call toLowerCase here. - ConfigurationPropertyName.of("spring.cloud.stream.bindings." + inboundName.toLowerCase() + ".consumer") + ConfigurationPropertyName.of("spring.cloud.stream.bindings." + inboundName.toLowerCase(Locale.ROOT) + ".consumer") .isAncestorOf(name); } return result; diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountBranchesFunctionTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountBranchesFunctionTests.java index e18186018..80f2e944b 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountBranchesFunctionTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountBranchesFunctionTests.java @@ -19,6 +19,7 @@ import java.time.Duration; import java.util.Arrays; import java.util.Date; +import java.util.Locale; import java.util.Map; import java.util.function.Function; @@ -190,7 +191,7 @@ public Function, KStream[]> process() { return input -> { final Map> stringKStreamMap = input - .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) + .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.ROOT).split("\\W+"))) .groupBy((key, value) -> value) .windowedBy(TimeWindows.of(Duration.ofSeconds(5))) .count(Materialized.as("WordCounts-branch")) diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountFunctionTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountFunctionTests.java index 19dfe654a..57f0646b7 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountFunctionTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountFunctionTests.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Date; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -397,7 +398,7 @@ public static class WordCountProcessorApplication { Function, KStream> process() { return input -> input - .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) + .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.ROOT).split("\\W+"))) .map((key, value) -> new KeyValue<>(value, value)) .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(TimeWindows.of(Duration.ofMillis(5000))) @@ -439,7 +440,7 @@ static class OutboundNullApplication { Function, KStream> process() { return input -> input .flatMapValues( - value -> Arrays.asList(value.toLowerCase().split("\\W+"))) + value -> Arrays.asList(value.toLowerCase(Locale.ROOT).split("\\W+"))) .map((key, value) -> new KeyValue<>(value, value)) .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(5))).count(Materialized.as("foobar-WordCounts")) diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/DlqDestinationResolverTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/DlqDestinationResolverTests.java index a83a71459..89d086c8b 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/DlqDestinationResolverTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/DlqDestinationResolverTests.java @@ -18,6 +18,7 @@ import java.time.Duration; import java.util.Arrays; +import java.util.Locale; import java.util.Map; import java.util.function.Function; @@ -124,7 +125,7 @@ public Function, KStream> process() { return input -> input .flatMapValues( - value -> Arrays.asList(value.toLowerCase().split("\\W+"))) + value -> Arrays.asList(value.toLowerCase(Locale.ROOT).split("\\W+"))) .map((key, value) -> new KeyValue<>(value, value)) .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(TimeWindows.of(Duration.ofSeconds(5))).count(Materialized.as("foo-WordCounts-x")) diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsBinderMultipleInputTopicsTest.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsBinderMultipleInputTopicsTest.java index 8e14655ee..41b131de2 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsBinderMultipleInputTopicsTest.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsBinderMultipleInputTopicsTest.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.function.Function; @@ -149,7 +150,7 @@ public Function, KStream> process() { return input -> input .flatMapValues( - value -> Arrays.asList(value.toLowerCase().split("\\W+"))) + value -> Arrays.asList(value.toLowerCase(Locale.ROOT).split("\\W+"))) .map((key, value) -> new KeyValue<>(value, value)) .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .count(Materialized.as("WordCounts-tKWCWSIAP0")).toStream() diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsBinderTombstoneTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsBinderTombstoneTests.java index e4f436ccb..38db7eeb4 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsBinderTombstoneTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsBinderTombstoneTests.java @@ -19,6 +19,7 @@ import java.time.Duration; import java.util.Arrays; import java.util.Date; +import java.util.Locale; import java.util.Map; import java.util.Properties; import java.util.function.Function; @@ -173,7 +174,7 @@ static class WordCountProcessorApplication { public Function, KStream> process() { return input -> input - .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) + .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.ROOT).split("\\W+"))) .map((key, value) -> new KeyValue<>(value, value)) .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(TimeWindows.of(Duration.ofMillis(5000))) diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsNativeEncodingDecodingDisabledTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsNativeEncodingDecodingDisabledTests.java index 4e4ab9b9c..b67dab139 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsNativeEncodingDecodingDisabledTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsNativeEncodingDecodingDisabledTests.java @@ -19,6 +19,7 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Arrays; +import java.util.Locale; import java.util.Map; import java.util.function.Function; @@ -153,7 +154,7 @@ public Function, KStream> process() { return input -> input .flatMapValues( - value -> Arrays.asList(value.toLowerCase().split("\\W+"))) + value -> Arrays.asList(value.toLowerCase(Locale.ROOT).split("\\W+"))) .map((key, value) -> new KeyValue<>(value, value)) .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(TimeWindows.of(Duration.ofSeconds(5))).count(Materialized.as("foo-WordCounts-x")) diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsNativeEncodingDecodingEnabledTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsNativeEncodingDecodingEnabledTests.java index 3211d1106..5954bf270 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsNativeEncodingDecodingEnabledTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsNativeEncodingDecodingEnabledTests.java @@ -18,6 +18,7 @@ import java.time.Duration; import java.util.Arrays; +import java.util.Locale; import java.util.Map; import java.util.function.Function; @@ -137,7 +138,7 @@ public Function, KStream> process() { return input -> input .flatMapValues( - value -> Arrays.asList(value.toLowerCase().split("\\W+"))) + value -> Arrays.asList(value.toLowerCase(Locale.ROOT).split("\\W+"))) .map((key, value) -> new KeyValue<>(value, value)) .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(TimeWindows.of(Duration.ofSeconds(5))).count(Materialized.as("foo-WordCounts-x")) diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java index 2e1249ae8..0cc821b8b 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -1517,7 +1518,7 @@ partition calculation (see other usages of PartitionHandler) by using current pa if (producerProperties.isDynamicPartitionUpdatesEnabled() && producerProperties.getPartitionKeyExpression() != null && !(producerProperties.getPartitionKeyExpression().getExpressionString() - .toLowerCase().contains("payload"))) { + .toLowerCase(Locale.ROOT).contains("payload"))) { kafkaPartitionHandler = new PartitionHandler(ExpressionUtils.createStandardEvaluationContext(beanFactory), producerProperties, beanFactory); diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/ProducerOnlyTransactionTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/ProducerOnlyTransactionTests.java index 2fbb2766d..c958948e9 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/ProducerOnlyTransactionTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/ProducerOnlyTransactionTests.java @@ -16,6 +16,7 @@ package org.springframework.cloud.stream.binder.kafka.integration; +import java.util.Locale; import java.util.Map; import org.apache.kafka.clients.consumer.Consumer; @@ -86,7 +87,7 @@ void producerTx() { props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase()); + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)); Consumer consumer = new KafkaConsumer<>(props); embeddedKafkaBrokera.consumeFromAllEmbeddedTopics(consumer); ConsumerRecord record = KafkaTestUtils.getSingleRecord(consumer, "output"); diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration2/ConsumerProducerTransactionTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration2/ConsumerProducerTransactionTests.java index ddbb0273a..376b7a40a 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration2/ConsumerProducerTransactionTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration2/ConsumerProducerTransactionTests.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Locale; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -128,7 +129,7 @@ public Function listenIn() { if (in.equals("two")) { throw new RuntimeException("fail"); } - return in.toUpperCase(); + return in.toUpperCase(Locale.ROOT); }; } diff --git a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/binder/PollableConsumerTests.java b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/binder/PollableConsumerTests.java index 46655c833..0391ae731 100644 --- a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/binder/PollableConsumerTests.java +++ b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/binder/PollableConsumerTests.java @@ -18,6 +18,7 @@ import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; @@ -116,7 +117,7 @@ void simpleCase() { @Override public Message preSend(Message message, MessageChannel channel) { return MessageBuilder - .withPayload(((String) message.getPayload()).toUpperCase()) + .withPayload(((String) message.getPayload()).toUpperCase(Locale.ROOT)) .copyHeaders(message.getHeaders()).build(); } @@ -301,7 +302,7 @@ void embedded() { public Message preSend(Message message, MessageChannel channel) { return MessageBuilder .withPayload( - new String((byte[]) message.getPayload()).toUpperCase()) + new String((byte[]) message.getPayload()).toUpperCase(Locale.ROOT)) .copyHeaders(message.getHeaders()).build(); } @@ -328,7 +329,7 @@ void errors() { @Override public Message preSend(Message message, MessageChannel channel) { return MessageBuilder - .withPayload(((String) message.getPayload()).toUpperCase()) + .withPayload(((String) message.getPayload()).toUpperCase(Locale.ROOT)) .copyHeaders(message.getHeaders()).build(); } @@ -379,7 +380,7 @@ void errorsNoRetry() { @Override public Message preSend(Message message, MessageChannel channel) { return MessageBuilder - .withPayload(((String) message.getPayload()).toUpperCase()) + .withPayload(((String) message.getPayload()).toUpperCase(Locale.ROOT)) .copyHeaders(message.getHeaders()).build(); } diff --git a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/FunctionPostProcessingTests.java b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/FunctionPostProcessingTests.java index 621251304..e92aa919b 100644 --- a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/FunctionPostProcessingTests.java +++ b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/FunctionPostProcessingTests.java @@ -16,6 +16,7 @@ package org.springframework.cloud.stream.function; +import java.util.Locale; import java.util.function.Function; import java.util.function.Supplier; @@ -241,7 +242,7 @@ public Supplier> hello() { public Function uppercase() { return new PostProcessingFunction() { public String apply(String input) { - return input.toUpperCase(); + return input.toUpperCase(Locale.ROOT); } public void postProcess(Message result) { @@ -260,7 +261,7 @@ public String apply(String input) { if (input.equals("error")) { throw new RuntimeException("intentional"); } - return input.toUpperCase(); + return input.toUpperCase(Locale.ROOT); } @Override diff --git a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/GreenfieldFunctionEnableBindingTests.java b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/GreenfieldFunctionEnableBindingTests.java index 470291011..bca3d8502 100644 --- a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/GreenfieldFunctionEnableBindingTests.java +++ b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/GreenfieldFunctionEnableBindingTests.java @@ -18,6 +18,7 @@ import java.nio.charset.StandardCharsets; import java.util.Date; +import java.util.Locale; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -94,7 +95,7 @@ public static class ProcessorFromFunction { @Bean public Function toUpperCase() { - return String::toUpperCase; + return v -> v.toUpperCase(Locale.ROOT); } } diff --git a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/HeaderTests.java b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/HeaderTests.java new file mode 100644 index 000000000..15bf8ac39 --- /dev/null +++ b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/HeaderTests.java @@ -0,0 +1,162 @@ +/* + * Copyright 2024-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.function; + +import java.util.Locale; +import java.util.function.Function; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cloud.function.context.message.MessageUtils; +import org.springframework.cloud.stream.binder.test.InputDestination; +import org.springframework.cloud.stream.binder.test.OutputDestination; +import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.MimeTypeUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Omer Celik + */ + +public class HeaderTests { + + @BeforeAll + public static void before() { + System.clearProperty("spring.cloud.function.definition"); + } + + @Test + void checkWithEmptyPojo() { + try (ConfigurableApplicationContext context = new SpringApplicationBuilder( + TestChannelBinderConfiguration.getCompleteConfiguration(EmptyConfiguration.class)) + .web(WebApplicationType.NONE).run("--spring.jmx.enabled=false")) { + + StreamBridge streamBridge = context.getBean(StreamBridge.class); + Message message = MessageBuilder.withPayload(new EmptyPojo()).build(); + streamBridge.send("emptyConfigurationDestination", message); + + OutputDestination outputDestination = context.getBean(OutputDestination.class); + Message messageReceived = outputDestination.receive(1000, "emptyConfigurationDestination"); + MessageHeaders headers = messageReceived.getHeaders(); + assertThat(headers).isNotNull(); + assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka"); + assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json"); + } + } + + @Test + void checkIfHeaderProvidedInData() { + try (ConfigurableApplicationContext context = new SpringApplicationBuilder( + TestChannelBinderConfiguration.getCompleteConfiguration(EmptyConfiguration.class)) + .web(WebApplicationType.NONE).run("--spring.jmx.enabled=false")) { + StreamBridge streamBridge = context.getBean(StreamBridge.class); + String jsonPayload = "{\"name\":\"Omer\"}"; + streamBridge.send("myBinding-out-0", + MessageBuilder.withPayload(jsonPayload.getBytes()) + .setHeader("anyHeader", "anyValue") + .build(), + MimeTypeUtils.APPLICATION_JSON); + OutputDestination output = context.getBean(OutputDestination.class); + Message result = output.receive(1000, "myBinding-out-0"); + MessageHeaders headers = result.getHeaders(); + assertThat(headers).isNotNull(); + assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka"); + assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json"); + assertThat(headers.get("anyHeader")).isEqualTo("anyValue"); + } + } + + @Test + void checkGenericMessageSent() { + try (ConfigurableApplicationContext context = new SpringApplicationBuilder( + TestChannelBinderConfiguration.getCompleteConfiguration(FunctionUpperCaseConfiguration.class)) + .web(WebApplicationType.NONE) + .run("--spring.jmx.enabled=false", + "--spring.cloud.function.definition=uppercase")) { + String jsonPayload = "{\"surname\":\"Celik\"}"; + InputDestination input = context.getBean(InputDestination.class); + input.send(new GenericMessage<>(jsonPayload.getBytes()), "uppercase-in-0"); + OutputDestination output = context.getBean(OutputDestination.class); + + Message result = output.receive(1000, "uppercase-out-0"); + MessageHeaders headers = result.getHeaders(); + assertThat(headers).isNotNull(); + assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka"); + assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json"); + } + } + + @Test + void checkMessageWrappedFunctionalConsumer() { + System.clearProperty("spring.cloud.function.definition"); + ConfigurableApplicationContext context = new SpringApplicationBuilder( + TestChannelBinderConfiguration.getCompleteConfiguration(FunctionMessageConfiguration.class)) + .web(WebApplicationType.NONE) + .run("--spring.jmx.enabled=false", + "--spring.cloud.function.definition=uppercase" + ); + + InputDestination source = context.getBean(InputDestination.class); + source.send(new GenericMessage<>("Omer Celik".getBytes()), "uppercase-in-0"); + + OutputDestination target = context.getBean(OutputDestination.class); + Message message = target.receive(5, "uppercase-out-0"); + MessageHeaders headers = message.getHeaders(); + assertThat(headers).isNotNull(); + assertThat(headers).isNotNull(); + assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json"); + assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka"); + } + + @EnableAutoConfiguration + public static class EmptyConfiguration { + + } + + @EnableAutoConfiguration + public static class FunctionMessageConfiguration { + @Bean + public Function, Message> uppercase() { + return msg -> MessageBuilder.withPayload(msg.getPayload().toUpperCase(Locale.ROOT)).build(); + } + } + + @EnableAutoConfiguration + @Configuration + public static class FunctionUpperCaseConfiguration { + @Bean + public Function uppercase() { + return String::toUpperCase; + } + } + + public static class EmptyPojo { + + } +} diff --git a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/ImplicitFunctionBindingTests.java b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/ImplicitFunctionBindingTests.java index 256f8ce29..ffb86a391 100644 --- a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/ImplicitFunctionBindingTests.java +++ b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/ImplicitFunctionBindingTests.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -245,7 +246,7 @@ void dynamicBindingTestWithFunction() { // good, we expected it } - Function function = v -> new String(v).toUpperCase(); + Function function = v -> new String(v).toUpperCase(Locale.ROOT); FunctionBindingTestUtils.bind(context, function); input.send(new GenericMessage("hello".getBytes())); @@ -1434,7 +1435,7 @@ public Function, Flux> echo() { public Function>, Flux>> reactivePojoMessage() { return flux -> flux.map(message -> { Person p = message.getPayload(); - p.setName(p.getName().toUpperCase()); + p.setName(p.getName().toUpperCase(Locale.ROOT)); return MessageBuilder.withPayload(p).copyHeaders(message.getHeaders()).build(); }); } @@ -1636,7 +1637,7 @@ public Function func() { @Bean public Function uppercase() { - return v -> v.toUpperCase(); + return v -> v.toUpperCase(Locale.ROOT); } @Bean diff --git a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/MultipleInputOutputFunctionTests.java b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/MultipleInputOutputFunctionTests.java index c71772ed0..6a99e39d5 100644 --- a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/MultipleInputOutputFunctionTests.java +++ b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/MultipleInputOutputFunctionTests.java @@ -18,6 +18,7 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Locale; import java.util.function.Consumer; import java.util.function.Function; @@ -296,7 +297,7 @@ public static class ReactiveFunctionConfiguration { @Bean public Function uppercase() { - return value -> value.toUpperCase(); + return value -> value.toUpperCase(Locale.ROOT); } @Bean @@ -378,8 +379,8 @@ public static class ContentTypeConfiguration { @Bean public Function, Flux>, Flux> multiInputSingleOutput() { return tuple -> { - Flux stringStream = tuple.getT1().map(p -> p.getName().toUpperCase()); - Flux intStream = tuple.getT2().map(p -> p.getName().toUpperCase()); + Flux stringStream = tuple.getT1().map(p -> p.getName().toUpperCase(Locale.ROOT)); + Flux intStream = tuple.getT2().map(p -> p.getName().toUpperCase(Locale.ROOT)); return Flux.merge(stringStream, intStream); }; } diff --git a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/RoutingFunctionTests.java b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/RoutingFunctionTests.java index 7407a248e..a2d16cfca 100644 --- a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/RoutingFunctionTests.java +++ b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/RoutingFunctionTests.java @@ -18,6 +18,7 @@ import java.lang.reflect.Field; import java.util.List; +import java.util.Locale; import java.util.function.Consumer; import java.util.function.Function; @@ -376,7 +377,6 @@ public Function, Flux> echoFlux() { @Bean public Function, Message> enrich() { return x -> { - System.out.println("===> enrich"); return MessageBuilder.withPayload(x.getPayload()).setHeader("spring.cloud.function.definition", "uppercase").build(); }; } @@ -384,8 +384,7 @@ public Function, Message> enrich() { @Bean public Function uppercase() { return x -> { - System.out.println("===> uppercase"); - return x.toUpperCase(); + return x.toUpperCase(Locale.ROOT); }; } diff --git a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java index 5d7ec5982..e55c8b055 100644 --- a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java +++ b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java @@ -18,6 +18,7 @@ import java.lang.reflect.Field; import java.net.URI; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -819,7 +820,7 @@ void dynamicProducerDestination() { public static class DynamicProducerDestinationConfig { @Bean public Function, Message> uppercase() { - return msg -> MessageBuilder.withPayload(msg.getPayload().toUpperCase()) + return msg -> MessageBuilder.withPayload(msg.getPayload().toUpperCase(Locale.ROOT)) .setHeader("spring.cloud.stream.sendto.destination", "dynamicTopic").build(); } } @@ -1003,7 +1004,7 @@ public static class IntegrationFlowConfiguration { public IntegrationFlow transform(StreamBridge bridge) { return IntegrationFlow.from("foo").transform(v -> { String s = new String((byte[]) v); - return s.toUpperCase(); + return s.toUpperCase(Locale.ROOT); }) .handle(v -> bridge.send("output", v)) .get(); diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinderFactory.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinderFactory.java index 99d82a376..5e135ed85 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinderFactory.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinderFactory.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; @@ -212,7 +213,7 @@ private Binder doGetBinderAOT(Str } private String getKafkaStreamsBinderSimpleName(Class bindingTargetType) { - return bindingTargetType.getSimpleName().toLowerCase(); + return bindingTargetType.getSimpleName().toLowerCase(Locale.ROOT); } private boolean isKafkaStreamsType(Class bindingTargetType) { @@ -225,7 +226,7 @@ private Binder doGetBinderConvent if (!MessageChannel.class.isAssignableFrom(bindingTargetType) && !PollableMessageSource.class.isAssignableFrom(bindingTargetType)) { - String bindingTargetTypeName = StringUtils.hasText(name) ? name : bindingTargetType.getSimpleName().toLowerCase(); + String bindingTargetTypeName = StringUtils.hasText(name) ? name : bindingTargetType.getSimpleName().toLowerCase(Locale.ROOT); Binder binderInstance = getBinderInstance(bindingTargetTypeName); return binderInstance; } diff --git a/schema-registry/spring-cloud-stream-schema-registry-server/src/test/java/org/springframework/cloud/stream/schema/registry/server/SchemaRegistryServerAvroTests.java b/schema-registry/spring-cloud-stream-schema-registry-server/src/test/java/org/springframework/cloud/stream/schema/registry/server/SchemaRegistryServerAvroTests.java index 5704c31ca..13be2f3a9 100644 --- a/schema-registry/spring-cloud-stream-schema-registry-server/src/test/java/org/springframework/cloud/stream/schema/registry/server/SchemaRegistryServerAvroTests.java +++ b/schema-registry/spring-cloud-stream-schema-registry-server/src/test/java/org/springframework/cloud/stream/schema/registry/server/SchemaRegistryServerAvroTests.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; @@ -81,12 +82,12 @@ class SchemaRegistryServerAvroTests { .parse(resourceToString("classpath:/avro_user_definition_schema_v2.json")); private static final String AVRO_USER_SCHEMA_DEFAULT_NAME_STRATEGY_SUBJECT = AVRO_USER_AVRO_SCHEMA_V1.getName() - .toLowerCase(); + .toLowerCase(Locale.ROOT); private static final String AVRO_USER_SCHEMA_QUALIFED_NAME_STRATEGY_SUBJECT = AVRO_USER_AVRO_SCHEMA_V1 .getFullName() - .toLowerCase(); + .toLowerCase(Locale.ROOT); private static final Schema AVRO_USER_REGISTRY_SCHEMA_V1 = toSchema( AVRO_USER_SCHEMA_DEFAULT_NAME_STRATEGY_SUBJECT,