Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Spring Cloud Stream Version To Message Headers For Easier Debugging Of Issues #3027

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dump.rdb
.apt_generated
artifacts
**/dependency-reduced-pom.xml
core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/utils/GeneratedBuildProperties.java

node
node_modules
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.pulsar.client.api.Message;

import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.utils.BuildInformationProvider;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageHeaderAccessor;
Expand Down Expand Up @@ -61,6 +62,9 @@ public MessageHeaders toSpringHeaders(Message<?> pulsarMessage) {
MessageHeaderAccessor mutableHeaders = new MessageHeaderAccessor();
mutableHeaders.copyHeaders(springHeaders);
mutableHeaders.setHeader(BinderHeaders.NATIVE_HEADERS_PRESENT, Boolean.TRUE);
if (BuildInformationProvider.isVersionValid()) {
mutableHeaders.setHeader(BinderHeaders.SCST_VERSION, BuildInformationProvider.getVersion());
}
springHeaders = mutableHeaders.getMessageHeaders();
}
return springHeaders;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
Expand All @@ -36,6 +37,7 @@
import org.springframework.cloud.stream.binder.pulsar.provisioning.PulsarTopicProvisioner;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.utils.BuildInformationProvider;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.handler.AbstractMessageProducingHandler;
Expand All @@ -46,6 +48,7 @@
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.pulsar.core.ProducerBuilderCustomizer;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.core.PulsarTemplate;
Expand Down Expand Up @@ -139,7 +142,7 @@ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination
containerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, pulsarMsg) -> {
var springMessage = (inboundHeaderMapper != null)
? MessageBuilder.createMessage(pulsarMsg.getValue(), inboundHeaderMapper.toSpringHeaders(pulsarMsg))
: MessageBuilder.withPayload(pulsarMsg.getValue()).build();
: MessageBuilder.createMessage(pulsarMsg.getValue(), createMessageHeaders());
messageDrivenChannelAdapter.send(springMessage);
});

Expand Down Expand Up @@ -168,6 +171,14 @@ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination
return messageDrivenChannelAdapter;
}

private MessageHeaders createMessageHeaders() {
MessageHeaderAccessor mutableMessageHeaderAccessor = new MessageHeaderAccessor();
if (BuildInformationProvider.isVersionValid()) {
mutableMessageHeaderAccessor.setHeader(BinderHeaders.SCST_VERSION, BuildInformationProvider.getVersion());
}
return mutableMessageHeaderAccessor.getMessageHeaders();
}

@Nullable
private PulsarBinderHeaderMapper determineInboundHeaderMapper(
ExtendedConsumerProperties<PulsarConsumerProperties> extConsumerProps) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package org.springframework.cloud.stream.function;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;

import org.junit.jupiter.api.BeforeAll;
Expand All @@ -26,9 +29,14 @@
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.test.EnableTestBinder;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.cloud.stream.utils.BuildInformationProvider;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -43,7 +51,6 @@
/**
* @author Omer Celik
*/

public class HeaderTests {

@BeforeAll
Expand All @@ -63,10 +70,8 @@ void checkWithEmptyPojo() {

OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message<byte[]> messageReceived = outputDestination.receive(1000, "emptyConfigurationDestination");
MessageHeaders headers = messageReceived.getHeaders();
assertThat(headers).isNotNull();
assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka");
assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json");

checkCommonHeaders(messageReceived.getHeaders());
}
}

