diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala index f555aeaaa..e400bd1bb 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala @@ -142,7 +142,9 @@ class KinesisSink[F[_]: Sync] private ( writeBatchToKinesisWithRetries(batch, minBackoff, maxRetries) // Kinesis not healthy and SQS buffer defined case Some(sqs) => - writeBatchToSqsWithRetries(batch, sqs, minBackoff, maxRetries) + val (big, small) = batch.partition(_.payload.size > 192000) // TODO: grab 192000 from the config + if (small.nonEmpty) writeBatchToSqsWithRetries(small, sqs, minBackoff, maxRetries) + if (big.nonEmpty) writeBatchToKinesisWithRetries(small, minBackoff, maxRetries) } def writeBatchToKinesisWithRetries(batch: List[Events], nextBackoff: Long, retriesLeft: Int): Unit = { @@ -498,11 +500,9 @@ object KinesisSink { clients.map { case (kinesisClient, sqsClientAndName) => - val maxBytes = - if (sqsClientAndName.isDefined) sinkConfig.config.sqsMaxBytes else sinkConfig.config.maxBytes val ks = new KinesisSink( - maxBytes, + sinkConfig.config.maxBytes, kinesisClient, sinkConfig.config, sinkConfig.buffer,