Skip to content

Commit

Permalink
GH-2948: Adding configuration for ReactorKafkaBinderHealthIndicator
Browse files Browse the repository at this point in the history
Resolves #2948
  • Loading branch information
sobychacko committed May 13, 2024
1 parent 1f41055 commit a89331c
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,6 +30,7 @@
import org.springframework.cloud.stream.binder.kafka.support.ProducerConfigCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

/**
* Binder configuration for ReactorKafka.
Expand All @@ -40,6 +41,7 @@
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(Binder.class)
@EnableConfigurationProperties({ KafkaProperties.class, KafkaExtendedBindingProperties.class })
@Import({ ReactorKafkaBinderHealthIndicatorConfiguration.class })
public class ReactorKafkaBinderConfiguration {

/**
Expand Down Expand Up @@ -77,7 +79,6 @@ ReactorKafkaBinder reactorKafkaBinder(KafkaBinderConfigurationProperties configu
ObjectProvider<ProducerConfigCustomizer> producerConfigCustomizer,
ObjectProvider<ReceiverOptionsCustomizer> receiverOptionsCustomizers,
ObjectProvider<SenderOptionsCustomizer> senderOptionsptionsCustomizers) {

ReactorKafkaBinder reactorKafkaBinder = new ReactorKafkaBinder(configurationProperties, provisioningProvider);
reactorKafkaBinder.setExtendedBindingProperties(extendedBindingProperties);
reactorKafkaBinder.setConsumerConfigCustomizer(consumerConfigCustomizer.getIfUnique());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2024-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.stream.binder.reactorkafka;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.util.ObjectUtils;

/**
* @author Soby Chacko
* @since 4.1.2
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator")
@ConditionalOnEnabledHealthIndicator("binders")
public class ReactorKafkaBinderHealthIndicatorConfiguration {

@Bean
public ReactorKafkaBinderHealthIndicator reactorKafkaBinderHealthIndicator(
ReactorKafkaBinder reactorKafkaBinder,
KafkaBinderConfigurationProperties configurationProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
Map<String, Object> mergedConfig = configurationProperties
.mergedConsumerConfiguration();
if (!ObjectUtils.isEmpty(mergedConfig)) {
props.putAll(mergedConfig);
}
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
configurationProperties.getKafkaConnectionString());
}
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
ReactorKafkaBinderHealthIndicator indicator = new ReactorKafkaBinderHealthIndicator(
reactorKafkaBinder, consumerFactory);
indicator.setTimeout(configurationProperties.getHealthTimeout());
indicator.setConsiderDownWhenAnyPartitionHasNoLeader(configurationProperties.isConsiderDownWhenAnyPartitionHasNoLeader());
return indicator;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,6 +37,9 @@

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.actuate.health.CompositeHealthContributor;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.test.context.SpringBootTest;
Expand Down Expand Up @@ -81,7 +84,7 @@ class ReactorKafkaBinderIntegrationTests {

@ParameterizedTest
@ValueSource(booleans = { false, true })
void endToEndReactorKafkaBinder(boolean excludeKafkaAutoConfig) {
void endToEndReactorKafkaBinder(boolean excludeKafkaAutoConfig) throws InterruptedException {

recOptsCustOrder.clear();
patternedDeliveries.clear();
Expand Down Expand Up @@ -137,9 +140,20 @@ void endToEndReactorKafkaBinder(boolean excludeKafkaAutoConfig) {
assertThat(recOptsCustOrder).containsExactly("two", "one", "two", "one", "two", "one");
await().untilAsserted(() -> assertThat(patternedDeliveries).contains("bazqux", "FOOBAR"));
assertThat(context.getBean(ReactiveKafkaApplication.class).correlation).contains(42, 43);

checkHealth(context, Status.UP);
}
}

private static void checkHealth(ConfigurableApplicationContext context,
Status expected) throws InterruptedException {
CompositeHealthContributor healthIndicator = context
.getBean("bindersHealthContributor", CompositeHealthContributor.class);
ReactorKafkaBinderHealthIndicator rkbhi = (ReactorKafkaBinderHealthIndicator) healthIndicator.getContributor("reactorKafka");
Health health = rkbhi.health();
assertThat(health.getStatus()).isEqualTo(expected);
}

private String excludeKafkaAutoConfigParam(boolean excludeKafkaAutoConfig) {
return excludeKafkaAutoConfig ?
"--spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration" : "foo=bar";
Expand Down

0 comments on commit a89331c

Please sign in to comment.