From 49bf3048906bcdfcd95e6aa8cb0594f8d25da1a7 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Fri, 3 Dec 2021 20:45:35 +0000 Subject: [PATCH] CollectorServiceSpec should test number of events written to sink (close #188) --- .../CollectorRoute.scala | 6 +- .../CollectorService.scala | 41 +++--- .../sinks/Sink.scala | 2 +- .../CollectorRouteSpec.scala | 2 +- .../CollectorServiceSpec.scala | 121 ++++++++++++------ .../TestSink.scala | 8 +- .../sinks/KafkaSink.scala | 3 +- .../sinks/KinesisSink.scala | 4 +- .../sinks/NsqSink.scala | 4 +- .../sinks/GooglePubSubSink.scala | 3 +- .../sinks/SqsSink.scala | 4 +- .../sinks/StdoutSink.scala | 4 +- 12 files changed, 116 insertions(+), 86 deletions(-) diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala index d7fa30c7f..4c6a9ef8b 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala @@ -62,7 +62,7 @@ trait CollectorRoute { post { extractContentType { ct => entity(as[String]) { body => - val (r, _) = collectorService.cookie( + val r = collectorService.cookie( qs, Some(body), path, @@ -83,7 +83,7 @@ trait CollectorRoute { } } ~ (get | head) { - val (r, _) = collectorService.cookie( + val r = collectorService.cookie( qs, None, path, @@ -104,7 +104,7 @@ trait CollectorRoute { } ~ path("""ice\.png""".r | "i".r) { path => (get | head) { - val (r, _) = collectorService.cookie( + val r = collectorService.cookie( qs, None, "/" + path, diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala index 32cdaf6da..3851deeb3 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala @@ -55,7 +55,7 @@ trait Service { doNotTrack: Boolean, contentType: Option[ContentType] = None, spAnonymous: Option[String] = None - ): (HttpResponse, List[Array[Byte]]) + ): HttpResponse def cookieName: Option[String] def doNotTrackCookie: Option[DntCookieMatcher] def determinePath(vendor: String, version: String): String @@ -111,7 +111,7 @@ class CollectorService( doNotTrack: Boolean, contentType: Option[ContentType] = None, spAnonymous: Option[String] - ): (HttpResponse, List[Array[Byte]]) = { + ): HttpResponse = { val (ipAddress, partitionKey) = ipAndPartitionKey(ip, config.streams.useIpAddressAsPartitionKey) extractQueryParams(queryString) match { @@ -144,7 +144,7 @@ class CollectorService( spAnonymous ) // we don't store events in case we're bouncing - val sinkResponses = if (!bounce && !doNotTrack) sinkEvent(event, partitionKey) else Nil + if (!bounce && !doNotTrack) sinkEvent(event, partitionKey) val headers = bounceLocationHeader(params, request, config.cookieBounce, bounce) ++ cookieHeader(request, config.cookieConfig, nuid, doNotTrack, spAnonymous) ++ @@ -155,10 +155,7 @@ class CollectorService( `Access-Control-Allow-Credentials`(true) ) - val (httpResponse, badRedirectResponses) = - buildHttpResponse(event, params, headers.toList, redirect, pixelExpected, bounce, config.redirectMacro) - - (httpResponse, badRedirectResponses ++ sinkResponses) + buildHttpResponse(event, params, headers.toList, redirect, pixelExpected, bounce, config.redirectMacro) case Left(error) => val badRow = BadRow.GenericError( @@ -167,8 +164,10 @@ class CollectorService( Payload.RawPayload(queryString.getOrElse("")) ) - if (sinks.bad.isHealthy) (HttpResponse(StatusCodes.OK), sinkBad(badRow, partitionKey)) - else (HttpResponse(StatusCodes.OK), Nil) // if bad sink is unhealthy, we don't want to try storing the bad rows + if (sinks.bad.isHealthy) { + sinkBad(badRow, partitionKey) + HttpResponse(StatusCodes.OK) + } else HttpResponse(StatusCodes.OK) // if bad sink is unhealthy, we don't want to try storing the bad rows } } @@ -261,18 +260,16 @@ class CollectorService( def sinkEvent( event: CollectorPayload, partitionKey: String - ): List[Array[Byte]] = { + ): Unit = { // Split events into Good and Bad val eventSplit = splitBatch.splitAndSerializePayload(event, sinks.good.MaxBytes) // Send events to respective sinks - val sinkResponseGood = sinks.good.storeRawEvents(eventSplit.good, partitionKey) - val sinkResponseBad = sinks.bad.storeRawEvents(eventSplit.bad, partitionKey) - // Sink Responses for Test Sink - sinkResponseGood ++ sinkResponseBad + sinks.good.storeRawEvents(eventSplit.good, partitionKey) + sinks.bad.storeRawEvents(eventSplit.bad, partitionKey) } /** Sinks a bad row generated by an illegal querystring. */ - def sinkBad(badRow: BadRow, partitionKey: String): List[Array[Byte]] = { + def sinkBad(badRow: BadRow, partitionKey: String): Unit = { val toSink = List(badRow.compact.getBytes(UTF_8)) sinks.bad.storeRawEvents(toSink, partitionKey) } @@ -286,12 +283,12 @@ class CollectorService( pixelExpected: Boolean, bounce: Boolean, redirectMacroConfig: RedirectMacroConfig - ): (HttpResponse, List[Array[Byte]]) = + ): HttpResponse = if (redirect) { - val (r, l) = buildRedirectHttpResponse(event, queryParams, redirectMacroConfig) - (r.withHeaders(r.headers ++ headers), l) + val r = buildRedirectHttpResponse(event, queryParams, redirectMacroConfig) + r.withHeaders(r.headers ++ headers) } else { - (buildUsualHttpResponse(pixelExpected, bounce).withHeaders(headers), Nil) + buildUsualHttpResponse(pixelExpected, bounce).withHeaders(headers) } /** Builds the appropriate http response when not dealing with click redirects. */ @@ -311,7 +308,7 @@ class CollectorService( event: CollectorPayload, queryParams: Map[String, String], redirectMacroConfig: RedirectMacroConfig - ): (HttpResponse, List[Array[Byte]]) = + ): HttpResponse = queryParams.get("u") match { case Some(target) => val canReplace = redirectMacroConfig.enabled && event.isSetNetworkUserId @@ -319,8 +316,8 @@ class CollectorService( val replacedTarget = if (canReplace) target.replaceAllLiterally(token, event.networkUserId) else target - (HttpResponse(StatusCodes.Found).withHeaders(`RawHeader`("Location", replacedTarget)), Nil) - case None => (HttpResponse(StatusCodes.BadRequest), Nil) + HttpResponse(StatusCodes.Found).withHeaders(`RawHeader`("Location", replacedTarget)) + case None => HttpResponse(StatusCodes.BadRequest) } /** diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/Sink.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/Sink.scala index 54d7e7578..c7f5e5c1b 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/Sink.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/Sink.scala @@ -30,6 +30,6 @@ trait Sink { lazy val log = LoggerFactory.getLogger(getClass()) def isHealthy: Boolean = true - def storeRawEvents(events: List[Array[Byte]], key: String): List[Array[Byte]] + def storeRawEvents(events: List[Array[Byte]], key: String): Unit def shutdown(): Unit } diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRouteSpec.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRouteSpec.scala index 242a7b844..dc914fe3b 100644 --- a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRouteSpec.scala +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRouteSpec.scala @@ -47,7 +47,7 @@ class CollectorRouteSpec extends Specification with Specs2RouteTest { doNotTrack: Boolean, contentType: Option[ContentType] = None, spAnonymous: Option[String] = spAnonymous - ): (HttpResponse, List[Array[Byte]]) = (HttpResponse(200, entity = s"cookie"), List.empty) + ): HttpResponse = HttpResponse(200, entity = s"cookie") def cookieName: Option[String] = Some("name") def doNotTrackCookie: Option[DntCookieMatcher] = None def determinePath(vendor: String, version: String): String = "/p1/p2" diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala index ae664c526..4febdf637 100644 --- a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala @@ -38,18 +38,37 @@ import com.snowplowanalytics.snowplow.collectors.scalastream.model._ import org.specs2.mutable.Specification class CollectorServiceSpec extends Specification { + case class ProbeService(service: CollectorService, good: TestSink, bad: TestSink) + val service = new CollectorService( TestUtils.testConf, CollectorSinks(new TestSink, new TestSink), "app", "version" ) - val bouncingService = new CollectorService( - TestUtils.testConf.copy(cookieBounce = TestUtils.testConf.cookieBounce.copy(enabled = true)), - CollectorSinks(new TestSink, new TestSink), - "app", - "version" - ) + + def probeService(): ProbeService = { + val good = new TestSink + val bad = new TestSink + val s = new CollectorService( + TestUtils.testConf, + CollectorSinks(good, bad), + "app", + "version" + ) + ProbeService(s, good, bad) + } + def bouncingService(): ProbeService = { + val good = new TestSink + val bad = new TestSink + val s = new CollectorService( + TestUtils.testConf.copy(cookieBounce = TestUtils.testConf.cookieBounce.copy(enabled = true)), + CollectorSinks(good, bad), + "app", + "version" + ) + ProbeService(s, good, bad) + } val uuidRegex = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}".r val event = new CollectorPayload("iglu-schema", "ip", System.currentTimeMillis, "UTF-8", "collector") val hs = List(`Raw-Request-URI`("uri"), `X-Forwarded-For`(RemoteAddress(InetAddress.getByName("127.0.0.1")))) @@ -59,7 +78,8 @@ class CollectorServiceSpec extends Specification { "The collector service" should { "cookie" in { "attach p3p headers" in { - val (r, l) = service.cookie( + val ProbeService(s, good, bad) = probeService() + val r = s.cookie( Some("nuid=12"), Some("b"), "p", @@ -82,10 +102,12 @@ class CollectorServiceSpec extends Specification { r.headers must contain(`Access-Control-Allow-Origin`(HttpOriginRange.`*`)) r.headers must contain(`Access-Control-Allow-Credentials`(true)) r.headers.filter(_.toString.startsWith("Set-Cookie")) must have size 1 - l must have size 1 + good.storedRawEvents must have size 1 + bad.storedRawEvents must have size 0 } "not store stuff and provide no cookie if do not track is on" in { - val (r, l) = service.cookie( + val ProbeService(s, good, bad) = probeService() + val r = s.cookie( Some("nuid=12"), Some("b"), "p", @@ -107,10 +129,11 @@ class CollectorServiceSpec extends Specification { ) r.headers must contain(`Access-Control-Allow-Origin`(HttpOriginRange.`*`)) r.headers must contain(`Access-Control-Allow-Credentials`(true)) - l must have size 0 + good.storedRawEvents must have size 0 + bad.storedRawEvents must have size 0 } "not set a cookie if SP-Anonymous is present" in { - val (r, _) = service.cookie( + val r = service.cookie( Some("nuid=12"), Some("b"), "p", @@ -129,7 +152,8 @@ class CollectorServiceSpec extends Specification { r.headers.filter(_.toString.startsWith("Set-Cookie")) must have size 0 } "not set a network_userid from cookie if SP-Anonymous is present" in { - val (_, l) = service.cookie( + val ProbeService(s, good, bad) = probeService() + s.cookie( None, Some("b"), "p", @@ -144,13 +168,15 @@ class CollectorServiceSpec extends Specification { None, Some("*") ) - l must have size 1 + good.storedRawEvents must have size 1 + bad.storedRawEvents must have size 0 val newEvent = new CollectorPayload("iglu-schema", "ip", System.currentTimeMillis, "UTF-8", "collector") - deserializer.deserialize(newEvent, l.head) + deserializer.deserialize(newEvent, good.storedRawEvents.head) newEvent.networkUserId shouldEqual "00000000-0000-0000-0000-000000000000" } "network_userid from cookie should persist if SP-Anonymous is not present" in { - val (_, l) = service.cookie( + val ProbeService(s, good, bad) = probeService() + s.cookie( None, Some("b"), "p", @@ -165,13 +191,15 @@ class CollectorServiceSpec extends Specification { None, None ) - l must have size 1 + good.storedRawEvents must have size 1 + bad.storedRawEvents must have size 0 val newEvent = new CollectorPayload("iglu-schema", "ip", System.currentTimeMillis, "UTF-8", "collector") - deserializer.deserialize(newEvent, l.head) + deserializer.deserialize(newEvent, good.storedRawEvents.head) newEvent.networkUserId shouldEqual "cookie-nuid" } "not store stuff if bouncing and provide a location header" in { - val (r, l) = bouncingService.cookie( + val ProbeService(s, good, bad) = bouncingService() + val r = s.cookie( None, Some("b"), "p", @@ -187,10 +215,12 @@ class CollectorServiceSpec extends Specification { r.headers must have size 6 r.headers must contain(`Location`("/?bounce=true")) r.headers must contain(`Cache-Control`(`no-cache`, `no-store`, `must-revalidate`)) - l must have size 0 + good.storedRawEvents must have size 0 + bad.storedRawEvents must have size 0 } "store stuff if having already bounced with the fallback nuid" in { - val (r, l) = bouncingService.cookie( + val ProbeService(s, good, bad) = bouncingService() + val r = s.cookie( Some("bounce=true"), Some("b"), "p", @@ -205,13 +235,15 @@ class CollectorServiceSpec extends Specification { ) r.headers must have size 5 r.headers must contain(`Cache-Control`(`no-cache`, `no-store`, `must-revalidate`)) - l must have size 1 + good.storedRawEvents must have size 1 + bad.storedRawEvents must have size 0 val newEvent = new CollectorPayload("iglu-schema", "ip", System.currentTimeMillis, "UTF-8", "collector") - deserializer.deserialize(newEvent, l.head) + deserializer.deserialize(newEvent, good.storedRawEvents.head) newEvent.networkUserId shouldEqual "new-nuid" } "respond with a 200 OK and a bad row in case of illegal querystring" in { - val (r, l) = service.cookie( + val ProbeService(s, good, bad) = probeService() + val r = s.cookie( Some("a b"), None, "p", @@ -224,13 +256,14 @@ class CollectorServiceSpec extends Specification { false, false ) + good.storedRawEvents must have size 0 + bad.storedRawEvents must have size 1 + r.status mustEqual StatusCodes.OK - val brJson = parse(new String(l.head, StandardCharsets.UTF_8)).getOrElse(Json.Null) + val brJson = parse(new String(bad.storedRawEvents.head, StandardCharsets.UTF_8)).getOrElse(Json.Null) val failure = brJson.hcursor.downField("data").downField("failure").downField("errors").downArray.as[String] val payload = brJson.hcursor.downField("data").downField("payload").as[String] - r.status mustEqual StatusCodes.OK - l must have size 1 failure must beRight( "Illegal query: Invalid input ' ', expected '+', '=', query-char, 'EOI', '&' or pct-encoded (line 1, column 2): a b\n ^" ) @@ -442,9 +475,11 @@ class CollectorServiceSpec extends Specification { "sinkEvent" in { "send back the produced events" in { - val l = service.sinkEvent(event, "key") - l must have size 1 - l.head.zip(serializer.serialize(event)).forall { case (a, b) => a mustEqual b } + val ProbeService(s, good, bad) = probeService() + s.sinkEvent(event, "key") + good.storedRawEvents must have size 1 + bad.storedRawEvents must have size 0 + good.storedRawEvents.head.zip(serializer.serialize(event)).forall { case (a, b) => a mustEqual b } } } @@ -455,10 +490,12 @@ class CollectorServiceSpec extends Specification { Failure.GenericFailure(Instant.now(), NonEmptyList.one("IllegalQueryString")), Payload.RawPayload("") ) - val l = service.sinkBad(br, "key") + val ProbeService(s, good, bad) = probeService() + s.sinkBad(br, "key") - l must have size 1 - l.head.zip(br.compact).forall { case (a, b) => a mustEqual b } + bad.storedRawEvents must have size 1 + good.storedRawEvents must have size 0 + bad.storedRawEvents.head.zip(br.compact).forall { case (a, b) => a mustEqual b } } } @@ -466,21 +503,21 @@ class CollectorServiceSpec extends Specification { val redirConf = TestUtils.testConf.redirectMacro "rely on buildRedirectHttpResponse if redirect is true" in { - val (res, Nil) = service.buildHttpResponse(event, Map("u" -> "12"), hs, true, true, false, redirConf) + val res = service.buildHttpResponse(event, Map("u" -> "12"), hs, true, true, false, redirConf) res shouldEqual HttpResponse(302).withHeaders(`RawHeader`("Location", "12") :: hs) } "send back a gif if pixelExpected is true" in { - val (res, Nil) = service.buildHttpResponse(event, Map.empty, hs, false, true, false, redirConf) + val res = service.buildHttpResponse(event, Map.empty, hs, false, true, false, redirConf) res shouldEqual HttpResponse(200) .withHeaders(hs) .withEntity(HttpEntity(contentType = ContentType(MediaTypes.`image/gif`), bytes = CollectorService.pixel)) } "send back a found if pixelExpected and bounce is true" in { - val (res, Nil) = service.buildHttpResponse(event, Map.empty, hs, false, true, true, redirConf) + val res = service.buildHttpResponse(event, Map.empty, hs, false, true, true, redirConf) res shouldEqual HttpResponse(302).withHeaders(hs) } "send back ok otherwise" in { - val (res, Nil) = service.buildHttpResponse(event, Map.empty, hs, false, false, false, redirConf) + val res = service.buildHttpResponse(event, Map.empty, hs, false, false, false, redirConf) res shouldEqual HttpResponse(200, entity = "ok").withHeaders(hs) } } @@ -502,22 +539,22 @@ class CollectorServiceSpec extends Specification { "buildRedirectHttpResponse" in { val redirConf = TestUtils.testConf.redirectMacro "give back a 302 if redirecting and there is a u query param" in { - val (res, Nil) = service.buildRedirectHttpResponse(event, Map("u" -> "12"), redirConf) + val res = service.buildRedirectHttpResponse(event, Map("u" -> "12"), redirConf) res shouldEqual HttpResponse(302).withHeaders(`RawHeader`("Location", "12")) } "give back a 400 if redirecting and there are no u query params" in { - val (res, _) = service.buildRedirectHttpResponse(event, Map.empty, redirConf) + val res = service.buildRedirectHttpResponse(event, Map.empty, redirConf) res shouldEqual HttpResponse(400) } "the redirect url should ignore a cookie replacement macro on redirect if not enabled" in { event.networkUserId = "1234" - val (res, Nil) = + val res = service.buildRedirectHttpResponse(event, Map("u" -> s"http://localhost/?uid=$${SP_NUID}"), redirConf) res shouldEqual HttpResponse(302).withHeaders(`RawHeader`("Location", s"http://localhost/?uid=$${SP_NUID}")) } "the redirect url should support a cookie replacement macro on redirect if enabled" in { event.networkUserId = "1234" - val (res, Nil) = service.buildRedirectHttpResponse( + val res = service.buildRedirectHttpResponse( event, Map("u" -> s"http://localhost/?uid=$${SP_NUID}"), redirConf.copy(enabled = true) @@ -526,7 +563,7 @@ class CollectorServiceSpec extends Specification { } "the redirect url should allow for custom token placeholders" in { event.networkUserId = "1234" - val (res, Nil) = service.buildRedirectHttpResponse( + val res = service.buildRedirectHttpResponse( event, Map("u" -> "http://localhost/?uid=[TOKEN]"), redirConf.copy(enabled = true, Some("[TOKEN]")) @@ -534,7 +571,7 @@ class CollectorServiceSpec extends Specification { res shouldEqual HttpResponse(302).withHeaders(`RawHeader`("Location", "http://localhost/?uid=1234")) } "the redirect url should allow for double encoding for return redirects" in { - val (res, Nil) = + val res = service.buildRedirectHttpResponse(event, Map("u" -> "a%3Db"), redirConf) res shouldEqual HttpResponse(302).withHeaders(`RawHeader`("Location", "a%3Db")) } diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestSink.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestSink.scala index e8cc17f0a..d1ebec171 100644 --- a/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestSink.scala +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TestSink.scala @@ -20,14 +20,20 @@ package com.snowplowanalytics.snowplow.collectors.scalastream import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.Sink +import scala.collection.mutable.ListBuffer + // Allow the testing framework to test collection events using the // same methods from AbstractSink as the other sinks. class TestSink extends Sink { + private val buf: ListBuffer[Array[Byte]] = ListBuffer() + def storedRawEvents: List[Array[Byte]] = buf.toList + // Effectively no limit to the record size override val MaxBytes = Int.MaxValue - override def storeRawEvents(events: List[Array[Byte]], key: String): List[Array[Byte]] = events + override def storeRawEvents(events: List[Array[Byte]], key: String): Unit = + buf ++= events override def shutdown(): Unit = () } diff --git a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala index fb9bdb286..193ba9ed7 100644 --- a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala +++ b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala @@ -64,7 +64,7 @@ class KafkaSink( * @param events The list of events to send * @param key The partition key to use */ - override def storeRawEvents(events: List[Array[Byte]], key: String): List[Array[Byte]] = { + override def storeRawEvents(events: List[Array[Byte]], key: String): Unit = { log.debug(s"Writing ${events.size} Thrift records to Kafka topic $topicName at key $key") events.foreach { event => kafkaProducer.send( @@ -75,7 +75,6 @@ class KafkaSink( } ) } - Nil } override def shutdown(): Unit = diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala index f9d9dcc67..f51bda3b0 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala @@ -91,10 +91,8 @@ class KinesisSink private ( @volatile private var outage: Boolean = false override def isHealthy: Boolean = !outage - override def storeRawEvents(events: List[Array[Byte]], key: String): List[Array[Byte]] = { + override def storeRawEvents(events: List[Array[Byte]], key: String): Unit = events.foreach(e => EventStorage.store(e, key)) - Nil - } object EventStorage { private val storedEvents = ListBuffer.empty[Events] diff --git a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala index 0d1b6ce7c..a31921afd 100644 --- a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala +++ b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala @@ -39,10 +39,8 @@ class NsqSink(nsqConfig: Nsq, topicName: String) extends Sink { * @param events The list of events to send * @param key The partition key (unused) */ - override def storeRawEvents(events: List[Array[Byte]], key: String): List[Array[Byte]] = { + override def storeRawEvents(events: List[Array[Byte]], key: String): Unit = producer.produceMulti(topicName, events.asJava) - Nil - } override def shutdown(): Unit = producer.shutdown() diff --git a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GooglePubSubSink.scala b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GooglePubSubSink.scala index 87dddf6f2..a06332f91 100644 --- a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GooglePubSubSink.scala +++ b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GooglePubSubSink.scala @@ -48,7 +48,7 @@ class GooglePubSubSink private (publisher: Publisher, topicName: String) extends * @param events The list of events to send * @param key Not used. */ - override def storeRawEvents(events: List[Array[Byte]], key: String): List[Array[Byte]] = { + override def storeRawEvents(events: List[Array[Byte]], key: String): Unit = { if (events.nonEmpty) log.debug(s"Writing ${events.size} Thrift records to Google PubSub topic $topicName.") events.foreach { event => @@ -77,7 +77,6 @@ class GooglePubSubSink private (publisher: Publisher, topicName: String) extends ) } } - Nil } override def shutdown(): Unit = diff --git a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala index adc4485c5..f1ad5f0fb 100644 --- a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala +++ b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala @@ -75,10 +75,8 @@ class SqsSink private ( @volatile private var outage: Boolean = false override def isHealthy: Boolean = !outage - override def storeRawEvents(events: List[Array[Byte]], key: String): List[Array[Byte]] = { + override def storeRawEvents(events: List[Array[Byte]], key: String): Unit = events.foreach(e => EventStorage.store(e, key)) - Nil - } object EventStorage { private val storedEvents = ListBuffer.empty[Events] diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/StdoutSink.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/StdoutSink.scala index 883b45aab..a4bd22e5d 100644 --- a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/StdoutSink.scala +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/StdoutSink.scala @@ -25,7 +25,7 @@ class StdoutSink(streamName: String) extends Sink { override val MaxBytes = Int.MaxValue // Print a Base64-encoded event. - override def storeRawEvents(events: List[Array[Byte]], key: String): List[Array[Byte]] = { + override def storeRawEvents(events: List[Array[Byte]], key: String): Unit = streamName match { case "out" => events.foreach { e => @@ -36,8 +36,6 @@ class StdoutSink(streamName: String) extends Sink { Console.err.println(Base64.encodeBase64String(e)) } } - Nil - } override def shutdown(): Unit = () }