From 2e7ed0a8746d24dcf5a657835a2f4c0e58093ed0 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Sat, 4 Dec 2021 22:07:59 +0000 Subject: [PATCH] Move logging of thread pool creation out of KinesisSink (close #129) --- .../KinesisCollector.scala | 7 ++++++- .../sinks/KinesisSink.scala | 2 -- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala index 2041b783e..1ad919294 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala @@ -34,7 +34,7 @@ object KinesisCollector extends Collector { case kc: Kinesis => kc.asRight case _ => new IllegalArgumentException("Configured sink is not Kinesis").asLeft } - es = new ScheduledThreadPoolExecutor(kc.threadPoolSize) + es = buildExecutorService(kc) goodStream = collectorConf.streams.good badStream = collectorConf.streams.bad bufferConf = collectorConf.streams.buffer @@ -56,4 +56,9 @@ object KinesisCollector extends Collector { case Left(e) => throw e } } + + def buildExecutorService(kc: Kinesis): ScheduledThreadPoolExecutor = { + log.info("Creating thread pool of size " + kc.threadPoolSize) + new ScheduledThreadPoolExecutor(kc.threadPoolSize) + } } diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala index f51bda3b0..578a36575 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala @@ -51,8 +51,6 @@ class KinesisSink private ( ) extends Sink { import KinesisSink._ - log.info("Creating thread pool of size " + kinesisConfig.threadPoolSize) - maybeSqs match { case Some(sqs) => log.info(