diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java index c5bba0eb5..1ccb2bcf1 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,6 +67,7 @@ * @author Lars Bilger * @author Tomek Szmytka * @author Nico Heller + * @author Kurt Hong */ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener, AutoCloseable { @@ -263,6 +264,9 @@ public void onApplicationEvent(BindingCreatedEvent event) { @Override public void close() throws Exception { + if (this.meterRegistry != null) { + this.meterRegistry.find(OFFSET_LAG_METRIC_NAME).meters().forEach(this.meterRegistry::remove); + } Optional.ofNullable(scheduler).ifPresent(ExecutorService::shutdown); } } diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetricsTest.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetricsTest.java index 376fb51a2..4559a25f4 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetricsTest.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetricsTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,6 +56,7 @@ * @author Lars Bilger * @author Tomek Szmytka * @author Nico Heller + * @author Kurt Hong */ class KafkaBinderMetricsTest { @@ -91,7 +92,7 @@ public void setup() { org.mockito.BDDMockito.given(kafkaBinderConfigurationProperties.getMetrics().getOffsetLagMetricsInterval()) .willReturn(Duration.ofSeconds(60)); metrics = new KafkaBinderMetrics(binder, kafkaBinderConfigurationProperties, - consumerFactory, null + consumerFactory, meterRegistry ); org.mockito.BDDMockito .given(consumer.endOffsets(ArgumentMatchers.anyCollection())) @@ -351,6 +352,19 @@ public void shouldShutdownSchedulerOnClose() throws Exception { assertThat(metrics.scheduler.isShutdown()).isTrue(); } + @Test + public void shouldUnregisterMetersOnClose() throws Exception { + final List partitions = partitions(new Node(0, null, 0)); + topicsInUse.put( + TEST_TOPIC, + new TopicInformation("group4-metrics", partitions, false) + ); + metrics.bindTo(meterRegistry); + assertThat(meterRegistry.find(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME).meters()).hasSize(1); + metrics.close(); + assertThat(meterRegistry.find(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME).meters()).isEmpty(); + } + private List partitions(Node... nodes) { List partitions = new ArrayList<>(); for (int i = 0; i < nodes.length; i++) {