From 2d3d87e1edf3e2e1d3f51844423dd3b8d4930778 Mon Sep 17 00:00:00 2001 From: spenes Date: Wed, 4 Oct 2023 15:29:23 +0300 Subject: [PATCH] Pipeline id telemetry --- .../App.scala | 2 +- .../Run.scala | 16 ++-- .../Telemetry.scala | 48 +++++----- .../KafkaCollector.scala | 31 ++++++- .../KinesisCollector.scala | 29 +++++- .../sinks/KinesisSink.scala | 90 +++++++------------ .../sinks/KinesisSinkConfig.scala | 1 - .../sinks/KinesisConfigSpec.scala | 4 - .../NsqCollector.scala | 4 +- .../PubSubCollector.scala | 11 ++- .../SqsCollector.scala | 27 +++++- .../sinks/SqsSink.scala | 14 +-- .../StdoutCollector.scala | 4 +- 13 files changed, 156 insertions(+), 125 deletions(-) diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala index 5bbce5762..23b614458 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala @@ -21,7 +21,7 @@ abstract class App[SinkConfig <: Config.Sink: Decoder](appInfo: AppInfo) def mkSinks(config: Config.Streams[SinkConfig]): Resource[IO, Sinks[IO]] - def telemetryInfo(config: Config[SinkConfig]): Telemetry.TelemetryInfo + def telemetryInfo(config: Config.Streams[SinkConfig]): IO[Telemetry.TelemetryInfo] final def main: Opts[IO[ExitCode]] = Run.fromCli[IO, SinkConfig](appInfo, mkSinks, telemetryInfo) } diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala index 25297d818..30eea531c 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala @@ -27,14 +27,14 @@ object Run { type MkSinks[F[_], SinkConfig] = Config.Streams[SinkConfig] => Resource[F, Sinks[F]] - type TelemetryInfo[SinkConfig] = Config[SinkConfig] => Telemetry.TelemetryInfo + type TelemetryInfo[F[_], SinkConfig] = Config.Streams[SinkConfig] => F[Telemetry.TelemetryInfo] implicit private def logger[F[_]: Sync] = Slf4jLogger.getLogger[F] def fromCli[F[_]: Async: Tracking, SinkConfig: Decoder]( appInfo: AppInfo, mkSinks: MkSinks[F, SinkConfig], - telemetryInfo: TelemetryInfo[SinkConfig] + telemetryInfo: TelemetryInfo[F, SinkConfig] ): Opts[F[ExitCode]] = { val configPath = Opts.option[Path]("config", "Path to HOCON configuration (optional)", "c", "config.hocon").orNone configPath.map(fromPath[F, SinkConfig](appInfo, mkSinks, telemetryInfo, _)) @@ -43,7 +43,7 @@ object Run { private def fromPath[F[_]: Async: Tracking, SinkConfig: Decoder]( appInfo: AppInfo, mkSinks: MkSinks[F, SinkConfig], - telemetryInfo: TelemetryInfo[SinkConfig], + telemetryInfo: TelemetryInfo[F, SinkConfig], path: Option[Path] ): F[ExitCode] = { val eitherT = for { @@ -60,7 +60,7 @@ object Run { private def fromConfig[F[_]: Async: Tracking, SinkConfig]( appInfo: AppInfo, mkSinks: MkSinks[F, SinkConfig], - telemetryInfo: TelemetryInfo[SinkConfig], + telemetryInfo: TelemetryInfo[F, SinkConfig], config: Config[SinkConfig] ): F[ExitCode] = { val resources = for { @@ -82,13 +82,7 @@ object Run { resources.use { httpClient => Telemetry - .run( - config.telemetry, - httpClient, - appInfo, - telemetryInfo(config).region, - telemetryInfo(config).cloud - ) + .run(config.telemetry, httpClient, appInfo, telemetryInfo(config.streams)) .compile .drain .flatMap(_ => Async[F].never[ExitCode]) diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Telemetry.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Telemetry.scala index 95df9bebc..35dd26dc5 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Telemetry.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Telemetry.scala @@ -3,6 +3,8 @@ package com.snowplowanalytics.snowplow.collector.core import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger +import org.apache.commons.codec.digest.DigestUtils + import cats.data.NonEmptyList import cats.implicits._ @@ -32,25 +34,19 @@ object Telemetry { telemetryConfig: Config.Telemetry, httpClient: HttpClient[F], appInfo: AppInfo, - region: Option[String], - cloud: Option[String] + telemetryInfoF: F[TelemetryInfo] ): Stream[F, Unit] = if (telemetryConfig.disable) Stream.empty.covary[F] - else { - val sdj = makeHeartbeatEvent( - telemetryConfig, - region, - cloud, - appInfo.moduleName, - appInfo.version - ) - Stream.resource(initTracker(telemetryConfig, appInfo.moduleName, httpClient)).flatMap { tracker => - Stream.fixedDelay[F](telemetryConfig.interval).evalMap { _ => + else + for { + telemetryInfo <- Stream.eval(telemetryInfoF) + sdj = makeHeartbeatEvent(telemetryConfig, appInfo, telemetryInfo) + tracker <- Stream.resource(initTracker(telemetryConfig, appInfo.moduleName, httpClient)) + _ <- Stream.fixedDelay[F](telemetryConfig.interval).evalMap { _ => tracker.trackSelfDescribingEvent(unstructEvent = sdj) >> tracker.flushEmitters() } - } - } + } yield () private def initTracker[F[_]: Async: Tracking]( config: Config.Telemetry, @@ -90,13 +86,11 @@ object Telemetry { private def makeHeartbeatEvent( teleCfg: Config.Telemetry, - region: Option[String], - cloud: Option[String], - appName: String, - appVersion: String + appInfo: AppInfo, + telemetryInfo: TelemetryInfo ): SelfDescribingData[Json] = SelfDescribingData( - SchemaKey("com.snowplowanalytics.oss", "oss_context", "jsonschema", SchemaVer.Full(1, 0, 1)), + SchemaKey("com.snowplowanalytics.oss", "oss_context", "jsonschema", SchemaVer.Full(1, 0, 2)), Json.obj( "userProvidedId" -> teleCfg.userProvidedId.asJson, "autoGeneratedId" -> teleCfg.autoGeneratedId.asJson, @@ -104,15 +98,19 @@ object Telemetry { "moduleVersion" -> teleCfg.moduleVersion.asJson, "instanceId" -> teleCfg.instanceId.asJson, "appGeneratedId" -> java.util.UUID.randomUUID.toString.asJson, - "cloud" -> cloud.asJson, - "region" -> region.asJson, - "applicationName" -> appName.asJson, - "applicationVersion" -> appVersion.asJson + "cloud" -> telemetryInfo.cloud.asJson, + "region" -> telemetryInfo.region.asJson, + "pipelineId" -> telemetryInfo.hashedPipelineId.asJson, + "applicationName" -> appInfo.moduleName.asJson, + "applicationVersion" -> appInfo.version.asJson ) ) case class TelemetryInfo( region: Option[String], - cloud: Option[String] - ) + cloud: Option[String], + unhashedPipelineId: Option[String] + ) { + def hashedPipelineId: Option[String] = unhashedPipelineId.map(DigestUtils.sha256Hex) + } } diff --git a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala index 6447b3209..ff7e7df67 100644 --- a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala +++ b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala @@ -15,6 +15,10 @@ package com.snowplowanalytics.snowplow.collectors.scalastream import cats.effect.{IO, Resource} +import org.http4s._ +import org.http4s.blaze.client.BlazeClientBuilder +import org.typelevel.ci._ +import io.circe.parser import com.snowplowanalytics.snowplow.collector.core.model.Sinks import com.snowplowanalytics.snowplow.collector.core.{App, Config, Telemetry} import com.snowplowanalytics.snowplow.collectors.scalastream.sinks._ @@ -37,6 +41,29 @@ object KafkaCollector extends App[KafkaSinkConfig](BuildInfo) { ) } yield Sinks(good, bad) - override def telemetryInfo(config: Config[KafkaSinkConfig]): Telemetry.TelemetryInfo = - Telemetry.TelemetryInfo(None, None) + override def telemetryInfo(config: Config.Streams[KafkaSinkConfig]): IO[Telemetry.TelemetryInfo] = + getAzureSubscriptionId.map { + case None => Telemetry.TelemetryInfo(None, None, None) + case Some(id) => Telemetry.TelemetryInfo(None, Some("Azure"), Some(id)) + } + + def getAzureSubscriptionId: IO[Option[String]] = { + val response = for { + client <- BlazeClientBuilder[IO].resource + request = Request[IO]( + method = Method.GET, + uri = Uri.unsafeFromString("http://169.254.169.254/metadata/instance?api-version=2021-02-01"), + headers = Headers(Header.Raw(ci"Metadata", "true")) + ) + response <- client.run(request) + } yield response + response.use(_.bodyText.compile.string.map(extractId)).handleError(_ => None) + } + + private def extractId(metadata: String): Option[String] = + for { + json <- parser.parse(metadata).toOption + id <- json.hcursor.downField("compute").downField("subscriptionId").as[String].toOption + } yield id + } diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala index a79c970f6..c9faf3f0e 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala @@ -15,9 +15,11 @@ package com.snowplowanalytics.snowplow.collectors.scalastream import cats.effect.{IO, Resource} + import com.snowplowanalytics.snowplow.collector.core.model.Sinks import com.snowplowanalytics.snowplow.collector.core.{App, Config, Telemetry} import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.{KinesisSink, KinesisSinkConfig} + import org.slf4j.LoggerFactory import java.util.concurrent.ScheduledThreadPoolExecutor @@ -48,14 +50,33 @@ object KinesisCollector extends App[KinesisSinkConfig](BuildInfo) { } yield Sinks(good, bad) } - override def telemetryInfo(config: Config[KinesisSinkConfig]): Telemetry.TelemetryInfo = - Telemetry.TelemetryInfo( - region = Some(config.streams.sink.region), - cloud = Some("AWS") + override def telemetryInfo(config: Config.Streams[KinesisSinkConfig]): IO[Telemetry.TelemetryInfo] = + getAccountId(config).map(id => + Telemetry.TelemetryInfo( + region = Some(config.sink.region), + cloud = Some("AWS"), + unhashedPipelineId = id + ) ) def buildExecutorService(kc: KinesisSinkConfig): ScheduledThreadPoolExecutor = { log.info("Creating thread pool of size " + kc.threadPoolSize) new ScheduledThreadPoolExecutor(kc.threadPoolSize) } + + def getAccountId(config: Config.Streams[KinesisSinkConfig]): IO[Option[String]] = + Resource + .make( + IO(KinesisSink.createKinesisClient(config.sink.endpoint, config.sink.region)).rethrow + )(c => IO(c.shutdown())) + .use { kinesis => + IO { + val streamArn = KinesisSink.describeStream(kinesis, config.good).getStreamARN + Some(extractAccountId(streamArn)) + } + } + .handleError(_ => None) + + def extractAccountId(kinesisStreamArn: String): String = + kinesisStreamArn.split(":")(4) } diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala index f3d147cd5..772f9281e 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala @@ -16,7 +16,6 @@ package sinks import cats.effect.{Resource, Sync} import cats.implicits.catsSyntaxMonadErrorRethrow import cats.syntax.either._ -import com.amazonaws.auth._ import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration import com.amazonaws.services.kinesis.model._ import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClientBuilder} @@ -356,10 +355,8 @@ class KinesisSink[F[_]: Sync] private ( override def run() { while (!kinesisHealthy) { Try { - val describeRequest = new DescribeStreamSummaryRequest() - describeRequest.setStreamName(streamName) - val describeResult = client.describeStreamSummary(describeRequest) - describeResult.getStreamDescriptionSummary().getStreamStatus() + val streamDescription = describeStream(client, streamName) + streamDescription.getStreamStatus() } match { case Success("ACTIVE") => log.info(s"Stream $streamName ACTIVE") @@ -443,6 +440,31 @@ object KinesisSink { Resource.make(acquire)(release) } + /** + * Creates a new Kinesis client. + * @param provider aws credentials provider + * @param endpoint kinesis endpoint where the stream resides + * @param region aws region where the stream resides + * @return the initialized AmazonKinesisClient + */ + def createKinesisClient( + endpoint: String, + region: String + ): Either[Throwable, AmazonKinesis] = + Either.catchNonFatal( + AmazonKinesisClientBuilder + .standard() + .withEndpointConfiguration(new EndpointConfiguration(endpoint, region)) + .build() + ) + + def describeStream(client: AmazonKinesis, streamName: String) = { + val describeRequest = new DescribeStreamSummaryRequest() + describeRequest.setStreamName(streamName) + val describeResult = client.describeStreamSummary(describeRequest) + describeResult.getStreamDescriptionSummary() + } + /** * Create a KinesisSink and schedule a task to flush its EventStorage. * Exists so that no threads can get a reference to the KinesisSink @@ -457,9 +479,8 @@ object KinesisSink { executorService: ScheduledExecutorService ): Either[Throwable, KinesisSink[F]] = { val clients = for { - provider <- getProvider(kinesisConfig.aws) - kinesisClient <- createKinesisClient(provider, kinesisConfig.endpoint, kinesisConfig.region) - sqsClientAndName <- sqsBuffer(sqsBufferName, provider, kinesisConfig.region) + kinesisClient <- createKinesisClient(kinesisConfig.endpoint, kinesisConfig.region) + sqsClientAndName <- sqsBuffer(sqsBufferName, kinesisConfig.region) } yield (kinesisClient, sqsClientAndName) clients.map { @@ -483,66 +504,19 @@ object KinesisSink { } } - /** Create an aws credentials provider through env variables and iam. */ - private def getProvider(awsConfig: KinesisSinkConfig.AWSConfig): Either[Throwable, AWSCredentialsProvider] = { - def isDefault(key: String): Boolean = key == "default" - def isIam(key: String): Boolean = key == "iam" - def isEnv(key: String): Boolean = key == "env" - - ((awsConfig.accessKey, awsConfig.secretKey) match { - case (a, s) if isDefault(a) && isDefault(s) => - new DefaultAWSCredentialsProviderChain().asRight - case (a, s) if isDefault(a) || isDefault(s) => - "accessKey and secretKey must both be set to 'default' or neither".asLeft - case (a, s) if isIam(a) && isIam(s) => - InstanceProfileCredentialsProvider.getInstance().asRight - case (a, s) if isIam(a) && isIam(s) => - "accessKey and secretKey must both be set to 'iam' or neither".asLeft - case (a, s) if isEnv(a) && isEnv(s) => - new EnvironmentVariableCredentialsProvider().asRight - case (a, s) if isEnv(a) || isEnv(s) => - "accessKey and secretKey must both be set to 'env' or neither".asLeft - case _ => - new AWSStaticCredentialsProvider( - new BasicAWSCredentials(awsConfig.accessKey, awsConfig.secretKey) - ).asRight - }).leftMap(new IllegalArgumentException(_)) - } - - /** - * Creates a new Kinesis client. - * @param provider aws credentials provider - * @param endpoint kinesis endpoint where the stream resides - * @param region aws region where the stream resides - * @return the initialized AmazonKinesisClient - */ - private def createKinesisClient( - provider: AWSCredentialsProvider, - endpoint: String, - region: String - ): Either[Throwable, AmazonKinesis] = - Either.catchNonFatal( - AmazonKinesisClientBuilder - .standard() - .withCredentials(provider) - .withEndpointConfiguration(new EndpointConfiguration(endpoint, region)) - .build() - ) - private def sqsBuffer( sqsBufferName: Option[String], - provider: AWSCredentialsProvider, region: String ): Either[Throwable, Option[SqsClientAndName]] = sqsBufferName match { case Some(name) => - createSqsClient(provider, region).map(amazonSqs => Some(SqsClientAndName(amazonSqs, name))) + createSqsClient(region).map(amazonSqs => Some(SqsClientAndName(amazonSqs, name))) case None => None.asRight } - private def createSqsClient(provider: AWSCredentialsProvider, region: String): Either[Throwable, AmazonSQS] = + private def createSqsClient(region: String): Either[Throwable, AmazonSQS] = Either.catchNonFatal( - AmazonSQSClientBuilder.standard().withRegion(region).withCredentials(provider).build + AmazonSQSClientBuilder.standard().withRegion(region).build ) /** diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSinkConfig.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSinkConfig.scala index 9942b0768..8826c6b4b 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSinkConfig.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSinkConfig.scala @@ -11,7 +11,6 @@ final case class KinesisSinkConfig( maxBytes: Int, region: String, threadPoolSize: Int, - aws: KinesisSinkConfig.AWSConfig, backoffPolicy: KinesisSinkConfig.BackoffPolicy, customEndpoint: Option[String], sqsGoodBuffer: Option[String], diff --git a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala index 31d6c77cb..bf97c260b 100644 --- a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala +++ b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala @@ -123,10 +123,6 @@ object KinesisConfigSpec { maxBytes = 1000000, region = "eu-central-1", threadPoolSize = 10, - aws = KinesisSinkConfig.AWSConfig( - accessKey = "iam", - secretKey = "iam" - ), backoffPolicy = KinesisSinkConfig.BackoffPolicy( minBackoff = 500, maxBackoff = 1500, diff --git a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqCollector.scala b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqCollector.scala index 365f54c0b..b6fb40109 100644 --- a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqCollector.scala +++ b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqCollector.scala @@ -34,6 +34,6 @@ object NsqCollector extends App[NsqSinkConfig](BuildInfo) { ) } yield Sinks(good, bad) - override def telemetryInfo(config: Config[NsqSinkConfig]): Telemetry.TelemetryInfo = - Telemetry.TelemetryInfo(None, None) + override def telemetryInfo(config: Config.Streams[NsqSinkConfig]): IO[Telemetry.TelemetryInfo] = + IO(Telemetry.TelemetryInfo(None, None, None)) } diff --git a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PubSubCollector.scala b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PubSubCollector.scala index cc71cf6ee..85f84de52 100644 --- a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PubSubCollector.scala +++ b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PubSubCollector.scala @@ -14,9 +14,12 @@ object PubSubCollector extends App[PubSubSinkConfig](BuildInfo) { bad <- PubSubSink.create[IO](config.sink.maxBytes, config.sink, config.buffer, config.bad) } yield Sinks(good, bad) - override def telemetryInfo(config: Config[PubSubSinkConfig]): Telemetry.TelemetryInfo = - Telemetry.TelemetryInfo( - region = None, - cloud = Some("GCP") + override def telemetryInfo(config: Config.Streams[PubSubSinkConfig]): IO[Telemetry.TelemetryInfo] = + IO( + Telemetry.TelemetryInfo( + region = None, + cloud = Some("GCP"), + unhashedPipelineId = Some(config.sink.googleProjectId) + ) ) } diff --git a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala index aaa2b7159..bec8984ec 100644 --- a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala +++ b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala @@ -42,9 +42,28 @@ object SqsCollector extends App[SqsSinkConfig](BuildInfo) { } yield Sinks(good, bad) } - override def telemetryInfo(config: Config[SqsSinkConfig]): Telemetry.TelemetryInfo = - Telemetry.TelemetryInfo( - region = Some(config.streams.sink.region), - cloud = Some("AWS") + override def telemetryInfo(config: Config.Streams[SqsSinkConfig]): IO[Telemetry.TelemetryInfo] = + getAccountId(config).map(id => + Telemetry.TelemetryInfo( + region = Some(config.sink.region), + cloud = Some("AWS"), + unhashedPipelineId = id + ) ) + + def getAccountId(config: Config.Streams[SqsSinkConfig]): IO[Option[String]] = + Resource + .make( + IO(SqsSink.createSqsClient(config.sink.region)).rethrow + )(c => IO(c.shutdown())) + .use { client => + IO { + val sqsQueueUrl = client.getQueueUrl(config.good).getQueueUrl + Some(extractAccountId(sqsQueueUrl)) + } + } + .handleError(_ => None) + + def extractAccountId(sqsQueueUrl: String): String = + sqsQueueUrl.split("/")(3) } diff --git a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala index 6f40e5eb2..af5708b1a 100644 --- a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala +++ b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala @@ -298,6 +298,11 @@ object SqsSink { Resource.make(acquire)(release) } + def createSqsClient(region: String): Either[Throwable, AmazonSQS] = + Either.catchNonFatal( + AmazonSQSClientBuilder.standard().withRegion(region).build + ) + /** * Create an SqsSink and schedule a task to flush its EventStorage. * Exists so that no threads can get a reference to the SqsSink @@ -309,16 +314,11 @@ object SqsSink { bufferConfig: Config.Buffer, queueName: String, executorService: ScheduledExecutorService - ): Either[Throwable, SqsSink[F]] = { - val client = Either.catchNonFatal( - AmazonSQSClientBuilder.standard().withRegion(sqsConfig.region).build - ) - - client.map { c => + ): Either[Throwable, SqsSink[F]] = + createSqsClient(sqsConfig.region).map { c => val sqsSink = new SqsSink(maxBytes, c, sqsConfig, bufferConfig, queueName, executorService) sqsSink.EventStorage.scheduleFlush() sqsSink.checkSqsHealth() sqsSink } - } } diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala index ac8070eb4..c307c5bc3 100644 --- a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala @@ -13,6 +13,6 @@ object StdoutCollector extends App[SinkConfig](BuildInfo) { Resource.pure(Sinks(good, bad)) } - override def telemetryInfo(config: Config[SinkConfig]): Telemetry.TelemetryInfo = - Telemetry.TelemetryInfo(None, None) + override def telemetryInfo(config: Config.Streams[SinkConfig]): IO[Telemetry.TelemetryInfo] = + IO(Telemetry.TelemetryInfo(None, None, None)) }