Skip to content

Commit

Permalink
Add statsd metrics reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Dec 28, 2023
1 parent 44816c1 commit 504e271
Show file tree
Hide file tree
Showing 14 changed files with 76 additions and 23 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ lazy val core = project
Dependencies.Libraries.circeConfig,
Dependencies.Libraries.trackerCore,
Dependencies.Libraries.emitterHttps,
Dependencies.Libraries.datadogHttp4s,
Dependencies.Libraries.datadogStatsd,
Dependencies.Libraries.specs2,
Dependencies.Libraries.specs2CE,
Dependencies.Libraries.ceTestkit,
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@
port = 8125
period = 10 seconds
prefix = snowplow.collector
tags = {
"app": "collector"
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ object Config {
hostname: String,
port: Int,
period: FiniteDuration,
prefix: String
prefix: String,
tags: Map[String, String]
)

case class SSL(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ package com.snowplowanalytics.snowplow.collector.core

import cats.effect.{Async, Resource}
import cats.implicits._
import org.http4s.HttpApp
import com.avast.datadog4s.api.Tag
import com.avast.datadog4s.extension.http4s.DatadogMetricsOps
import com.avast.datadog4s.{StatsDMetricFactory, StatsDMetricFactoryConfig}
import org.http4s.HttpRoutes
import org.http4s.blaze.server.BlazeServerBuilder
import org.http4s.server.Server
import org.http4s.server.middleware.Metrics
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

Expand All @@ -24,23 +28,46 @@ object HttpServer {
implicit private def logger[F[_]: Async]: Logger[F] = Slf4jLogger.getLogger[F]

def build[F[_]: Async](
app: HttpApp[F],
routes: HttpRoutes[F],
port: Int,
secure: Boolean,
networking: Config.Networking
networking: Config.Networking,
metricsConfig: Config.Metrics
): Resource[F, Server] =
buildBlazeServer[F](app, port, secure, networking)
for {
withMetricsMiddleware <- createMetricsMiddleware(routes, metricsConfig)
server <- buildBlazeServer[F](withMetricsMiddleware, port, secure, networking)
} yield server

private def createMetricsMiddleware[F[_]: Async](
routes: HttpRoutes[F],
metricsConfig: Config.Metrics
): Resource[F, HttpRoutes[F]] =
if (metricsConfig.statsd.enabled) {
val metricsFactory = StatsDMetricFactory.make(createStatsdConfig(metricsConfig))
metricsFactory.evalMap(DatadogMetricsOps.builder[F](_).build()).map { metricsOps =>
Metrics[F](metricsOps)(routes)
}
} else {
Resource.pure(routes)
}

private def createStatsdConfig(metricsConfig: Config.Metrics): StatsDMetricFactoryConfig = {
val server = InetSocketAddress.createUnresolved(metricsConfig.statsd.hostname, metricsConfig.statsd.port)
val tags = metricsConfig.statsd.tags.toSeq.map { case (name, value) => Tag.of(name, value) }
StatsDMetricFactoryConfig(Some(metricsConfig.statsd.prefix), server, defaultTags = tags)
}

private def buildBlazeServer[F[_]: Async](
app: HttpApp[F],
routes: HttpRoutes[F],
port: Int,
secure: Boolean,
networking: Config.Networking
): Resource[F, Server] =
Resource.eval(Logger[F].info("Building blaze server")) >>
BlazeServerBuilder[F]
.bindSocketAddress(new InetSocketAddress(port))
.withHttpApp(app)
.withHttpApp(routes.orNotFound)
.withIdleTimeout(networking.idleTimeout)
.withMaxConnections(networking.maxConnections)
.cond(secure, _.withSslContext(SSLContext.getDefault))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,8 @@ class Routes[F[_]: Sync](
service.crossdomainResponse
}

val value: HttpApp[F] = {
val value: HttpRoutes[F] = {
val routes = healthRoutes <+> corsRoute <+> cookieRoutes <+> rootRoute <+> crossdomainRoute
val res = if (enableDefaultRedirect) routes else rejectRedirect <+> routes
res.orNotFound
if (enableDefaultRedirect) routes else rejectRedirect <+> routes
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ object Run {
).value,
if (config.ssl.enable) config.ssl.port else config.port,
config.ssl.enable,
config.networking
config.networking,
config.monitoring.metrics
)
_ <- withGracefulShutdown(config.preTerminationPeriod)(httpServer)
httpClient <- BlazeClientBuilder[F].resource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ class RoutesSpec extends Specification {
enableCrossdomainTracking: Boolean = false
) = {
val service = new TestService()
val routes = new Routes(enabledDefaultRedirect, enableRootResponse, enableCrossdomainTracking, service).value
val routes =
new Routes(enabledDefaultRedirect, enableRootResponse, enableCrossdomainTracking, service).value.orNotFound
(service, routes)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ object TestUtils {
"localhost",
8125,
10.seconds,
"snowplow.collector"
"snowplow.collector",
Map("app" -> "collector")
)
)
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,11 @@ object KafkaConfigSpec {
body = ""
),
cors = Config.CORS(1.hour),
monitoring =
Config.Monitoring(Config.Metrics(Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector"))),
monitoring = Config.Monitoring(
Config.Metrics(
Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector", Map("app" -> "collector"))
)
),
ssl = Config.SSL(enable = false, redirect = false, port = 443),
enableDefaultRedirect = false,
redirectDomains = Set.empty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,11 @@ object KinesisConfigSpec {
body = ""
),
cors = Config.CORS(1.hour),
monitoring =
Config.Monitoring(Config.Metrics(Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector"))),
monitoring = Config.Monitoring(
Config.Metrics(
Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector", Map("app" -> "collector"))
)
),
ssl = Config.SSL(enable = false, redirect = false, port = 443),
enableDefaultRedirect = false,
redirectDomains = Set.empty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,11 @@ object NsqConfigSpec {
body = ""
),
cors = Config.CORS(1.hour),
monitoring =
Config.Monitoring(Config.Metrics(Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector"))),
monitoring = Config.Monitoring(
Config.Metrics(
Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector", Map("app" -> "collector"))
)
),
ssl = Config.SSL(enable = false, redirect = false, port = 443),
enableDefaultRedirect = false,
redirectDomains = Set.empty,
Expand Down
3 changes: 3 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ object Dependencies {
val thrift = "0.15.0" // force this version to mitigate security vulnerabilities
val tracker = "2.0.0"
val azureAuth = "1.7.14"
val dataDog4s = "0.32.0"
}

object Libraries {
Expand All @@ -57,6 +58,8 @@ object Dependencies {
val slf4j = "org.slf4j" % "slf4j-simple" % V.slf4j
val thrift = "org.apache.thrift" % "libthrift" % V.thrift
val trackerCore = "com.snowplowanalytics" %% "snowplow-scala-tracker-core" % V.tracker
val datadogHttp4s = "com.avast.cloud" %% "datadog4s-http4s" % V.dataDog4s
val datadogStatsd = "com.avast.cloud" %% "datadog4s-statsd" % V.dataDog4s

//sinks
val fs2PubSub = "com.permutive" %% "fs2-google-pubsub-grpc" % V.fs2PubSub
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,11 @@ object ConfigSpec {
body = ""
),
cors = Config.CORS(1.hour),
monitoring =
Config.Monitoring(Config.Metrics(Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector"))),
monitoring = Config.Monitoring(
Config.Metrics(
Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector", Map("app" -> "collector"))
)
),
ssl = Config.SSL(enable = false, redirect = false, port = 443),
enableDefaultRedirect = false,
redirectDomains = Set.empty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,11 @@ object SqsConfigSpec {
body = ""
),
cors = Config.CORS(1.hour),
monitoring =
Config.Monitoring(Config.Metrics(Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector"))),
monitoring = Config.Monitoring(
Config.Metrics(
Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector", Map("app" -> "collector"))
)
),
ssl = Config.SSL(enable = false, redirect = false, port = 443),
enableDefaultRedirect = false,
redirectDomains = Set.empty,
Expand Down

0 comments on commit 504e271

Please sign in to comment.