diff --git a/build.sbt b/build.sbt index 177c5ad7b..0d9594eac 100644 --- a/build.sbt +++ b/build.sbt @@ -147,6 +147,7 @@ lazy val http4s = project Dependencies.Libraries.emitterHttps, Dependencies.Libraries.specs2, Dependencies.Libraries.specs2CE, + Dependencies.Libraries.ceTestkit, //Integration tests Dependencies.Libraries.IT.testcontainers, diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala index 5bbce5762..23b614458 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala @@ -21,7 +21,7 @@ abstract class App[SinkConfig <: Config.Sink: Decoder](appInfo: AppInfo) def mkSinks(config: Config.Streams[SinkConfig]): Resource[IO, Sinks[IO]] - def telemetryInfo(config: Config[SinkConfig]): Telemetry.TelemetryInfo + def telemetryInfo(config: Config.Streams[SinkConfig]): IO[Telemetry.TelemetryInfo] final def main: Opts[IO[ExitCode]] = Run.fromCli[IO, SinkConfig](appInfo, mkSinks, telemetryInfo) } diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala index 25297d818..20bed625b 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala @@ -27,14 +27,14 @@ object Run { type MkSinks[F[_], SinkConfig] = Config.Streams[SinkConfig] => Resource[F, Sinks[F]] - type TelemetryInfo[SinkConfig] = Config[SinkConfig] => Telemetry.TelemetryInfo + type TelemetryInfo[F[_], SinkConfig] = Config.Streams[SinkConfig] => F[Telemetry.TelemetryInfo] implicit private def logger[F[_]: Sync] = Slf4jLogger.getLogger[F] def fromCli[F[_]: Async: Tracking, SinkConfig: Decoder]( appInfo: AppInfo, mkSinks: MkSinks[F, SinkConfig], - telemetryInfo: TelemetryInfo[SinkConfig] + telemetryInfo: TelemetryInfo[F, SinkConfig] ): Opts[F[ExitCode]] = { val configPath = Opts.option[Path]("config", "Path to HOCON configuration (optional)", "c", "config.hocon").orNone configPath.map(fromPath[F, SinkConfig](appInfo, mkSinks, telemetryInfo, _)) @@ -43,7 +43,7 @@ object Run { private def fromPath[F[_]: Async: Tracking, SinkConfig: Decoder]( appInfo: AppInfo, mkSinks: MkSinks[F, SinkConfig], - telemetryInfo: TelemetryInfo[SinkConfig], + telemetryInfo: TelemetryInfo[F, SinkConfig], path: Option[Path] ): F[ExitCode] = { val eitherT = for { @@ -60,7 +60,7 @@ object Run { private def fromConfig[F[_]: Async: Tracking, SinkConfig]( appInfo: AppInfo, mkSinks: MkSinks[F, SinkConfig], - telemetryInfo: TelemetryInfo[SinkConfig], + telemetryInfo: TelemetryInfo[F, SinkConfig], config: Config[SinkConfig] ): F[ExitCode] = { val resources = for { @@ -81,14 +81,9 @@ object Run { } yield httpClient resources.use { httpClient => + val appId = java.util.UUID.randomUUID.toString Telemetry - .run( - config.telemetry, - httpClient, - appInfo, - telemetryInfo(config).region, - telemetryInfo(config).cloud - ) + .run(config.telemetry, httpClient, appInfo, appId, telemetryInfo(config.streams)) .compile .drain .flatMap(_ => Async[F].never[ExitCode]) diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Telemetry.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Telemetry.scala index 95df9bebc..e4f7543e2 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Telemetry.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Telemetry.scala @@ -3,6 +3,8 @@ package com.snowplowanalytics.snowplow.collector.core import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger +import org.apache.commons.codec.digest.DigestUtils + import cats.data.NonEmptyList import cats.implicits._ @@ -32,25 +34,20 @@ object Telemetry { telemetryConfig: Config.Telemetry, httpClient: HttpClient[F], appInfo: AppInfo, - region: Option[String], - cloud: Option[String] + appId: String, + telemetryInfoF: F[TelemetryInfo] ): Stream[F, Unit] = if (telemetryConfig.disable) Stream.empty.covary[F] - else { - val sdj = makeHeartbeatEvent( - telemetryConfig, - region, - cloud, - appInfo.moduleName, - appInfo.version - ) - Stream.resource(initTracker(telemetryConfig, appInfo.moduleName, httpClient)).flatMap { tracker => - Stream.fixedDelay[F](telemetryConfig.interval).evalMap { _ => + else + for { + telemetryInfo <- Stream.eval(telemetryInfoF) + sdj = makeHeartbeatEvent(telemetryConfig, appInfo, appId, telemetryInfo) + tracker <- Stream.resource(initTracker(telemetryConfig, appInfo.moduleName, httpClient)) + _ <- Stream.fixedDelay[F](telemetryConfig.interval).evalMap { _ => tracker.trackSelfDescribingEvent(unstructEvent = sdj) >> tracker.flushEmitters() } - } - } + } yield () private def initTracker[F[_]: Async: Tracking]( config: Config.Telemetry, @@ -90,29 +87,39 @@ object Telemetry { private def makeHeartbeatEvent( teleCfg: Config.Telemetry, - region: Option[String], - cloud: Option[String], - appName: String, - appVersion: String + appInfo: AppInfo, + appId: String, + telemetryInfo: TelemetryInfo ): SelfDescribingData[Json] = SelfDescribingData( - SchemaKey("com.snowplowanalytics.oss", "oss_context", "jsonschema", SchemaVer.Full(1, 0, 1)), + SchemaKey("com.snowplowanalytics.oss", "oss_context", "jsonschema", SchemaVer.Full(1, 0, 2)), Json.obj( "userProvidedId" -> teleCfg.userProvidedId.asJson, "autoGeneratedId" -> teleCfg.autoGeneratedId.asJson, "moduleName" -> teleCfg.moduleName.asJson, "moduleVersion" -> teleCfg.moduleVersion.asJson, "instanceId" -> teleCfg.instanceId.asJson, - "appGeneratedId" -> java.util.UUID.randomUUID.toString.asJson, - "cloud" -> cloud.asJson, - "region" -> region.asJson, - "applicationName" -> appName.asJson, - "applicationVersion" -> appVersion.asJson + "appGeneratedId" -> appId.asJson, + "cloud" -> telemetryInfo.cloud.asJson, + "region" -> telemetryInfo.region.asJson, + "pipelineId" -> telemetryInfo.hashedPipelineId.asJson, + "applicationName" -> appInfo.moduleName.asJson, + "applicationVersion" -> appInfo.version.asJson ) ) + /** + * Stores destination specific telemetry data + * @param region Cloud region application is deployed + * @param cloud Cloud application is deployed + * @param unhashedPipelineId Unhashed version of id that is used identify pipeline. + * It should be something unique to that pipeline such as account id, project id etc. + */ case class TelemetryInfo( region: Option[String], - cloud: Option[String] - ) + cloud: Option[String], + unhashedPipelineId: Option[String] + ) { + def hashedPipelineId: Option[String] = unhashedPipelineId.map(DigestUtils.sha256Hex) + } } diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TelemetrySpec.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TelemetrySpec.scala new file mode 100644 index 000000000..ede0f0480 --- /dev/null +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TelemetrySpec.scala @@ -0,0 +1,126 @@ +package com.snowplowanalytics.snowplow.collector.core + +import scala.concurrent.duration._ +import scala.collection.mutable.ListBuffer + +import org.apache.commons.codec.binary.Base64 +import org.apache.commons.codec.digest.DigestUtils + +import java.nio.charset.StandardCharsets + +import cats.effect._ +import cats.effect.unsafe.implicits.global +import cats.effect.testkit.TestControl + +import org.http4s._ +import org.http4s.client.{Client => HttpClient} + +import io.circe._ +import io.circe.parser._ +import io.circe.syntax._ + +import fs2.Stream + +import com.snowplowanalytics.snowplow.scalatracker.emitters.http4s.ceTracking + +import org.specs2.mutable.Specification + +class TelemetrySpec extends Specification { + + case class ProbeTelemetry( + telemetryStream: Stream[IO, Unit], + telemetryEvents: ListBuffer[Json] + ) + + val appId = "testAppId" + val region = Some("testRegion") + val cloud = Some("testCloud") + val unhashedPipelineId = Some("testPipelineId") + val interval = 5.minutes + val telemetryConfig = Config.Telemetry( + disable = false, + interval = interval, + method = "POST", + url = "127.0.0.1", + port = 443, + secure = true, + userProvidedId = None, + moduleName = None, + moduleVersion = None, + instanceId = None, + autoGeneratedId = None + ) + + def probeTelemetry(telemetryConfig: Config.Telemetry): ProbeTelemetry = { + val telemetryEvents = ListBuffer[Json]() + val mockHttpApp = HttpRoutes + .of[IO] { + case req => + IO { + telemetryEvents += extractTelemetryEvent(req) + Response[IO](status = Status.Ok) + } + } + .orNotFound + val mockClient = HttpClient.fromHttpApp[IO](mockHttpApp) + val telemetryInfoF = IO(Telemetry.TelemetryInfo(region, cloud, unhashedPipelineId)) + val telemetryStream = Telemetry.run[IO]( + telemetryConfig, + mockClient, + TestUtils.appInfo, + appId, + telemetryInfoF + ) + ProbeTelemetry(telemetryStream, telemetryEvents) + } + + def extractTelemetryEvent(req: Request[IO]): Json = { + val body = req.bodyText.compile.string.unsafeRunSync() + val jsonBody = parse(body).toOption.get + val uepxEncoded = jsonBody.hcursor.downField("data").downN(0).downField("ue_px").as[String].toOption.get + val uePxDecoded = new String(Base64.decodeBase64(uepxEncoded), StandardCharsets.UTF_8) + parse(uePxDecoded).toOption.get.hcursor.downField("data").as[Json].toOption.get + } + + def expectedEvent(config: Config.Telemetry): Json = { + val pipelineId = unhashedPipelineId.map(DigestUtils.sha256Hex) + Json.obj( + "schema" -> "iglu:com.snowplowanalytics.oss/oss_context/jsonschema/1-0-2".asJson, + "data" -> Json.obj( + "userProvidedId" -> config.userProvidedId.asJson, + "autoGeneratedId" -> config.autoGeneratedId.asJson, + "moduleName" -> config.moduleName.asJson, + "moduleVersion" -> config.moduleVersion.asJson, + "instanceId" -> config.instanceId.asJson, + "appGeneratedId" -> appId.asJson, + "cloud" -> cloud.asJson, + "region" -> region.asJson, + "pipelineId" -> pipelineId.asJson, + "applicationName" -> TestUtils.appInfo.name.asJson, + "applicationVersion" -> TestUtils.appInfo.version.asJson + ) + ) + } + + "Telemetry" should { + "send correct number of events with expected hostnames" in { + val eventCount = 10 + val timeout = (interval * eventCount.toLong) + 1.minutes + val probe = probeTelemetry(telemetryConfig) + TestControl.executeEmbed(probe.telemetryStream.timeout(timeout).compile.drain.voidError).unsafeRunSync() + val events = probe.telemetryEvents + val expected = (1 to eventCount).map(_ => expectedEvent(telemetryConfig)).toList + events must beEqualTo(expected) + } + + "not send any events if telemetry is disabled" in { + val probe = probeTelemetry(telemetryConfig.copy(disable = true)) + TestControl + .executeEmbed( + probe.telemetryStream.timeout(interval * 10).compile.drain.voidError + ) + .unsafeRunSync() + probe.telemetryEvents must beEmpty + } + } +} diff --git a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala index 6447b3209..bd20c16e8 100644 --- a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala +++ b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala @@ -37,6 +37,10 @@ object KafkaCollector extends App[KafkaSinkConfig](BuildInfo) { ) } yield Sinks(good, bad) - override def telemetryInfo(config: Config[KafkaSinkConfig]): Telemetry.TelemetryInfo = - Telemetry.TelemetryInfo(None, None) + override def telemetryInfo(config: Config.Streams[KafkaSinkConfig]): IO[Telemetry.TelemetryInfo] = + TelemetryUtils.getAzureSubscriptionId.map { + case None => Telemetry.TelemetryInfo(None, None, None) + case Some(id) => Telemetry.TelemetryInfo(None, Some("Azure"), Some(id)) + } + } diff --git a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TelemetryUtils.scala b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TelemetryUtils.scala new file mode 100644 index 000000000..253f519ef --- /dev/null +++ b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TelemetryUtils.scala @@ -0,0 +1,29 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream + +import cats.effect.IO +import org.http4s._ +import org.http4s.blaze.client.BlazeClientBuilder +import org.typelevel.ci._ +import io.circe.parser + +object TelemetryUtils { + + def getAzureSubscriptionId: IO[Option[String]] = { + val response = for { + client <- BlazeClientBuilder[IO].resource + request = Request[IO]( + method = Method.GET, + uri = Uri.unsafeFromString("http://169.254.169.254/metadata/instance?api-version=2021-02-01"), + headers = Headers(Header.Raw(ci"Metadata", "true")) + ) + response <- client.run(request) + } yield response + response.use(_.bodyText.compile.string.map(extractId)).handleError(_ => None) + } + + private def extractId(metadata: String): Option[String] = + for { + json <- parser.parse(metadata).toOption + id <- json.hcursor.downField("compute").downField("subscriptionId").as[String].toOption + } yield id +} diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala index a79c970f6..83830f8cc 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala @@ -15,9 +15,11 @@ package com.snowplowanalytics.snowplow.collectors.scalastream import cats.effect.{IO, Resource} + import com.snowplowanalytics.snowplow.collector.core.model.Sinks import com.snowplowanalytics.snowplow.collector.core.{App, Config, Telemetry} import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.{KinesisSink, KinesisSinkConfig} + import org.slf4j.LoggerFactory import java.util.concurrent.ScheduledThreadPoolExecutor @@ -48,11 +50,16 @@ object KinesisCollector extends App[KinesisSinkConfig](BuildInfo) { } yield Sinks(good, bad) } - override def telemetryInfo(config: Config[KinesisSinkConfig]): Telemetry.TelemetryInfo = - Telemetry.TelemetryInfo( - region = Some(config.streams.sink.region), - cloud = Some("AWS") - ) + override def telemetryInfo(config: Config.Streams[KinesisSinkConfig]): IO[Telemetry.TelemetryInfo] = + TelemetryUtils + .getAccountId(config) + .map(id => + Telemetry.TelemetryInfo( + region = Some(config.sink.region), + cloud = Some("AWS"), + unhashedPipelineId = id + ) + ) def buildExecutorService(kc: KinesisSinkConfig): ScheduledThreadPoolExecutor = { log.info("Creating thread pool of size " + kc.threadPoolSize) diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TelemetryUtils.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TelemetryUtils.scala new file mode 100644 index 000000000..f303d8cb0 --- /dev/null +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TelemetryUtils.scala @@ -0,0 +1,26 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream + +import cats.effect.{IO, Resource} + +import com.snowplowanalytics.snowplow.collector.core.Config +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.{KinesisSink, KinesisSinkConfig} + +object TelemetryUtils { + + def getAccountId(config: Config.Streams[KinesisSinkConfig]): IO[Option[String]] = + Resource + .make( + IO(KinesisSink.createKinesisClient(config.sink.endpoint, config.sink.region)).rethrow + )(c => IO(c.shutdown())) + .use { kinesis => + IO { + val streamArn = KinesisSink.describeStream(kinesis, config.good).getStreamARN + Some(extractAccountId(streamArn)) + } + } + .handleError(_ => None) + + def extractAccountId(kinesisStreamArn: String): String = + kinesisStreamArn.split(":")(4) + +} diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala index f3d147cd5..772f9281e 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala @@ -16,7 +16,6 @@ package sinks import cats.effect.{Resource, Sync} import cats.implicits.catsSyntaxMonadErrorRethrow import cats.syntax.either._ -import com.amazonaws.auth._ import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration import com.amazonaws.services.kinesis.model._ import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClientBuilder} @@ -356,10 +355,8 @@ class KinesisSink[F[_]: Sync] private ( override def run() { while (!kinesisHealthy) { Try { - val describeRequest = new DescribeStreamSummaryRequest() - describeRequest.setStreamName(streamName) - val describeResult = client.describeStreamSummary(describeRequest) - describeResult.getStreamDescriptionSummary().getStreamStatus() + val streamDescription = describeStream(client, streamName) + streamDescription.getStreamStatus() } match { case Success("ACTIVE") => log.info(s"Stream $streamName ACTIVE") @@ -443,6 +440,31 @@ object KinesisSink { Resource.make(acquire)(release) } + /** + * Creates a new Kinesis client. + * @param provider aws credentials provider + * @param endpoint kinesis endpoint where the stream resides + * @param region aws region where the stream resides + * @return the initialized AmazonKinesisClient + */ + def createKinesisClient( + endpoint: String, + region: String + ): Either[Throwable, AmazonKinesis] = + Either.catchNonFatal( + AmazonKinesisClientBuilder + .standard() + .withEndpointConfiguration(new EndpointConfiguration(endpoint, region)) + .build() + ) + + def describeStream(client: AmazonKinesis, streamName: String) = { + val describeRequest = new DescribeStreamSummaryRequest() + describeRequest.setStreamName(streamName) + val describeResult = client.describeStreamSummary(describeRequest) + describeResult.getStreamDescriptionSummary() + } + /** * Create a KinesisSink and schedule a task to flush its EventStorage. * Exists so that no threads can get a reference to the KinesisSink @@ -457,9 +479,8 @@ object KinesisSink { executorService: ScheduledExecutorService ): Either[Throwable, KinesisSink[F]] = { val clients = for { - provider <- getProvider(kinesisConfig.aws) - kinesisClient <- createKinesisClient(provider, kinesisConfig.endpoint, kinesisConfig.region) - sqsClientAndName <- sqsBuffer(sqsBufferName, provider, kinesisConfig.region) + kinesisClient <- createKinesisClient(kinesisConfig.endpoint, kinesisConfig.region) + sqsClientAndName <- sqsBuffer(sqsBufferName, kinesisConfig.region) } yield (kinesisClient, sqsClientAndName) clients.map { @@ -483,66 +504,19 @@ object KinesisSink { } } - /** Create an aws credentials provider through env variables and iam. */ - private def getProvider(awsConfig: KinesisSinkConfig.AWSConfig): Either[Throwable, AWSCredentialsProvider] = { - def isDefault(key: String): Boolean = key == "default" - def isIam(key: String): Boolean = key == "iam" - def isEnv(key: String): Boolean = key == "env" - - ((awsConfig.accessKey, awsConfig.secretKey) match { - case (a, s) if isDefault(a) && isDefault(s) => - new DefaultAWSCredentialsProviderChain().asRight - case (a, s) if isDefault(a) || isDefault(s) => - "accessKey and secretKey must both be set to 'default' or neither".asLeft - case (a, s) if isIam(a) && isIam(s) => - InstanceProfileCredentialsProvider.getInstance().asRight - case (a, s) if isIam(a) && isIam(s) => - "accessKey and secretKey must both be set to 'iam' or neither".asLeft - case (a, s) if isEnv(a) && isEnv(s) => - new EnvironmentVariableCredentialsProvider().asRight - case (a, s) if isEnv(a) || isEnv(s) => - "accessKey and secretKey must both be set to 'env' or neither".asLeft - case _ => - new AWSStaticCredentialsProvider( - new BasicAWSCredentials(awsConfig.accessKey, awsConfig.secretKey) - ).asRight - }).leftMap(new IllegalArgumentException(_)) - } - - /** - * Creates a new Kinesis client. - * @param provider aws credentials provider - * @param endpoint kinesis endpoint where the stream resides - * @param region aws region where the stream resides - * @return the initialized AmazonKinesisClient - */ - private def createKinesisClient( - provider: AWSCredentialsProvider, - endpoint: String, - region: String - ): Either[Throwable, AmazonKinesis] = - Either.catchNonFatal( - AmazonKinesisClientBuilder - .standard() - .withCredentials(provider) - .withEndpointConfiguration(new EndpointConfiguration(endpoint, region)) - .build() - ) - private def sqsBuffer( sqsBufferName: Option[String], - provider: AWSCredentialsProvider, region: String ): Either[Throwable, Option[SqsClientAndName]] = sqsBufferName match { case Some(name) => - createSqsClient(provider, region).map(amazonSqs => Some(SqsClientAndName(amazonSqs, name))) + createSqsClient(region).map(amazonSqs => Some(SqsClientAndName(amazonSqs, name))) case None => None.asRight } - private def createSqsClient(provider: AWSCredentialsProvider, region: String): Either[Throwable, AmazonSQS] = + private def createSqsClient(region: String): Either[Throwable, AmazonSQS] = Either.catchNonFatal( - AmazonSQSClientBuilder.standard().withRegion(region).withCredentials(provider).build + AmazonSQSClientBuilder.standard().withRegion(region).build ) /** diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSinkConfig.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSinkConfig.scala index 9942b0768..8826c6b4b 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSinkConfig.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSinkConfig.scala @@ -11,7 +11,6 @@ final case class KinesisSinkConfig( maxBytes: Int, region: String, threadPoolSize: Int, - aws: KinesisSinkConfig.AWSConfig, backoffPolicy: KinesisSinkConfig.BackoffPolicy, customEndpoint: Option[String], sqsGoodBuffer: Option[String], diff --git a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TelemetryUtilsSpec.scala b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TelemetryUtilsSpec.scala new file mode 100644 index 000000000..3cc62ec3e --- /dev/null +++ b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TelemetryUtilsSpec.scala @@ -0,0 +1,13 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream + +import org.specs2.mutable.Specification + +class TelemetryUtilsSpec extends Specification { + + "extractAccountId" should { + "be able to extract account id from kinesis stream arn successfully" in { + val streamArn = "arn:aws:kinesis:region:123456789:stream/name" + TelemetryUtils.extractAccountId(streamArn) must beEqualTo("123456789") + } + } +} diff --git a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala index 31d6c77cb..bf97c260b 100644 --- a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala +++ b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala @@ -123,10 +123,6 @@ object KinesisConfigSpec { maxBytes = 1000000, region = "eu-central-1", threadPoolSize = 10, - aws = KinesisSinkConfig.AWSConfig( - accessKey = "iam", - secretKey = "iam" - ), backoffPolicy = KinesisSinkConfig.BackoffPolicy( minBackoff = 500, maxBackoff = 1500, diff --git a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqCollector.scala b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqCollector.scala index 365f54c0b..b6fb40109 100644 --- a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqCollector.scala +++ b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqCollector.scala @@ -34,6 +34,6 @@ object NsqCollector extends App[NsqSinkConfig](BuildInfo) { ) } yield Sinks(good, bad) - override def telemetryInfo(config: Config[NsqSinkConfig]): Telemetry.TelemetryInfo = - Telemetry.TelemetryInfo(None, None) + override def telemetryInfo(config: Config.Streams[NsqSinkConfig]): IO[Telemetry.TelemetryInfo] = + IO(Telemetry.TelemetryInfo(None, None, None)) } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index fd7df112e..b03e73514 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -62,6 +62,7 @@ object Dependencies { val specs2 = "4.11.0" val specs2CE = "1.5.0" val testcontainers = "0.40.10" + val ceTestkit = "3.4.5" object Legacy { val specs2CE = "0.4.1" @@ -120,8 +121,9 @@ object Dependencies { // Scala (test only) // Test common - val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test - val specs2CE = "org.typelevel" %% "cats-effect-testing-specs2" % V.specs2CE % Test + val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test + val specs2CE = "org.typelevel" %% "cats-effect-testing-specs2" % V.specs2CE % Test + val ceTestkit = "org.typelevel" %% "cats-effect-testkit" % V.ceTestkit % Test // Test Akka val akkaTestkit = "com.typesafe.akka" %% "akka-testkit" % V.akka % Test diff --git a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PubSubCollector.scala b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PubSubCollector.scala index cc71cf6ee..85f84de52 100644 --- a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PubSubCollector.scala +++ b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PubSubCollector.scala @@ -14,9 +14,12 @@ object PubSubCollector extends App[PubSubSinkConfig](BuildInfo) { bad <- PubSubSink.create[IO](config.sink.maxBytes, config.sink, config.buffer, config.bad) } yield Sinks(good, bad) - override def telemetryInfo(config: Config[PubSubSinkConfig]): Telemetry.TelemetryInfo = - Telemetry.TelemetryInfo( - region = None, - cloud = Some("GCP") + override def telemetryInfo(config: Config.Streams[PubSubSinkConfig]): IO[Telemetry.TelemetryInfo] = + IO( + Telemetry.TelemetryInfo( + region = None, + cloud = Some("GCP"), + unhashedPipelineId = Some(config.sink.googleProjectId) + ) ) } diff --git a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala index aaa2b7159..b395c8822 100644 --- a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala +++ b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala @@ -42,9 +42,14 @@ object SqsCollector extends App[SqsSinkConfig](BuildInfo) { } yield Sinks(good, bad) } - override def telemetryInfo(config: Config[SqsSinkConfig]): Telemetry.TelemetryInfo = - Telemetry.TelemetryInfo( - region = Some(config.streams.sink.region), - cloud = Some("AWS") - ) + override def telemetryInfo(config: Config.Streams[SqsSinkConfig]): IO[Telemetry.TelemetryInfo] = + TelemetryUtils + .getAccountId(config) + .map(id => + Telemetry.TelemetryInfo( + region = Some(config.sink.region), + cloud = Some("AWS"), + unhashedPipelineId = id + ) + ) } diff --git a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TelemetryUtils.scala b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TelemetryUtils.scala new file mode 100644 index 000000000..7aa013c77 --- /dev/null +++ b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TelemetryUtils.scala @@ -0,0 +1,25 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream + +import cats.effect.{IO, Resource} +import com.snowplowanalytics.snowplow.collector.core.Config +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks._ + +object TelemetryUtils { + + def getAccountId(config: Config.Streams[SqsSinkConfig]): IO[Option[String]] = + Resource + .make( + IO(SqsSink.createSqsClient(config.sink.region)).rethrow + )(c => IO(c.shutdown())) + .use { client => + IO { + val sqsQueueUrl = client.getQueueUrl(config.good).getQueueUrl + Some(extractAccountId(sqsQueueUrl)) + } + } + .handleError(_ => None) + + def extractAccountId(sqsQueueUrl: String): String = + sqsQueueUrl.split("/")(3) + +} diff --git a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala index 6f40e5eb2..af5708b1a 100644 --- a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala +++ b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala @@ -298,6 +298,11 @@ object SqsSink { Resource.make(acquire)(release) } + def createSqsClient(region: String): Either[Throwable, AmazonSQS] = + Either.catchNonFatal( + AmazonSQSClientBuilder.standard().withRegion(region).build + ) + /** * Create an SqsSink and schedule a task to flush its EventStorage. * Exists so that no threads can get a reference to the SqsSink @@ -309,16 +314,11 @@ object SqsSink { bufferConfig: Config.Buffer, queueName: String, executorService: ScheduledExecutorService - ): Either[Throwable, SqsSink[F]] = { - val client = Either.catchNonFatal( - AmazonSQSClientBuilder.standard().withRegion(sqsConfig.region).build - ) - - client.map { c => + ): Either[Throwable, SqsSink[F]] = + createSqsClient(sqsConfig.region).map { c => val sqsSink = new SqsSink(maxBytes, c, sqsConfig, bufferConfig, queueName, executorService) sqsSink.EventStorage.scheduleFlush() sqsSink.checkSqsHealth() sqsSink } - } } diff --git a/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TelemetryUtilsSpec.scala b/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TelemetryUtilsSpec.scala new file mode 100644 index 000000000..7c8183f63 --- /dev/null +++ b/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TelemetryUtilsSpec.scala @@ -0,0 +1,13 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream + +import org.specs2.mutable.Specification + +class TelemetryUtilsSpec extends Specification { + + "extractAccountId" should { + "be able to extract account id from sqs queue url successfully" in { + val queueUrl = "https://sqs.region.amazonaws.com/123456789/queue" + TelemetryUtils.extractAccountId(queueUrl) must beEqualTo("123456789") + } + } +} diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala index ac8070eb4..c307c5bc3 100644 --- a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala @@ -13,6 +13,6 @@ object StdoutCollector extends App[SinkConfig](BuildInfo) { Resource.pure(Sinks(good, bad)) } - override def telemetryInfo(config: Config[SinkConfig]): Telemetry.TelemetryInfo = - Telemetry.TelemetryInfo(None, None) + override def telemetryInfo(config: Config.Streams[SinkConfig]): IO[Telemetry.TelemetryInfo] = + IO(Telemetry.TelemetryInfo(None, None, None)) }