From 1d3a5609c26c5472a714d9f0c83632d5198a6759 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Thu, 5 Nov 2020 13:16:33 +0000 Subject: [PATCH 1/4] Adds opentracing to the collector routes --- build.sbt | 2 ++ .../Collector.scala | 7 +++++- .../CollectorRoute.scala | 23 +++++++++++++++---- .../CollectorService.scala | 17 ++++++++++++-- .../CollectorRouteSpec.scala | 2 ++ .../CollectorServiceSpec.scala | 10 +++++--- project/Dependencies.scala | 3 +++ 7 files changed, 54 insertions(+), 10 deletions(-) diff --git a/build.sbt b/build.sbt index 0df6ebf69..5d53bbd41 100644 --- a/build.sbt +++ b/build.sbt @@ -26,6 +26,8 @@ lazy val commonDependencies = Seq( Dependencies.Libraries.config, Dependencies.Libraries.prometheus, Dependencies.Libraries.prometheusCommon, + Dependencies.Libraries.opentracingApi, + Dependencies.Libraries.opentracingNoop, // Scala Dependencies.Libraries.scopt, Dependencies.Libraries.scalaz7, diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala index a7e9d1608..8af44f796 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala @@ -24,11 +24,13 @@ import akka.http.scaladsl.server.Directives._ import akka.stream.ActorMaterializer import com.typesafe.config.{Config, ConfigFactory} import com.typesafe.sslconfig.akka.AkkaSSLConfig +import io.opentracing.noop.NoopTracerFactory import org.slf4j.LoggerFactory import pureconfig._ import pureconfig.generic.{FieldCoproductHint, ProductHint} import pureconfig.generic.auto._ + import metrics._ import model._ @@ -73,8 +75,11 @@ trait Collector { implicit val materializer = ActorMaterializer() implicit val executionContext = system.dispatcher + val sharedTracer = NoopTracerFactory.create + val collectorRoute = new CollectorRoute { - override def collectorService = new CollectorService(collectorConf, sinks) + override def collectorService = new CollectorService(collectorConf, sinks, sharedTracer) + override def tracer = sharedTracer } val prometheusMetricsService = new PrometheusMetricsService(collectorConf.prometheusMetrics) diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala index 944c33cd9..7310b5e71 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala @@ -16,14 +16,19 @@ package com.snowplowanalytics.snowplow.collectors.scalastream import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.HttpCookiePair -import akka.http.scaladsl.server.{Directive1, Route} +import akka.http.scaladsl.server.{Directive1, Route, StandardRoute} import akka.http.scaladsl.server.Directives._ +import io.opentracing.{Span, Tracer} + import model.DntCookieMatcher import monitoring.BeanRegistry +import scala.concurrent.ExecutionContext + trait CollectorRoute { def collectorService: Service + def tracer: Tracer private val headers = optionalHeaderValueByName("User-Agent") & optionalHeaderValueByName("Referer") & @@ -42,8 +47,18 @@ trait CollectorRoute { complete(StatusCodes.NotFound -> "redirects disabled") } + def completeWithSpan(r: HttpResponse, span: Span): StandardRoute = + requestContext => { + val fut = complete(r)(requestContext) + fut.onComplete { _ => + span.finish + }(ExecutionContext.global) + fut + } + def routes: Route = doNotTrack(collectorService.doNotTrackCookie) { dnt => + val span = tracer.buildSpan("CollectorRequest").start cookieIfWanted(collectorService.cookieName) { reqCookie => val cookie = reqCookie.map(_.toCookie) headers { (userAgent, refererURI, rawRequestURI) => @@ -69,7 +84,7 @@ trait CollectorRoute { doNotTrack = dnt, Some(ct)) incrementRequests(r.status) - complete(r) + completeWithSpan(r, span) } } } ~ @@ -87,7 +102,7 @@ trait CollectorRoute { pixelExpected = true, doNotTrack = dnt) incrementRequests(r.status) - complete(r) + completeWithSpan(r, span) } } ~ path("""ice\.png""".r | "i".r) { path => @@ -105,7 +120,7 @@ trait CollectorRoute { pixelExpected = true, doNotTrack = dnt) incrementRequests(r.status) - complete(r) + completeWithSpan(r, span) } } } diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala index 643169e94..af4ecc75c 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala @@ -23,6 +23,8 @@ import akka.http.scaladsl.model.headers._ import akka.http.scaladsl.model.headers.CacheDirectives._ import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload +import io.opentracing.Tracer +import io.opentracing.propagation.{Format, TextMapAdapter} import org.apache.commons.codec.binary.Base64 import org.slf4j.LoggerFactory @@ -66,7 +68,8 @@ object CollectorService { class CollectorService( config: CollectorConfig, - sinks: CollectorSinks + sinks: CollectorSinks, + tracer: Tracer ) extends Service { private val logger = LoggerFactory.getLogger(getClass) @@ -199,6 +202,7 @@ class CollectorService( networkUserId: String, contentType: Option[String] ): CollectorPayload = { + val e = new CollectorPayload( "iglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0", ipAddress, @@ -213,7 +217,7 @@ class CollectorService( refererUri.foreach(e.refererUri = _) e.hostname = hostname e.networkUserId = networkUserId - e.headers = (headers(request) ++ contentType).asJava + e.headers = (headers(request) ++ contentType ++ tracerHeaders).asJava contentType.foreach(e.contentType = _) e } @@ -226,8 +230,10 @@ class CollectorService( // Split events into Good and Bad val eventSplit = SplitBatch.splitAndSerializePayload(event, sinks.good.MaxBytes) // Send events to respective sinks + val span = tracer.buildSpan("SinkRawEvents").start val sinkResponseGood = sinks.good.storeRawEvents(eventSplit.good, partitionKey) val sinkResponseBad = sinks.bad.storeRawEvents(eventSplit.bad, partitionKey) + span.finish // Sink Responses for Test Sink sinkResponseGood ++ sinkResponseBad } @@ -345,6 +351,13 @@ class CollectorService( case other => Some(other.toString) } + def tracerHeaders: Iterable[String] = { + val m = scala.collection.mutable.Map.empty[String, String] + val adapter = new TextMapAdapter(m.asJava) + tracer.inject(tracer.activeSpan.context, Format.Builtin.HTTP_HEADERS, adapter) + m.map { case (k, v) => s"$k: $v" } + } + /** If the pixel is requested, this attaches cache control headers to the response to prevent any caching. */ def cacheControl(pixelExpected: Boolean): List[`Cache-Control`] = if (pixelExpected) List(`Cache-Control`(`no-cache`, `no-store`, `must-revalidate`)) diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRouteSpec.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRouteSpec.scala index 871d8b14d..f4ca8bf66 100644 --- a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRouteSpec.scala +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRouteSpec.scala @@ -19,10 +19,12 @@ import akka.http.scaladsl.model.headers._ import akka.http.scaladsl.testkit.Specs2RouteTest import akka.http.scaladsl.server.Directives._ import com.snowplowanalytics.snowplow.collectors.scalastream.model.DntCookieMatcher +import io.opentracing.noop.NoopTracerFactory import org.specs2.mutable.Specification class CollectorRouteSpec extends Specification with Specs2RouteTest { val mkRoute = (withRedirects: Boolean) => new CollectorRoute { + override val tracer = NoopTracerFactory.create override val collectorService = new Service { def preflightResponse(req: HttpRequest): HttpResponse = HttpResponse(200, entity = "preflight response") diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala index ea3e7be02..2781b722c 100644 --- a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala @@ -24,6 +24,7 @@ import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers._ import akka.http.scaladsl.model.headers.CacheDirectives._ +import io.opentracing.noop.NoopTracerFactory import org.apache.thrift.{TDeserializer, TSerializer} import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload @@ -35,11 +36,13 @@ import model._ class CollectorServiceSpec extends Specification { val service = new CollectorService( TestUtils.testConf, - CollectorSinks(new TestSink, new TestSink) + CollectorSinks(new TestSink, new TestSink), + NoopTracerFactory.create ) val bouncingService = new CollectorService( TestUtils.testConf.copy(cookieBounce = TestUtils.testConf.cookieBounce.copy(enabled = true)), - CollectorSinks(new TestSink, new TestSink) + CollectorSinks(new TestSink, new TestSink), + NoopTracerFactory.create ) val uuidRegex = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}".r val event = new CollectorPayload( @@ -503,7 +506,8 @@ class CollectorServiceSpec extends Specification { "should pass on the original path if no mapping for it can be found" in { val service = new CollectorService( TestUtils.testConf.copy(paths = Map.empty[String, String]), - CollectorSinks(new TestSink, new TestSink) + CollectorSinks(new TestSink, new TestSink), + NoopTracerFactory.create ) val expected1 = "/com.acme/track" val expected2 = "/com.acme/redirect" diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 17b653546..3ef6cf1aa 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -40,6 +40,7 @@ object Dependencies { val thrift = "0.13.0" // force this version of lib from dependencies to mitigate secutiry vulnerabilities, TODO: update underlying libraries val commonsCodec = "1.13" // force this version of lib from dependencies to mitigate secutiry vulnerabilities, TODO: update underlying libraries val grpcCore = "1.31.0" // force this version of lib from dependencies to mitigate secutiry vulnerabilities, TODO: update underlying libraries + val opentracing = "0.33.0" // Scala val collectorPayload = "0.0.0" val scalaz7 = "7.0.9" @@ -71,6 +72,8 @@ object Dependencies { val prometheus = "io.prometheus" % "simpleclient" % V.prometheus val prometheusCommon = "io.prometheus" % "simpleclient_common" % V.prometheus val cbor = "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % V.cbor + val opentracingApi = "io.opentracing" % "opentracing-api" % V.opentracing + val opentracingNoop = "io.opentracing" % "opentracing-noop" % V.opentracing val retry = "com.softwaremill.retry" %% "retry" % V.retry // Scala From fcdb2c86429a0dbfd6bc09193fbaeb521ca84456 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Thu, 5 Nov 2020 14:28:11 +0000 Subject: [PATCH 2/4] Adds jaeger as a tracer --- build.sbt | 1 + .../Collector.scala | 17 ++- .../CollectorRoute.scala | 128 ++++++++++-------- .../CollectorService.scala | 33 +++-- .../model.scala | 10 +- project/Dependencies.scala | 2 + 6 files changed, 119 insertions(+), 72 deletions(-) diff --git a/build.sbt b/build.sbt index 5d53bbd41..322d17e2e 100644 --- a/build.sbt +++ b/build.sbt @@ -28,6 +28,7 @@ lazy val commonDependencies = Seq( Dependencies.Libraries.prometheusCommon, Dependencies.Libraries.opentracingApi, Dependencies.Libraries.opentracingNoop, + Dependencies.Libraries.jaeger, // Scala Dependencies.Libraries.scopt, Dependencies.Libraries.scalaz7, diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala index 8af44f796..f40132338 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala @@ -24,6 +24,8 @@ import akka.http.scaladsl.server.Directives._ import akka.stream.ActorMaterializer import com.typesafe.config.{Config, ConfigFactory} import com.typesafe.sslconfig.akka.AkkaSSLConfig +import io.jaegertracing.{Configuration => JaegerConfiguration} +import io.opentracing.Tracer import io.opentracing.noop.NoopTracerFactory import org.slf4j.LoggerFactory import pureconfig._ @@ -40,7 +42,8 @@ trait Collector { lazy val log = LoggerFactory.getLogger(getClass()) implicit def hint[T] = ProductHint[T](ConfigFieldMapping(CamelCase, CamelCase)) - implicit val _ = new FieldCoproductHint[SinkConfig]("enabled") + implicit val sinkHint = new FieldCoproductHint[SinkConfig]("enabled") + implicit val tracerHint = new FieldCoproductHint[TracerConfig]("enabled") def parseConfig(args: Array[String]): (CollectorConfig, Config) = { case class FileConfig(config: File = new File(".")) @@ -69,13 +72,23 @@ trait Collector { (loadConfigOrThrow[CollectorConfig](conf.getConfig("collector")), conf) } + def tracer(config: TracerConfig): Tracer = + config match { + case TracerConfig.Noop => + log.debug("Using noop tracer") + NoopTracerFactory.create + case TracerConfig.Jaeger => + log.debug("Using jaeger tracer") + JaegerConfiguration.fromEnv.getTracer + } + def run(collectorConf: CollectorConfig, akkaConf: Config, sinks: CollectorSinks): Unit = { implicit val system = ActorSystem.create("scala-stream-collector", akkaConf) implicit val materializer = ActorMaterializer() implicit val executionContext = system.dispatcher - val sharedTracer = NoopTracerFactory.create + val sharedTracer = tracer(collectorConf.tracer) val collectorRoute = new CollectorRoute { override def collectorService = new CollectorService(collectorConf, sinks, sharedTracer) diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala index 7310b5e71..8f2804f91 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala @@ -16,7 +16,7 @@ package com.snowplowanalytics.snowplow.collectors.scalastream import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.HttpCookiePair -import akka.http.scaladsl.server.{Directive1, Route, StandardRoute} +import akka.http.scaladsl.server.{Directive1, Route} import akka.http.scaladsl.server.Directives._ import io.opentracing.{Span, Tracer} @@ -24,8 +24,6 @@ import io.opentracing.{Span, Tracer} import model.DntCookieMatcher import monitoring.BeanRegistry -import scala.concurrent.ExecutionContext - trait CollectorRoute { def collectorService: Service def tracer: Tracer @@ -47,32 +45,65 @@ trait CollectorRoute { complete(StatusCodes.NotFound -> "redirects disabled") } - def completeWithSpan(r: HttpResponse, span: Span): StandardRoute = + def traceRoute(inner: Span => Route): Route = requestContext => { - val fut = complete(r)(requestContext) + val span = tracer.buildSpan("HandleRequest").start + val fut = inner(span)(requestContext) fut.onComplete { _ => span.finish - }(ExecutionContext.global) + }(requestContext.executionContext) fut } + // Activates the span only for the local thread + def withActiveSpan[T](span: Span)(f: => T): T = { + val scope = tracer.activateSpan(span) + try { + f + } finally { + scope.close + } + } + def routes: Route = doNotTrack(collectorService.doNotTrackCookie) { dnt => - val span = tracer.buildSpan("CollectorRequest").start - cookieIfWanted(collectorService.cookieName) { reqCookie => - val cookie = reqCookie.map(_.toCookie) - headers { (userAgent, refererURI, rawRequestURI) => - val qs = queryString(rawRequestURI) - extractors { (host, ip, request) => - // get the adapter vendor and version from the path - path(Segment / Segment) { (vendor, version) => - val path = collectorService.determinePath(vendor, version) - post { - extractContentType { ct => - entity(as[String]) { body => + traceRoute { span => + cookieIfWanted(collectorService.cookieName) { reqCookie => + val cookie = reqCookie.map(_.toCookie) + headers { (userAgent, refererURI, rawRequestURI) => + val qs = queryString(rawRequestURI) + extractors { (host, ip, request) => + // get the adapter vendor and version from the path + path(Segment / Segment) { (vendor, version) => + val path = collectorService.determinePath(vendor, version) + post { + extractContentType { ct => + entity(as[String]) { body => + withActiveSpan(span) { + val (r, _) = collectorService.cookie( + qs, + Some(body), + path, + cookie, + userAgent, + refererURI, + host, + ip, + request, + pixelExpected = false, + doNotTrack = dnt, + Some(ct)) + incrementRequests(r.status) + complete(r) + } + } + } + } ~ + (get | head) { + withActiveSpan(span) { val (r, _) = collectorService.cookie( qs, - Some(body), + None, path, cookie, userAgent, @@ -80,47 +111,32 @@ trait CollectorRoute { host, ip, request, - pixelExpected = false, - doNotTrack = dnt, - Some(ct)) + pixelExpected = true, + doNotTrack = dnt) incrementRequests(r.status) - completeWithSpan(r, span) + complete(r) } } } ~ - (get | head) { - val (r, _) = collectorService.cookie( - qs, - None, - path, - cookie, - userAgent, - refererURI, - host, - ip, - request, - pixelExpected = true, - doNotTrack = dnt) - incrementRequests(r.status) - completeWithSpan(r, span) - } - } ~ - path("""ice\.png""".r | "i".r) { path => - (get | head) { - val (r, _) = collectorService.cookie( - qs, - None, - "/" + path, - cookie, - userAgent, - refererURI, - host, - ip, - request, - pixelExpected = true, - doNotTrack = dnt) - incrementRequests(r.status) - completeWithSpan(r, span) + path("""ice\.png""".r | "i".r) { path => + (get | head) { + withActiveSpan(span) { + val (r, _) = collectorService.cookie( + qs, + None, + "/" + path, + cookie, + userAgent, + refererURI, + host, + ip, + request, + pixelExpected = true, + doNotTrack = dnt) + incrementRequests(r.status) + complete(r) + } + } } } } diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala index af4ecc75c..215fecd2f 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala @@ -229,13 +229,16 @@ class CollectorService( ): List[Array[Byte]] = { // Split events into Good and Bad val eventSplit = SplitBatch.splitAndSerializePayload(event, sinks.good.MaxBytes) - // Send events to respective sinks - val span = tracer.buildSpan("SinkRawEvents").start - val sinkResponseGood = sinks.good.storeRawEvents(eventSplit.good, partitionKey) - val sinkResponseBad = sinks.bad.storeRawEvents(eventSplit.bad, partitionKey) - span.finish - // Sink Responses for Test Sink - sinkResponseGood ++ sinkResponseBad + val span = tracer.buildSpan("SinkRawEvents").start() + try { + // Send events to respective sinks + val sinkResponseGood = sinks.good.storeRawEvents(eventSplit.good, partitionKey) + val sinkResponseBad = sinks.bad.storeRawEvents(eventSplit.bad, partitionKey) + // Sink Responses for Test Sink + sinkResponseGood ++ sinkResponseBad + } finally { + span.finish + } } /** Builds the final http response from */ @@ -351,12 +354,16 @@ class CollectorService( case other => Some(other.toString) } - def tracerHeaders: Iterable[String] = { - val m = scala.collection.mutable.Map.empty[String, String] - val adapter = new TextMapAdapter(m.asJava) - tracer.inject(tracer.activeSpan.context, Format.Builtin.HTTP_HEADERS, adapter) - m.map { case (k, v) => s"$k: $v" } - } + def tracerHeaders: Iterable[String] = + Option(tracer.activeSpan) match { + case Some(span) => + val m = scala.collection.mutable.Map.empty[String, String] + val adapter = new TextMapAdapter(m.asJava) + tracer.inject(span.context, Format.Builtin.HTTP_HEADERS, adapter) + m.map { case (k, v) => s"$k: $v" } + case None => + Iterable() + } /** If the pixel is requested, this attaches cache control headers to the response to prevent any caching. */ def cacheControl(pixelExpected: Boolean): List[`Cache-Control`] = diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala index a95332ac1..f17561452 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala @@ -142,6 +142,13 @@ package model { redirect: Boolean = false, port: Int = 443 ) + + sealed trait TracerConfig + object TracerConfig { + case object Noop extends TracerConfig + case object Jaeger extends TracerConfig + } + final case class CollectorConfig( interface: String, port: Int, @@ -157,7 +164,8 @@ package model { streams: StreamsConfig, prometheusMetrics: PrometheusMetricsConfig, enableDefaultRedirect: Boolean = false, - ssl: SSLConfig = SSLConfig() + ssl: SSLConfig = SSLConfig(), + tracer: TracerConfig = TracerConfig.Noop ) { val cookieConfig = if (cookie.enabled) Some(cookie) else None val doNotTrackHttpCookie = diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 3ef6cf1aa..19f3923fe 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -41,6 +41,7 @@ object Dependencies { val commonsCodec = "1.13" // force this version of lib from dependencies to mitigate secutiry vulnerabilities, TODO: update underlying libraries val grpcCore = "1.31.0" // force this version of lib from dependencies to mitigate secutiry vulnerabilities, TODO: update underlying libraries val opentracing = "0.33.0" + val jaeger = "1.4.0" // Scala val collectorPayload = "0.0.0" val scalaz7 = "7.0.9" @@ -74,6 +75,7 @@ object Dependencies { val cbor = "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % V.cbor val opentracingApi = "io.opentracing" % "opentracing-api" % V.opentracing val opentracingNoop = "io.opentracing" % "opentracing-noop" % V.opentracing + val jaeger = "io.jaegertracing" % "jaeger-client" % V.jaeger val retry = "com.softwaremill.retry" %% "retry" % V.retry // Scala From 03967dfec9bcde9d3bc9f0eb8683770b46c42c83 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Thu, 5 Nov 2020 21:59:02 +0000 Subject: [PATCH 3/4] Configuration options for jaeger --- .../Collector.scala | 24 +++++++++++++++++-- .../CollectorRoute.scala | 19 +++++++++++++-- .../CollectorService.scala | 2 ++ .../model.scala | 10 +++++++- 4 files changed, 50 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala index f40132338..1d1fda46f 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory import pureconfig._ import pureconfig.generic.{FieldCoproductHint, ProductHint} import pureconfig.generic.auto._ +import scala.collection.JavaConverters._ import metrics._ @@ -77,9 +78,28 @@ trait Collector { case TracerConfig.Noop => log.debug("Using noop tracer") NoopTracerFactory.create - case TracerConfig.Jaeger => + case j: TracerConfig.Jaeger => log.debug("Using jaeger tracer") - JaegerConfiguration.fromEnv.getTracer + new JaegerConfiguration(j.serviceName) + .withReporter { + val rc = new JaegerConfiguration.ReporterConfiguration + rc.withSender { + val sender = new JaegerConfiguration.SenderConfiguration + j.agentHost.foreach(sender.withAgentHost(_)) + j.agentPort.foreach(sender.withAgentPort(_)) + sender + } + rc + } + .withSampler { + val sampler = new JaegerConfiguration.SamplerConfiguration + j.samplerType.foreach(sampler.withType(_)) + j.samplerParam.foreach(sampler.withParam(_)) + j.managerHostPort.foreach(sampler.withManagerHostPort(_)) + sampler + } + .withTracerTags(j.tracerTags.asJava) + .getTracer } def run(collectorConf: CollectorConfig, akkaConf: Config, sinks: CollectorSinks): Unit = { diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala index 8f2804f91..c3cfbc974 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala @@ -16,13 +16,15 @@ package com.snowplowanalytics.snowplow.collectors.scalastream import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.HttpCookiePair -import akka.http.scaladsl.server.{Directive1, Route} +import akka.http.scaladsl.server.{Directive1, Route, RouteResult} import akka.http.scaladsl.server.Directives._ import io.opentracing.{Span, Tracer} import model.DntCookieMatcher import monitoring.BeanRegistry +import scala.collection.JavaConverters._ +import scala.util.{Success, Failure} trait CollectorRoute { def collectorService: Service @@ -48,8 +50,21 @@ trait CollectorRoute { def traceRoute(inner: Span => Route): Route = requestContext => { val span = tracer.buildSpan("HandleRequest").start + span.setTag("http.method", requestContext.request.method.name) + span.setTag("http.url", requestContext.request.uri.toString) + val fut = inner(span)(requestContext) - fut.onComplete { _ => + fut.onComplete { result => + result match { + case Success(RouteResult.Complete(response)) => + span.setTag("http.status", response.status.intValue) + case Success(RouteResult.Rejected(rejections)) => + span.setTag("error", result.isFailure) + span.log(Map("event" -> "error", "error.object" -> rejections).asJava) + case Failure(e) => + span.setTag("error", result.isFailure) + span.log(Map("event" -> "error", "error.object" -> e).asJava) + } span.finish }(requestContext.executionContext) fut diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala index 215fecd2f..cc7e143a2 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala @@ -104,6 +104,7 @@ class CollectorService( doNotTrack: Boolean, contentType: Option[ContentType] = None ): (HttpResponse, List[Array[Byte]]) = { + Option(tracer.activeSpan).map(_.log(Map("message" -> "cookie handler").asJava)) val queryParams = Uri.Query(queryString).toMap val (ipAddress, partitionKey) = ipAndPartitionKey(ip, config.streams.useIpAddressAsPartitionKey) @@ -230,6 +231,7 @@ class CollectorService( // Split events into Good and Bad val eventSplit = SplitBatch.splitAndSerializePayload(event, sinks.good.MaxBytes) val span = tracer.buildSpan("SinkRawEvents").start() + span.setTag("component", sinks.good.getClass.getSimpleName) try { // Send events to respective sinks val sinkResponseGood = sinks.good.storeRawEvents(eventSplit.good, partitionKey) diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala index f17561452..7daec620f 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala @@ -146,7 +146,15 @@ package model { sealed trait TracerConfig object TracerConfig { case object Noop extends TracerConfig - case object Jaeger extends TracerConfig + case class Jaeger( + serviceName: String, + agentHost: Option[String], + agentPort: Option[Int], + samplerType: Option[String], + samplerParam: Option[Float], + managerHostPort: Option[String], + tracerTags: Map[String, String] = Map() + ) extends TracerConfig } final case class CollectorConfig( From 931bb986cfe6b24a18bd0986a6d9ea621d550170 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Fri, 6 Nov 2020 15:56:56 +0000 Subject: [PATCH 4/4] Add zipkin as a tracer --- build.sbt | 3 + .../Collector.scala | 36 +------- .../CollectorRoute.scala | 2 +- .../CollectorService.scala | 2 +- .../Tracing.scala | 82 +++++++++++++++++++ .../model.scala | 7 ++ examples/config.hocon.sample | 39 +++++++++ project/Dependencies.scala | 4 + 8 files changed, 138 insertions(+), 37 deletions(-) create mode 100644 core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Tracing.scala diff --git a/build.sbt b/build.sbt index 322d17e2e..743535db0 100644 --- a/build.sbt +++ b/build.sbt @@ -29,6 +29,9 @@ lazy val commonDependencies = Seq( Dependencies.Libraries.opentracingApi, Dependencies.Libraries.opentracingNoop, Dependencies.Libraries.jaeger, + Dependencies.Libraries.jaegerZipkin, + Dependencies.Libraries.zipkin, + Dependencies.Libraries.zipkinSender, // Scala Dependencies.Libraries.scopt, Dependencies.Libraries.scalaz7, diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala index 1d1fda46f..e43907b3e 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala @@ -24,15 +24,10 @@ import akka.http.scaladsl.server.Directives._ import akka.stream.ActorMaterializer import com.typesafe.config.{Config, ConfigFactory} import com.typesafe.sslconfig.akka.AkkaSSLConfig -import io.jaegertracing.{Configuration => JaegerConfiguration} -import io.opentracing.Tracer -import io.opentracing.noop.NoopTracerFactory import org.slf4j.LoggerFactory import pureconfig._ import pureconfig.generic.{FieldCoproductHint, ProductHint} import pureconfig.generic.auto._ -import scala.collection.JavaConverters._ - import metrics._ import model._ @@ -73,42 +68,13 @@ trait Collector { (loadConfigOrThrow[CollectorConfig](conf.getConfig("collector")), conf) } - def tracer(config: TracerConfig): Tracer = - config match { - case TracerConfig.Noop => - log.debug("Using noop tracer") - NoopTracerFactory.create - case j: TracerConfig.Jaeger => - log.debug("Using jaeger tracer") - new JaegerConfiguration(j.serviceName) - .withReporter { - val rc = new JaegerConfiguration.ReporterConfiguration - rc.withSender { - val sender = new JaegerConfiguration.SenderConfiguration - j.agentHost.foreach(sender.withAgentHost(_)) - j.agentPort.foreach(sender.withAgentPort(_)) - sender - } - rc - } - .withSampler { - val sampler = new JaegerConfiguration.SamplerConfiguration - j.samplerType.foreach(sampler.withType(_)) - j.samplerParam.foreach(sampler.withParam(_)) - j.managerHostPort.foreach(sampler.withManagerHostPort(_)) - sampler - } - .withTracerTags(j.tracerTags.asJava) - .getTracer - } - def run(collectorConf: CollectorConfig, akkaConf: Config, sinks: CollectorSinks): Unit = { implicit val system = ActorSystem.create("scala-stream-collector", akkaConf) implicit val materializer = ActorMaterializer() implicit val executionContext = system.dispatcher - val sharedTracer = tracer(collectorConf.tracer) + val sharedTracer = Tracing.tracer(collectorConf.tracer) val collectorRoute = new CollectorRoute { override def collectorService = new CollectorService(collectorConf, sinks, sharedTracer) diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala index c3cfbc974..e7f4b5872 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala @@ -49,7 +49,7 @@ trait CollectorRoute { def traceRoute(inner: Span => Route): Route = requestContext => { - val span = tracer.buildSpan("HandleRequest").start + val span = tracer.buildSpan("handle-request").start span.setTag("http.method", requestContext.request.method.name) span.setTag("http.url", requestContext.request.uri.toString) diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala index cc7e143a2..61255939f 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala @@ -230,7 +230,7 @@ class CollectorService( ): List[Array[Byte]] = { // Split events into Good and Bad val eventSplit = SplitBatch.splitAndSerializePayload(event, sinks.good.MaxBytes) - val span = tracer.buildSpan("SinkRawEvents").start() + val span = tracer.buildSpan("sink-raw-events").start() span.setTag("component", sinks.good.getClass.getSimpleName) try { // Send events to respective sinks diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Tracing.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Tracing.scala new file mode 100644 index 000000000..6af79f307 --- /dev/null +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Tracing.scala @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2013-2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, and + * you may not use this file except in compliance with the Apache License + * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the Apache License Version 2.0 is distributed on an "AS + * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.collectors.scalastream + +import io.jaegertracing.{Configuration => JaegerConfiguration} +import io.jaegertracing.internal.propagation.B3TextMapCodec +import io.jaegertracing.zipkin.ZipkinV2Reporter +import io.opentracing.Tracer +import io.opentracing.noop.NoopTracerFactory +import io.opentracing.propagation.Format +import org.slf4j.LoggerFactory +import zipkin2.reporter.okhttp3.OkHttpSender +import zipkin2.reporter.AsyncReporter + +import scala.collection.JavaConverters._ + +import com.snowplowanalytics.snowplow.collectors.scalastream.model.TracerConfig + + +object Tracing { + + lazy val log = LoggerFactory.getLogger(getClass()) + + def tracer(config: TracerConfig): Tracer = + config match { + case TracerConfig.Noop => + log.debug("Using noop tracer") + NoopTracerFactory.create + case j: TracerConfig.Jaeger => + log.debug("Using jaeger tracer") + new JaegerConfiguration(j.serviceName) + .withReporter { + val rc = new JaegerConfiguration.ReporterConfiguration + rc.withSender { + val sender = new JaegerConfiguration.SenderConfiguration + j.agentHost.foreach(sender.withAgentHost(_)) + j.agentPort.foreach(sender.withAgentPort(_)) + sender + } + rc + } + .withSampler { + val sampler = new JaegerConfiguration.SamplerConfiguration + j.samplerType.foreach(sampler.withType(_)) + j.samplerParam.foreach(sampler.withParam(_)) + j.managerHostPort.foreach(sampler.withManagerHostPort(_)) + sampler + } + .withTracerTags(j.tracerTags.asJava) + .getTracer + case z: TracerConfig.Zipkin => + log.debug("Using zipkin tracer") + val b3Codec = new B3TextMapCodec.Builder().build; + + new JaegerConfiguration(z.serviceName) + .withSampler { + (new JaegerConfiguration.SamplerConfiguration) + .withType(z.samplerType) + .withParam(z.samplerParam) + } + .withTracerTags(z.tracerTags.asJava) + .getTracerBuilder + .withReporter { + new ZipkinV2Reporter(AsyncReporter.create(OkHttpSender.create(z.endpoint))) + } + .registerInjector(Format.Builtin.HTTP_HEADERS, b3Codec) + .registerExtractor(Format.Builtin.HTTP_HEADERS, b3Codec) + .build + } +} diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala index 7daec620f..c2147926f 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala @@ -155,6 +155,13 @@ package model { managerHostPort: Option[String], tracerTags: Map[String, String] = Map() ) extends TracerConfig + case class Zipkin( + serviceName: String, + endpoint: String, + samplerType: String = "const", + samplerParam: Float = 1.0f, + tracerTags: Map[String, String] = Map() + ) extends TracerConfig } final case class CollectorConfig( diff --git a/examples/config.hocon.sample b/examples/config.hocon.sample index 1bcef1e3f..73a0d020b 100644 --- a/examples/config.hocon.sample +++ b/examples/config.hocon.sample @@ -318,6 +318,45 @@ collector { } } + # Enable Jaeger distributed tracing + #tracer { + # enabled = jaeger + # serviceName = collector + # + # # The following are all optional, and use the Jaeger client defaults if not set. + # agentHost = localhost + # agentPort = 6831 + # managerHostPort = localhost:5778 + # # For sampler types see https://www.jaegertracing.io/docs/1.17/sampling/#client-sampling-configuration + # samplerType = remote + # samplerParam = # Not needed for remote sampler type + # + # # Optional tags to append to the trace + # tracerTags = { + # tag1 = value1 + # tag2 = value2 + # } + #} + + # Or enable Zipkin distributed tracing + tracer { + enabled = zipkin + serviceName = collector + endpoint = "http://localhost:9411/api/v2/spans" + + # samplerType must be one of constant, probabilistic, or ratelimiting. Default is constant. + # See https://www.jaegertracing.io/docs/1.17/sampling/#client-sampling-configuration + # Note we use jaeger client sampling, even when we send traces to zipkin backend. + samplerType = ratelimiting + samplerParam = 2.0 + + # Optional tags to append to the trace + tracerTags = { + tag1 = value1 + tag2 = value2 + } + } + } # Akka has a variety of possible configuration options defined at diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 19f3923fe..e17927121 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -42,6 +42,7 @@ object Dependencies { val grpcCore = "1.31.0" // force this version of lib from dependencies to mitigate secutiry vulnerabilities, TODO: update underlying libraries val opentracing = "0.33.0" val jaeger = "1.4.0" + val zipkin = "2.16.0" // Scala val collectorPayload = "0.0.0" val scalaz7 = "7.0.9" @@ -76,6 +77,9 @@ object Dependencies { val opentracingApi = "io.opentracing" % "opentracing-api" % V.opentracing val opentracingNoop = "io.opentracing" % "opentracing-noop" % V.opentracing val jaeger = "io.jaegertracing" % "jaeger-client" % V.jaeger + val jaegerZipkin = "io.jaegertracing" % "jaeger-zipkin" % V.jaeger + val zipkin = "io.zipkin.reporter2" % "zipkin-reporter" % V.zipkin + val zipkinSender = "io.zipkin.reporter2" % "zipkin-sender-okhttp3" % V.zipkin val retry = "com.softwaremill.retry" %% "retry" % V.retry // Scala