diff --git a/.gitignore b/.gitignore index aa64c92da..76d8c0f8d 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,7 @@ dump.rdb .apt_generated artifacts **/dependency-reduced-pom.xml +core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/utils/GeneratedBuildProperties.java node node_modules diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderHeaderMapper.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderHeaderMapper.java index c209863de..18a74de64 100644 --- a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderHeaderMapper.java +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderHeaderMapper.java @@ -21,6 +21,7 @@ import org.apache.pulsar.client.api.Message; import org.springframework.cloud.stream.binder.BinderHeaders; +import org.springframework.cloud.stream.utils.BuildInformationProvider; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageHeaderAccessor; @@ -61,6 +62,9 @@ public MessageHeaders toSpringHeaders(Message pulsarMessage) { MessageHeaderAccessor mutableHeaders = new MessageHeaderAccessor(); mutableHeaders.copyHeaders(springHeaders); mutableHeaders.setHeader(BinderHeaders.NATIVE_HEADERS_PRESENT, Boolean.TRUE); + if (BuildInformationProvider.isVersionValid()) { + mutableHeaders.setHeader(BinderHeaders.SCST_VERSION, BuildInformationProvider.getVersion()); + } springHeaders = mutableHeaders.getMessageHeaders(); } return springHeaders; diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/PulsarMessageChannelBinder.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/PulsarMessageChannelBinder.java index e571b8f37..b45583a5a 100644 --- a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/PulsarMessageChannelBinder.java +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/main/java/org/springframework/cloud/stream/binder/pulsar/PulsarMessageChannelBinder.java @@ -24,6 +24,7 @@ import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; import org.springframework.cloud.stream.binder.Binder; +import org.springframework.cloud.stream.binder.BinderHeaders; import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedProducerProperties; @@ -36,6 +37,7 @@ import org.springframework.cloud.stream.binder.pulsar.provisioning.PulsarTopicProvisioner; import org.springframework.cloud.stream.provisioning.ConsumerDestination; import org.springframework.cloud.stream.provisioning.ProducerDestination; +import org.springframework.cloud.stream.utils.BuildInformationProvider; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.integration.handler.AbstractMessageProducingHandler; @@ -46,6 +48,7 @@ import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.messaging.support.MessageHeaderAccessor; import org.springframework.pulsar.core.ProducerBuilderCustomizer; import org.springframework.pulsar.core.PulsarConsumerFactory; import org.springframework.pulsar.core.PulsarTemplate; @@ -139,7 +142,7 @@ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination containerProperties.setMessageListener((PulsarRecordMessageListener) (consumer, pulsarMsg) -> { var springMessage = (inboundHeaderMapper != null) ? MessageBuilder.createMessage(pulsarMsg.getValue(), inboundHeaderMapper.toSpringHeaders(pulsarMsg)) - : MessageBuilder.withPayload(pulsarMsg.getValue()).build(); + : MessageBuilder.createMessage(pulsarMsg.getValue(), createMessageHeaders()); messageDrivenChannelAdapter.send(springMessage); }); @@ -168,6 +171,14 @@ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination return messageDrivenChannelAdapter; } + private MessageHeaders createMessageHeaders() { + MessageHeaderAccessor mutableMessageHeaderAccessor = new MessageHeaderAccessor(); + if (BuildInformationProvider.isVersionValid()) { + mutableMessageHeaderAccessor.setHeader(BinderHeaders.SCST_VERSION, BuildInformationProvider.getVersion()); + } + return mutableMessageHeaderAccessor.getMessageHeaders(); + } + @Nullable private PulsarBinderHeaderMapper determineInboundHeaderMapper( ExtendedConsumerProperties extConsumerProps) { diff --git a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/HeaderTests.java b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/HeaderTests.java index 15bf8ac39..055d38722 100644 --- a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/HeaderTests.java +++ b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/HeaderTests.java @@ -16,7 +16,10 @@ package org.springframework.cloud.stream.function; +import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.Locale; +import java.util.Map; import java.util.function.Function; import org.junit.jupiter.api.BeforeAll; @@ -26,9 +29,14 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.cloud.function.context.message.MessageUtils; +import org.springframework.cloud.function.json.JsonMapper; +import org.springframework.cloud.stream.binder.BinderHeaders; +import org.springframework.cloud.stream.binder.test.EnableTestBinder; import org.springframework.cloud.stream.binder.test.InputDestination; import org.springframework.cloud.stream.binder.test.OutputDestination; import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration; +import org.springframework.cloud.stream.utils.BuildInformationProvider; +import org.springframework.context.ApplicationContext; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -43,7 +51,6 @@ /** * @author Omer Celik */ - public class HeaderTests { @BeforeAll @@ -63,10 +70,8 @@ void checkWithEmptyPojo() { OutputDestination outputDestination = context.getBean(OutputDestination.class); Message messageReceived = outputDestination.receive(1000, "emptyConfigurationDestination"); - MessageHeaders headers = messageReceived.getHeaders(); - assertThat(headers).isNotNull(); - assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka"); - assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json"); + + checkCommonHeaders(messageReceived.getHeaders()); } } @@ -75,6 +80,7 @@ void checkIfHeaderProvidedInData() { try (ConfigurableApplicationContext context = new SpringApplicationBuilder( TestChannelBinderConfiguration.getCompleteConfiguration(EmptyConfiguration.class)) .web(WebApplicationType.NONE).run("--spring.jmx.enabled=false")) { + StreamBridge streamBridge = context.getBean(StreamBridge.class); String jsonPayload = "{\"name\":\"Omer\"}"; streamBridge.send("myBinding-out-0", @@ -82,13 +88,12 @@ void checkIfHeaderProvidedInData() { .setHeader("anyHeader", "anyValue") .build(), MimeTypeUtils.APPLICATION_JSON); + OutputDestination output = context.getBean(OutputDestination.class); Message result = output.receive(1000, "myBinding-out-0"); - MessageHeaders headers = result.getHeaders(); - assertThat(headers).isNotNull(); - assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka"); - assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json"); - assertThat(headers.get("anyHeader")).isEqualTo("anyValue"); + + checkCommonHeaders(result.getHeaders()); + assertThat(result.getHeaders().get("anyHeader")).isEqualTo("anyValue"); } } @@ -99,16 +104,35 @@ void checkGenericMessageSent() { .web(WebApplicationType.NONE) .run("--spring.jmx.enabled=false", "--spring.cloud.function.definition=uppercase")) { + String jsonPayload = "{\"surname\":\"Celik\"}"; InputDestination input = context.getBean(InputDestination.class); input.send(new GenericMessage<>(jsonPayload.getBytes()), "uppercase-in-0"); + OutputDestination output = context.getBean(OutputDestination.class); + Message result = output.receive(1000, "uppercase-out-0"); + + checkCommonHeaders(result.getHeaders()); + } + } + @Test + void checkGenericMessageSentUsingStreamBridge() { + try (ConfigurableApplicationContext context = new SpringApplicationBuilder( + TestChannelBinderConfiguration.getCompleteConfiguration(FunctionUpperCaseConfiguration.class)) + .web(WebApplicationType.NONE) + .run("--spring.jmx.enabled=false", + "--spring.cloud.function.definition=uppercase")) { + + String jsonPayload = "{\"anyFieldName\":\"anyValue\"}"; + final StreamBridge streamBridge = context.getBean(StreamBridge.class); + GenericMessage message = new GenericMessage<>(jsonPayload); + streamBridge.send("uppercase-in-0", message); + + OutputDestination output = context.getBean(OutputDestination.class); Message result = output.receive(1000, "uppercase-out-0"); - MessageHeaders headers = result.getHeaders(); - assertThat(headers).isNotNull(); - assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka"); - assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json"); + + checkCommonHeaders(result.getHeaders()); } } @@ -127,11 +151,96 @@ void checkMessageWrappedFunctionalConsumer() { OutputDestination target = context.getBean(OutputDestination.class); Message message = target.receive(5, "uppercase-out-0"); - MessageHeaders headers = message.getHeaders(); - assertThat(headers).isNotNull(); + + checkCommonHeaders(message.getHeaders()); + } + + @Test + void checkStringToMapMessageStreamListener() { + ApplicationContext context = new SpringApplicationBuilder( + StringToMapMessageConfiguration.class).web(WebApplicationType.NONE) + .run("--spring.jmx.enabled=false"); + InputDestination source = context.getBean(InputDestination.class); + String jsonPayload = "{\"name\":\"Omer\"}"; + source.send(new GenericMessage<>(jsonPayload.getBytes())); + OutputDestination target = context.getBean(OutputDestination.class); + Message outputMessage = target.receive(); + checkCommonHeaders(outputMessage.getHeaders()); + } + + @Test + void checkPojoToPojo() { + ApplicationContext context = new SpringApplicationBuilder( + PojoToPojoConfiguration.class).web(WebApplicationType.NONE) + .run("--spring.jmx.enabled=false"); + InputDestination source = context.getBean(InputDestination.class); + String jsonPayload = "{\"name\":\"Omer\"}"; + source.send(new GenericMessage<>(jsonPayload.getBytes())); + OutputDestination target = context.getBean(OutputDestination.class); + Message outputMessage = target.receive(); + checkCommonHeaders(outputMessage.getHeaders()); + } + + @Test + void checkPojoToString() { + ApplicationContext context = new SpringApplicationBuilder( + PojoToStringConfiguration.class).web(WebApplicationType.NONE) + .run("--spring.jmx.enabled=false"); + InputDestination source = context.getBean(InputDestination.class); + OutputDestination target = context.getBean(OutputDestination.class); + String jsonPayload = "{\"name\":\"Neso\"}"; + source.send(new GenericMessage<>(jsonPayload.getBytes())); + Message outputMessage = target.receive(); + checkCommonHeaders(outputMessage.getHeaders()); + } + + @Test + void checkPojoToByteArray() { + ApplicationContext context = new SpringApplicationBuilder( + PojoToByteArrayConfiguration.class).web(WebApplicationType.NONE) + .run("--spring.jmx.enabled=false"); + InputDestination source = context.getBean(InputDestination.class); + OutputDestination target = context.getBean(OutputDestination.class); + String jsonPayload = "{\"name\":\"Neptune\"}"; + source.send(new GenericMessage<>(jsonPayload.getBytes())); + Message outputMessage = target.receive(); + checkCommonHeaders(outputMessage.getHeaders()); + } + + @Test + void checkStringToPojoInboundContentTypeHeader() { + ApplicationContext context = new SpringApplicationBuilder( + StringToPojoConfiguration.class).web(WebApplicationType.NONE) + .run("--spring.jmx.enabled=false"); + InputDestination source = context.getBean(InputDestination.class); + OutputDestination target = context.getBean(OutputDestination.class); + String jsonPayload = "{\"name\":\"Mercury\"}"; + source.send(new GenericMessage<>(jsonPayload.getBytes(), + new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, + MimeTypeUtils.APPLICATION_JSON_VALUE)))); + Message outputMessage = target.receive(); + checkCommonHeaders(outputMessage.getHeaders()); + } + + @Test + void checkPojoMessageToStringMessage() { + ApplicationContext context = new SpringApplicationBuilder( + PojoMessageToStringMessageConfiguration.class) + .web(WebApplicationType.NONE).run("--spring.jmx.enabled=false"); + InputDestination source = context.getBean(InputDestination.class); + OutputDestination target = context.getBean(OutputDestination.class); + String jsonPayload = "{\"name\":\"Earth\"}"; + source.send(new GenericMessage<>(jsonPayload.getBytes())); + Message outputMessage = target.receive(); + MessageHeaders headers = outputMessage.getHeaders(); + assertThat(BuildInformationProvider.isVersionValid((String) headers.get(BinderHeaders.SCST_VERSION))).isTrue(); + } + + private void checkCommonHeaders(MessageHeaders headers) { assertThat(headers).isNotNull(); - assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json"); assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka"); + assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json"); + assertThat(BuildInformationProvider.isVersionValid((String) headers.get(BinderHeaders.SCST_VERSION))).isTrue(); } @EnableAutoConfiguration @@ -156,6 +265,97 @@ public Function uppercase() { } } + @EnableTestBinder + @EnableAutoConfiguration + public static class StringToMapMessageConfiguration { + @Bean + public Function>, String> echo() { + return value -> { + assertThat(value.getPayload() instanceof Map).isTrue(); + return (String) value.getPayload().get("name"); + }; + } + } + + @EnableTestBinder + @EnableAutoConfiguration + public static class PojoToPojoConfiguration { + + @Bean + public Function echo() { + return value -> value; + } + } + + @EnableTestBinder + @EnableAutoConfiguration + public static class PojoToStringConfiguration { + + @Bean + public Function echo() { + return Planet::toString; + } + } + + @EnableTestBinder + @EnableAutoConfiguration + public static class PojoToByteArrayConfiguration { + + @Bean + public Function echo() { + return value -> value.toString().getBytes(StandardCharsets.UTF_8); + } + } + + @EnableTestBinder + @EnableAutoConfiguration + public static class StringToPojoConfiguration { + + @Bean + public Function echo(JsonMapper mapper) { + return value -> mapper.fromJson(value, Planet.class); + } + } + + @EnableTestBinder + @EnableAutoConfiguration + public static class PojoMessageToStringMessageConfiguration { + + @Bean + public Function, Message> echo() { + return value -> MessageBuilder.withPayload(value.getPayload().toString()) + .setHeader("expected-content-type", MimeTypeUtils.TEXT_PLAIN_VALUE) + .build(); + } + } + + public static class Planet { + + private String name; + + Planet() { + this(null); + } + + Planet(String name) { + this.name = name; + } + + public String getName() { + return this.name; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public String toString() { + return this.name; + } + + } + public static class EmptyPojo { } diff --git a/core/spring-cloud-stream/pom.xml b/core/spring-cloud-stream/pom.xml index 6b0e54b73..3dffa5ff1 100644 --- a/core/spring-cloud-stream/pom.xml +++ b/core/spring-cloud-stream/pom.xml @@ -15,6 +15,10 @@ 4.2.0-SNAPSHOT + + ${maven.build.timestamp} + + org.springframework.boot @@ -129,6 +133,77 @@ 1.8 + + org.apache.maven.plugins + maven-clean-plugin + + + clean + + clean + + + + + src/main/java + + **/GeneratedBuildProperties.java + + + + + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + generate-build-info + generate-sources + + run + + + + + + + + + + + + + + + + org.apache.maven.plugins + maven-resources-plugin + + + process-sources + + copy-resources + + + + + src/main/template + + **/*.java + + true + + + src/main/java + true + + + + diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java index 7d23f7d75..fab2d7ca0 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -79,6 +80,7 @@ import org.springframework.cloud.stream.config.BindingServiceConfiguration; import org.springframework.cloud.stream.config.BindingServiceProperties; import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel; +import org.springframework.cloud.stream.utils.BuildInformationProvider; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ConfigurableApplicationContext; @@ -132,6 +134,7 @@ * @author Ivan Shapoval * @author Patrik Péter Süli * @author Artem Bilan + * @author Omer Celik * @since 2.1 */ @Lazy(false) @@ -474,7 +477,7 @@ private void bindFunctionToDestinations(BindableProxyFactory bindableProxyFactor if (this.functionProperties.isComposeFrom()) { AbstractSubscribableChannel outputChannel = this.applicationContext.getBean(outputBindingNames.iterator().next(), AbstractSubscribableChannel.class); logger.info("Composing at the head of output destination: " + outputChannel.getBeanName()); - String outputChannelName = ((AbstractMessageChannel) outputChannel).getBeanName(); + String outputChannelName = outputChannel.getBeanName(); DirectWithAttributesChannel newOutputChannel = new DirectWithAttributesChannel(); newOutputChannel.setAttribute("type", "output"); newOutputChannel.setComponentName("output.extended"); @@ -501,11 +504,14 @@ private void bindFunctionToDestinations(BindableProxyFactory bindableProxyFactor headersField.setAccessible(true); targetProtocolEnhancer.set(message -> { Map headersMap = (Map) ReflectionUtils - .getField(headersField, ((Message) message).getHeaders()); + .getField(headersField, message.getHeaders()); headersMap.putIfAbsent(MessageUtils.TARGET_PROTOCOL, targetProtocol); if (CloudEventMessageUtils.isCloudEvent((message))) { headersMap.putIfAbsent(MessageUtils.MESSAGE_TYPE, CloudEventMessageUtils.CLOUDEVENT_VALUE); } + if (BuildInformationProvider.isVersionValid()) { + headersMap.putIfAbsent(BinderHeaders.SCST_VERSION, BuildInformationProvider.getVersion()); + } return message; }); } @@ -571,9 +577,8 @@ private void bindFunctionToDestinations(BindableProxyFactory bindableProxyFactor // based on information from the correct output binding properties. if (outputCount > 1) { Integer partitionId = determinePartitionForOutputBinding(outputBinding, message); - message = MessageBuilder - .fromMessage((Message) message) - .setHeader(BinderHeaders.PARTITION_HEADER, partitionId).build(); + message = MessageBuilder.fromMessage((Message) message) + .copyHeaders(createMessageHeadersWithVersionInfoAndPartitionId(partitionId)).build(); } if (message instanceof Message m && m.getHeaders().get("spring.cloud.stream.sendto.destination") != null) { String destinationName = (String) m.getHeaders().get("spring.cloud.stream.sendto.destination"); @@ -582,12 +587,11 @@ private void bindFunctionToDestinations(BindableProxyFactory bindableProxyFactor if (logger.isInfoEnabled()) { logger.info("Output message is sent to '" + destinationName + "' destination"); } + m = createMessageWithVersionInfoInHeader(m); dynamicChannel.send(m); } else { - if (!(message instanceof Message)) { - message = MessageBuilder.withPayload(message).build(); - } + message = createMessageWithVersionInfoInHeader(message); if (isContextPropagationPresent && outputChannel instanceof FluxMessageChannel) { ContextView reactorContext = StaticMessageHeaderAccessor.getReactorContext((Message) message); try (AutoCloseable autoCloseable = ContextSnapshotHelper.setContext(reactorContext)) { @@ -626,6 +630,31 @@ private void bindFunctionToDestinations(BindableProxyFactory bindableProxyFactor } } + private Message createMessageWithVersionInfoInHeader(Object objectMessage) { + MessageBuilder messageBuilder; + if (objectMessage instanceof Message message) { + messageBuilder = MessageBuilder.fromMessage(message); + } + else { + messageBuilder = MessageBuilder.withPayload(objectMessage); + } + return messageBuilder.copyHeaders(createMessageHeadersWithVersionInfo()).build(); + } + + private Map createMessageHeadersWithVersionInfo() { + Map headers = new HashMap<>(); + if (BuildInformationProvider.isVersionValid()) { + headers.put(BinderHeaders.SCST_VERSION, BuildInformationProvider.getVersion()); + } + return headers; + } + + private Map createMessageHeadersWithVersionInfoAndPartitionId(Integer partitionId) { + Map headers = createMessageHeadersWithVersionInfo(); + headers.put(BinderHeaders.PARTITION_HEADER, partitionId); + return headers; + } + private Integer determinePartitionForOutputBinding(String outputBinding, Object message) { BindingProperties bindingProperties = FunctionToDestinationBinder.this.serviceProperties.getBindings().get(outputBinding); ProducerProperties producerProperties = bindingProperties == null ? null : bindingProperties.getProducer(); @@ -840,6 +869,9 @@ private void setHeadersIfNeeded(Message message) { if (CloudEventMessageUtils.isCloudEvent(message)) { headersMap.putIfAbsent(MessageUtils.MESSAGE_TYPE, CloudEventMessageUtils.CLOUDEVENT_VALUE); } + if (BuildInformationProvider.isVersionValid()) { + headersMap.putIfAbsent(BinderHeaders.SCST_VERSION, BuildInformationProvider.getVersion()); + } } } diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java index 7e22cdb75..399101448 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java @@ -17,7 +17,6 @@ package org.springframework.cloud.stream.function; import java.lang.reflect.Type; -import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; @@ -43,6 +42,7 @@ import org.springframework.cloud.function.core.FunctionInvocationHelper; import org.springframework.cloud.stream.binder.Binder; import org.springframework.cloud.stream.binder.BinderFactory; +import org.springframework.cloud.stream.binder.BinderHeaders; import org.springframework.cloud.stream.binder.ProducerProperties; import org.springframework.cloud.stream.binding.BindingService; import org.springframework.cloud.stream.binding.DefaultPartitioningInterceptor; @@ -50,6 +50,7 @@ import org.springframework.cloud.stream.config.BindingProperties; import org.springframework.cloud.stream.config.BindingServiceProperties; import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel; +import org.springframework.cloud.stream.utils.BuildInformationProvider; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.core.ResolvableType; import org.springframework.integration.channel.AbstractMessageChannel; @@ -85,6 +86,7 @@ * @author Soby Chacko * @author Byungjun You * @author Michał Rowicki + * @author Omer Celik * @since 3.0.3 * */ @@ -206,8 +208,7 @@ public boolean send(String bindingName, @Nullable String binderName, Object data this.applicationContext.getBean(BinderFactory.class)); Message messageToSend = data instanceof Message messageData - ? MessageBuilder.fromMessage(messageData).setHeaderIfAbsent(MessageUtils.TARGET_PROTOCOL, targetType).build() - : new GenericMessage<>(data, Collections.singletonMap(MessageUtils.TARGET_PROTOCOL, targetType)); + ? createMessageWithHeader(messageData, targetType) : createGenericMessageWithHeader(data, targetType); Message resultMessage; lock.lock(); @@ -228,6 +229,24 @@ public boolean send(String bindingName, @Nullable String binderName, Object data return messageChannel.send(resultMessage); } + private Message createMessageWithHeader(Message messageData, String targetType) { + MessageBuilder messageBuilder = MessageBuilder.fromMessage(messageData) + .copyHeaders(createHeaders(targetType)); + return messageBuilder.build(); + } + private Message createGenericMessageWithHeader(Object data, String targetType) { + return new GenericMessage<>(data, createHeaders(targetType)); + } + + private Map createHeaders(String targetType) { + Map headers = new HashMap<>(); + headers.put(MessageUtils.TARGET_PROTOCOL, targetType); + if (BuildInformationProvider.isVersionValid()) { + headers.put(BinderHeaders.SCST_VERSION, BuildInformationProvider.getVersion()); + } + return headers; + } + private int hashProducerProperties(ProducerProperties producerProperties, String outputContentType) { int hash = outputContentType.hashCode() + Boolean.hashCode(producerProperties.isUseNativeEncoding()) diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/utils/BuildInformationProvider.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/utils/BuildInformationProvider.java new file mode 100644 index 000000000..67c117731 --- /dev/null +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/utils/BuildInformationProvider.java @@ -0,0 +1,66 @@ +/* + * Copyright 2024-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.utils; + + +/** + * Provides information about current Spring Cloud Stream build. + * + * @author Omer Celik + */ +public final class BuildInformationProvider { + + private static final String UNKNOWN_SCST_VERSION = "-1"; + + private static final BuildInformation BUILD_INFO_CACHE; + + static { + BUILD_INFO_CACHE = createBuildInformation(); + } + + private BuildInformationProvider() { + } + + public static boolean isVersionValid() { + return !getVersion().equals(UNKNOWN_SCST_VERSION); + } + public static boolean isVersionValid(String version) { + return !version.equals(UNKNOWN_SCST_VERSION); + } + public static String getVersion() { + return BUILD_INFO_CACHE.version(); + } + + // If you have a compilation error at GeneratedBuildProperties then run 'mvn clean install' + // the GeneratedBuildProperties class is generated at a compile-time + private static BuildInformation createBuildInformation() { + return new BuildInformation(calculateVersion(), GeneratedBuildProperties.TIMESTAMP); + } + + // If you have a compilation error at GeneratedBuildProperties then run 'mvn clean install' + // the GeneratedBuildProperties class is generated at a compile-time + private static String calculateVersion() { + String version = GeneratedBuildProperties.VERSION; + if (version.startsWith("@") && version.endsWith("@")) { + return UNKNOWN_SCST_VERSION; + } + return version; + } + + private record BuildInformation(String version, String timestamp) { + } +} diff --git a/core/spring-cloud-stream/src/main/template/org/springframework/cloud/stream/utils/GeneratedBuildProperties.java b/core/spring-cloud-stream/src/main/template/org/springframework/cloud/stream/utils/GeneratedBuildProperties.java new file mode 100644 index 000000000..d82b1ff9f --- /dev/null +++ b/core/spring-cloud-stream/src/main/template/org/springframework/cloud/stream/utils/GeneratedBuildProperties.java @@ -0,0 +1,50 @@ +/* + * Copyright 2024-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.utils; + +import javax.annotation.processing.Generated; + +/** + * Exposes the Spring Cloud Stream Build Properties. + * This class is generated in a build-time from a template stored at + * src/main/template/org/springframework/cloud/stream/utils/GeneratedBuildProperties. + * + * Do not edit by hand as the changes will be overwritten in the next build. + * We use this method to support all build types. (Fat jar, shaded jar, war, etc.) + * + * WARNING: DO NOT CHANGE FIELD VALUES IN THE TEMPLATE.(For example: @project.version@) + * The fields are injected using the @.....@ keywords with the Maven Antrun Plugin. + * + * @author Omer Celik + * @since 4.2.0 + */ +@Generated("") +public final class GeneratedBuildProperties { + + /** + * Indicates the Spring Cloud Stream version. + */ + public static final String VERSION = "@project.version@"; + + /** + * Indicates the build time of the project. + */ + public static final String TIMESTAMP = "@timestamp@"; + + private GeneratedBuildProperties() { + } +}