From e9b4d58d9089532f9a3f38aea764b88a3f097a16 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Tue, 19 Nov 2024 08:49:20 +0000 Subject: [PATCH] Kinesis allow larger batches when sqs buffer is enabled --- .../sinks/KinesisSink.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala index f555aeaaa..e400bd1bb 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala @@ -142,7 +142,9 @@ class KinesisSink[F[_]: Sync] private ( writeBatchToKinesisWithRetries(batch, minBackoff, maxRetries) // Kinesis not healthy and SQS buffer defined case Some(sqs) => - writeBatchToSqsWithRetries(batch, sqs, minBackoff, maxRetries) + val (big, small) = batch.partition(_.payload.size > 192000) // TODO: grab 192000 from the config + if (small.nonEmpty) writeBatchToSqsWithRetries(small, sqs, minBackoff, maxRetries) + if (big.nonEmpty) writeBatchToKinesisWithRetries(small, minBackoff, maxRetries) } def writeBatchToKinesisWithRetries(batch: List[Events], nextBackoff: Long, retriesLeft: Int): Unit = { @@ -498,11 +500,9 @@ object KinesisSink { clients.map { case (kinesisClient, sqsClientAndName) => - val maxBytes = - if (sqsClientAndName.isDefined) sinkConfig.config.sqsMaxBytes else sinkConfig.config.maxBytes val ks = new KinesisSink( - maxBytes, + sinkConfig.config.maxBytes, kinesisClient, sinkConfig.config, sinkConfig.buffer,