From a89331c095016d07c76b515f04a4c5849b963443 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Mon, 13 May 2024 13:47:11 -0400 Subject: [PATCH] GH-2948: Adding configuration for `ReactorKafkaBinderHealthIndicator` Resolves https://github.com/spring-cloud/spring-cloud-stream/issues/2948 --- .../ReactorKafkaBinderConfiguration.java | 5 +- ...fkaBinderHealthIndicatorConfiguration.java | 69 +++++++++++++++++++ .../ReactorKafkaBinderIntegrationTests.java | 18 ++++- 3 files changed, 88 insertions(+), 4 deletions(-) create mode 100644 binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderHealthIndicatorConfiguration.java diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderConfiguration.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderConfiguration.java index 5a740f8973..3f40f3091c 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderConfiguration.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ import org.springframework.cloud.stream.binder.kafka.support.ProducerConfigCustomizer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; /** * Binder configuration for ReactorKafka. @@ -40,6 +41,7 @@ @Configuration(proxyBeanMethods = false) @ConditionalOnMissingBean(Binder.class) @EnableConfigurationProperties({ KafkaProperties.class, KafkaExtendedBindingProperties.class }) +@Import({ ReactorKafkaBinderHealthIndicatorConfiguration.class }) public class ReactorKafkaBinderConfiguration { /** @@ -77,7 +79,6 @@ ReactorKafkaBinder reactorKafkaBinder(KafkaBinderConfigurationProperties configu ObjectProvider producerConfigCustomizer, ObjectProvider receiverOptionsCustomizers, ObjectProvider senderOptionsptionsCustomizers) { - ReactorKafkaBinder reactorKafkaBinder = new ReactorKafkaBinder(configurationProperties, provisioningProvider); reactorKafkaBinder.setExtendedBindingProperties(extendedBindingProperties); reactorKafkaBinder.setConsumerConfigCustomizer(consumerConfigCustomizer.getIfUnique()); diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderHealthIndicatorConfiguration.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderHealthIndicatorConfiguration.java new file mode 100644 index 0000000000..0301a7e212 --- /dev/null +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderHealthIndicatorConfiguration.java @@ -0,0 +1,69 @@ +/* + * Copyright 2024-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.reactorkafka; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; + +import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.util.ObjectUtils; + +/** + * @author Soby Chacko + * @since 4.1.2 + */ +@Configuration(proxyBeanMethods = false) +@ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator") +@ConditionalOnEnabledHealthIndicator("binders") +public class ReactorKafkaBinderHealthIndicatorConfiguration { + + @Bean + public ReactorKafkaBinderHealthIndicator reactorKafkaBinderHealthIndicator( + ReactorKafkaBinder reactorKafkaBinder, + KafkaBinderConfigurationProperties configurationProperties) { + Map props = new HashMap<>(); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + ByteArrayDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + ByteArrayDeserializer.class); + Map mergedConfig = configurationProperties + .mergedConsumerConfiguration(); + if (!ObjectUtils.isEmpty(mergedConfig)) { + props.putAll(mergedConfig); + } + if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) { + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + configurationProperties.getKafkaConnectionString()); + } + ConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>(props); + ReactorKafkaBinderHealthIndicator indicator = new ReactorKafkaBinderHealthIndicator( + reactorKafkaBinder, consumerFactory); + indicator.setTimeout(configurationProperties.getHealthTimeout()); + indicator.setConsiderDownWhenAnyPartitionHasNoLeader(configurationProperties.isConsiderDownWhenAnyPartitionHasNoLeader()); + return indicator; + } + +} diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderIntegrationTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderIntegrationTests.java index 7b1ed3ea4c..7495dacdba 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderIntegrationTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,6 +37,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.WebApplicationType; +import org.springframework.boot.actuate.health.CompositeHealthContributor; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.Status; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.boot.test.context.SpringBootTest; @@ -81,7 +84,7 @@ class ReactorKafkaBinderIntegrationTests { @ParameterizedTest @ValueSource(booleans = { false, true }) - void endToEndReactorKafkaBinder(boolean excludeKafkaAutoConfig) { + void endToEndReactorKafkaBinder(boolean excludeKafkaAutoConfig) throws InterruptedException { recOptsCustOrder.clear(); patternedDeliveries.clear(); @@ -137,9 +140,20 @@ void endToEndReactorKafkaBinder(boolean excludeKafkaAutoConfig) { assertThat(recOptsCustOrder).containsExactly("two", "one", "two", "one", "two", "one"); await().untilAsserted(() -> assertThat(patternedDeliveries).contains("bazqux", "FOOBAR")); assertThat(context.getBean(ReactiveKafkaApplication.class).correlation).contains(42, 43); + + checkHealth(context, Status.UP); } } + private static void checkHealth(ConfigurableApplicationContext context, + Status expected) throws InterruptedException { + CompositeHealthContributor healthIndicator = context + .getBean("bindersHealthContributor", CompositeHealthContributor.class); + ReactorKafkaBinderHealthIndicator rkbhi = (ReactorKafkaBinderHealthIndicator) healthIndicator.getContributor("reactorKafka"); + Health health = rkbhi.health(); + assertThat(health.getStatus()).isEqualTo(expected); + } + private String excludeKafkaAutoConfigParam(boolean excludeKafkaAutoConfig) { return excludeKafkaAutoConfig ? "--spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration" : "foo=bar";