Skip to content

Commit

Permalink
Pipeline id telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Oct 5, 2023
1 parent eee1035 commit 2d3d87e
Show file tree
Hide file tree
Showing 13 changed files with 156 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ abstract class App[SinkConfig <: Config.Sink: Decoder](appInfo: AppInfo)

def mkSinks(config: Config.Streams[SinkConfig]): Resource[IO, Sinks[IO]]

def telemetryInfo(config: Config[SinkConfig]): Telemetry.TelemetryInfo
def telemetryInfo(config: Config.Streams[SinkConfig]): IO[Telemetry.TelemetryInfo]

final def main: Opts[IO[ExitCode]] = Run.fromCli[IO, SinkConfig](appInfo, mkSinks, telemetryInfo)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ object Run {

type MkSinks[F[_], SinkConfig] = Config.Streams[SinkConfig] => Resource[F, Sinks[F]]

type TelemetryInfo[SinkConfig] = Config[SinkConfig] => Telemetry.TelemetryInfo
type TelemetryInfo[F[_], SinkConfig] = Config.Streams[SinkConfig] => F[Telemetry.TelemetryInfo]

implicit private def logger[F[_]: Sync] = Slf4jLogger.getLogger[F]

def fromCli[F[_]: Async: Tracking, SinkConfig: Decoder](
appInfo: AppInfo,
mkSinks: MkSinks[F, SinkConfig],
telemetryInfo: TelemetryInfo[SinkConfig]
telemetryInfo: TelemetryInfo[F, SinkConfig]
): Opts[F[ExitCode]] = {
val configPath = Opts.option[Path]("config", "Path to HOCON configuration (optional)", "c", "config.hocon").orNone
configPath.map(fromPath[F, SinkConfig](appInfo, mkSinks, telemetryInfo, _))
Expand All @@ -43,7 +43,7 @@ object Run {
private def fromPath[F[_]: Async: Tracking, SinkConfig: Decoder](
appInfo: AppInfo,
mkSinks: MkSinks[F, SinkConfig],
telemetryInfo: TelemetryInfo[SinkConfig],
telemetryInfo: TelemetryInfo[F, SinkConfig],
path: Option[Path]
): F[ExitCode] = {
val eitherT = for {
Expand All @@ -60,7 +60,7 @@ object Run {
private def fromConfig[F[_]: Async: Tracking, SinkConfig](
appInfo: AppInfo,
mkSinks: MkSinks[F, SinkConfig],
telemetryInfo: TelemetryInfo[SinkConfig],
telemetryInfo: TelemetryInfo[F, SinkConfig],
config: Config[SinkConfig]
): F[ExitCode] = {
val resources = for {
Expand All @@ -82,13 +82,7 @@ object Run {

resources.use { httpClient =>
Telemetry
.run(
config.telemetry,
httpClient,
appInfo,
telemetryInfo(config).region,
telemetryInfo(config).cloud
)
.run(config.telemetry, httpClient, appInfo, telemetryInfo(config.streams))
.compile
.drain
.flatMap(_ => Async[F].never[ExitCode])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package com.snowplowanalytics.snowplow.collector.core
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import org.apache.commons.codec.digest.DigestUtils

import cats.data.NonEmptyList
import cats.implicits._

Expand Down Expand Up @@ -32,25 +34,19 @@ object Telemetry {
telemetryConfig: Config.Telemetry,
httpClient: HttpClient[F],
appInfo: AppInfo,
region: Option[String],
cloud: Option[String]
telemetryInfoF: F[TelemetryInfo]
): Stream[F, Unit] =
if (telemetryConfig.disable)
Stream.empty.covary[F]
else {
val sdj = makeHeartbeatEvent(
telemetryConfig,
region,
cloud,
appInfo.moduleName,
appInfo.version
)
Stream.resource(initTracker(telemetryConfig, appInfo.moduleName, httpClient)).flatMap { tracker =>
Stream.fixedDelay[F](telemetryConfig.interval).evalMap { _ =>
else
for {
telemetryInfo <- Stream.eval(telemetryInfoF)
sdj = makeHeartbeatEvent(telemetryConfig, appInfo, telemetryInfo)
tracker <- Stream.resource(initTracker(telemetryConfig, appInfo.moduleName, httpClient))
_ <- Stream.fixedDelay[F](telemetryConfig.interval).evalMap { _ =>
tracker.trackSelfDescribingEvent(unstructEvent = sdj) >> tracker.flushEmitters()
}
}
}
} yield ()

private def initTracker[F[_]: Async: Tracking](
config: Config.Telemetry,
Expand Down Expand Up @@ -90,29 +86,31 @@ object Telemetry {

private def makeHeartbeatEvent(
teleCfg: Config.Telemetry,
region: Option[String],
cloud: Option[String],
appName: String,
appVersion: String
appInfo: AppInfo,
telemetryInfo: TelemetryInfo
): SelfDescribingData[Json] =
SelfDescribingData(
SchemaKey("com.snowplowanalytics.oss", "oss_context", "jsonschema", SchemaVer.Full(1, 0, 1)),
SchemaKey("com.snowplowanalytics.oss", "oss_context", "jsonschema", SchemaVer.Full(1, 0, 2)),
Json.obj(
"userProvidedId" -> teleCfg.userProvidedId.asJson,
"autoGeneratedId" -> teleCfg.autoGeneratedId.asJson,
"moduleName" -> teleCfg.moduleName.asJson,
"moduleVersion" -> teleCfg.moduleVersion.asJson,
"instanceId" -> teleCfg.instanceId.asJson,
"appGeneratedId" -> java.util.UUID.randomUUID.toString.asJson,
"cloud" -> cloud.asJson,
"region" -> region.asJson,
"applicationName" -> appName.asJson,
"applicationVersion" -> appVersion.asJson
"cloud" -> telemetryInfo.cloud.asJson,
"region" -> telemetryInfo.region.asJson,
"pipelineId" -> telemetryInfo.hashedPipelineId.asJson,
"applicationName" -> appInfo.moduleName.asJson,
"applicationVersion" -> appInfo.version.asJson
)
)

case class TelemetryInfo(
region: Option[String],
cloud: Option[String]
)
cloud: Option[String],
unhashedPipelineId: Option[String]
) {
def hashedPipelineId: Option[String] = unhashedPipelineId.map(DigestUtils.sha256Hex)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
package com.snowplowanalytics.snowplow.collectors.scalastream

import cats.effect.{IO, Resource}
import org.http4s._
import org.http4s.blaze.client.BlazeClientBuilder
import org.typelevel.ci._
import io.circe.parser
import com.snowplowanalytics.snowplow.collector.core.model.Sinks
import com.snowplowanalytics.snowplow.collector.core.{App, Config, Telemetry}
import com.snowplowanalytics.snowplow.collectors.scalastream.sinks._
Expand All @@ -37,6 +41,29 @@ object KafkaCollector extends App[KafkaSinkConfig](BuildInfo) {
)
} yield Sinks(good, bad)

override def telemetryInfo(config: Config[KafkaSinkConfig]): Telemetry.TelemetryInfo =
Telemetry.TelemetryInfo(None, None)
override def telemetryInfo(config: Config.Streams[KafkaSinkConfig]): IO[Telemetry.TelemetryInfo] =
getAzureSubscriptionId.map {
case None => Telemetry.TelemetryInfo(None, None, None)
case Some(id) => Telemetry.TelemetryInfo(None, Some("Azure"), Some(id))
}

def getAzureSubscriptionId: IO[Option[String]] = {
val response = for {
client <- BlazeClientBuilder[IO].resource
request = Request[IO](
method = Method.GET,
uri = Uri.unsafeFromString("http://169.254.169.254/metadata/instance?api-version=2021-02-01"),
headers = Headers(Header.Raw(ci"Metadata", "true"))
)
response <- client.run(request)
} yield response
response.use(_.bodyText.compile.string.map(extractId)).handleError(_ => None)
}

private def extractId(metadata: String): Option[String] =
for {
json <- parser.parse(metadata).toOption
id <- json.hcursor.downField("compute").downField("subscriptionId").as[String].toOption
} yield id

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package com.snowplowanalytics.snowplow.collectors.scalastream

import cats.effect.{IO, Resource}

import com.snowplowanalytics.snowplow.collector.core.model.Sinks
import com.snowplowanalytics.snowplow.collector.core.{App, Config, Telemetry}
import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.{KinesisSink, KinesisSinkConfig}

import org.slf4j.LoggerFactory

import java.util.concurrent.ScheduledThreadPoolExecutor
Expand Down Expand Up @@ -48,14 +50,33 @@ object KinesisCollector extends App[KinesisSinkConfig](BuildInfo) {
} yield Sinks(good, bad)
}

override def telemetryInfo(config: Config[KinesisSinkConfig]): Telemetry.TelemetryInfo =
Telemetry.TelemetryInfo(
region = Some(config.streams.sink.region),
cloud = Some("AWS")
override def telemetryInfo(config: Config.Streams[KinesisSinkConfig]): IO[Telemetry.TelemetryInfo] =
getAccountId(config).map(id =>
Telemetry.TelemetryInfo(
region = Some(config.sink.region),
cloud = Some("AWS"),
unhashedPipelineId = id
)
)

def buildExecutorService(kc: KinesisSinkConfig): ScheduledThreadPoolExecutor = {
log.info("Creating thread pool of size " + kc.threadPoolSize)
new ScheduledThreadPoolExecutor(kc.threadPoolSize)
}

def getAccountId(config: Config.Streams[KinesisSinkConfig]): IO[Option[String]] =
Resource
.make(
IO(KinesisSink.createKinesisClient(config.sink.endpoint, config.sink.region)).rethrow
)(c => IO(c.shutdown()))
.use { kinesis =>
IO {
val streamArn = KinesisSink.describeStream(kinesis, config.good).getStreamARN
Some(extractAccountId(streamArn))
}
}
.handleError(_ => None)

def extractAccountId(kinesisStreamArn: String): String =
kinesisStreamArn.split(":")(4)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package sinks
import cats.effect.{Resource, Sync}
import cats.implicits.catsSyntaxMonadErrorRethrow
import cats.syntax.either._
import com.amazonaws.auth._
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.services.kinesis.model._
import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClientBuilder}
Expand Down Expand Up @@ -356,10 +355,8 @@ class KinesisSink[F[_]: Sync] private (
override def run() {
while (!kinesisHealthy) {
Try {
val describeRequest = new DescribeStreamSummaryRequest()
describeRequest.setStreamName(streamName)
val describeResult = client.describeStreamSummary(describeRequest)
describeResult.getStreamDescriptionSummary().getStreamStatus()
val streamDescription = describeStream(client, streamName)
streamDescription.getStreamStatus()
} match {
case Success("ACTIVE") =>
log.info(s"Stream $streamName ACTIVE")
Expand Down Expand Up @@ -443,6 +440,31 @@ object KinesisSink {
Resource.make(acquire)(release)
}

/**
* Creates a new Kinesis client.
* @param provider aws credentials provider
* @param endpoint kinesis endpoint where the stream resides
* @param region aws region where the stream resides
* @return the initialized AmazonKinesisClient
*/
def createKinesisClient(
endpoint: String,
region: String
): Either[Throwable, AmazonKinesis] =
Either.catchNonFatal(
AmazonKinesisClientBuilder
.standard()
.withEndpointConfiguration(new EndpointConfiguration(endpoint, region))
.build()
)

def describeStream(client: AmazonKinesis, streamName: String) = {
val describeRequest = new DescribeStreamSummaryRequest()
describeRequest.setStreamName(streamName)
val describeResult = client.describeStreamSummary(describeRequest)
describeResult.getStreamDescriptionSummary()
}

/**
* Create a KinesisSink and schedule a task to flush its EventStorage.
* Exists so that no threads can get a reference to the KinesisSink
Expand All @@ -457,9 +479,8 @@ object KinesisSink {
executorService: ScheduledExecutorService
): Either[Throwable, KinesisSink[F]] = {
val clients = for {
provider <- getProvider(kinesisConfig.aws)
kinesisClient <- createKinesisClient(provider, kinesisConfig.endpoint, kinesisConfig.region)
sqsClientAndName <- sqsBuffer(sqsBufferName, provider, kinesisConfig.region)
kinesisClient <- createKinesisClient(kinesisConfig.endpoint, kinesisConfig.region)
sqsClientAndName <- sqsBuffer(sqsBufferName, kinesisConfig.region)
} yield (kinesisClient, sqsClientAndName)

clients.map {
Expand All @@ -483,66 +504,19 @@ object KinesisSink {
}
}

/** Create an aws credentials provider through env variables and iam. */
private def getProvider(awsConfig: KinesisSinkConfig.AWSConfig): Either[Throwable, AWSCredentialsProvider] = {
def isDefault(key: String): Boolean = key == "default"
def isIam(key: String): Boolean = key == "iam"
def isEnv(key: String): Boolean = key == "env"

((awsConfig.accessKey, awsConfig.secretKey) match {
case (a, s) if isDefault(a) && isDefault(s) =>
new DefaultAWSCredentialsProviderChain().asRight
case (a, s) if isDefault(a) || isDefault(s) =>
"accessKey and secretKey must both be set to 'default' or neither".asLeft
case (a, s) if isIam(a) && isIam(s) =>
InstanceProfileCredentialsProvider.getInstance().asRight
case (a, s) if isIam(a) && isIam(s) =>
"accessKey and secretKey must both be set to 'iam' or neither".asLeft
case (a, s) if isEnv(a) && isEnv(s) =>
new EnvironmentVariableCredentialsProvider().asRight
case (a, s) if isEnv(a) || isEnv(s) =>
"accessKey and secretKey must both be set to 'env' or neither".asLeft
case _ =>
new AWSStaticCredentialsProvider(
new BasicAWSCredentials(awsConfig.accessKey, awsConfig.secretKey)
).asRight
}).leftMap(new IllegalArgumentException(_))
}

/**
* Creates a new Kinesis client.
* @param provider aws credentials provider
* @param endpoint kinesis endpoint where the stream resides
* @param region aws region where the stream resides
* @return the initialized AmazonKinesisClient
*/
private def createKinesisClient(
provider: AWSCredentialsProvider,
endpoint: String,
region: String
): Either[Throwable, AmazonKinesis] =
Either.catchNonFatal(
AmazonKinesisClientBuilder
.standard()
.withCredentials(provider)
.withEndpointConfiguration(new EndpointConfiguration(endpoint, region))
.build()
)

private def sqsBuffer(
sqsBufferName: Option[String],
provider: AWSCredentialsProvider,
region: String
): Either[Throwable, Option[SqsClientAndName]] =
sqsBufferName match {
case Some(name) =>
createSqsClient(provider, region).map(amazonSqs => Some(SqsClientAndName(amazonSqs, name)))
createSqsClient(region).map(amazonSqs => Some(SqsClientAndName(amazonSqs, name)))
case None => None.asRight
}

private def createSqsClient(provider: AWSCredentialsProvider, region: String): Either[Throwable, AmazonSQS] =
private def createSqsClient(region: String): Either[Throwable, AmazonSQS] =
Either.catchNonFatal(
AmazonSQSClientBuilder.standard().withRegion(region).withCredentials(provider).build
AmazonSQSClientBuilder.standard().withRegion(region).build
)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ final case class KinesisSinkConfig(
maxBytes: Int,
region: String,
threadPoolSize: Int,
aws: KinesisSinkConfig.AWSConfig,
backoffPolicy: KinesisSinkConfig.BackoffPolicy,
customEndpoint: Option[String],
sqsGoodBuffer: Option[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,6 @@ object KinesisConfigSpec {
maxBytes = 1000000,
region = "eu-central-1",
threadPoolSize = 10,
aws = KinesisSinkConfig.AWSConfig(
accessKey = "iam",
secretKey = "iam"
),
backoffPolicy = KinesisSinkConfig.BackoffPolicy(
minBackoff = 500,
maxBackoff = 1500,
Expand Down
Loading

0 comments on commit 2d3d87e

Please sign in to comment.