Skip to content

Commit

Permalink
Pipeline id telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Oct 5, 2023
1 parent eee1035 commit 9fbdb77
Show file tree
Hide file tree
Showing 21 changed files with 355 additions and 130 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ lazy val http4s = project
Dependencies.Libraries.emitterHttps,
Dependencies.Libraries.specs2,
Dependencies.Libraries.specs2CE,
Dependencies.Libraries.ceTestkit,

//Integration tests
Dependencies.Libraries.IT.testcontainers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ abstract class App[SinkConfig <: Config.Sink: Decoder](appInfo: AppInfo)

def mkSinks(config: Config.Streams[SinkConfig]): Resource[IO, Sinks[IO]]

def telemetryInfo(config: Config[SinkConfig]): Telemetry.TelemetryInfo
def telemetryInfo(config: Config.Streams[SinkConfig]): IO[Telemetry.TelemetryInfo]

final def main: Opts[IO[ExitCode]] = Run.fromCli[IO, SinkConfig](appInfo, mkSinks, telemetryInfo)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ object Run {

type MkSinks[F[_], SinkConfig] = Config.Streams[SinkConfig] => Resource[F, Sinks[F]]

type TelemetryInfo[SinkConfig] = Config[SinkConfig] => Telemetry.TelemetryInfo
type TelemetryInfo[F[_], SinkConfig] = Config.Streams[SinkConfig] => F[Telemetry.TelemetryInfo]

implicit private def logger[F[_]: Sync] = Slf4jLogger.getLogger[F]

def fromCli[F[_]: Async: Tracking, SinkConfig: Decoder](
appInfo: AppInfo,
mkSinks: MkSinks[F, SinkConfig],
telemetryInfo: TelemetryInfo[SinkConfig]
telemetryInfo: TelemetryInfo[F, SinkConfig]
): Opts[F[ExitCode]] = {
val configPath = Opts.option[Path]("config", "Path to HOCON configuration (optional)", "c", "config.hocon").orNone
configPath.map(fromPath[F, SinkConfig](appInfo, mkSinks, telemetryInfo, _))
Expand All @@ -43,7 +43,7 @@ object Run {
private def fromPath[F[_]: Async: Tracking, SinkConfig: Decoder](
appInfo: AppInfo,
mkSinks: MkSinks[F, SinkConfig],
telemetryInfo: TelemetryInfo[SinkConfig],
telemetryInfo: TelemetryInfo[F, SinkConfig],
path: Option[Path]
): F[ExitCode] = {
val eitherT = for {
Expand All @@ -60,7 +60,7 @@ object Run {
private def fromConfig[F[_]: Async: Tracking, SinkConfig](
appInfo: AppInfo,
mkSinks: MkSinks[F, SinkConfig],
telemetryInfo: TelemetryInfo[SinkConfig],
telemetryInfo: TelemetryInfo[F, SinkConfig],
config: Config[SinkConfig]
): F[ExitCode] = {
val resources = for {
Expand All @@ -81,14 +81,9 @@ object Run {
} yield httpClient

resources.use { httpClient =>
val appId = java.util.UUID.randomUUID.toString
Telemetry
.run(
config.telemetry,
httpClient,
appInfo,
telemetryInfo(config).region,
telemetryInfo(config).cloud
)
.run(config.telemetry, httpClient, appInfo, appId, telemetryInfo(config.streams))
.compile
.drain
.flatMap(_ => Async[F].never[ExitCode])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package com.snowplowanalytics.snowplow.collector.core
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import org.apache.commons.codec.digest.DigestUtils

import cats.data.NonEmptyList
import cats.implicits._

Expand Down Expand Up @@ -32,25 +34,20 @@ object Telemetry {
telemetryConfig: Config.Telemetry,
httpClient: HttpClient[F],
appInfo: AppInfo,
region: Option[String],
cloud: Option[String]
appId: String,
telemetryInfoF: F[TelemetryInfo]
): Stream[F, Unit] =
if (telemetryConfig.disable)
Stream.empty.covary[F]
else {
val sdj = makeHeartbeatEvent(
telemetryConfig,
region,
cloud,
appInfo.moduleName,
appInfo.version
)
Stream.resource(initTracker(telemetryConfig, appInfo.moduleName, httpClient)).flatMap { tracker =>
Stream.fixedDelay[F](telemetryConfig.interval).evalMap { _ =>
else
for {
telemetryInfo <- Stream.eval(telemetryInfoF)
sdj = makeHeartbeatEvent(telemetryConfig, appInfo, appId, telemetryInfo)
tracker <- Stream.resource(initTracker(telemetryConfig, appInfo.moduleName, httpClient))
_ <- Stream.fixedDelay[F](telemetryConfig.interval).evalMap { _ =>
tracker.trackSelfDescribingEvent(unstructEvent = sdj) >> tracker.flushEmitters()
}
}
}
} yield ()

private def initTracker[F[_]: Async: Tracking](
config: Config.Telemetry,
Expand Down Expand Up @@ -90,29 +87,39 @@ object Telemetry {

private def makeHeartbeatEvent(
teleCfg: Config.Telemetry,
region: Option[String],
cloud: Option[String],
appName: String,
appVersion: String
appInfo: AppInfo,
appId: String,
telemetryInfo: TelemetryInfo
): SelfDescribingData[Json] =
SelfDescribingData(
SchemaKey("com.snowplowanalytics.oss", "oss_context", "jsonschema", SchemaVer.Full(1, 0, 1)),
SchemaKey("com.snowplowanalytics.oss", "oss_context", "jsonschema", SchemaVer.Full(1, 0, 2)),
Json.obj(
"userProvidedId" -> teleCfg.userProvidedId.asJson,
"autoGeneratedId" -> teleCfg.autoGeneratedId.asJson,
"moduleName" -> teleCfg.moduleName.asJson,
"moduleVersion" -> teleCfg.moduleVersion.asJson,
"instanceId" -> teleCfg.instanceId.asJson,
"appGeneratedId" -> java.util.UUID.randomUUID.toString.asJson,
"cloud" -> cloud.asJson,
"region" -> region.asJson,
"applicationName" -> appName.asJson,
"applicationVersion" -> appVersion.asJson
"appGeneratedId" -> appId.asJson,
"cloud" -> telemetryInfo.cloud.asJson,
"region" -> telemetryInfo.region.asJson,
"pipelineId" -> telemetryInfo.hashedPipelineId.asJson,
"applicationName" -> appInfo.moduleName.asJson,
"applicationVersion" -> appInfo.version.asJson
)
)

/**
* Stores destination specific telemetry data
* @param region Cloud region application is deployed
* @param cloud Cloud application is deployed
* @param unhashedPipelineId Unhashed version of id that is used identify pipeline.
* It should be something unique to that pipeline such as account id, project id etc.
*/
case class TelemetryInfo(
region: Option[String],
cloud: Option[String]
)
cloud: Option[String],
unhashedPipelineId: Option[String]
) {
def hashedPipelineId: Option[String] = unhashedPipelineId.map(DigestUtils.sha256Hex)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package com.snowplowanalytics.snowplow.collector.core

import scala.concurrent.duration._
import scala.collection.mutable.ListBuffer

import org.apache.commons.codec.binary.Base64
import org.apache.commons.codec.digest.DigestUtils

import java.nio.charset.StandardCharsets

import cats.effect._
import cats.effect.unsafe.implicits.global
import cats.effect.testkit.TestControl

import org.http4s._
import org.http4s.client.{Client => HttpClient}

import io.circe._
import io.circe.parser._
import io.circe.syntax._

import fs2.Stream

import com.snowplowanalytics.snowplow.scalatracker.emitters.http4s.ceTracking

import org.specs2.mutable.Specification

class TelemetrySpec extends Specification {

case class ProbeTelemetry(
telemetryStream: Stream[IO, Unit],
telemetryEvents: ListBuffer[Json]
)

val appId = "testAppId"
val region = Some("testRegion")
val cloud = Some("testCloud")
val unhashedPipelineId = Some("testPipelineId")
val interval = 5.minutes
val telemetryConfig = Config.Telemetry(
disable = false,
interval = interval,
method = "POST",
url = "127.0.0.1",
port = 443,
secure = true,
userProvidedId = None,
moduleName = None,
moduleVersion = None,
instanceId = None,
autoGeneratedId = None
)

def probeTelemetry(telemetryConfig: Config.Telemetry): ProbeTelemetry = {
val telemetryEvents = ListBuffer[Json]()
val mockHttpApp = HttpRoutes
.of[IO] {
case req =>
IO {
telemetryEvents += extractTelemetryEvent(req)
Response[IO](status = Status.Ok)
}
}
.orNotFound
val mockClient = HttpClient.fromHttpApp[IO](mockHttpApp)
val telemetryInfoF = IO(Telemetry.TelemetryInfo(region, cloud, unhashedPipelineId))
val telemetryStream = Telemetry.run[IO](
telemetryConfig,
mockClient,
TestUtils.appInfo,
appId,
telemetryInfoF
)
ProbeTelemetry(telemetryStream, telemetryEvents)
}

def extractTelemetryEvent(req: Request[IO]): Json = {
val body = req.bodyText.compile.string.unsafeRunSync()
val jsonBody = parse(body).toOption.get
val uepxEncoded = jsonBody.hcursor.downField("data").downN(0).downField("ue_px").as[String].toOption.get
val uePxDecoded = new String(Base64.decodeBase64(uepxEncoded), StandardCharsets.UTF_8)
parse(uePxDecoded).toOption.get.hcursor.downField("data").as[Json].toOption.get
}

def expectedEvent(config: Config.Telemetry): Json = {
val pipelineId = unhashedPipelineId.map(DigestUtils.sha256Hex)
Json.obj(
"schema" -> "iglu:com.snowplowanalytics.oss/oss_context/jsonschema/1-0-2".asJson,
"data" -> Json.obj(
"userProvidedId" -> config.userProvidedId.asJson,
"autoGeneratedId" -> config.autoGeneratedId.asJson,
"moduleName" -> config.moduleName.asJson,
"moduleVersion" -> config.moduleVersion.asJson,
"instanceId" -> config.instanceId.asJson,
"appGeneratedId" -> appId.asJson,
"cloud" -> cloud.asJson,
"region" -> region.asJson,
"pipelineId" -> pipelineId.asJson,
"applicationName" -> TestUtils.appInfo.name.asJson,
"applicationVersion" -> TestUtils.appInfo.version.asJson
)
)
}

"Telemetry" should {
"send correct number of events with expected hostnames" in {
val eventCount = 10
val timeout = (interval * eventCount.toLong) + 1.minutes
val probe = probeTelemetry(telemetryConfig)
TestControl.executeEmbed(probe.telemetryStream.timeout(timeout).compile.drain.voidError).unsafeRunSync()
val events = probe.telemetryEvents
val expected = (1 to eventCount).map(_ => expectedEvent(telemetryConfig)).toList
events must beEqualTo(expected)
}

"not send any events if telemetry is disabled" in {
val probe = probeTelemetry(telemetryConfig.copy(disable = true))
TestControl
.executeEmbed(
probe.telemetryStream.timeout(interval * 10).compile.drain.voidError
)
.unsafeRunSync()
probe.telemetryEvents must beEmpty
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ object KafkaCollector extends App[KafkaSinkConfig](BuildInfo) {
)
} yield Sinks(good, bad)

override def telemetryInfo(config: Config[KafkaSinkConfig]): Telemetry.TelemetryInfo =
Telemetry.TelemetryInfo(None, None)
override def telemetryInfo(config: Config.Streams[KafkaSinkConfig]): IO[Telemetry.TelemetryInfo] =
TelemetryUtils.getAzureSubscriptionId.map {
case None => Telemetry.TelemetryInfo(None, None, None)
case Some(id) => Telemetry.TelemetryInfo(None, Some("Azure"), Some(id))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.snowplowanalytics.snowplow.collectors.scalastream

import cats.effect.IO
import org.http4s._
import org.http4s.blaze.client.BlazeClientBuilder
import org.typelevel.ci._
import io.circe.parser

object TelemetryUtils {

def getAzureSubscriptionId: IO[Option[String]] = {
val response = for {
client <- BlazeClientBuilder[IO].resource
request = Request[IO](
method = Method.GET,
uri = Uri.unsafeFromString("http://169.254.169.254/metadata/instance?api-version=2021-02-01"),
headers = Headers(Header.Raw(ci"Metadata", "true"))
)
response <- client.run(request)
} yield response
response.use(_.bodyText.compile.string.map(extractId)).handleError(_ => None)
}

private def extractId(metadata: String): Option[String] =
for {
json <- parser.parse(metadata).toOption
id <- json.hcursor.downField("compute").downField("subscriptionId").as[String].toOption
} yield id
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package com.snowplowanalytics.snowplow.collectors.scalastream

import cats.effect.{IO, Resource}

import com.snowplowanalytics.snowplow.collector.core.model.Sinks
import com.snowplowanalytics.snowplow.collector.core.{App, Config, Telemetry}
import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.{KinesisSink, KinesisSinkConfig}

import org.slf4j.LoggerFactory

import java.util.concurrent.ScheduledThreadPoolExecutor
Expand Down Expand Up @@ -48,11 +50,16 @@ object KinesisCollector extends App[KinesisSinkConfig](BuildInfo) {
} yield Sinks(good, bad)
}

override def telemetryInfo(config: Config[KinesisSinkConfig]): Telemetry.TelemetryInfo =
Telemetry.TelemetryInfo(
region = Some(config.streams.sink.region),
cloud = Some("AWS")
)
override def telemetryInfo(config: Config.Streams[KinesisSinkConfig]): IO[Telemetry.TelemetryInfo] =
TelemetryUtils
.getAccountId(config)
.map(id =>
Telemetry.TelemetryInfo(
region = Some(config.sink.region),
cloud = Some("AWS"),
unhashedPipelineId = id
)
)

def buildExecutorService(kc: KinesisSinkConfig): ScheduledThreadPoolExecutor = {
log.info("Creating thread pool of size " + kc.threadPoolSize)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.snowplowanalytics.snowplow.collectors.scalastream

import cats.effect.{IO, Resource}

import com.snowplowanalytics.snowplow.collector.core.Config
import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.{KinesisSink, KinesisSinkConfig}

object TelemetryUtils {

def getAccountId(config: Config.Streams[KinesisSinkConfig]): IO[Option[String]] =
Resource
.make(
IO(KinesisSink.createKinesisClient(config.sink.endpoint, config.sink.region)).rethrow
)(c => IO(c.shutdown()))
.use { kinesis =>
IO {
val streamArn = KinesisSink.describeStream(kinesis, config.good).getStreamARN
Some(extractAccountId(streamArn))
}
}
.handleError(_ => None)

def extractAccountId(kinesisStreamArn: String): String =
kinesisStreamArn.split(":")(4)

}
Loading

0 comments on commit 9fbdb77

Please sign in to comment.