Expand All @@ -75,20 +80,20 @@ void checkIfHeaderProvidedInData() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(EmptyConfiguration.class))
.web(WebApplicationType.NONE).run("--spring.jmx.enabled=false")) {

StreamBridge streamBridge = context.getBean(StreamBridge.class);
String jsonPayload = "{\"name\":\"Omer\"}";
streamBridge.send("myBinding-out-0",
MessageBuilder.withPayload(jsonPayload.getBytes())
.setHeader("anyHeader", "anyValue")
.build(),
MimeTypeUtils.APPLICATION_JSON);

OutputDestination output = context.getBean(OutputDestination.class);
Message<byte[]> result = output.receive(1000, "myBinding-out-0");
MessageHeaders headers = result.getHeaders();
assertThat(headers).isNotNull();
assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka");
assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json");
assertThat(headers.get("anyHeader")).isEqualTo("anyValue");

checkCommonHeaders(result.getHeaders());
assertThat(result.getHeaders().get("anyHeader")).isEqualTo("anyValue");
}
}

Expand All @@ -99,16 +104,35 @@ void checkGenericMessageSent() {
.web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false",
"--spring.cloud.function.definition=uppercase")) {

String jsonPayload = "{\"surname\":\"Celik\"}";
InputDestination input = context.getBean(InputDestination.class);
input.send(new GenericMessage<>(jsonPayload.getBytes()), "uppercase-in-0");

OutputDestination output = context.getBean(OutputDestination.class);
Message<byte[]> result = output.receive(1000, "uppercase-out-0");

checkCommonHeaders(result.getHeaders());
}
}

@Test
void checkGenericMessageSentUsingStreamBridge() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(FunctionUpperCaseConfiguration.class))
.web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false",
"--spring.cloud.function.definition=uppercase")) {

String jsonPayload = "{\"anyFieldName\":\"anyValue\"}";
final StreamBridge streamBridge = context.getBean(StreamBridge.class);
GenericMessage<String> message = new GenericMessage<>(jsonPayload);
streamBridge.send("uppercase-in-0", message);

OutputDestination output = context.getBean(OutputDestination.class);
Message<byte[]> result = output.receive(1000, "uppercase-out-0");
MessageHeaders headers = result.getHeaders();
assertThat(headers).isNotNull();
assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka");
assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json");

checkCommonHeaders(result.getHeaders());
}
}

Expand All @@ -127,11 +151,96 @@ void checkMessageWrappedFunctionalConsumer() {

OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> message = target.receive(5, "uppercase-out-0");
MessageHeaders headers = message.getHeaders();
assertThat(headers).isNotNull();

checkCommonHeaders(message.getHeaders());
}

@Test
void checkStringToMapMessageStreamListener() {
ApplicationContext context = new SpringApplicationBuilder(
StringToMapMessageConfiguration.class).web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false");
InputDestination source = context.getBean(InputDestination.class);
String jsonPayload = "{\"name\":\"Omer\"}";
source.send(new GenericMessage<>(jsonPayload.getBytes()));
OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> outputMessage = target.receive();
checkCommonHeaders(outputMessage.getHeaders());
}

@Test
void checkPojoToPojo() {
ApplicationContext context = new SpringApplicationBuilder(
PojoToPojoConfiguration.class).web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false");
InputDestination source = context.getBean(InputDestination.class);
String jsonPayload = "{\"name\":\"Omer\"}";
source.send(new GenericMessage<>(jsonPayload.getBytes()));
OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> outputMessage = target.receive();
checkCommonHeaders(outputMessage.getHeaders());
}

@Test
void checkPojoToString() {
ApplicationContext context = new SpringApplicationBuilder(
PojoToStringConfiguration.class).web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false");
InputDestination source = context.getBean(InputDestination.class);
OutputDestination target = context.getBean(OutputDestination.class);
String jsonPayload = "{\"name\":\"Neso\"}";
source.send(new GenericMessage<>(jsonPayload.getBytes()));
Message<byte[]> outputMessage = target.receive();
checkCommonHeaders(outputMessage.getHeaders());
}

@Test
void checkPojoToByteArray() {
ApplicationContext context = new SpringApplicationBuilder(
PojoToByteArrayConfiguration.class).web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false");
InputDestination source = context.getBean(InputDestination.class);
OutputDestination target = context.getBean(OutputDestination.class);
String jsonPayload = "{\"name\":\"Neptune\"}";
source.send(new GenericMessage<>(jsonPayload.getBytes()));
Message<byte[]> outputMessage = target.receive();
checkCommonHeaders(outputMessage.getHeaders());
}

