diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala index 598fa53d2..11c7c526e 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala @@ -144,7 +144,7 @@ class CollectorService( spAnonymous ) // we don't store events in case we're bouncing - if (!bounce && !doNotTrack) sinkEvent(event, partitionKey) + val sinkOk = if (!bounce && !doNotTrack) sinkEvent(event, partitionKey) else true val headers = bounceLocationHeader(params, request, config.cookieBounce, bounce) ++ cookieHeader(request, config.cookieConfig, nuid, doNotTrack, spAnonymous) ++ @@ -155,7 +155,10 @@ class CollectorService( `Access-Control-Allow-Credentials`(true) ) - buildHttpResponse(event, params, headers.toList, redirect, pixelExpected, bounce, config.redirectMacro) + if (sinkOk || redirect) + buildHttpResponse(event, params, headers.toList, redirect, pixelExpected, bounce, config.redirectMacro) + else + HttpResponse(StatusCodes.ServiceUnavailable) case Left(error) => val badRow = BadRow.GenericError( @@ -165,7 +168,7 @@ class CollectorService( ) if (sinks.bad.isHealthy) { - sinkBad(badRow, partitionKey) + sinkBad(badRow, partitionKey) // ignore the result, which tells us if bad event was sunk successfully. HttpResponse(StatusCodes.OK) } else HttpResponse(StatusCodes.OK) // if bad sink is unhealthy, we don't want to try storing the bad rows } @@ -256,20 +259,24 @@ class CollectorService( e } - /** Produces the event to the configured sink. */ + /** Produces the event to the configured sink. + * @return whether the events were stored successfully + */ def sinkEvent( event: CollectorPayload, partitionKey: String - ): Unit = { + ): Boolean = { // Split events into Good and Bad val eventSplit = splitBatch.splitAndSerializePayload(event, sinks.good.MaxBytes) // Send events to respective sinks + sinks.bad.storeRawEvents(eventSplit.bad, partitionKey) && sinks.good.storeRawEvents(eventSplit.good, partitionKey) - sinks.bad.storeRawEvents(eventSplit.bad, partitionKey) } - /** Sinks a bad row generated by an illegal querystring. */ - def sinkBad(badRow: BadRow, partitionKey: String): Unit = { + /** Sinks a bad row generated by an illegal querystring. + * @return whether the events were stored successfully + */ + def sinkBad(badRow: BadRow, partitionKey: String): Boolean = { val toSink = List(badRow.compact.getBytes(UTF_8)) sinks.bad.storeRawEvents(toSink, partitionKey) } diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala index dbc64da40..58bace060 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala @@ -125,7 +125,13 @@ package model { ) extends SinkConfig final case class Nsq(host: String, port: Int) extends SinkConfig case object Stdout extends SinkConfig - final case class BufferConfig(byteLimit: Long, recordLimit: Long, timeLimit: Long) + final case class BufferConfig( + byteLimit: Long, + recordLimit: Long, + timeLimit: Long, + hardByteLimit: Option[Long], + enqueueTimeout: Long + ) final case class StreamsConfig( good: String, bad: String, diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/Sink.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/Sink.scala index 00a3912e6..0d424d6a7 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/Sink.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/Sink.scala @@ -19,8 +19,15 @@ package com.snowplowanalytics.snowplow.collectors.scalastream package sinks +import java.util.concurrent.{Semaphore, TimeUnit} + +import cats.Monad +import cats.implicits._ + import org.slf4j.LoggerFactory +import com.snowplowanalytics.snowplow.collectors.scalastream.model.{BufferConfig, CollectorSinks} + // Define an interface for all sinks to use to store events. trait Sink { @@ -30,5 +37,52 @@ trait Sink { lazy val log = LoggerFactory.getLogger(getClass()) def isHealthy: Boolean = true - def storeRawEvents(events: List[Array[Byte]], key: String): Unit + + /** Store the raw events in the output sink + * @param events The events to store + * @return whether the events were stored successfully + */ + def storeRawEvents(events: List[Array[Byte]], key: String): Boolean +} + +object Sink { + abstract class Throttled(throttler: Throttler) extends Sink { + + def storeRawEventsThrottled(events: List[Array[Byte]], key: String): Unit + + protected def onComplete(sunkBytes: Long): Unit = + throttler.release(sunkBytes) + + override def storeRawEvents(events: List[Array[Byte]], key: String): Boolean = { + val bytes = events.foldLeft(0L)(_ + _.size.toLong) + if (throttler.tryAcquire(bytes)) { + storeRawEventsThrottled(events, key) + true + } else + false + } + } + + case class Throttler(tryAcquire: Long => Boolean, release: Long => Unit) + + def throttled[F[_]: Monad]( + config: BufferConfig, + buildGood: Throttler => F[Sink], + buildBad: Throttler => F[Sink] + ): F[CollectorSinks] = { + val semaphore = new Semaphore(bytesToPermits(config.hardByteLimit.getOrElse(Runtime.getRuntime.maxMemory / 4))) + val throttler = Throttler( + b => semaphore.tryAcquire(bytesToPermits(b), config.enqueueTimeout, TimeUnit.MILLISECONDS), + b => semaphore.release(bytesToPermits(b)) + ) + for { + good <- buildGood(throttler) + bad <- buildBad(throttler) + } yield CollectorSinks(good, bad) + } + + // 1 permit corresponds to 64 bytes. + // Int.MaxValue permits corresponds to 127 GB, so we can accommodate any reasonable heap using a Semaphore. + private def bytesToPermits(bytes: Long): Int = + (bytes >> 6).toInt } diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestSink.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestSink.scala index 07e6d342b..f984990a0 100644 --- a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestSink.scala +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestSink.scala @@ -32,6 +32,8 @@ class TestSink extends Sink { // Effectively no limit to the record size override val MaxBytes = Int.MaxValue - override def storeRawEvents(events: List[Array[Byte]], key: String): Unit = + override def storeRawEvents(events: List[Array[Byte]], key: String): Boolean = { buf ++= events + true + } } diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestUtils.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestUtils.scala index 7cfdef7f6..eee77057f 100644 --- a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestUtils.scala +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestUtils.scala @@ -56,7 +56,7 @@ object TestUtils { sqsGoodBuffer = Some("good-buffer"), sqsBadBuffer = Some("bad-buffer") ), - buffer = BufferConfig(4000000L, 500L, 60000L) + buffer = BufferConfig(4000000L, 500L, 60000L, Some(8000000L), 10000L) ), telemetry = None, prometheusMetrics = PrometheusMetricsConfig(false, None), diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/config/ConfigSpec.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/config/ConfigSpec.scala index 6cd6c73d5..1ddc4bb32 100644 --- a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/config/ConfigSpec.scala +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/config/ConfigSpec.scala @@ -88,15 +88,19 @@ abstract class ConfigSpec extends Specification { buffer = if (app == "pubsub") BufferConfig( - byteLimit = 100000, - recordLimit = 40, - timeLimit = 1000 + byteLimit = 100000, + recordLimit = 40, + timeLimit = 1000, + hardByteLimit = None, + enqueueTimeout = 10000 ) else BufferConfig( - byteLimit = 3145728, - recordLimit = 500, - timeLimit = 5000 + byteLimit = 3145728, + recordLimit = 500, + timeLimit = 5000, + hardByteLimit = None, + enqueueTimeout = 10000 ), sink = sinkConfigRefFactory(app) ) diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SinkSpec.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SinkSpec.scala new file mode 100644 index 000000000..fbcf3e0b4 --- /dev/null +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SinkSpec.scala @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2013-2021 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, and + * you may not use this file except in compliance with the Apache License + * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the Apache License Version 2.0 is distributed on an "AS + * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.collectors.scalastream.sinks + +import cats.Id + +import com.snowplowanalytics.snowplow.collectors.scalastream.model.BufferConfig + +import scala.collection.mutable.ListBuffer + +import org.specs2.mutable.Specification + +class SinkSpec extends Specification { + + /** A sink that immediately calls the onComplete callback */ + class HealthySink(buf: ListBuffer[Array[Byte]], throttler: Sink.Throttler) extends Sink.Throttled(throttler) { + val MaxBytes = Int.MaxValue + override def storeRawEventsThrottled(events: List[Array[Byte]], key: String): Unit = { + buf ++= events + onComplete(events.foldLeft(0L)(_ + _.size)) + } + } + + /** A sink that buffers, but never calls the onComplete callback */ + class UnhealthySink(buf: ListBuffer[Array[Byte]], throttler: Sink.Throttler) extends Sink.Throttled(throttler) { + val MaxBytes = Int.MaxValue + override def storeRawEventsThrottled(events: List[Array[Byte]], key: String): Unit = + buf ++= events + } + + // A 64 byte event. + val testEvent = (1 to 64).map(_ => 'a'.toByte).toArray + + // A config that allows 2 * 64 testEvents + val config = BufferConfig( + byteLimit = 1000, + recordLimit = 1000, + timeLimit = 1000, + hardByteLimit = Some(128), + enqueueTimeout = 2 + ) + + "The throttled sink" should { + "immediately sink events to a healthy sink" in { + val buf = ListBuffer[Array[Byte]]() + val sink = Sink.throttled[Id](config, new HealthySink(buf, _), new HealthySink(buf, _)) + + val results = (1 to 10).toList.map { _ => + sink.good.storeRawEvents(List(testEvent), "key") + } + + results must contain(beTrue).foreach + + buf must have size 10 + } + + "something else" in { + val buf = ListBuffer[Array[Byte]]() + val sink = Sink.throttled[Id](config, new UnhealthySink(buf, _), new UnhealthySink(buf, _)) + + val results = (1 to 4).toList.map { _ => + sink.good.storeRawEvents(List(testEvent), "key") + } + + results must containTheSameElementsAs(Seq(true, true, false, false)) + + buf must have size 2 + } + } +} diff --git a/examples/config.kafka.extended.hocon b/examples/config.kafka.extended.hocon index fdfd42213..8796f8436 100644 --- a/examples/config.kafka.extended.hocon +++ b/examples/config.kafka.extended.hocon @@ -204,8 +204,7 @@ collector { } - # Incoming events are stored in a buffer before being sent to Kinesis/Kafka. - # Note: Buffering is not supported by NSQ. + # Incoming events are stored in a buffer before being sent to Kafka. # The buffer is emptied whenever: # - the number of stored records reaches record-limit or # - the combined size of the stored records reaches byte-limit or @@ -214,6 +213,16 @@ collector { byteLimit = 3145728 recordLimit = 500 timeLimit = 5000 + + # The limit on bytes held in memory before the collector throttles new http requests. + # This becomes relevant only if the collector is receiving events more quickly than the buffer + # can be emptied to the sink. + # The default (null) means use 1/4 of the maximum heap size. + hardByteLimit = null + + # When the hardByteLimit is reached, the collector waits to enqueue incoming events to the buffer. + # If they are not enqueued within the timeout in milliseconds, then the collector returns a 503. + enqueueTimeout = 10000 } } # Telemetry sends heartbeat events to external pipeline. diff --git a/examples/config.kinesis.extended.hocon b/examples/config.kinesis.extended.hocon index 86509f787..fa557f407 100644 --- a/examples/config.kinesis.extended.hocon +++ b/examples/config.kinesis.extended.hocon @@ -222,8 +222,7 @@ collector { } } - # Incoming events are stored in a buffer before being sent to Kinesis/Kafka. - # Note: Buffering is not supported by NSQ. + # Incoming events are stored in a buffer before being sent to Kinesis # The buffer is emptied whenever: # - the number of stored records reaches record-limit or # - the combined size of the stored records reaches byte-limit or @@ -232,6 +231,16 @@ collector { byteLimit = 3145728 recordLimit = 500 timeLimit = 5000 + + # The limit on bytes held in memory before the collector throttles new http requests. + # This becomes relevant only if the collector is receiving events more quickly than the buffer + # can be emptied to the sink. + # The default (null) means use 1/4 of the maximum heap size. + hardByteLimit = null + + # When the hardByteLimit is reached, the collector waits to enqueue incoming events to the buffer. + # If they are not enqueued within the timeout in milliseconds, then the collector returns a 503. + enqueueTimeout = 10000 } } diff --git a/examples/config.pubsub.extended.hocon b/examples/config.pubsub.extended.hocon index 5a997921a..5d959ce31 100644 --- a/examples/config.pubsub.extended.hocon +++ b/examples/config.pubsub.extended.hocon @@ -199,8 +199,7 @@ collector { } - # Incoming events are stored in a buffer before being sent to Kinesis/Kafka. - # Note: Buffering is not supported by NSQ. + # Incoming events are stored in a buffer before being sent to PubSub # The buffer is emptied whenever: # - the number of stored records reaches record-limit or # - the combined size of the stored records reaches byte-limit or @@ -209,6 +208,16 @@ collector { byteLimit = 100000 recordLimit = 40 timeLimit = 1000 + + # The limit on bytes held in memory before the collector throttles new http requests. + # This becomes relevant only if the collector is receiving events more quickly than the buffer + # can be emptied to the sink. + # The default (null) means use 1/4 of the maximum heap size. + hardByteLimit = null + + # When the hardByteLimit is reached, the collector waits to enqueue incoming events to the buffer. + # If they are not enqueued within the timeout in milliseconds, then the collector returns a 503. + enqueueTimeout = 10000 } } # Telemetry sends heartbeat events to external pipeline. diff --git a/examples/config.sqs.extended.hocon b/examples/config.sqs.extended.hocon index ca81a790d..81055a759 100644 --- a/examples/config.sqs.extended.hocon +++ b/examples/config.sqs.extended.hocon @@ -212,8 +212,7 @@ collector { } } - # Incoming events are stored in a buffer before being sent to Kinesis/Kafka. - # Note: Buffering is not supported by NSQ. + # Incoming events are stored in a buffer before being sent to SQS # The buffer is emptied whenever: # - the number of stored records reaches record-limit or # - the combined size of the stored records reaches byte-limit or @@ -222,6 +221,16 @@ collector { byteLimit = 3145728 recordLimit = 500 timeLimit = 5000 + + # The limit on bytes held in memory before the collector throttles new http requests. + # This becomes relevant only if the collector is receiving events more quickly than the buffer + # can be emptied to the sink. + # The default (null) means use 1/4 of the maximum heap size. + hardByteLimit = null + + # When the hardByteLimit is reached, the collector waits to enqueue incoming events to the buffer. + # If they are not enqueued within the timeout in milliseconds, then the collector returns a 503. + enqueueTimeout = 10000 } } # Telemetry sends heartbeat events to external pipeline. diff --git a/examples/config.stdout.extended.hocon b/examples/config.stdout.extended.hocon index cb3364929..5b40bde31 100644 --- a/examples/config.stdout.extended.hocon +++ b/examples/config.stdout.extended.hocon @@ -187,18 +187,6 @@ collector { enabled = stdout enabled = ${?COLLECTOR_STREAMS_SINK_ENABLED} } - - # Incoming events are stored in a buffer before being sent to Kinesis/Kafka. - # Note: Buffering is not supported by NSQ. - # The buffer is emptied whenever: - # - the number of stored records reaches record-limit or - # - the combined size of the stored records reaches byte-limit or - # - the time in milliseconds since the buffer was last emptied reaches time-limit - buffer { - byteLimit = 3145728 - recordLimit = 500 - timeLimit = 5000 - } } # Telemetry sends heartbeat events to external pipeline. # Unless disable parameter set to true, this feature will be enabled. Deleting whole section will not disable it. diff --git a/kafka/src/main/resources/application.conf b/kafka/src/main/resources/application.conf index d3bdd4a87..0b55692c7 100644 --- a/kafka/src/main/resources/application.conf +++ b/kafka/src/main/resources/application.conf @@ -10,6 +10,7 @@ collector { byteLimit = 3145728 recordLimit = 500 timeLimit = 5000 + enqueueTimeout = 10000 } } } diff --git a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala index d29c65848..93bc19c20 100644 --- a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala +++ b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala @@ -14,8 +14,9 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream +import cats.Id import com.snowplowanalytics.snowplow.collectors.scalastream.model._ -import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KafkaSink +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.{KafkaSink, Sink} import com.snowplowanalytics.snowplow.collectors.scalastream.telemetry.TelemetryAkkaService import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo @@ -31,12 +32,15 @@ object KafkaCollector extends Collector { val goodStream = collectorConf.streams.good val badStream = collectorConf.streams.bad val bufferConf = collectorConf.streams.buffer - val (good, bad) = collectorConf.streams.sink match { + collectorConf.streams.sink match { case kc: Kafka => - (new KafkaSink(kc, bufferConf, goodStream), new KafkaSink(kc, bufferConf, badStream)) + Sink.throttled[Id]( + collectorConf.streams.buffer, + new KafkaSink(kc, bufferConf, goodStream, _), + new KafkaSink(kc, bufferConf, badStream, _) + ) case _ => throw new IllegalArgumentException("Configured sink is not Kafka") } - CollectorSinks(good, bad) } run(collectorConf, akkaConf, sinks, telemetry) } diff --git a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala index e073748d9..fdde1f9a5 100644 --- a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala +++ b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala @@ -25,8 +25,9 @@ import com.snowplowanalytics.snowplow.collectors.scalastream.model._ class KafkaSink( kafkaConfig: Kafka, bufferConfig: BufferConfig, - topicName: String -) extends Sink { + topicName: String, + throttler: Sink.Throttler +) extends Sink.Throttled(throttler) { // Records must not exceed MaxBytes - 1MB override val MaxBytes = 1000000 @@ -64,7 +65,7 @@ class KafkaSink( * @param events The list of events to send * @param key The partition key to use */ - override def storeRawEvents(events: List[Array[Byte]], key: String): Unit = { + override def storeRawEventsThrottled(events: List[Array[Byte]], key: String): Unit = { log.debug(s"Writing ${events.size} Thrift records to Kafka topic $topicName at key $key") events.foreach { event => kafkaProducer.send( @@ -72,6 +73,7 @@ class KafkaSink( new Callback { override def onCompletion(metadata: RecordMetadata, e: Exception): Unit = if (e != null) log.error(s"Sending event failed: ${e.getMessage}") + onComplete(events.size.toLong) } ) } diff --git a/kinesis/src/main/resources/application.conf b/kinesis/src/main/resources/application.conf index b9fc90ce3..23ec272b5 100644 --- a/kinesis/src/main/resources/application.conf +++ b/kinesis/src/main/resources/application.conf @@ -19,6 +19,7 @@ collector { byteLimit = 3145728 recordLimit = 500 timeLimit = 5000 + enqueueTimeout = 10000 } } } diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala index 2041b783e..446ee120f 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala @@ -18,7 +18,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor import cats.syntax.either._ import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo import com.snowplowanalytics.snowplow.collectors.scalastream.model._ -import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.{KinesisSink, Sink} import com.snowplowanalytics.snowplow.collectors.scalastream.telemetry.TelemetryAkkaService object KinesisCollector extends Collector { @@ -26,6 +26,8 @@ object KinesisCollector extends Collector { def appVersion = BuildInfo.version def scalaVersion = BuildInfo.scalaVersion + type ThrowableOr[A] = Either[Throwable, A] + def main(args: Array[String]): Unit = { val (collectorConf, akkaConf) = parseConfig(args) val telemetry = TelemetryAkkaService.initWithCollector(collectorConf, appName, appVersion) @@ -40,16 +42,20 @@ object KinesisCollector extends Collector { bufferConf = collectorConf.streams.buffer sqsGood = kc.sqsGoodBuffer sqsBad = kc.sqsBadBuffer - good <- KinesisSink.createAndInitialize( - kc, - bufferConf, - goodStream, - sqsGood, - collectorConf.enableStartupChecks, - es + sinks <- Sink.throttled[ThrowableOr]( + collectorConf.streams.buffer, + KinesisSink.createAndInitialize( + kc, + bufferConf, + goodStream, + sqsGood, + collectorConf.enableStartupChecks, + es, + _ + ), + KinesisSink.createAndInitialize(kc, bufferConf, badStream, sqsBad, collectorConf.enableStartupChecks, es, _) ) - bad <- KinesisSink.createAndInitialize(kc, bufferConf, badStream, sqsBad, collectorConf.enableStartupChecks, es) - } yield CollectorSinks(good, bad) + } yield sinks sinks match { case Right(s) => run(collectorConf, akkaConf, s, telemetry) diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala index 6430bebf4..b71bd005c 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala @@ -47,8 +47,9 @@ class KinesisSink private ( bufferConfig: BufferConfig, streamName: String, executorService: ScheduledExecutorService, - maybeSqs: Option[SqsClientAndName] -) extends Sink { + maybeSqs: Option[SqsClientAndName], + throttler: Sink.Throttler +) extends Sink.Throttled(throttler) { import KinesisSink._ log.info("Creating thread pool of size " + kinesisConfig.threadPoolSize) @@ -91,7 +92,7 @@ class KinesisSink private ( @volatile private var outage: Boolean = false override def isHealthy: Boolean = !outage - override def storeRawEvents(events: List[Array[Byte]], key: String): Unit = + override def storeRawEventsThrottled(events: List[Array[Byte]], key: String): Unit = events.foreach(e => EventStorage.store(e, key)) object EventStorage { @@ -181,8 +182,8 @@ class KinesisSink private ( } } } - val results = s.getRecords.asScala.toList - val failurePairs = batch.zip(results).filter(_._2.getErrorMessage != null) + val results = s.getRecords.asScala.toList + val (failurePairs, successPairs) = batch.zip(results).partition(_._2.getErrorMessage != null) log.info( s"Successfully wrote ${batch.size - failurePairs.size} out of ${batch.size} records to Kinesis stream $streamName." ) @@ -202,6 +203,8 @@ class KinesisSink private ( log.error(errorMessage) scheduleWrite(failures, target, nextBackoff)(retriesLeft - 1) } + val writtenBytes = successPairs.foldLeft(0L)(_ + _._1.payloads.size.toLong) + onComplete(writtenBytes) case Failure(f) => log.error("Writing failed with error:", f) @@ -292,9 +295,14 @@ class KinesisSink private ( } .toMap // Events to retry and reasons for failure - msgGroup.collect { - case (e, m) if failures.contains(m.getId) => - (e, failures(m.getId)) + msgGroup.flatMap { + case (e, m) => + failures.get(m.getId) match { + case Some(failure) => Some((e, failure)) + case None => + onComplete(e.payloads.size.toLong) + None + } } } } @@ -391,7 +399,8 @@ object KinesisSink { streamName: String, sqsBufferName: Option[String], enableStartupChecks: Boolean, - executorService: ScheduledExecutorService + executorService: ScheduledExecutorService, + throttler: Sink.Throttler ): Either[Throwable, KinesisSink] = { val clients = for { provider <- getProvider(kinesisConfig.aws) @@ -409,7 +418,8 @@ object KinesisSink { bufferConfig, streamName, executorService, - sqsClientAndName + sqsClientAndName, + throttler ) ks.EventStorage.scheduleFlush() diff --git a/nsq/src/main/resources/application.conf b/nsq/src/main/resources/application.conf index 89d998cec..2dc5dc6aa 100644 --- a/nsq/src/main/resources/application.conf +++ b/nsq/src/main/resources/application.conf @@ -10,6 +10,7 @@ collector { byteLimit = 3145728 recordLimit = 500 timeLimit = 5000 + enqueueTimeout = 10000 } } } diff --git a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala index 7c4f6c7ab..9497afd48 100644 --- a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala +++ b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala @@ -39,6 +39,8 @@ class NsqSink(nsqConfig: Nsq, topicName: String) extends Sink { * @param events The list of events to send * @param key The partition key (unused) */ - override def storeRawEvents(events: List[Array[Byte]], key: String): Unit = + override def storeRawEvents(events: List[Array[Byte]], key: String): Boolean = { producer.produceMulti(topicName, events.asJava) + true + } } diff --git a/pubsub/src/main/resources/application.conf b/pubsub/src/main/resources/application.conf index bda43f9d6..ab8277699 100644 --- a/pubsub/src/main/resources/application.conf +++ b/pubsub/src/main/resources/application.conf @@ -16,6 +16,7 @@ collector { byteLimit = 100000 recordLimit = 40 timeLimit = 1000 + enqueueTimeout = 10000 } } } diff --git a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/GooglePubSubCollector.scala b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/GooglePubSubCollector.scala index 0e28f503b..a9fff5a5f 100644 --- a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/GooglePubSubCollector.scala +++ b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/GooglePubSubCollector.scala @@ -17,7 +17,7 @@ package com.snowplowanalytics.snowplow.collectors.scalastream import cats.syntax.either._ import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo import com.snowplowanalytics.snowplow.collectors.scalastream.model._ -import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.GooglePubSubSink +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.{GooglePubSubSink, Sink} import com.snowplowanalytics.snowplow.collectors.scalastream.telemetry.TelemetryAkkaService object GooglePubSubCollector extends Collector { @@ -25,6 +25,8 @@ object GooglePubSubCollector extends Collector { def appVersion = BuildInfo.version def scalaVersion = BuildInfo.scalaVersion + type ThrowableOr[A] = Either[Throwable, A] + def main(args: Array[String]): Unit = { val (collectorConf, akkaConf) = parseConfig(args) val telemetry = TelemetryAkkaService.initWithCollector(collectorConf, appName, appVersion) @@ -36,9 +38,12 @@ object GooglePubSubCollector extends Collector { goodStream = collectorConf.streams.good badStream = collectorConf.streams.bad bufferConf = collectorConf.streams.buffer - good <- GooglePubSubSink.createAndInitialize(pc, bufferConf, goodStream, collectorConf.enableStartupChecks) - bad <- GooglePubSubSink.createAndInitialize(pc, bufferConf, badStream, collectorConf.enableStartupChecks) - } yield CollectorSinks(good, bad) + sinks <- Sink.throttled[ThrowableOr]( + collectorConf.streams.buffer, + GooglePubSubSink.createAndInitialize(pc, bufferConf, goodStream, collectorConf.enableStartupChecks, _), + GooglePubSubSink.createAndInitialize(pc, bufferConf, badStream, collectorConf.enableStartupChecks, _) + ) + } yield sinks sinks match { case Right(s) => run(collectorConf, akkaConf, s, telemetry) diff --git a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GooglePubSubSink.scala b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GooglePubSubSink.scala index 0fce5f280..4450985cf 100644 --- a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GooglePubSubSink.scala +++ b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GooglePubSubSink.scala @@ -33,7 +33,8 @@ import com.snowplowanalytics.snowplow.collectors.scalastream.model._ /** * Google PubSub Sink for the Scala Stream Collector */ -class GooglePubSubSink private (publisher: Publisher, topicName: String) extends Sink { +class GooglePubSubSink private (publisher: Publisher, topicName: String, throttler: Sink.Throttler) + extends Sink.Throttled(throttler) { private val logExecutor = Executors.newSingleThreadExecutor() // maximum size of a pubsub message is 10MB @@ -48,34 +49,34 @@ class GooglePubSubSink private (publisher: Publisher, topicName: String) extends * @param events The list of events to send * @param key Not used. */ - override def storeRawEvents(events: List[Array[Byte]], key: String): Unit = { + override def storeRawEventsThrottled(events: List[Array[Byte]], key: String): Unit = { if (events.nonEmpty) log.debug(s"Writing ${events.size} Thrift records to Google PubSub topic $topicName.") events.foreach { event => - publisher.asRight.map { p => - val future = p.publish(eventToPubsubMessage(event)) - ApiFutures.addCallback( - future, - new ApiFutureCallback[String]() { - override def onSuccess(messageId: String): Unit = { - outage = false - log.debug(s"Successfully published event with id $messageId to $topicName.") + val future = publisher.publish(eventToPubsubMessage(event)) + ApiFutures.addCallback( + future, + new ApiFutureCallback[String]() { + override def onSuccess(messageId: String): Unit = { + outage = false + log.debug(s"Successfully published event with id $messageId to $topicName.") + onComplete(event.size.toLong) + } + + override def onFailure(throwable: Throwable): Unit = { + outage = true + throwable match { + case apiEx: ApiException => + log.error( + s"Publishing message to $topicName failed with code ${apiEx.getStatusCode}: ${apiEx.getMessage} This error is retryable: ${apiEx.isRetryable}." + ) + case t => log.error(s"Publishing message to $topicName failed with ${t.getMessage}.") } - - override def onFailure(throwable: Throwable): Unit = { - outage = true - throwable match { - case apiEx: ApiException => - log.error( - s"Publishing message to $topicName failed with code ${apiEx.getStatusCode}: ${apiEx.getMessage} This error is retryable: ${apiEx.isRetryable}." - ) - case t => log.error(s"Publishing message to $topicName failed with ${t.getMessage}.") - } - } - }, - logExecutor - ) - } + onComplete(event.size.toLong) + } + }, + logExecutor + ) } } @@ -94,7 +95,8 @@ object GooglePubSubSink { googlePubSubConfig: GooglePubSub, bufferConfig: BufferConfig, topicName: String, - enableStartupChecks: Boolean + enableStartupChecks: Boolean, + throttler: Sink.Throttler ): Either[Throwable, GooglePubSubSink] = for { batching <- batchingSettings(bufferConfig).asRight @@ -104,7 +106,7 @@ object GooglePubSubSink { if (b) ().asRight else new IllegalArgumentException(s"Google PubSub topic $topicName doesn't exist").asLeft } else ().asRight - } yield new GooglePubSubSink(publisher, topicName) + } yield new GooglePubSubSink(publisher, topicName, throttler) private val UserAgent = s"snowplow/stream-collector-${generated.BuildInfo.version}" diff --git a/sqs/src/main/resources/application.conf b/sqs/src/main/resources/application.conf index f97c45121..e877b15be 100644 --- a/sqs/src/main/resources/application.conf +++ b/sqs/src/main/resources/application.conf @@ -19,6 +19,7 @@ collector { byteLimit = 3145728 recordLimit = 500 timeLimit = 5000 + enqueueTimeout = 10000 } } } diff --git a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala index 461e7bc16..772d0884b 100644 --- a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala +++ b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala @@ -18,7 +18,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor import cats.syntax.either._ import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo import com.snowplowanalytics.snowplow.collectors.scalastream.model._ -import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.SqsSink +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.{Sink, SqsSink} import com.snowplowanalytics.snowplow.collectors.scalastream.telemetry.TelemetryAkkaService object SqsCollector extends Collector { @@ -26,6 +26,8 @@ object SqsCollector extends Collector { def appVersion = BuildInfo.version def scalaVersion = BuildInfo.scalaVersion + type ThrowableOr[A] = Either[Throwable, A] + def main(args: Array[String]): Unit = { val (collectorConf, akkaConf) = parseConfig(args) val telemetry = TelemetryAkkaService.initWithCollector(collectorConf, appName, appVersion) @@ -38,9 +40,12 @@ object SqsCollector extends Collector { goodQueue = collectorConf.streams.good badQueue = collectorConf.streams.bad bufferConf = collectorConf.streams.buffer - good <- SqsSink.createAndInitialize(sqs, bufferConf, goodQueue, collectorConf.enableStartupChecks, es) - bad <- SqsSink.createAndInitialize(sqs, bufferConf, badQueue, collectorConf.enableStartupChecks, es) - } yield CollectorSinks(good, bad) + sinks <- Sink.throttled[ThrowableOr]( + collectorConf.streams.buffer, + SqsSink.createAndInitialize(sqs, bufferConf, goodQueue, collectorConf.enableStartupChecks, es, _), + SqsSink.createAndInitialize(sqs, bufferConf, badQueue, collectorConf.enableStartupChecks, es, _) + ) + } yield sinks sinks match { case Right(s) => run(collectorConf, akkaConf, s, telemetry) diff --git a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala index d241f1fdf..59aa519e6 100644 --- a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala +++ b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala @@ -49,8 +49,9 @@ class SqsSink private ( sqsConfig: Sqs, bufferConfig: BufferConfig, queueName: String, - executorService: ScheduledExecutorService -) extends Sink { + executorService: ScheduledExecutorService, + throttler: Sink.Throttler +) extends Sink.Throttled(throttler) { import SqsSink._ // Records must not exceed 256K when writing to SQS. @@ -75,7 +76,7 @@ class SqsSink private ( @volatile private var outage: Boolean = false override def isHealthy: Boolean = !outage - override def storeRawEvents(events: List[Array[Byte]], key: String): Unit = + override def storeRawEventsThrottled(events: List[Array[Byte]], key: String): Unit = events.foreach(e => EventStorage.store(e, key)) object EventStorage { @@ -186,10 +187,16 @@ class SqsSink private ( (bree.getId, BatchResultErrorInfo(bree.getCode, bree.getMessage)) } .toMap + // Events to retry and reasons for failure - msgGroup.collect { - case (e, m) if failures.contains(m.getId) => - (e, failures(m.getId)) + msgGroup.flatMap { + case (e, m) => + failures.get(m.getId) match { + case Some(failure) => Some((e, failure)) + case None => + onComplete(e.payloads.size.toLong) + None + } } } .toList @@ -269,7 +276,8 @@ object SqsSink { bufferConfig: BufferConfig, queueName: String, enableStartupChecks: Boolean, - executorService: ScheduledExecutorService + executorService: ScheduledExecutorService, + throttler: Sink.Throttler ): Either[Throwable, SqsSink] = { val client = for { provider <- getProvider(sqsConfig.aws) @@ -278,7 +286,7 @@ object SqsSink { } yield client client.map { c => - val sqsSink = new SqsSink(c, sqsConfig, bufferConfig, queueName, executorService) + val sqsSink = new SqsSink(c, sqsConfig, bufferConfig, queueName, executorService, throttler) sqsSink.EventStorage.scheduleFlush() // When the application is shut down try to send all stored events. diff --git a/stdout/src/main/resources/application.conf b/stdout/src/main/resources/application.conf index 6c0cdfce5..b5a41bb20 100644 --- a/stdout/src/main/resources/application.conf +++ b/stdout/src/main/resources/application.conf @@ -10,6 +10,7 @@ collector { byteLimit = 3145728 recordLimit = 500 timeLimit = 5000 + enqueueTimeout = 10000 } } } @@ -29,4 +30,4 @@ akka { max-connections = 2048 } -} \ No newline at end of file +} diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/StdoutSink.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/StdoutSink.scala index 8dde23da1..23ce3ae1a 100644 --- a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/StdoutSink.scala +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/StdoutSink.scala @@ -25,7 +25,7 @@ class StdoutSink(streamName: String) extends Sink { override val MaxBytes = Int.MaxValue // Print a Base64-encoded event. - override def storeRawEvents(events: List[Array[Byte]], key: String): Unit = + override def storeRawEvents(events: List[Array[Byte]], key: String): Boolean = { streamName match { case "out" => events.foreach { e => @@ -36,4 +36,6 @@ class StdoutSink(streamName: String) extends Sink { Console.err.println(Base64.encodeBase64String(e)) } } + true + } }