Skip to content

Commit

Permalink
Merge pull request #345 from Banno/unregister
Browse files Browse the repository at this point in the history
KAYAK-795 feat(metrics): unregister collectors on remove and close
  • Loading branch information
Kazark authored Apr 14, 2021
2 parents 98ad084 + 0d4ecea commit 034a5ab
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,22 @@

package com.banno.kafka.metrics.prometheus

import scala.collection.compat._
import cats._
import cats.data._
import cats.effect.concurrent.Ref
import cats.effect._
import cats.syntax.all._
import com.banno.kafka.metrics.MetricsReporterApi
import cats.implicits._
import cats.effect.{Concurrent, Sync, Timer}
import fs2.Stream
import fs2.concurrent.SignallingRef
import io.chrisdavenport.log4cats.slf4j.Slf4jLogger

import scala.jdk.CollectionConverters._
import io.prometheus.client._
import org.apache.kafka.common.MetricName
import org.apache.kafka.common.metrics.KafkaMetric
import io.prometheus.client.{CollectorRegistry, Counter, Gauge}

import scala.math.max
import scala.collection.compat._
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.math.max

object PrometheusMetricsReporterApi {

Expand Down Expand Up @@ -65,49 +66,104 @@ object PrometheusMetricsReporterApi {
def value: F[Double] =
F.delay(metric.metricValue.toString.toDouble).recover { case _ => 0 } //TODO can probably do better than this...

def matches(other: KafkaMetric): Boolean =
metric.metricName() === other.metricName()

def createGauge(registry: CollectorRegistry): F[Gauge] =
F.delay(Gauge.build().name(name).help(help).labelNames(labelNames: _*).register(registry))
def createCounter(registry: CollectorRegistry): F[Counter] =
F.delay(Counter.build().name(name).help(help).labelNames(labelNames: _*).register(registry))
}

implicit val metricNameEq: Eq[MetricName] = Eq.fromUniversalEquals

sealed trait `Removed?`[+F[_]]
object `Removed?` {
object NotThere extends `Removed?`[Nothing]
def notThere[F[_]]: `Removed?`[F] = NotThere
object LastOne extends `Removed?`[Nothing]
def lastOne[F[_]]: `Removed?`[F] = LastOne
case class Removed[F[_]](
updated: MetricAdapter[F]
) extends `Removed?`[F]
def removed[F[_]](updated: MetricAdapter[F]): `Removed?`[F] =
Removed(updated)
}

sealed trait MetricAdapter[F[_]] {
def update: F[Unit]
def add(m: MetricSource[F]): MetricAdapter[F]
def collector: Collector
def remove(metric: KafkaMetric): `Removed?`[F]
}
case class GaugeMetricAdapter[F[_]](metrics: List[MetricSource[F]], gauge: Gauge)(
implicit F: Sync[F]
) extends MetricAdapter[F] {
def add(m: MetricSource[F]): MetricAdapter[F] = copy(metrics = metrics :+ m)
def update: F[Unit] =
metrics.traverse_(m => m.value.flatMap(v => F.delay(gauge.labels(m.labels: _*).set(v))))
}
case class CounterMetricAdapter[F[_]](metrics: List[MetricSource[F]], counter: Counter)(
implicit F: Sync[F]
) extends MetricAdapter[F] {
def add(m: MetricSource[F]): MetricAdapter[F] = copy(metrics = metrics :+ m)
def update: F[Unit] =
metrics.traverse_(

object MetricAdapter {
private case class Impl[F[_]: Applicative](
metrics: NonEmptyList[MetricSource[F]],
collector: Collector,
update1: MetricSource[F] => F[Unit]
) extends MetricAdapter[F] {
override def add(m: MetricSource[F]): MetricAdapter[F] =
copy(metrics = metrics :+ m)

override def update: F[Unit] =
metrics.traverse_(update1)

override def remove(metric: KafkaMetric): `Removed?`[F] =
NonEmptyList.fromList(
metrics.filterNot(_.matches(metric))
).fold(`Removed?`.lastOne[F])(ms =>
if (metrics.length === ms.length)
/*then*/ `Removed?`.notThere[F]
else `Removed?`.removed(Impl(ms, collector, update1))
)
}

def gauge[F[_]: Sync](
metric: MetricSource[F],
gauge: Gauge
): MetricAdapter[F] =
Impl(
NonEmptyList.one(metric),
gauge,
m => m.value.flatMap(v => Sync[F].delay(gauge.labels(m.labels: _*).set(v)))
)

def counter[F[_]: Sync](
metric: MetricSource[F],
counter: Counter
): MetricAdapter[F] =
Impl(
NonEmptyList.one(metric),
counter,
m =>
m.value.flatMap(
v =>
F.delay(
Sync[F].delay(
// NOTE Should always be positive, but protect against negative
// TODO might want to log on negative?
counter.labels(m.labels: _*).inc(max(0, v - counter.labels(m.labels: _*).get))
)
)
) //should always be positive, but protect against negative, TODO might want to log on negative?
)
}

abstract class PrometheusMetricsReporterApi[F[_]](
protected val prefix: String,
protected val adapters: Ref[F, Map[String, MetricAdapter[F]]],
protected val updating: SignallingRef[F, Boolean],
protected val updatePeriod: FiniteDuration
protected val updatePeriod: FiniteDuration,
private val collectorRegistry: CollectorRegistry,
)(implicit F: Concurrent[F], T: Timer[F])
extends MetricsReporterApi[F] {

override def remove(metric: KafkaMetric): F[Unit] =
F.unit //F.delay(log.warn(s"remove is not implemented, but called for ${metric.prometheusName(prefix)}"))
adapters.modify { adapterMap =>
adapterMap.collectFirst(kv => kv._2.remove(metric) match {
case `Removed?`.LastOne => (adapterMap - kv._1, kv._2.collector.some)
case `Removed?`.Removed(updated) => (adapterMap.updated(kv._1, updated), none)
}).getOrElse((adapterMap, none[Collector]))
}.flatMap(_.traverse_(c => F.delay(collectorRegistry.unregister(c))))

def updateMetricsPeriodically(implicit T: Timer[F]): Stream[F, Unit] =
for {
Expand All @@ -125,7 +181,11 @@ object PrometheusMetricsReporterApi {

override def configure(configs: Map[String, Any]): F[Unit] = F.unit

override def close: F[Unit] = updating.set(false)
override def close: F[Unit] =
adapters.modify { adapterMap =>
(Map.empty, adapterMap.values.map(_.collector).toList)
}.flatMap(_.traverse_(c => F.delay(collectorRegistry.unregister(c)))) *>
updating.set(false)

val ignore = F.unit

Expand Down Expand Up @@ -167,7 +227,7 @@ object PrometheusMetricsReporterApi {
registry: CollectorRegistry,
updatePeriod: FiniteDuration
)(implicit F: Concurrent[F], T: Timer[F]): MetricsReporterApi[F] =
new PrometheusMetricsReporterApi[F]("kafka_producer", adapters, updating, updatePeriod) {
new PrometheusMetricsReporterApi[F]("kafka_producer", adapters, updating, updatePeriod, registry) {

override def add(metric: KafkaMetric): F[Unit] = {

Expand All @@ -176,15 +236,15 @@ object PrometheusMetricsReporterApi {
metric,
name,
Map.empty,
source => source.createGauge(registry).map(GaugeMetricAdapter(List(source), _))
source => source.createGauge(registry).map(MetricAdapter.gauge(source, _))
)

def counter(name: String): F[Unit] =
adapter(
metric,
name,
Map.empty,
source => source.createCounter(registry).map(CounterMetricAdapter(List(source), _))
source => source.createCounter(registry).map(MetricAdapter.counter(source, _))
)

MetricId(metric) match {
Expand Down Expand Up @@ -406,7 +466,7 @@ object PrometheusMetricsReporterApi {
registry: CollectorRegistry,
updatePeriod: FiniteDuration
)(implicit F: Concurrent[F], T: Timer[F]): MetricsReporterApi[F] =
new PrometheusMetricsReporterApi[F]("kafka_consumer", adapters, updating, updatePeriod) {
new PrometheusMetricsReporterApi[F]("kafka_consumer", adapters, updating, updatePeriod, registry) {

override def add(metric: KafkaMetric): F[Unit] = {

Expand All @@ -415,15 +475,15 @@ object PrometheusMetricsReporterApi {
metric,
name,
additionalTags,
source => source.createGauge(registry).map(GaugeMetricAdapter(List(source), _))
source => source.createGauge(registry).map(MetricAdapter.gauge(source, _))
)

def counter(name: String): F[Unit] =
adapter(
metric,
name,
Map.empty,
source => source.createCounter(registry).map(CounterMetricAdapter(List(source), _))
source => source.createCounter(registry).map(MetricAdapter.counter(source, _))
)

MetricId(metric) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class PrometheusMetricsReporterApiSpec extends AnyFlatSpec with Matchers with In
implicit val defaultTimer = IO.timer(ExecutionContext.global)

//when kafka clients change their metrics, this test will help identify the changes we need to make
"Prometheus reporter" should "register Prometheus collectors for all known Kafka metrics" in {
"Prometheus reporter" should "register Prometheus collectors for all known Kafka metrics and unregister on close" in {
val topic = createTopic(2)
val records =
List(new ProducerRecord(topic, 0, "a", "a"), new ProducerRecord(topic, 1, "b", "b"))
Expand Down Expand Up @@ -61,9 +61,6 @@ class PrometheusMetricsReporterApiSpec extends AnyFlatSpec with Matchers with In
_ <- c2.poll(1 second)

_ <- IO.sleep(PrometheusMetricsReporterApi.defaultUpdatePeriod + (1 second))
_ <- p.close
_ <- c1.close
_ <- c2.close
} yield {
val registry = CollectorRegistry.defaultRegistry
registry.metricFamilySamples.asScala
Expand All @@ -85,6 +82,9 @@ class PrometheusMetricsReporterApiSpec extends AnyFlatSpec with Matchers with In
)
)
.unsafeRunSync()
CollectorRegistry.defaultRegistry
.metricFamilySamples.asScala
.count(_.name.startsWith("kafka_producer")) should ===(0)
}

}

0 comments on commit 034a5ab

Please sign in to comment.