@Test
void checkStringToPojoInboundContentTypeHeader() {
ApplicationContext context = new SpringApplicationBuilder(
StringToPojoConfiguration.class).web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false");
InputDestination source = context.getBean(InputDestination.class);
OutputDestination target = context.getBean(OutputDestination.class);
String jsonPayload = "{\"name\":\"Mercury\"}";
source.send(new GenericMessage<>(jsonPayload.getBytes(),
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE,
MimeTypeUtils.APPLICATION_JSON_VALUE))));
Message<byte[]> outputMessage = target.receive();
checkCommonHeaders(outputMessage.getHeaders());
}

@Test
void checkPojoMessageToStringMessage() {
ApplicationContext context = new SpringApplicationBuilder(
PojoMessageToStringMessageConfiguration.class)
.web(WebApplicationType.NONE).run("--spring.jmx.enabled=false");
InputDestination source = context.getBean(InputDestination.class);
OutputDestination target = context.getBean(OutputDestination.class);
String jsonPayload = "{\"name\":\"Earth\"}";
source.send(new GenericMessage<>(jsonPayload.getBytes()));
Message<byte[]> outputMessage = target.receive();
MessageHeaders headers = outputMessage.getHeaders();
assertThat(BuildInformationProvider.isVersionValid((String) headers.get(BinderHeaders.SCST_VERSION))).isTrue();
}

private void checkCommonHeaders(MessageHeaders headers) {
assertThat(headers).isNotNull();
assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json");
assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka");
assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json");
assertThat(BuildInformationProvider.isVersionValid((String) headers.get(BinderHeaders.SCST_VERSION))).isTrue();
}

@EnableAutoConfiguration
Expand All @@ -156,6 +265,97 @@ public Function<String, String> uppercase() {
}
}

@EnableTestBinder
@EnableAutoConfiguration
public static class StringToMapMessageConfiguration {
@Bean
public Function<Message<Map<?, ?>>, String> echo() {
return value -> {
assertThat(value.getPayload() instanceof Map).isTrue();
return (String) value.getPayload().get("name");
};
}
}

@EnableTestBinder
@EnableAutoConfiguration
public static class PojoToPojoConfiguration {

@Bean
public Function<Planet, Planet> echo() {
return value -> value;
}
}

@EnableTestBinder
@EnableAutoConfiguration
public static class PojoToStringConfiguration {

@Bean
public Function<Planet, String> echo() {
return Planet::toString;
}
}

@EnableTestBinder
@EnableAutoConfiguration
public static class PojoToByteArrayConfiguration {

@Bean
public Function<Planet, byte[]> echo() {
return value -> value.toString().getBytes(StandardCharsets.UTF_8);
}
}

@EnableTestBinder
@EnableAutoConfiguration
public static class StringToPojoConfiguration {

@Bean
public Function<String, Planet> echo(JsonMapper mapper) {
return value -> mapper.fromJson(value, Planet.class);
}
}

@EnableTestBinder
@EnableAutoConfiguration
public static class PojoMessageToStringMessageConfiguration {

@Bean
public Function<Message<Planet>, Message<String>> echo() {
return value -> MessageBuilder.withPayload(value.getPayload().toString())
.setHeader("expected-content-type", MimeTypeUtils.TEXT_PLAIN_VALUE)
.build();
}
}

public static class Planet {

private String name;

Planet() {
this(null);
}

Planet(String name) {
this.name = name;
}

public String getName() {
return this.name;
}

public void setName(String name) {
this.name = name;
}

@Override
public String toString() {
return this.name;
}

}

public static class EmptyPojo {

}
Expand Down
Loading
Loading