diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc index 7b2bb36502..07f0d200bd 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc @@ -88,6 +88,11 @@ double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total") A similar listener is provided for the `StreamsBuilderFactoryBean` - see xref:streams.adoc#streams-micrometer[KafkaStreams Micrometer Support]. +Starting with version 3.3, a `KafkaMetricsSupport` abstract class is introduced to manage `io.micrometer.core.instrument.binder.kafka.KafkaMetrics` binding into a `MeterRegistry` for provided Kafka client. +This class is a super for the mentioned above `MicrometerConsumerListener`, `MicrometerProducerListener` and `KafkaStreamsMicrometerListener`. +However, it can be used for any Kafka client use-cases. +The class needs to be extended and its `bindClient()` and `unbindClient()` API have to be called to connect Kafka client metrics with a Micrometer collector. + [[observation]] == Micrometer Observation diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 1ad2f58850..c80ac18c77 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -67,4 +67,4 @@ For more details, see xref:kafka/annotation-error-handling.adoc#delivery-attempt === Kafka Metrics Listeners and `TaskScheduler` The `MicrometerProducerListener`, `MicrometerConsumerListener` and `KafkaStreamsMicrometerListener` can now be configured with a `TaskScheduler`. -See `KafkaMetrics` JavaDocs and xref:kafka/micrometer.adoc[Micrometer Support] for more information. +See `KafkaMetricsSupport` JavaDocs and xref:kafka/micrometer.adoc[Micrometer Support] for more information. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaMetricsSupport.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaMetricsSupport.java index 3ee7644a07..dc3de51fd1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaMetricsSupport.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaMetricsSupport.java @@ -38,7 +38,6 @@ import org.springframework.util.Assert; import org.springframework.util.ReflectionUtils; - import io.micrometer.core.instrument.ImmutableTag; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; @@ -46,13 +45,15 @@ import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics; /** - * An abstract class to manage {@link io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics}. + * An abstract class to manage {@link KafkaClientMetrics}. * * @param the Kafka Client type. * * @author Artem Bilan * * @since 3.3 + * + * @see KafkaClientMetrics */ public abstract class KafkaMetricsSupport { @@ -67,7 +68,6 @@ public abstract class KafkaMetricsSupport { /** * Construct an instance with the provided registry. - * * @param meterRegistry the registry. */ protected KafkaMetricsSupport(MeterRegistry meterRegistry) { @@ -76,7 +76,6 @@ protected KafkaMetricsSupport(MeterRegistry meterRegistry) { /** * Construct an instance with the provided {@link MeterRegistry} and {@link TaskScheduler}. - * * @param meterRegistry the registry. * @param taskScheduler the task scheduler. */ @@ -86,7 +85,6 @@ protected KafkaMetricsSupport(MeterRegistry meterRegistry, TaskScheduler taskSch /** * Construct an instance with the provided {@link MeterRegistry} and tags. - * * @param meterRegistry the registry. * @param tags the tags. */ @@ -99,7 +97,6 @@ protected KafkaMetricsSupport(MeterRegistry meterRegistry, List tags) { /** * Construct an instance with the provided {@link MeterRegistry}, tags and {@link TaskScheduler}. - * * @param meterRegistry the registry. * @param tags the tags. * @param taskScheduler the task scheduler. @@ -112,7 +109,12 @@ protected KafkaMetricsSupport(MeterRegistry meterRegistry, List tags, TaskS this.scheduler = obtainScheduledExecutorService(taskScheduler); } - protected void clientAdded(String id, C client) { + /** + * Bind metrics for the Apache Kafka client with provided id. + * @param id the unique identifier for the client to manage in store. + * @param client the Kafka client instance to bind. + */ + protected final void bindClient(String id, C client) { if (!this.metrics.containsKey(id)) { List clientTags = new ArrayList<>(this.tags); clientTags.add(new ImmutableTag("spring.id", id)); @@ -121,6 +123,15 @@ protected void clientAdded(String id, C client) { } } + /** + * Create a {@code io.micrometer.core.instrument.binder.kafka.KafkaMetrics} instance + * for the provided Kafka client and metric tags. + * By default, this factory is aware of {@link Consumer}, {@link Producer} and {@link AdminClient} types. + * For other use-case this method can be overridden. + * @param client the client to create a {@code io.micrometer.core.instrument.binder.kafka.KafkaMetrics} instance for. + * @param tags the tags for the {@code io.micrometer.core.instrument.binder.kafka.KafkaMetrics}. + * @return the {@code io.micrometer.core.instrument.binder.kafka.KafkaMetrics}. + */ protected MeterBinder createClientMetrics(C client, List tags) { if (client instanceof Consumer consumer) { return createConsumerMetrics(consumer, tags); @@ -153,7 +164,12 @@ private KafkaClientMetrics createAdminMetrics(AdminClient adminClient, List : new KafkaClientMetrics(adminClient, tags); } - protected void clientRemoved(String id, C client) { + /** + * Unbind a {@code io.micrometer.core.instrument.binder.kafka.KafkaMetrics} for the provided Kafka client. + * @param id the unique identifier for the client to manage in store. + * @param client the Kafka client instance to unbind. + */ + protected final void unbindClient(String id, C client) { AutoCloseable removed = (AutoCloseable) this.metrics.remove(id); if (removed != null) { try { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/MicrometerConsumerListener.java b/spring-kafka/src/main/java/org/springframework/kafka/core/MicrometerConsumerListener.java index 87a0ae87cd..bdf3e29cd2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/MicrometerConsumerListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/MicrometerConsumerListener.java @@ -82,12 +82,12 @@ public MicrometerConsumerListener(MeterRegistry meterRegistry, List tags, T @Override public synchronized void consumerAdded(String id, Consumer consumer) { - clientAdded(id, consumer); + bindClient(id, consumer); } @Override public synchronized void consumerRemoved(String id, Consumer consumer) { - clientRemoved(id, consumer); + unbindClient(id, consumer); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/MicrometerProducerListener.java b/spring-kafka/src/main/java/org/springframework/kafka/core/MicrometerProducerListener.java index c7f8910eb4..343378a110 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/MicrometerProducerListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/MicrometerProducerListener.java @@ -81,12 +81,12 @@ public MicrometerProducerListener(MeterRegistry meterRegistry, List tags, T @Override public synchronized void producerAdded(String id, Producer producer) { - clientAdded(id, producer); + bindClient(id, producer); } @Override public synchronized void producerRemoved(String id, Producer producer) { - clientRemoved(id, producer); + unbindClient(id, producer); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/streams/KafkaStreamsMicrometerListener.java b/spring-kafka/src/main/java/org/springframework/kafka/streams/KafkaStreamsMicrometerListener.java index 7cab4b17b6..d25dd60dc8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/streams/KafkaStreamsMicrometerListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/streams/KafkaStreamsMicrometerListener.java @@ -82,7 +82,7 @@ public KafkaStreamsMicrometerListener(MeterRegistry meterRegistry, List tag @Override public synchronized void streamsAdded(String id, KafkaStreams kafkaStreams) { - clientAdded(id, kafkaStreams); + bindClient(id, kafkaStreams); } @Override @@ -94,7 +94,7 @@ protected MeterBinder createClientMetrics(KafkaStreams client, List tags) { @Override public synchronized void streamsRemoved(String id, KafkaStreams streams) { - clientRemoved(id, streams); + unbindClient(id, streams); } }