From a9f4a8f91f7d26189bd063ca8b01d14ef9eee3e2 Mon Sep 17 00:00:00 2001 From: Dilyan Damyanov Date: Tue, 9 Mar 2021 11:07:26 +0000 Subject: [PATCH 1/3] Respect SQS batch request limit (close #125) --- .../sinks/KinesisSink.scala | 661 +++++++++--------- .../sinks/KinesisSinkSpec.scala | 63 ++ 2 files changed, 409 insertions(+), 315 deletions(-) create mode 100644 kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSinkSpec.scala diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala index 33ae8ba87..06656b836 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala @@ -16,16 +16,12 @@ package sinks import java.nio.ByteBuffer import java.util.concurrent.ScheduledExecutorService import java.util.UUID -import com.amazonaws.{AmazonClientException, AmazonWebServiceRequest, ClientConfiguration} import com.amazonaws.auth._ import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration -import com.amazonaws.retry.RetryPolicy.RetryCondition -import com.amazonaws.retry.{PredefinedBackoffStrategies, RetryPolicy} import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClientBuilder} import com.amazonaws.services.kinesis.model._ import com.amazonaws.services.sqs.{AmazonSQS, AmazonSQSClientBuilder} import com.amazonaws.services.sqs.model.{ - MessageAttributeValue, QueueDoesNotExistException, SendMessageBatchRequest, SendMessageBatchRequestEntry @@ -33,18 +29,333 @@ import com.amazonaws.services.sqs.model.{ import org.slf4j.LoggerFactory import scala.collection.JavaConverters._ -import scala.concurrent.Future +import scala.collection.mutable.ListBuffer +import scala.concurrent.{ExecutionContextExecutorService, Future} import scala.concurrent.duration._ import scala.util.{Failure, Success} - import cats.syntax.either._ - import com.snowplowanalytics.snowplow.collectors.scalastream.model._ import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink.SqsClientAndName +/** + * Kinesis Sink for the Scala collector. + */ +class KinesisSink private ( + client: AmazonKinesis, + kinesisConfig: Kinesis, + bufferConfig: BufferConfig, + streamName: String, + executorService: ScheduledExecutorService, + maybeSqs: Option[SqsClientAndName] +) extends Sink { + import KinesisSink._ + + private val WithBuffer = maybeSqs.isDefined + + // Records must not exceed MaxBytes - 1MB (for Kinesis) + // When SQS buffer is enabled MaxBytes has to be 256k, + // but we encode the message with Base64 for SQS, so the limit drops to 192k + private val SqsLimit = 192000 // 256000 / 4 * 3 + private val KinesisLimit = 1000000 + override val MaxBytes = if (WithBuffer) SqsLimit else KinesisLimit + + private val ByteThreshold = bufferConfig.byteLimit + private val RecordThreshold = bufferConfig.recordLimit + private val TimeThreshold = bufferConfig.timeLimit + + private val maxBackoff = kinesisConfig.backoffPolicy.maxBackoff + private val minBackoff = kinesisConfig.backoffPolicy.minBackoff + private val randomGenerator = new java.util.Random() + + private val MaxSqsBatchSizeN = 10 + private val MaxRetries = 3 // TODO: make this configurable + + log.info("Creating thread pool of size " + kinesisConfig.threadPoolSize) + maybeSqs match { + case Some(sqs) => + log.info( + s"SQS buffer for '$streamName' Kinesis sink is set up as: ${sqs.sqsBufferName}." + ) + case None => + log.warn( + s"No SQS buffer for surge protection set up (consider setting a SQS Buffer in config.hocon)." + ) + } + + implicit lazy val ec: ExecutionContextExecutorService = + concurrent.ExecutionContext.fromExecutorService(executorService) + + /** + * Recursively schedule a task to send everything in EventStorage. + * Even if the incoming event flow dries up, all stored events will eventually get sent. + * Whenever TimeThreshold milliseconds have passed since the last call to flush, call flush. + * @param interval When to schedule the next flush + */ + def scheduleFlush(interval: Long = TimeThreshold): Unit = { + executorService.schedule( + new Runnable { + override def run(): Unit = { + val lastFlushed = EventStorage.getLastFlushTime + val currentTime = System.currentTimeMillis() + if (currentTime - lastFlushed >= TimeThreshold) { + EventStorage.flush() + scheduleFlush(TimeThreshold) + } else { + scheduleFlush(TimeThreshold + lastFlushed - currentTime) + } + } + }, + interval, + MILLISECONDS + ) + () + } + + object EventStorage { + private val storedEvents = ListBuffer.empty[Events] + private var byteCount = 0L + @volatile private var lastFlushedTime = 0L + + def store(event: Array[Byte], key: String): Unit = { + val eventBytes = ByteBuffer.wrap(event) + val eventSize = eventBytes.capacity + if (eventSize >= MaxBytes) { + log.error( + s"Record of size $eventSize bytes is too large - must be less than $MaxBytes bytes" + ) + } else { + synchronized { + if (storedEvents.size + 1 > RecordThreshold || byteCount + eventSize > ByteThreshold) { + flush() + } + storedEvents += Events(eventBytes.array(), key) + byteCount += eventSize + } + } + } + + def flush(): Unit = { + val eventsToSend = synchronized { + val evts = storedEvents.result + storedEvents.clear() + byteCount = 0 + evts + } + lastFlushedTime = System.currentTimeMillis() + sinkBatch(eventsToSend, KinesisStream(maybeSqs))(MaxRetries) + } + + def getLastFlushTime: Long = lastFlushedTime + } + + def storeRawEvents(events: List[Array[Byte]], key: String): List[Array[Byte]] = { + events.foreach(e => EventStorage.store(e, key)) + Nil + } + + def scheduleBatch(batch: List[Events], lastBackoff: Long = minBackoff): Unit = { + val nextBackoff = getNextBackoff(lastBackoff) + executorService.schedule(new Runnable { + override def run(): Unit = + sinkBatch(batch, KinesisStream(maybeSqs), nextBackoff)(MaxRetries) + }, lastBackoff, MILLISECONDS) + () + } + + /** + * The number of retries is unlimited, but targets switch every 10 attempts if an SQS buffer is configured. + * @param batch A batch of events to be written + * @param target A Kinesis stream or SQS queue to write events to + * @param nextBackoff Period of time to wait before next attempt if retrying + * @param retriesLeft How many retries are left to the current target. Once exhausted, the target will be switched. + */ + def sinkBatch(batch: List[Events], target: Target, nextBackoff: Long = minBackoff)( + retriesLeft: Int = MaxRetries + ): Unit = + if (batch.nonEmpty) target match { + case KinesisStream(buffer) => + log.info(s"Writing ${batch.size} Thrift records to Kinesis stream $streamName.") + writeBatchToKinesis(batch).onComplete { + case Success(s) => + val results = s.getRecords.asScala.toList + val failurePairs = batch.zip(results).filter(_._2.getErrorMessage != null) + log.info( + s"Successfully wrote ${batch.size - failurePairs.size} out of ${batch.size} records to Kinesis stream $streamName." + ) + if (failurePairs.nonEmpty) { + failurePairs.foreach(f => + log.error( + s"Failed writing record to Kinesis stream $streamName, with error code [${f._2.getErrorCode}] and message [${f._2.getErrorMessage}]." + ) + ) + val failures = failurePairs.map(_._1) + val errorMessage = { + if (buffer.isEmpty || retriesLeft > 1) + s"Retrying all records that could not be written to Kinesis stream $streamName in $nextBackoff milliseconds..." + else + s"Sending ${failures.size} records that could not be written to Kinesis stream $streamName to SQS queue ${buffer.get.sqsBufferName} in $nextBackoff milliseconds..." + } + log.error(errorMessage) + scheduleWrite(failures, target, nextBackoff)(retriesLeft - 1) + } + case Failure(f) => + log.error("Writing failed with error:", f) + val errorMessage = { + if (buffer.isEmpty || retriesLeft > 1) + s"Retrying writing batch to Kinesis stream $streamName in $nextBackoff milliseconds..." + else + s"Sending batch to SQS queue ${buffer.get.sqsBufferName} in $nextBackoff milliseconds..." + } + log.error(errorMessage) + scheduleWrite(batch, target, nextBackoff)(retriesLeft - 1) + } + + case SqsQueue(sqs) => + log.info(s"Writing ${batch.size} Thrift records to SQS queue ${sqs.sqsBufferName}.") + writeBatchToSqs(batch, sqs).onComplete { + case Success(s) => + log.info( + s"Successfully wrote ${batch.size - s.size} out of ${batch.size} messages to SQS queue ${sqs.sqsBufferName}." + ) + if (s.nonEmpty) { + s.foreach { f => + log.error( + s"Failed writing message to SQS queue ${sqs.sqsBufferName}, with error code [${f._2.code}] and message [${f._2.message}]." + ) + } + val errorMessage = { + if (retriesLeft > 1) + s"Retrying all messages that could not be written to SQS queue ${sqs.sqsBufferName} in $nextBackoff milliseconds..." + else + s"Sending ${s.size} messages that could not be written to SQS queue ${sqs.sqsBufferName} to Kinesis stream $streamName in $nextBackoff milliseconds..." + } + log.error(errorMessage) + scheduleWrite(s.map(_._1), target, nextBackoff)(retriesLeft - 1) + } + case Failure(f) => + log.error("Writing failed with error:", f) + val errorMessage = { + if (retriesLeft > 1) + s"Retrying writing batch to SQS queue ${sqs.sqsBufferName} in $nextBackoff milliseconds..." + else + s"Sending batch to Kinesis stream $streamName in $nextBackoff milliseconds..." + } + log.error(errorMessage) + scheduleWrite(batch, target, nextBackoff)(retriesLeft - 1) + } + } + + def writeBatchToKinesis(batch: List[Events]): Future[PutRecordsResult] = + Future { + val putRecordsRequest = { + val prr = new PutRecordsRequest() + prr.setStreamName(streamName) + val putRecordsRequestEntryList = batch.map { event => + val prre = new PutRecordsRequestEntry() + prre.setPartitionKey(event.key) + prre.setData(ByteBuffer.wrap(event.payloads)) + prre + } + prr.setRecords(putRecordsRequestEntryList.asJava) + prr + } + client.putRecords(putRecordsRequest) + } + + // If successful, returns a list of events to be retried, together with the reasons they failed. + def writeBatchToSqs(batch: List[Events], sqs: SqsClientAndName): Future[List[(Events, BatchResultErrorInfo)]] = + Future { + val splitBatch = split(batch, getByteSize, MaxSqsBatchSizeN, SqsLimit) + splitBatch.map(toSqsMessages).flatMap { msgGroup => + val entries = msgGroup.map(_._2) + val batchRequest = + new SendMessageBatchRequest().withQueueUrl(sqs.sqsBufferName).withEntries(entries.asJava) + val response = sqs.sqsClient.sendMessageBatch(batchRequest) + val failures = response + .getFailed + .asScala + .toList + .map { bree => + (bree.getId, BatchResultErrorInfo(bree.getCode, bree.getMessage)) + } + .toMap + // Events to retry and reasons for failure + msgGroup.collect { + case (e, m) if failures.contains(m.getId) => + (e, failures(m.getId)) + } + } + } + + def toSqsMessages(events: List[Events]): List[(Events, SendMessageBatchRequestEntry)] = + events.map(e => (e, new SendMessageBatchRequestEntry(UUID.randomUUID.toString, b64Encode(e.payloads)))) + + def b64Encode(msg: Array[Byte]): String = { + val buffer = java.util.Base64.getEncoder.encode(msg) + new String(buffer) + } + + def scheduleWrite(batch: List[Events], target: Target, lastBackoff: Long = minBackoff)( + retriesLeft: Int + ): Unit = { + val nextBackoff = getNextBackoff(lastBackoff) + executorService.schedule( + new Runnable { + override def run(): Unit = + // If an SQS buffer is configured, keep switching targets every 10 retries. + // If no SQS buffer is configured, continue retrying to Kinesis forever. + if (retriesLeft > 0) sinkBatch(batch, target, nextBackoff)(retriesLeft) + else sinkBatch(batch, switchTarget(target), nextBackoff)(MaxRetries) + }, + lastBackoff, + MILLISECONDS + ) + () + } + + // No-op if no SQS buffer is configured + def switchTarget(target: Target): Target = target match { + case KinesisStream(Some(sqs)) => SqsQueue(sqs) + case KinesisStream(None) => KinesisStream(None) + case SqsQueue(sqs) => KinesisStream(Some(sqs)) + } + + /** + * How long to wait before sending the next request + * @param lastBackoff The previous backoff time + * @return Maximum of two-thirds of lastBackoff and a random number between minBackoff and maxBackoff + */ + private def getNextBackoff(lastBackoff: Long): Long = { + val diff = (maxBackoff - minBackoff + 1).toInt + (minBackoff + randomGenerator.nextInt(diff)).max(lastBackoff / 3 * 2) + } + + def shutdown(): Unit = { + executorService.shutdown() + executorService.awaitTermination(10000, MILLISECONDS) + () + } +} + /** KinesisSink companion object with factory method */ object KinesisSink { + sealed trait Target + final case class KinesisStream(buffer: Option[SqsClientAndName]) extends Target + final case class SqsQueue(sqs: SqsClientAndName) extends Target + + // Details about why messages failed to be written to SQS. + final case class BatchResultErrorInfo(code: String, message: String) + + /** + * Events to be written to Kinesis or SQS. + * @param payloads Serialized events extracted from a CollectorPayload. + * The size of this collection is limited by MaxBytes. + * Not to be confused with a 'batch' events to sink. + * @param key Partition key for Kinesis + */ + final case class Events(payloads: Array[Byte], key: String) + case class SqsClientAndName(sqsClient: AmazonSQS, sqsBufferName: String) /** @@ -134,28 +445,6 @@ object KinesisSink { .standard() .withCredentials(provider) .withEndpointConfiguration(new EndpointConfiguration(endpoint, region)) - .withClientConfiguration( - new ClientConfiguration().withRetryPolicy( - new RetryPolicy( - new RetryCondition { - override def shouldRetry( - originalRequest: AmazonWebServiceRequest, - exception: AmazonClientException, - retriesAttempted: Int - ): Boolean = - retriesAttempted < 10 && - (exception match { - case _: ProvisionedThroughputExceededException => false - case _ => true - }) - }, - new PredefinedBackoffStrategies.FullJitterBackoffStrategy(1000, 5 * 3600), - 10, - true, - true - ) - ) - ) .build() /** @@ -175,29 +464,7 @@ object KinesisSink { private def createSqsClient(provider: AWSCredentialsProvider, region: String) = Either.catchNonFatal( - AmazonSQSClientBuilder - .standard() - .withRegion(region) - .withCredentials(provider) - .withClientConfiguration( - new ClientConfiguration().withRetryPolicy( - new RetryPolicy( - new RetryCondition { - override def shouldRetry( - originalRequest: AmazonWebServiceRequest, - exception: AmazonClientException, - retriesAttempted: Int - ): Boolean = - retriesAttempted < 10 - }, - new PredefinedBackoffStrategies.FullJitterBackoffStrategy(1000, 5 * 3600), - 10, - true, - true - ) - ) - ) - .build + AmazonSQSClientBuilder.standard().withRegion(region).withCredentials(provider).build ) def sqsBuffer( @@ -229,7 +496,7 @@ object KinesisSink { streamName: String, sqs: Option[SqsClientAndName] ): Unit = { - lazy val log = LoggerFactory.getLogger(getClass()) + lazy val log = LoggerFactory.getLogger(getClass) val kExists = streamExists(kinesisClient, streamName) if (!kExists) log.error(s"Kinesis stream $streamName doesn't exist or isn't available.") @@ -245,272 +512,36 @@ object KinesisSink { } // format: on } -} - -/** - * Kinesis Sink for the Scala collector. - */ -class KinesisSink private ( - client: AmazonKinesis, - kinesisConfig: Kinesis, - bufferConfig: BufferConfig, - streamName: String, - executorService: ScheduledExecutorService, - maybeSqs: Option[SqsClientAndName] -) extends Sink { - // Records must not exceed MaxBytes - 1MB (for Kinesis) - // When SQS buffer is enabled MaxBytes has to be 256k, - // but we encode the message with Base64 for SQS, so the limit drops to 192k - val SqsLimit = 192000 // 256000 / 4 * 3 - val KinesisLimit = 1000000 - override val MaxBytes = if (maybeSqs.isDefined) SqsLimit else KinesisLimit - val BackoffTime = 3000L - - val ByteThreshold = bufferConfig.byteLimit - val RecordThreshold = bufferConfig.recordLimit - val TimeThreshold = bufferConfig.timeLimit - - private val maxBackoff = kinesisConfig.backoffPolicy.maxBackoff - private val minBackoff = kinesisConfig.backoffPolicy.minBackoff - private val randomGenerator = new java.util.Random() - - log.info("Creating thread pool of size " + kinesisConfig.threadPoolSize) - maybeSqs match { - case Some(sqs) => - log.info( - s"SQS buffer for '$streamName' Kinesis sink is set up as: ${sqs.sqsBufferName}." - ) - case None => - log.warn( - s"No SQS buffer for surge protection set up (consider setting a SQS Buffer in config.hocon)." - ) - } - - implicit lazy val ec = concurrent.ExecutionContext.fromExecutorService(executorService) - - /** - * Recursively schedule a task to send everthing in EventStorage - * Even if the incoming event flow dries up, all stored events will eventually get sent - * Whenever TimeThreshold milliseconds have passed since the last call to flush, call flush. - * @param interval When to schedule the next flush - */ - def scheduleFlush(interval: Long = TimeThreshold): Unit = { - executorService.schedule( - new Thread { - override def run(): Unit = { - val lastFlushed = EventStorage.getLastFlushTime() - val currentTime = System.currentTimeMillis() - if (currentTime - lastFlushed >= TimeThreshold) { - EventStorage.flush() - scheduleFlush(TimeThreshold) - } else { - scheduleFlush(TimeThreshold + lastFlushed - currentTime) - } - } - }, - interval, - MILLISECONDS - ) - () - } - - // 'key' is used to populate the 'kinesisKey' message attribute for SQS - // and as partition key for Kinesis - case class Event(msg: ByteBuffer, key: String) - - object EventStorage { - private var storedEvents = List.empty[Event] - private var byteCount = 0L - @volatile private var lastFlushedTime = 0L - - def store(event: Array[Byte], key: String): Unit = { - val eventBytes = ByteBuffer.wrap(event) - val eventSize = eventBytes.capacity - if (eventSize >= MaxBytes) { - log.error( - s"Record of size $eventSize bytes is too large - must be less than $MaxBytes bytes" - ) - } else { - synchronized { - storedEvents = Event(eventBytes, key) :: storedEvents - byteCount += eventSize - if (storedEvents.size >= RecordThreshold || byteCount >= ByteThreshold) { - flush() - } - } - } - } - - def flush(): Unit = { - val eventsToSend = synchronized { - val evts = storedEvents.reverse - storedEvents = Nil - byteCount = 0 - evts - } - lastFlushedTime = System.currentTimeMillis() - sendBatch(eventsToSend) - } - - def getLastFlushTime(): Long = lastFlushedTime - } - - def storeRawEvents(events: List[Array[Byte]], key: String): List[Array[Byte]] = { - events.foreach(e => EventStorage.store(e, key)) - Nil - } - - def scheduleBatch(batch: List[Event], lastBackoff: Long = minBackoff): Unit = { - val nextBackoff = getNextBackoff(lastBackoff) - executorService.schedule(new Thread { - override def run(): Unit = - sendBatch(batch, nextBackoff) - }, lastBackoff, MILLISECONDS) - () - } /** - * Max number of retries is unlimitted, so when Kinesis stream is under heavy load, - * the events accumulate in collector memory for later retries. The fix for this is to use - * sqs queue as a buffer and sqs2kinesis to move events back from sqs queue to kinesis stream. - * Consider using sqs buffer in heavy load scenarios. - * + * Splits a Kinesis-sized batch of `Events` into smaller batches that meet the SQS limit. + * @param batch A batch of up to `KinesisLimit` that must be split into smaller batches. + * @param getByteSize How to get the size of a batch. + * @param maxRecords Max records for the smaller batches. + * @param maxBytes Max byte size for the smaller batches. + * @return A batch of smaller batches, each one of which meets the limits. */ - def sendBatch(batch: List[Event], nextBackoff: Long = minBackoff): Unit = - if (batch.nonEmpty) { - log.info(s"Writing ${batch.size} Thrift records to Kinesis stream ${streamName}") - - multiPut(streamName, batch).onComplete { - case Success(s) => { - val results = s.getRecords.asScala.toList - val failurePairs = batch.zip(results).filter(_._2.getErrorMessage != null) - log.info( - s"Successfully wrote ${batch.size - failurePairs.size} out of ${batch.size} records" - ) - if (failurePairs.nonEmpty) { - failurePairs.foreach(f => - log.error( - s"Record failed with error code [${f._2.getErrorCode}] and message [${f._2.getErrorMessage}]" - ) - ) - val failures = failurePairs.map(_._1) - val retryErrorMsg = s"Retrying all failed records in $nextBackoff milliseconds..." - sendToSqsOrRetryToKinesis(failures, nextBackoff)(retryErrorMsg) - } - } - case Failure(f) => { - log.error("Writing failed.", f) - val retryErrorMsg = s"Retrying in $nextBackoff milliseconds..." - sendToSqsOrRetryToKinesis(batch, nextBackoff)(retryErrorMsg) - } + def split( + batch: List[Events], + getByteSize: Events => Int, + maxRecords: Int, + maxBytes: Int + ): List[List[Events]] = { + var bytes = 0L + @scala.annotation.tailrec + def go(originalBatch: List[Events], tmpBatch: List[Events], newBatch: List[List[Events]]): List[List[Events]] = + (originalBatch, tmpBatch) match { + case (Nil, Nil) => newBatch + case (Nil, acc) => acc :: newBatch + case (h :: t, acc) if acc.size + 1 > maxRecords || getByteSize(h) + bytes > maxBytes => + bytes = getByteSize(h).toLong + go(t, h :: Nil, acc :: newBatch) + case (h :: t, acc) => + bytes += getByteSize(h) + go(t, h :: acc, newBatch) } - } - - private def sendToSqsOrRetryToKinesis( - failures: List[Event], - nextBackoff: Long - )( - retryErrorMsg: String - ): Unit = - maybeSqs match { - case Some(sqs) => - log.info( - s"Sending ${failures.size} events from a batch to SQS buffer queue: ${sqs.sqsBufferName}" - ) - putToSqs(sqs, failures) - () - case None => - log.error(retryErrorMsg) - log.warn( - s"${failures.size} failed events scheduled for retry (consider setting a SQS Buffer in config.hocon)" - ) - scheduleBatch(failures, nextBackoff) - } - - private def putToSqs(sqs: SqsClientAndName, batch: List[Event]): Future[Unit] = - Future { - log.info(s"Writing ${batch.size} messages to SQS queue: ${sqs.sqsBufferName}") - val MaxSqsBatchSize = 10 - batch.map(toSqsBatchEntry).grouped(MaxSqsBatchSize).foreach { batchEntryGroup => - sendToSqs(sqs, batchEntryGroup).transform { - case failure @ Failure(ex) => - log.info(s"Sending to sqs failed with exception: $ex") - failure - case s @ Success(_) => s - } - } - } - - private def toSqsBatchEntry(event: Event): SendMessageBatchRequestEntry = { - val b64EncodedMsg = encode(event.msg) - // The UUID is not used anywhere currently but is required by the constructor - new SendMessageBatchRequestEntry(UUID.randomUUID.toString, b64EncodedMsg).withMessageAttributes( - Map( - "kinesisKey" -> - new MessageAttributeValue().withDataType("String").withStringValue(event.key) - ).asJava - ) - } - - private def createBatchRequest(queueUrl: String, batch: List[SendMessageBatchRequestEntry]) = - new SendMessageBatchRequest().withQueueUrl(queueUrl).withEntries(batch.asJava) - - private def sendToSqs( - sqs: SqsClientAndName, - batchEntryGroup: List[SendMessageBatchRequestEntry] - ) = - Future { - val batchRequest = createBatchRequest(sqs.sqsBufferName, batchEntryGroup) - val res = sqs.sqsClient.sendMessageBatch(batchRequest) - val failed = res.getFailed().asScala - if (failed.nonEmpty) { - // It could be improved even more by storing events (that failed to be sent to SQS) to some persistance storage - val errors = failed.map(_.toString).mkString(", ") - log.error( - s"Sending to SQS queue [${sqs.sqsBufferName}] failed with errors [$errors]. Dropping events." - ) - log.info( - s"${res.getSuccessful.size} out of ${batchEntryGroup.size} from batch was successfully send to SQS queue: ${sqs.sqsBufferName}" - ) - } else - log.info( - s"Batch of ${batchEntryGroup.size} was successfully send to SQS queue: ${sqs.sqsBufferName}." - ) - } - - private def encode(bufMsg: ByteBuffer): String = { - val buffer = java.util.Base64.getEncoder.encode(bufMsg) - new String(buffer.array()) + go(batch, Nil, Nil).map(_.reverse).reverse.filter(_.nonEmpty) } - private def multiPut(name: String, batch: List[Event]): Future[PutRecordsResult] = - Future { - val putRecordsRequest = { - val prr = new PutRecordsRequest() - prr.setStreamName(name) - val putRecordsRequestEntryList = batch.map { event => - val prre = new PutRecordsRequestEntry() - prre.setPartitionKey(event.key) - prre.setData(event.msg) - prre - } - prr.setRecords(putRecordsRequestEntryList.asJava) - prr - } - client.putRecords(putRecordsRequest) - } - - /** - * How long to wait before sending the next request - * @param lastBackoff The previous backoff time - * @return Minimum of maxBackoff and a random number between minBackoff and three times lastBackoff - */ - private def getNextBackoff(lastBackoff: Long): Long = - (minBackoff + randomGenerator.nextDouble() * (lastBackoff * 3 - minBackoff)).toLong.min(maxBackoff) - - def shutdown(): Unit = { - executorService.shutdown() - executorService.awaitTermination(10000, MILLISECONDS) - () - } + def getByteSize(events: Events): Int = ByteBuffer.wrap(events.payloads).capacity } diff --git a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSinkSpec.scala b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSinkSpec.scala new file mode 100644 index 000000000..efa8d66e4 --- /dev/null +++ b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSinkSpec.scala @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2013-2021 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, and + * you may not use this file except in compliance with the Apache License + * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the Apache License Version 2.0 is distributed on an "AS + * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.collectors.scalastream +package sinks + +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink._ + +import org.specs2.mutable.Specification + +class KinesisSinkSpec extends Specification { + val event = Events("a".getBytes, "b") + + "KinesisSink.split" should { + "return empty list if given an empty batch" in { + val emptyBatch = List.empty[Events] + + split(emptyBatch, getByteSize, 1, 10) mustEqual List.empty + split(emptyBatch, getByteSize, 10, 1) mustEqual List.empty + // Edge case that we shouldn't hit. The test simply confirms the behaviour. + split(emptyBatch, getByteSize, 0, 0) mustEqual List.empty + } + + "correctly split batches, according to maxRecords setting" in { + val batch1 = List.fill(10)(event) + val batch2 = List.fill(1)(event) + + val res1 = split(batch1, getByteSize, 3, 1000) + val res2 = split(batch2, getByteSize, 3, 1000) + // Edge case that we shouldn't hit. The test simply confirms the behaviour. + val res3 = split(batch1, getByteSize, 0, 1000) + + res1.length mustEqual 4 + res2.length mustEqual 1 + (res3.length mustEqual 10).and(res3.forall(_ must not be empty)) + } + + "correctly split batches, according to maxBytes setting" in { + val batch1 = List.fill(10)(event) + val batch2 = List.fill(1)(event) + + val res1 = split(batch1, getByteSize, 1000, 3) + val res2 = split(batch2, getByteSize, 1000, 3) + // Edge case that we shouldn't hit. The test simply confirms the behaviour. + val res3 = split(batch1, getByteSize, 1000, 0) + + res1.length mustEqual 4 + res2.length mustEqual 1 + (res3.length mustEqual 10).and(res3.forall(_ must not be empty)) + } + } +} From 6c36615981ed4df9b89314015f97a00d2e675129 Mon Sep 17 00:00:00 2001 From: Dilyan Damyanov Date: Thu, 18 Mar 2021 12:09:39 +0000 Subject: [PATCH 2/3] Set network_userid to empty UUID in anonymous mode to prevent collector_payload_format_violation (close #126) --- .../CollectorService.scala | 17 ++- .../CollectorServiceSpec.scala | 108 +++++++++++++----- 2 files changed, 93 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala index 5a026bda3..705c223fb 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala @@ -82,6 +82,8 @@ class CollectorService( override val doNotTrackCookie = config.doNotTrackHttpCookie override val enableDefaultRedirect = config.enableDefaultRedirect + private val spAnonymousNuid = "00000000-0000-0000-0000-000000000000" + /** * Determines the path to be used in the response, * based on whether a mapping can be found in the config for the original request path. @@ -112,7 +114,7 @@ class CollectorService( case Right(params) => val redirect = path.startsWith("/r/") - val nuidOpt = networkUserId(request, cookie) + val nuidOpt = networkUserId(request, cookie, spAnonymous) val bouncing = params.contains(config.cookieBounce.name) // we bounce if it's enabled and we couldn't retrieve the nuid and we're not already bouncing val bounce = config.cookieBounce.enabled && nuidOpt.isEmpty && !bouncing && @@ -243,7 +245,7 @@ class CollectorService( userAgent.foreach(e.userAgent = _) refererUri.foreach(e.refererUri = _) e.hostname = hostname - e.networkUserId = spAnonymous.fold(networkUserId)(_ => "") + e.networkUserId = networkUserId e.headers = (headers(request, spAnonymous) ++ contentType).asJava contentType.foreach(e.contentType = _) e @@ -457,8 +459,15 @@ class CollectorService( * @param requestCookie cookie associated to the Http request * @return a network user id */ - def networkUserId(request: HttpRequest, requestCookie: Option[HttpCookie]): Option[String] = - request.uri.query().get("nuid").orElse(requestCookie.map(_.value)) + def networkUserId( + request: HttpRequest, + requestCookie: Option[HttpCookie], + spAnonymous: Option[String] + ): Option[String] = + spAnonymous match { + case Some(_) => Some(spAnonymousNuid) + case None => request.uri.query().get("nuid").orElse(requestCookie.map(_.value)) + } /** * Creates an Access-Control-Allow-Origin header which specifically allows the domain which made diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala index fe649e5b3..f733d11c5 100644 --- a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala @@ -129,7 +129,7 @@ class CollectorServiceSpec extends Specification { None, Some("b"), "p", - Some(HttpCookie("sp","cookie-nuid")), + Some(HttpCookie("sp", "cookie-nuid")), None, None, "h", @@ -143,14 +143,14 @@ class CollectorServiceSpec extends Specification { l must have size 1 val newEvent = new CollectorPayload("iglu-schema", "ip", System.currentTimeMillis, "UTF-8", "collector") deserializer.deserialize(newEvent, l.head) - newEvent.networkUserId shouldEqual "" + newEvent.networkUserId shouldEqual "00000000-0000-0000-0000-000000000000" } "network_userid from cookie should persist if SP-Anonymous is not present" in { val (_, l) = service.cookie( None, Some("b"), "p", - Some(HttpCookie("sp","cookie-nuid")), + Some(HttpCookie("sp", "cookie-nuid")), None, None, "h", @@ -340,9 +340,10 @@ class CollectorServiceSpec extends Specification { e.contentType shouldEqual ct.get } "fill the correct values if SP-Anonymous is present" in { - val l = `Location`("l") - val ct = Some("image/gif") - val r = HttpRequest().withHeaders(l :: hs) + val l = `Location`("l") + val ct = Some("image/gif") + val r = HttpRequest().withHeaders(l :: hs) + val nuid = service.networkUserId(r, None, Some("*")).get val e = service.buildEvent( Some("q"), @@ -353,7 +354,7 @@ class CollectorServiceSpec extends Specification { "h", "unknown", r, - "nuid", + nuid, ct, Some("*") ) @@ -367,16 +368,29 @@ class CollectorServiceSpec extends Specification { e.userAgent shouldEqual "ua" e.refererUri shouldEqual "ref" e.hostname shouldEqual "h" - e.networkUserId shouldEqual "" + e.networkUserId shouldEqual "00000000-0000-0000-0000-000000000000" e.headers shouldEqual (List(l) ++ ct).map(_.toString).asJava e.contentType shouldEqual ct.get } "have a null queryString if it's None" in { - val l = `Location`("l") - val ct = Some("image/gif") - val r = HttpRequest().withHeaders(l :: hs) + val l = `Location`("l") + val ct = Some("image/gif") + val r = HttpRequest().withHeaders(l :: hs) + val nuid = service.networkUserId(r, None, Some("*")).get val e = - service.buildEvent(None, Some("b"), "p", Some("ua"), Some("ref"), "h", "unknown", r, "nuid", ct, Some("*")) + service.buildEvent( + None, + Some("b"), + "p", + Some("ua"), + Some("ref"), + "h", + "unknown", + r, + nuid, + ct, + Some("*") + ) e.schema shouldEqual "iglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0" e.ipAddress shouldEqual "unknown" e.encoding shouldEqual "UTF-8" @@ -387,17 +401,30 @@ class CollectorServiceSpec extends Specification { e.userAgent shouldEqual "ua" e.refererUri shouldEqual "ref" e.hostname shouldEqual "h" - e.networkUserId shouldEqual "" + e.networkUserId shouldEqual "00000000-0000-0000-0000-000000000000" e.headers shouldEqual (List(l) ++ ct).map(_.toString).asJava e.contentType shouldEqual ct.get } "have an empty nuid if SP-Anonymous is present" in { - val l = `Location`("l") - val ct = Some("image/gif") - val r = HttpRequest().withHeaders(l :: hs) + val l = `Location`("l") + val ct = Some("image/gif") + val r = HttpRequest().withHeaders(l :: hs) + val nuid = service.networkUserId(r, None, Some("*")).get val e = - service.buildEvent(None, Some("b"), "p", Some("ua"), Some("ref"), "h", "unknown", r, "nuid", ct, Some("*")) - e.networkUserId shouldEqual "" + service.buildEvent( + None, + Some("b"), + "p", + Some("ua"), + Some("ref"), + "h", + "unknown", + r, + nuid, + ct, + Some("*") + ) + e.networkUserId shouldEqual "00000000-0000-0000-0000-000000000000" } "have a nuid if SP-Anonymous is not present" in { val l = `Location`("l") @@ -671,17 +698,42 @@ class CollectorServiceSpec extends Specification { } "netwokUserId" in { - "give back the nuid query param if present" in { - service.networkUserId( - HttpRequest().withUri(Uri().withRawQueryString("nuid=12")), - Some(HttpCookie("nuid", "13")) - ) shouldEqual Some("12") - } - "give back the request cookie if there no nuid query param" in { - service.networkUserId(HttpRequest(), Some(HttpCookie("nuid", "13"))) shouldEqual Some("13") + "with SP-Anonymous header not present" in { + "give back the nuid query param if present" in { + service.networkUserId( + HttpRequest().withUri(Uri().withRawQueryString("nuid=12")), + Some(HttpCookie("nuid", "13")), + None + ) shouldEqual Some("12") + } + "give back the request cookie if there no nuid query param" in { + service.networkUserId(HttpRequest(), Some(HttpCookie("nuid", "13")), None) shouldEqual Some("13") + } + "give back none otherwise" in { + service.networkUserId(HttpRequest(), None, None) shouldEqual None + } } - "give back none otherwise" in { - service.networkUserId(HttpRequest(), None) shouldEqual None + + "with SP-Anonymous header present" in { + "give back the dummy nuid" in { + "if query param is present" in { + service.networkUserId( + HttpRequest().withUri(Uri().withRawQueryString("nuid=12")), + Some(HttpCookie("nuid", "13")), + Some("*") + ) shouldEqual Some("00000000-0000-0000-0000-000000000000") + } + "if the request cookie can be used in place of a missing nuid query param" in { + service.networkUserId(HttpRequest(), Some(HttpCookie("nuid", "13")), Some("*")) shouldEqual Some( + "00000000-0000-0000-0000-000000000000" + ) + } + "in any other case" in { + service.networkUserId(HttpRequest(), None, Some("*")) shouldEqual Some( + "00000000-0000-0000-0000-000000000000" + ) + } + } } } From b253b68c28cd89d0f155e925d79318b84a821c17 Mon Sep 17 00:00:00 2001 From: Dilyan Damyanov Date: Fri, 26 Mar 2021 09:54:59 +0000 Subject: [PATCH 3/3] Prepare for 2.2.1 release --- CHANGELOG | 5 +++++ build.sbt | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/CHANGELOG b/CHANGELOG index 26e825ea1..599f4e6a5 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,8 @@ +Release 2.2.1 (2021-03-26) +-------------------------- +Respect SQS batch request limit (#125) +Set network_userid to empty UUID in anonymous mode to prevent collector_payload_format_violation (#126) + Release 2.2.0 (2021-03-08) -------------------------- Add SQS collector module (#120) diff --git a/build.sbt b/build.sbt index dd2d8a421..61cb3b1d6 100644 --- a/build.sbt +++ b/build.sbt @@ -45,7 +45,7 @@ lazy val commonDependencies = Seq( lazy val buildSettings = Seq( organization := "com.snowplowanalytics", name := "snowplow-stream-collector", - version := "2.2.0", + version := "2.2.1", description := "Scala Stream Collector for Snowplow raw events", scalaVersion := "2.12.10", javacOptions := Seq("-source", "11", "-target", "11"),