Skip to content

Commit

Permalink
GH-2949: KafkaBinderHealthIndicator consumer group.id
Browse files Browse the repository at this point in the history
* Add a new property in `KafkaBinderConfigurationProperties` to allow
  the users to specify a `group.id` for the metadata consumer used by the
  health indicator.

Resolves #2949
  • Loading branch information
sobychacko committed May 14, 2024
1 parent 7e20c9e commit a3c4364
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,17 @@ public class KafkaBinderConfigurationProperties {
/**
* Schema registry ssl configuration properties.
*/
private final String[] schemaRegistryProperties = new String[]{"schema.registry.url", "schema.registry.ssl.keystore.location", "schema.registry.ssl.keystore.password", "schema.registry.ssl.truststore.location", "schema.registry.ssl.truststore.password", "schema.registry.ssl.key.password"};
private final String[] schemaRegistryProperties = new String[]{"schema.registry.url",
"schema.registry.ssl.keystore.location", "schema.registry.ssl.keystore.password",
"schema.registry.ssl.truststore.location", "schema.registry.ssl.truststore.password",
"schema.registry.ssl.key.password"};

/**
* Consumer group.id of the Kafka consumer in
* {@link org.springframework.cloud.stream.binder.kafka.common.AbstractKafkaBinderHealthIndicator} that is used
* for querying metadata from the broker (such as metadata information about the topics).
*/
private String healthIndicatorConsumerGroup;

/**
* Earlier, @Autowired on this constructor was necessary for all the properties to be discovered
Expand Down Expand Up @@ -504,6 +514,14 @@ public void setEnableObservation(boolean enableObservation) {
this.enableObservation = enableObservation;
}

public String getHealthIndicatorConsumerGroup() {
return healthIndicatorConsumerGroup;
}

public void setHealthIndicatorConsumerGroup(String healthIndicatorConsumerGroup) {
this.healthIndicatorConsumerGroup = healthIndicatorConsumerGroup;
}

/**
* Domain class that models transaction capabilities in Kafka.
*/
Expand Down Expand Up @@ -697,6 +715,8 @@ public KafkaProducerProperties getExtension() {
return this.kafkaProducerProperties;
}



}

public static class Metrics {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

/**
* Configuration class for Kafka binder health indicator beans.
Expand Down Expand Up @@ -67,6 +68,10 @@ public KafkaBinderHealthIndicator kafkaBinderHealthIndicator(
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
configurationProperties.getKafkaConnectionString());
}
String healthIndicatorConsumerGroup = configurationProperties.getHealthIndicatorConsumerGroup();
if (StringUtils.hasText(healthIndicatorConsumerGroup)) {
props.put(ConsumerConfig.GROUP_ID_CONFIG, healthIndicatorConsumerGroup);
}
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(
kafkaMessageChannelBinder, consumerFactory);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,8 @@
import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.jupiter.api.Test;
Expand All @@ -32,9 +34,12 @@
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.common.AbstractKafkaBinderHealthIndicator;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.test.context.TestPropertySource;
Expand All @@ -47,17 +52,39 @@
* @author Soby Chacko
*/
@SpringBootTest(classes = { KafkaBinderConfiguration.class, KafkaAutoConfiguration.class,
KafkaBinderConfigurationPropertiesTest.class })
@TestPropertySource(locations = "classpath:binder-config.properties")
class KafkaBinderConfigurationPropertiesTest {
KafkaBinderPropertiesTest.class })
@TestPropertySource(locations = "classpath:binder-config.properties", properties =
"spring.cloud.stream.kafka.binder.healthIndicatorConsumerGroup=health-consumer-group")
class KafkaBinderPropertiesTest {

@Autowired
private KafkaMessageChannelBinder kafkaMessageChannelBinder;

@Autowired
private KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties;

@Autowired
private KafkaBinderHealthIndicator kafkaBinderHealthIndicator;

@Test
@SuppressWarnings("unchecked")
void kafkaBinderConfigurationProperties() throws Exception {
assertThat(this.kafkaMessageChannelBinder).isNotNull();
assertThat(this.kafkaBinderConfigurationProperties).isNotNull();

// Testing a scenario in health indicator that is originally triggered by a property in KafkaBinderConfigurationProperties,
// which ultimately creates a Kafka Consumer in the health indicator implementation.
assertThat(this.kafkaBinderConfigurationProperties.getHealthIndicatorConsumerGroup())
.isEqualTo("health-consumer-group");
assertThat(this.kafkaBinderHealthIndicator).isNotNull();
Field consumerFactoryField = AbstractKafkaBinderHealthIndicator.class.getDeclaredField("consumerFactory");
consumerFactoryField.setAccessible(true);
ConsumerFactory<?, ?> healthIndicatorConsumerFactory =
(ConsumerFactory<?, ?>) consumerFactoryField.get(this.kafkaBinderHealthIndicator);
assertThat(healthIndicatorConsumerFactory).isNotNull();
Consumer<?, ?> consumer = healthIndicatorConsumerFactory.createConsumer();
ConsumerGroupMetadata consumerGroupMetadata = consumer.groupMetadata();
assertThat(consumerGroupMetadata.groupId()).isEqualTo("health-consumer-group");

KafkaProducerProperties kafkaProducerProperties = new KafkaProducerProperties();
kafkaProducerProperties.setBufferSize(12345);
kafkaProducerProperties.setBatchTimeout(100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ Enable Micrometer observation registry on all the bindings in this binder.
+
Default: false

spring.cloud.stream.kafka.binder.healthIndicatorConsumerGroup::
`KafkaHealthIndicator` metadata consumer `group.id`.
This consumer is used by the `HealthIndicator` to query the metadata about the topics in use.
+
Default: none.

[[kafka-consumer-properties]]
== Kafka Consumer Properties

Expand Down

0 comments on commit a3c4364

Please sign in to comment.