Skip to content

Commit

Permalink
* Improve KafkaMetricsSupport API to make it more friendly for targ…
Browse files Browse the repository at this point in the history
…et projects

* Add more JavaDocs to the `KafkaMetricsSupport`
* Mention `KafkaMetricsSupport` in the docs
  • Loading branch information
artembilan committed Oct 17, 2024
1 parent 0fd679c commit cf4a1fb
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")

A similar listener is provided for the `StreamsBuilderFactoryBean` - see xref:streams.adoc#streams-micrometer[KafkaStreams Micrometer Support].

Starting with version 3.3, a `KafkaMetricsSupport` abstract class is introduced to manage `io.micrometer.core.instrument.binder.kafka.KafkaMetrics` binding into a `MeterRegistry` for provided Kafka client.
This class is a super for the mentioned above `MicrometerConsumerListener`, `MicrometerProducerListener` and `KafkaStreamsMicrometerListener`.
However, it can be used for any Kafka client use-cases.
The class needs to be extended and its `bindClient()` and `unbindClient()` API have to be called to connect Kafka client metrics with a Micrometer collector.

[[observation]]
== Micrometer Observation

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,4 @@ For more details, see xref:kafka/annotation-error-handling.adoc#delivery-attempt
=== Kafka Metrics Listeners and `TaskScheduler`

The `MicrometerProducerListener`, `MicrometerConsumerListener` and `KafkaStreamsMicrometerListener` can now be configured with a `TaskScheduler`.
See `KafkaMetrics` JavaDocs and xref:kafka/micrometer.adoc[Micrometer Support] for more information.
See `KafkaMetricsSupport` JavaDocs and xref:kafka/micrometer.adoc[Micrometer Support] for more information.
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,22 @@
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;


import io.micrometer.core.instrument.ImmutableTag;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.binder.MeterBinder;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;

/**
* An abstract class to manage {@link io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics}.
* An abstract class to manage {@link KafkaClientMetrics}.
*
* @param <C> the Kafka Client type.
*
* @author Artem Bilan
*
* @since 3.3
*
* @see KafkaClientMetrics
*/
public abstract class KafkaMetricsSupport<C> {

Expand All @@ -67,7 +68,6 @@ public abstract class KafkaMetricsSupport<C> {

/**
* Construct an instance with the provided registry.
*
* @param meterRegistry the registry.
*/
protected KafkaMetricsSupport(MeterRegistry meterRegistry) {
Expand All @@ -76,7 +76,6 @@ protected KafkaMetricsSupport(MeterRegistry meterRegistry) {

/**
* Construct an instance with the provided {@link MeterRegistry} and {@link TaskScheduler}.
*
* @param meterRegistry the registry.
* @param taskScheduler the task scheduler.
*/
Expand All @@ -86,7 +85,6 @@ protected KafkaMetricsSupport(MeterRegistry meterRegistry, TaskScheduler taskSch

/**
* Construct an instance with the provided {@link MeterRegistry} and tags.
*
* @param meterRegistry the registry.
* @param tags the tags.
*/
Expand All @@ -99,7 +97,6 @@ protected KafkaMetricsSupport(MeterRegistry meterRegistry, List<Tag> tags) {

/**
* Construct an instance with the provided {@link MeterRegistry}, tags and {@link TaskScheduler}.
*
* @param meterRegistry the registry.
* @param tags the tags.
* @param taskScheduler the task scheduler.
Expand All @@ -112,7 +109,12 @@ protected KafkaMetricsSupport(MeterRegistry meterRegistry, List<Tag> tags, TaskS
this.scheduler = obtainScheduledExecutorService(taskScheduler);
}

protected void clientAdded(String id, C client) {
/**
* Bind metrics for the Apache Kafka client with provided id.
* @param id the unique identifier for the client to manage in store.
* @param client the Kafka client instance to bind.
*/
protected final void bindClient(String id, C client) {
if (!this.metrics.containsKey(id)) {
List<Tag> clientTags = new ArrayList<>(this.tags);
clientTags.add(new ImmutableTag("spring.id", id));
Expand All @@ -121,6 +123,15 @@ protected void clientAdded(String id, C client) {
}
}

/**
* Create a {@code io.micrometer.core.instrument.binder.kafka.KafkaMetrics} instance
* for the provided Kafka client and metric tags.
* By default, this factory is aware of {@link Consumer}, {@link Producer} and {@link AdminClient} types.
* For other use-case this method can be overridden.
* @param client the client to create a {@code io.micrometer.core.instrument.binder.kafka.KafkaMetrics} instance for.
* @param tags the tags for the {@code io.micrometer.core.instrument.binder.kafka.KafkaMetrics}.
* @return the {@code io.micrometer.core.instrument.binder.kafka.KafkaMetrics}.
*/
protected MeterBinder createClientMetrics(C client, List<Tag> tags) {
if (client instanceof Consumer<?, ?> consumer) {
return createConsumerMetrics(consumer, tags);
Expand Down Expand Up @@ -153,7 +164,12 @@ private KafkaClientMetrics createAdminMetrics(AdminClient adminClient, List<Tag>
: new KafkaClientMetrics(adminClient, tags);
}

protected void clientRemoved(String id, C client) {
/**
* Unbind a {@code io.micrometer.core.instrument.binder.kafka.KafkaMetrics} for the provided Kafka client.
* @param id the unique identifier for the client to manage in store.
* @param client the Kafka client instance to unbind.
*/
protected final void unbindClient(String id, C client) {
AutoCloseable removed = (AutoCloseable) this.metrics.remove(id);
if (removed != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ public MicrometerConsumerListener(MeterRegistry meterRegistry, List<Tag> tags, T

@Override
public synchronized void consumerAdded(String id, Consumer<K, V> consumer) {
clientAdded(id, consumer);
bindClient(id, consumer);
}

@Override
public synchronized void consumerRemoved(String id, Consumer<K, V> consumer) {
clientRemoved(id, consumer);
unbindClient(id, consumer);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ public MicrometerProducerListener(MeterRegistry meterRegistry, List<Tag> tags, T

@Override
public synchronized void producerAdded(String id, Producer<K, V> producer) {
clientAdded(id, producer);
bindClient(id, producer);
}

@Override
public synchronized void producerRemoved(String id, Producer<K, V> producer) {
clientRemoved(id, producer);
unbindClient(id, producer);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public KafkaStreamsMicrometerListener(MeterRegistry meterRegistry, List<Tag> tag

@Override
public synchronized void streamsAdded(String id, KafkaStreams kafkaStreams) {
clientAdded(id, kafkaStreams);
bindClient(id, kafkaStreams);
}

@Override
Expand All @@ -94,7 +94,7 @@ protected MeterBinder createClientMetrics(KafkaStreams client, List<Tag> tags) {

@Override
public synchronized void streamsRemoved(String id, KafkaStreams streams) {
clientRemoved(id, streams);
unbindClient(id, streams);
}

}

0 comments on commit cf4a1fb

Please sign in to comment.