Skip to content

Commit

Permalink
Move logging of thread pool creation out of KinesisSink (close #129)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Dec 6, 2021
1 parent e11d867 commit d30e0ee
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object KinesisCollector extends Collector {
case kc: Kinesis => kc.asRight
case _ => new IllegalArgumentException("Configured sink is not Kinesis").asLeft
}
es = new ScheduledThreadPoolExecutor(kc.threadPoolSize)
es = buildExecutorService(kc)
goodStream = collectorConf.streams.good
badStream = collectorConf.streams.bad
bufferConf = collectorConf.streams.buffer
Expand All @@ -62,4 +62,9 @@ object KinesisCollector extends Collector {
case Left(e) => throw e
}
}

def buildExecutorService(kc: Kinesis): ScheduledThreadPoolExecutor = {
log.info("Creating thread pool of size " + kc.threadPoolSize)
new ScheduledThreadPoolExecutor(kc.threadPoolSize)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ class KinesisSink private (
) extends Sink.Throttled(throttler) {
import KinesisSink._

log.info("Creating thread pool of size " + kinesisConfig.threadPoolSize)

maybeSqs match {
case Some(sqs) =>
log.info(
Expand Down

0 comments on commit d30e0ee

Please sign in to comment.