diff --git a/build.sbt b/build.sbt index 0df6ebf69..5d53bbd41 100644 --- a/build.sbt +++ b/build.sbt @@ -26,6 +26,8 @@ lazy val commonDependencies = Seq( Dependencies.Libraries.config, Dependencies.Libraries.prometheus, Dependencies.Libraries.prometheusCommon, + Dependencies.Libraries.opentracingApi, + Dependencies.Libraries.opentracingNoop, // Scala Dependencies.Libraries.scopt, Dependencies.Libraries.scalaz7, diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala index a7e9d1608..8af44f796 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala @@ -24,11 +24,13 @@ import akka.http.scaladsl.server.Directives._ import akka.stream.ActorMaterializer import com.typesafe.config.{Config, ConfigFactory} import com.typesafe.sslconfig.akka.AkkaSSLConfig +import io.opentracing.noop.NoopTracerFactory import org.slf4j.LoggerFactory import pureconfig._ import pureconfig.generic.{FieldCoproductHint, ProductHint} import pureconfig.generic.auto._ + import metrics._ import model._ @@ -73,8 +75,11 @@ trait Collector { implicit val materializer = ActorMaterializer() implicit val executionContext = system.dispatcher + val sharedTracer = NoopTracerFactory.create + val collectorRoute = new CollectorRoute { - override def collectorService = new CollectorService(collectorConf, sinks) + override def collectorService = new CollectorService(collectorConf, sinks, sharedTracer) + override def tracer = sharedTracer } val prometheusMetricsService = new PrometheusMetricsService(collectorConf.prometheusMetrics) diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala index 944c33cd9..7310b5e71 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala @@ -16,14 +16,19 @@ package com.snowplowanalytics.snowplow.collectors.scalastream import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.HttpCookiePair -import akka.http.scaladsl.server.{Directive1, Route} +import akka.http.scaladsl.server.{Directive1, Route, StandardRoute} import akka.http.scaladsl.server.Directives._ +import io.opentracing.{Span, Tracer} + import model.DntCookieMatcher import monitoring.BeanRegistry +import scala.concurrent.ExecutionContext + trait CollectorRoute { def collectorService: Service + def tracer: Tracer private val headers = optionalHeaderValueByName("User-Agent") & optionalHeaderValueByName("Referer") & @@ -42,8 +47,18 @@ trait CollectorRoute { complete(StatusCodes.NotFound -> "redirects disabled") } + def completeWithSpan(r: HttpResponse, span: Span): StandardRoute = + requestContext => { + val fut = complete(r)(requestContext) + fut.onComplete { _ => + span.finish + }(ExecutionContext.global) + fut + } + def routes: Route = doNotTrack(collectorService.doNotTrackCookie) { dnt => + val span = tracer.buildSpan("CollectorRequest").start cookieIfWanted(collectorService.cookieName) { reqCookie => val cookie = reqCookie.map(_.toCookie) headers { (userAgent, refererURI, rawRequestURI) => @@ -69,7 +84,7 @@ trait CollectorRoute { doNotTrack = dnt, Some(ct)) incrementRequests(r.status) - complete(r) + completeWithSpan(r, span) } } } ~ @@ -87,7 +102,7 @@ trait CollectorRoute { pixelExpected = true, doNotTrack = dnt) incrementRequests(r.status) - complete(r) + completeWithSpan(r, span) } } ~ path("""ice\.png""".r | "i".r) { path => @@ -105,7 +120,7 @@ trait CollectorRoute { pixelExpected = true, doNotTrack = dnt) incrementRequests(r.status) - complete(r) + completeWithSpan(r, span) } } } diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala index 643169e94..af4ecc75c 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala @@ -23,6 +23,8 @@ import akka.http.scaladsl.model.headers._ import akka.http.scaladsl.model.headers.CacheDirectives._ import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload +import io.opentracing.Tracer +import io.opentracing.propagation.{Format, TextMapAdapter} import org.apache.commons.codec.binary.Base64 import org.slf4j.LoggerFactory @@ -66,7 +68,8 @@ object CollectorService { class CollectorService( config: CollectorConfig, - sinks: CollectorSinks + sinks: CollectorSinks, + tracer: Tracer ) extends Service { private val logger = LoggerFactory.getLogger(getClass) @@ -199,6 +202,7 @@ class CollectorService( networkUserId: String, contentType: Option[String] ): CollectorPayload = { + val e = new CollectorPayload( "iglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0", ipAddress, @@ -213,7 +217,7 @@ class CollectorService( refererUri.foreach(e.refererUri = _) e.hostname = hostname e.networkUserId = networkUserId - e.headers = (headers(request) ++ contentType).asJava + e.headers = (headers(request) ++ contentType ++ tracerHeaders).asJava contentType.foreach(e.contentType = _) e } @@ -226,8 +230,10 @@ class CollectorService( // Split events into Good and Bad val eventSplit = SplitBatch.splitAndSerializePayload(event, sinks.good.MaxBytes) // Send events to respective sinks + val span = tracer.buildSpan("SinkRawEvents").start val sinkResponseGood = sinks.good.storeRawEvents(eventSplit.good, partitionKey) val sinkResponseBad = sinks.bad.storeRawEvents(eventSplit.bad, partitionKey) + span.finish // Sink Responses for Test Sink sinkResponseGood ++ sinkResponseBad } @@ -345,6 +351,13 @@ class CollectorService( case other => Some(other.toString) } + def tracerHeaders: Iterable[String] = { + val m = scala.collection.mutable.Map.empty[String, String] + val adapter = new TextMapAdapter(m.asJava) + tracer.inject(tracer.activeSpan.context, Format.Builtin.HTTP_HEADERS, adapter) + m.map { case (k, v) => s"$k: $v" } + } + /** If the pixel is requested, this attaches cache control headers to the response to prevent any caching. */ def cacheControl(pixelExpected: Boolean): List[`Cache-Control`] = if (pixelExpected) List(`Cache-Control`(`no-cache`, `no-store`, `must-revalidate`)) diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRouteSpec.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRouteSpec.scala index 871d8b14d..f4ca8bf66 100644 --- a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRouteSpec.scala +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRouteSpec.scala @@ -19,10 +19,12 @@ import akka.http.scaladsl.model.headers._ import akka.http.scaladsl.testkit.Specs2RouteTest import akka.http.scaladsl.server.Directives._ import com.snowplowanalytics.snowplow.collectors.scalastream.model.DntCookieMatcher +import io.opentracing.noop.NoopTracerFactory import org.specs2.mutable.Specification class CollectorRouteSpec extends Specification with Specs2RouteTest { val mkRoute = (withRedirects: Boolean) => new CollectorRoute { + override val tracer = NoopTracerFactory.create override val collectorService = new Service { def preflightResponse(req: HttpRequest): HttpResponse = HttpResponse(200, entity = "preflight response") diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala index ea3e7be02..2781b722c 100644 --- a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala @@ -24,6 +24,7 @@ import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers._ import akka.http.scaladsl.model.headers.CacheDirectives._ +import io.opentracing.noop.NoopTracerFactory import org.apache.thrift.{TDeserializer, TSerializer} import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload @@ -35,11 +36,13 @@ import model._ class CollectorServiceSpec extends Specification { val service = new CollectorService( TestUtils.testConf, - CollectorSinks(new TestSink, new TestSink) + CollectorSinks(new TestSink, new TestSink), + NoopTracerFactory.create ) val bouncingService = new CollectorService( TestUtils.testConf.copy(cookieBounce = TestUtils.testConf.cookieBounce.copy(enabled = true)), - CollectorSinks(new TestSink, new TestSink) + CollectorSinks(new TestSink, new TestSink), + NoopTracerFactory.create ) val uuidRegex = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}".r val event = new CollectorPayload( @@ -503,7 +506,8 @@ class CollectorServiceSpec extends Specification { "should pass on the original path if no mapping for it can be found" in { val service = new CollectorService( TestUtils.testConf.copy(paths = Map.empty[String, String]), - CollectorSinks(new TestSink, new TestSink) + CollectorSinks(new TestSink, new TestSink), + NoopTracerFactory.create ) val expected1 = "/com.acme/track" val expected2 = "/com.acme/redirect" diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 17b653546..3ef6cf1aa 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -40,6 +40,7 @@ object Dependencies { val thrift = "0.13.0" // force this version of lib from dependencies to mitigate secutiry vulnerabilities, TODO: update underlying libraries val commonsCodec = "1.13" // force this version of lib from dependencies to mitigate secutiry vulnerabilities, TODO: update underlying libraries val grpcCore = "1.31.0" // force this version of lib from dependencies to mitigate secutiry vulnerabilities, TODO: update underlying libraries + val opentracing = "0.33.0" // Scala val collectorPayload = "0.0.0" val scalaz7 = "7.0.9" @@ -71,6 +72,8 @@ object Dependencies { val prometheus = "io.prometheus" % "simpleclient" % V.prometheus val prometheusCommon = "io.prometheus" % "simpleclient_common" % V.prometheus val cbor = "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % V.cbor + val opentracingApi = "io.opentracing" % "opentracing-api" % V.opentracing + val opentracingNoop = "io.opentracing" % "opentracing-noop" % V.opentracing val retry = "com.softwaremill.retry" %% "retry" % V.retry // Scala