Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-2650: Observability enhancements in reactive Kafka binder #3003

Merged
merged 2 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,39 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>context-propagation</artifactId>
artembilan marked this conversation as resolved.
Show resolved Hide resolved
<scope>optional</scope>
</dependency>

<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-integration-test</artifactId>
artembilan marked this conversation as resolved.
Show resolved Hide resolved
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>io.opentelemetry</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.wavefront</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-otel</artifactId>
</exclusion>
<exclusion>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-reporter-wavefront</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2023 the original author or authors.
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;

import io.micrometer.observation.ObservationRegistry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -84,6 +85,7 @@
* @author Gary Russell
* @author Byungjun You
* @author Omer Celik
* @author Soby Chacko
* @since 4.0
*
*/
Expand Down Expand Up @@ -111,11 +113,14 @@ public class ReactorKafkaBinder

private final Map<String, MessageProducerSupport> messageProducers = new ConcurrentHashMap<>();

private final ObservationRegistry observationRegistry;

public ReactorKafkaBinder(KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioner) {
KafkaTopicProvisioner provisioner, @Nullable ObservationRegistry observationRegistry) {

super(new String[0], provisioner, null, null);
this.configurationProperties = configurationProperties;
this.observationRegistry = observationRegistry;
}

public void setConsumerConfigCustomizer(ConsumerConfigCustomizer consumerConfigCustomizer) {
Expand Down Expand Up @@ -194,6 +199,9 @@ protected MessageHandler createProducerMessageHandler(ProducerDestination destin

SenderOptions<Object, Object> opts = this.senderOptionsCustomizer.apply(producerProperties.getBindingName(),
SenderOptions.create(configs));
if (this.configurationProperties.isEnableObservation() && this.observationRegistry != null) {
opts = opts.withObservation(this.observationRegistry);
}
// TODO bean for converter; MCB doesn't use one on the producer side.
RecordMessageConverter converter = new MessagingMessageConverter();
AbstractApplicationContext applicationContext = getApplicationContext();
Expand Down Expand Up @@ -405,7 +413,7 @@ protected void handleMessageInternal(Message<?> message) {
@SuppressWarnings("unchecked")
SenderRecord<Object, Object, Object> sr = SenderRecord.create(
(ProducerRecord<Object, Object>) converter.fromMessage(message, topic), correlation);
Flux<SenderResult<Object>> result = sender.send(Flux.just(sr));
Flux<SenderResult<Object>> result = sender.send(Flux.just(sr)).contextCapture();
result.subscribe(res -> {
if (this.results != null) {
this.results.send(MessageBuilder.withPayload(res)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.cloud.stream.binder.reactorkafka;

import io.micrometer.observation.ObservationRegistry;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
Expand All @@ -37,6 +39,7 @@
*
* @author Gary Russell
* @author Chris Bono
* @author Soby Chacko
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(Binder.class)
Expand Down Expand Up @@ -73,13 +76,15 @@ KafkaTopicProvisioner provisioningProvider(

@Bean
ReactorKafkaBinder reactorKafkaBinder(KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider,
KafkaExtendedBindingProperties extendedBindingProperties,
ObjectProvider<ConsumerConfigCustomizer> consumerConfigCustomizer,
ObjectProvider<ProducerConfigCustomizer> producerConfigCustomizer,
ObjectProvider<ReceiverOptionsCustomizer> receiverOptionsCustomizers,
ObjectProvider<SenderOptionsCustomizer> senderOptionsptionsCustomizers) {
ReactorKafkaBinder reactorKafkaBinder = new ReactorKafkaBinder(configurationProperties, provisioningProvider);
KafkaTopicProvisioner provisioningProvider,
KafkaExtendedBindingProperties extendedBindingProperties,
ObjectProvider<ConsumerConfigCustomizer> consumerConfigCustomizer,
ObjectProvider<ProducerConfigCustomizer> producerConfigCustomizer,
ObjectProvider<ReceiverOptionsCustomizer> receiverOptionsCustomizers,
ObjectProvider<SenderOptionsCustomizer> senderOptionsptionsCustomizers,
ObjectProvider<ObservationRegistry> observationRegistryObjectProvider) {
ReactorKafkaBinder reactorKafkaBinder = new ReactorKafkaBinder(configurationProperties, provisioningProvider,
observationRegistryObjectProvider.getIfUnique());
reactorKafkaBinder.setExtendedBindingProperties(extendedBindingProperties);
reactorKafkaBinder.setConsumerConfigCustomizer(consumerConfigCustomizer.getIfUnique());
reactorKafkaBinder.setProducerConfigCustomizer(producerConfigCustomizer.getIfUnique());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.stream.binder.reactorkafka;

import java.lang.reflect.Type;
import java.time.Duration;
import java.util.function.Function;
import java.util.stream.Collectors;

import brave.handler.SpanHandler;
import brave.test.TestSpanHandler;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import io.micrometer.tracing.brave.bridge.BraveFinishedSpan;
import io.micrometer.tracing.test.simple.SpansAssert;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.receiver.observation.KafkaReceiverObservation;
import reactor.kafka.receiver.observation.KafkaRecordReceiverContext;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.autoconfigure.actuate.observability.AutoConfigureObservability;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.messaging.Message;
import org.springframework.test.annotation.DirtiesContext;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

/**
* @author Artem Bilan
* @author Soby Chacko
* @since 4.2.0
*/
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {
"spring.kafka.consumer.metadata.max.age.ms=1000",
"spring.cloud.function.definition=receive",
"spring.cloud.stream.function.reactive.uppercase=true",
"spring.cloud.stream.bindings.receive-in-0.group=rkbot-in-group",
"spring.cloud.stream.bindings.receive-in-0.destination=rkbot-in-topic",
"spring.cloud.stream.bindings.receive-out-0.destination=rkbot-out-topic",
"spring.cloud.stream.kafka.binder.enable-observation=true",
"spring.cloud.stream.kafka.binder.brokers=${spring.kafka.bootstrap-servers}",
"management.tracing.sampling.probability=1",
"spring.cloud.stream.kafka.bindings.receive-in-0.consumer.converterBeanName=fullRR"
})
@DirtiesContext
@AutoConfigureObservability
@EmbeddedKafka(topics = { "rkbot-out-topic" })
public class ReactorKafkaBinderObservationTests {

private static final TestSpanHandler SPANS = new TestSpanHandler();

@Autowired
StreamBridge streamBridge;

@Autowired
ObservationRegistry observationRegistry;

@Autowired
TestConfiguration testConfiguration;

@Autowired
private EmbeddedKafkaBroker embeddedKafka;

@Test
void endToEndReactorKafkaBinder1() {

streamBridge.send("rkbot-in-topic", MessageBuilder.withPayload("data")
.build());

await().timeout(Duration.ofSeconds(10)).untilAsserted(() -> assertThat(SPANS.spans()).hasSize(3));
SpansAssert.assertThat(SPANS.spans().stream().map(BraveFinishedSpan::fromBrave).collect(Collectors.toList()))
.haveSameTraceId();
}

@SpringBootConfiguration
@EnableAutoConfiguration(exclude = org.springframework.cloud.function.observability.ObservationAutoConfiguration.class)
public static class TestConfiguration {

@Bean
SpanHandler testSpanHandler() {
return SPANS;
}

@Bean
RecordMessageConverter fullRR() {
return new RecordMessageConverter() {

private final RecordMessageConverter converter = new MessagingMessageConverter();

@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, Type payloadType) {

return MessageBuilder.withPayload(record).build();
}

@Override
public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
return this.converter.fromMessage(message, defaultTopic);
}

};
}

@Bean
Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<Message<String>>> receive(ObservationRegistry observationRegistry) {
return s -> s
.flatMap(record -> {
Observation receiverObservation =
KafkaReceiverObservation.RECEIVER_OBSERVATION.start(null,
KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
() ->
new KafkaRecordReceiverContext(
record, "user.receiver", "localhost:9092"),
observationRegistry);

return Mono.deferContextual(contextView -> Mono.just(record)
.map(rec -> new String(rec.value()).toLowerCase())
.map(rec -> MessageBuilder.withPayload(rec).setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, contextView).build()))
.doOnTerminate(receiverObservation::stop)
.doOnError(receiverObservation::error)
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
});
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ void consumerBinding() throws Exception {
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderProps, kafkaProperties, prop -> {
});
provisioner.setMetadataRetryOperations(new RetryTemplate());
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner);
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner, null);
binder.setApplicationContext(mock(GenericApplicationContext.class));

CountDownLatch latch = new CountDownLatch(2);
Expand Down Expand Up @@ -148,7 +148,7 @@ void concurrency(String topic, String group, boolean atMostOnce) throws Exceptio
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderProps, kafkaProperties, prop -> {
});
provisioner.setMetadataRetryOperations(new RetryTemplate());
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner);
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner, null);
binder.setApplicationContext(mock(GenericApplicationContext.class));

CountDownLatch subscriptionLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -229,7 +229,7 @@ void autoCommit() throws Exception {
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderProps, kafkaProperties, prop -> {
});
provisioner.setMetadataRetryOperations(new RetryTemplate());
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner);
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner, null);
binder.setApplicationContext(mock(GenericApplicationContext.class));

CountDownLatch subscriptionLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -298,7 +298,7 @@ void producerBinding() throws InterruptedException {
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderProps, kafkaProperties, prop -> {
});
provisioner.setMetadataRetryOperations(new RetryTemplate());
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner);
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner, null);
CountDownLatch latch = new CountDownLatch(1);
GenericApplicationContext context = new GenericApplicationContext();
context.registerBean("sendResults", FluxMessageChannel.class);
Expand Down
Loading
Loading