From 6bbfd6864c92c0be404813c3814e9438ff2bb759 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Thu, 5 Nov 2020 14:28:11 +0000 Subject: [PATCH] Adds jaeger as a tracer --- build.sbt | 1 + .../Collector.scala | 17 +++++++++++++++-- .../CollectorService.scala | 16 ++++++++++------ .../model.scala | 10 +++++++++- project/Dependencies.scala | 2 ++ 5 files changed, 37 insertions(+), 9 deletions(-) diff --git a/build.sbt b/build.sbt index 5d53bbd41..322d17e2e 100644 --- a/build.sbt +++ b/build.sbt @@ -28,6 +28,7 @@ lazy val commonDependencies = Seq( Dependencies.Libraries.prometheusCommon, Dependencies.Libraries.opentracingApi, Dependencies.Libraries.opentracingNoop, + Dependencies.Libraries.jaeger, // Scala Dependencies.Libraries.scopt, Dependencies.Libraries.scalaz7, diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala index 8af44f796..f40132338 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala @@ -24,6 +24,8 @@ import akka.http.scaladsl.server.Directives._ import akka.stream.ActorMaterializer import com.typesafe.config.{Config, ConfigFactory} import com.typesafe.sslconfig.akka.AkkaSSLConfig +import io.jaegertracing.{Configuration => JaegerConfiguration} +import io.opentracing.Tracer import io.opentracing.noop.NoopTracerFactory import org.slf4j.LoggerFactory import pureconfig._ @@ -40,7 +42,8 @@ trait Collector { lazy val log = LoggerFactory.getLogger(getClass()) implicit def hint[T] = ProductHint[T](ConfigFieldMapping(CamelCase, CamelCase)) - implicit val _ = new FieldCoproductHint[SinkConfig]("enabled") + implicit val sinkHint = new FieldCoproductHint[SinkConfig]("enabled") + implicit val tracerHint = new FieldCoproductHint[TracerConfig]("enabled") def parseConfig(args: Array[String]): (CollectorConfig, Config) = { case class FileConfig(config: File = new File(".")) @@ -69,13 +72,23 @@ trait Collector { (loadConfigOrThrow[CollectorConfig](conf.getConfig("collector")), conf) } + def tracer(config: TracerConfig): Tracer = + config match { + case TracerConfig.Noop => + log.debug("Using noop tracer") + NoopTracerFactory.create + case TracerConfig.Jaeger => + log.debug("Using jaeger tracer") + JaegerConfiguration.fromEnv.getTracer + } + def run(collectorConf: CollectorConfig, akkaConf: Config, sinks: CollectorSinks): Unit = { implicit val system = ActorSystem.create("scala-stream-collector", akkaConf) implicit val materializer = ActorMaterializer() implicit val executionContext = system.dispatcher - val sharedTracer = NoopTracerFactory.create + val sharedTracer = tracer(collectorConf.tracer) val collectorRoute = new CollectorRoute { override def collectorService = new CollectorService(collectorConf, sinks, sharedTracer) diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala index af4ecc75c..0f8d5ccd0 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala @@ -351,12 +351,16 @@ class CollectorService( case other => Some(other.toString) } - def tracerHeaders: Iterable[String] = { - val m = scala.collection.mutable.Map.empty[String, String] - val adapter = new TextMapAdapter(m.asJava) - tracer.inject(tracer.activeSpan.context, Format.Builtin.HTTP_HEADERS, adapter) - m.map { case (k, v) => s"$k: $v" } - } + def tracerHeaders: Iterable[String] = + Option(tracer.activeSpan) match { + case Some(span) => + val m = scala.collection.mutable.Map.empty[String, String] + val adapter = new TextMapAdapter(m.asJava) + tracer.inject(span.context, Format.Builtin.HTTP_HEADERS, adapter) + m.map { case (k, v) => s"$k: $v" } + case None => + Iterable() + } /** If the pixel is requested, this attaches cache control headers to the response to prevent any caching. */ def cacheControl(pixelExpected: Boolean): List[`Cache-Control`] = diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala index a95332ac1..f17561452 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala @@ -142,6 +142,13 @@ package model { redirect: Boolean = false, port: Int = 443 ) + + sealed trait TracerConfig + object TracerConfig { + case object Noop extends TracerConfig + case object Jaeger extends TracerConfig + } + final case class CollectorConfig( interface: String, port: Int, @@ -157,7 +164,8 @@ package model { streams: StreamsConfig, prometheusMetrics: PrometheusMetricsConfig, enableDefaultRedirect: Boolean = false, - ssl: SSLConfig = SSLConfig() + ssl: SSLConfig = SSLConfig(), + tracer: TracerConfig = TracerConfig.Noop ) { val cookieConfig = if (cookie.enabled) Some(cookie) else None val doNotTrackHttpCookie = diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 3ef6cf1aa..19f3923fe 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -41,6 +41,7 @@ object Dependencies { val commonsCodec = "1.13" // force this version of lib from dependencies to mitigate secutiry vulnerabilities, TODO: update underlying libraries val grpcCore = "1.31.0" // force this version of lib from dependencies to mitigate secutiry vulnerabilities, TODO: update underlying libraries val opentracing = "0.33.0" + val jaeger = "1.4.0" // Scala val collectorPayload = "0.0.0" val scalaz7 = "7.0.9" @@ -74,6 +75,7 @@ object Dependencies { val cbor = "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % V.cbor val opentracingApi = "io.opentracing" % "opentracing-api" % V.opentracing val opentracingNoop = "io.opentracing" % "opentracing-noop" % V.opentracing + val jaeger = "io.jaegertracing" % "jaeger-client" % V.jaeger val retry = "com.softwaremill.retry" %% "retry" % V.retry // Scala