diff --git a/build.sbt b/build.sbt index b22f7f0f6..5b672f1f8 100644 --- a/build.sbt +++ b/build.sbt @@ -264,12 +264,14 @@ lazy val kafkaDistroless = project .dependsOn(core % "test->test;compile->compile") lazy val nsqSettings = - allSettings ++ buildInfoSettings ++ Seq( + allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Seq( moduleName := "snowplow-stream-collector-nsq", Docker / packageName := "scala-stream-collector-nsq", + buildInfoPackage := s"com.snowplowanalytics.snowplow.collectors.scalastream", libraryDependencies ++= Seq( Dependencies.Libraries.nsqClient, Dependencies.Libraries.jackson, + Dependencies.Libraries.nettyAll, Dependencies.Libraries.log4j ) ) @@ -277,14 +279,14 @@ lazy val nsqSettings = lazy val nsq = project .settings(nsqSettings) .enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin) - .dependsOn(core % "test->test;compile->compile") + .dependsOn(http4s % "test->test;compile->compile") lazy val nsqDistroless = project .in(file("distroless/nsq")) .settings(sourceDirectory := (nsq / sourceDirectory).value) .settings(nsqSettings) .enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin) - .dependsOn(core % "test->test;compile->compile") + .dependsOn(http4s % "test->test;compile->compile") lazy val stdoutSettings = allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Seq( diff --git a/nsq/src/main/resources/application.conf b/nsq/src/main/resources/application.conf index 0d1ae5709..1df27cd22 100644 --- a/nsq/src/main/resources/application.conf +++ b/nsq/src/main/resources/application.conf @@ -14,26 +14,3 @@ collector { } } } - - -akka { - loglevel = WARNING - loggers = ["akka.event.slf4j.Slf4jLogger"] - - http.server { - remote-address-header = on - raw-request-uri-header = on - - parsing { - max-uri-length = 32768 - uri-parsing-mode = relaxed - illegal-header-warnings = off - } - - max-connections = 2048 - } - - coordinated-shutdown { - run-by-jvm-shutdown-hook = off - } -} diff --git a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqCollector.scala b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqCollector.scala index 44bdd04f0..599701a0f 100644 --- a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqCollector.scala +++ b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqCollector.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013-2022 Snowplow Analytics Ltd. All rights reserved. + * Copyright (c) 2013-2023 Snowplow Analytics Ltd. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, and * you may not use this file except in compliance with the Apache License @@ -14,28 +14,24 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream -import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo -import com.snowplowanalytics.snowplow.collectors.scalastream.model._ -import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.NsqSink -import com.snowplowanalytics.snowplow.collectors.scalastream.telemetry.TelemetryAkkaService +import cats.effect.{IO, Resource} +import com.snowplowanalytics.snowplow.collector.core.model.Sinks +import com.snowplowanalytics.snowplow.collector.core.{App, Config, Telemetry} +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks._ -object NsqCollector extends Collector { - def appName = BuildInfo.shortName - def appVersion = BuildInfo.version - def scalaVersion = BuildInfo.scalaVersion +object NsqCollector extends App[NsqSinkConfig](BuildInfo) { + override def mkSinks(config: Config.Streams[NsqSinkConfig]): Resource[IO, Sinks[IO]] = + for { + good <- NsqSink.create[IO]( + config.sink, + config.good + ) + bad <- NsqSink.create[IO]( + config.sink, + config.bad + ) + } yield Sinks(good, bad) - def main(args: Array[String]): Unit = { - val (collectorConf, akkaConf) = parseConfig(args) - val telemetry = TelemetryAkkaService.initWithCollector(collectorConf, BuildInfo.moduleName, appVersion) - val sinks = { - val goodStream = collectorConf.streams.good - val badStream = collectorConf.streams.bad - val (good, bad) = collectorConf.streams.sink match { - case nc: Nsq => (new NsqSink(nc.maxBytes, nc, goodStream), new NsqSink(nc.maxBytes, nc, badStream)) - case _ => throw new IllegalArgumentException("Configured sink is not NSQ") - } - CollectorSinks(good, bad) - } - run(collectorConf, akkaConf, sinks, telemetry) - } + override def telemetryInfo(config: Config[NsqSinkConfig]): Telemetry.TelemetryInfo = + Telemetry.TelemetryInfo(None, None) } diff --git a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala index cd466e441..bbdf2bdc7 100644 --- a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala +++ b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013-2022 Snowplow Analytics Ltd. + * Copyright (c) 2013-2023 Snowplow Analytics Ltd. * All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, @@ -19,17 +19,31 @@ package com.snowplowanalytics.snowplow.collectors.scalastream package sinks +import java.util.concurrent.TimeoutException + import scala.collection.JavaConverters._ +import cats.effect.{Resource, Sync} +import cats.implicits._ + import com.snowplowanalytics.client.nsq.NSQProducer -import com.snowplowanalytics.snowplow.collectors.scalastream.model._ +import com.snowplowanalytics.snowplow.collector.core.{Sink} +import com.snowplowanalytics.client.nsq.exceptions.NSQException /** * NSQ Sink for the Scala Stream Collector * @param nsqConfig Configuration for Nsq * @param topicName Nsq topic name */ -class NsqSink(val maxBytes: Int, nsqConfig: Nsq, topicName: String) extends Sink { +class NsqSink[F[_]: Sync] private ( + val maxBytes: Int, + nsqConfig: NsqSinkConfig, + topicName: String +) extends Sink[F] { + + @volatile private var healthStatus = true + + override def isHealthy: F[Boolean] = Sync[F].pure(healthStatus) private val producer = new NSQProducer().addAddress(nsqConfig.host, nsqConfig.port).start() @@ -38,9 +52,27 @@ class NsqSink(val maxBytes: Int, nsqConfig: Nsq, topicName: String) extends Sink * @param events The list of events to send * @param key The partition key (unused) */ - override def storeRawEvents(events: List[Array[Byte]], key: String): Unit = - producer.produceMulti(topicName, events.asJava) + override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = + Sync[F].blocking(producer.produceMulti(topicName, events.asJava)).onError { + case _: NSQException | _: TimeoutException => + Sync[F].delay(healthStatus = false) + } *> Sync[F].delay(healthStatus = true) - override def shutdown(): Unit = + def shutdown(): Unit = producer.shutdown() } + +object NsqSink { + + def create[F[_]: Sync]( + nsqConfig: NsqSinkConfig, + topicName: String + ): Resource[F, NsqSink[F]] = + Resource.make( + Sync[F].delay( + // MaxBytes is never used but is required by the sink interface definition, + // So just pass any int val in. + new NsqSink(0, nsqConfig, topicName) + ) + )(sink => Sync[F].delay(sink.shutdown())) +} diff --git a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSinkConfig.scala b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSinkConfig.scala new file mode 100644 index 000000000..6a050aeeb --- /dev/null +++ b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSinkConfig.scala @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2013-2023 Snowplow Analytics Ltd. + * All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache + * License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. + * + * See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ + +package com.snowplowanalytics.snowplow.collectors.scalastream.sinks + +import io.circe.Decoder +import io.circe.generic.semiauto._ + +import com.snowplowanalytics.snowplow.collector.core.Config + +final case class NsqSinkConfig( + maxBytes: Int, + threadPoolSize: Int, + host: String, + port: Int +) extends Config.Sink + +object NsqSinkConfig { + implicit val configDecoder: Decoder[NsqSinkConfig] = deriveDecoder[NsqSinkConfig] +} diff --git a/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala b/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala index a70ad4606..7c64c3e6a 100644 --- a/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala +++ b/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala @@ -1,5 +1,5 @@ /** - * Copyright (c) 2014-2022 Snowplow Analytics Ltd. + * Copyright (c) 2014-2023 Snowplow Analytics Ltd. * All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, @@ -18,8 +18,121 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream -import com.snowplowanalytics.snowplow.collectors.scalastream.config.ConfigSpec +import cats.effect.testing.specs2.CatsEffect +import cats.effect.{ExitCode, IO} +import com.snowplowanalytics.snowplow.collector.core.{Config, ConfigParser} +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.NsqSinkConfig +import org.http4s.SameSite +import org.specs2.mutable.Specification -class NsqConfigSpec extends ConfigSpec { - makeConfigTest("nsq", "", "") +import java.nio.file.Paths +import scala.concurrent.duration.DurationInt + +class NsqConfigSpec extends Specification with CatsEffect { + + "Config parser" should { + "be able to parse extended nsq config" in { + assert( + resource = "/config.nsq.extended.hocon", + expectedResult = Right(NsqConfigSpec.expectedConfig) + ) + } + "be able to parse minimal nsq config" in { + assert( + resource = "/config.nsq.minimal.hocon", + expectedResult = Right(NsqConfigSpec.expectedConfig) + ) + } + } + + private def assert(resource: String, expectedResult: Either[ExitCode, Config[NsqSinkConfig]]) = { + val path = Paths.get(getClass.getResource(resource).toURI) + ConfigParser.fromPath[IO, NsqSinkConfig](Some(path)).value.map { result => + result must beEqualTo(expectedResult) + } + } +} + +object NsqConfigSpec { + private val expectedConfig = Config[NsqSinkConfig]( + interface = "0.0.0.0", + port = 8080, + paths = Map.empty[String, String], + p3p = Config.P3P( + policyRef = "/w3c/p3p.xml", + CP = "NOI DSP COR NID PSA OUR IND COM NAV STA" + ), + crossDomain = Config.CrossDomain( + enabled = false, + domains = List("*"), + secure = true + ), + cookie = Config.Cookie( + enabled = true, + expiration = 365.days, + name = "sp", + domains = List.empty, + fallbackDomain = None, + secure = true, + httpOnly = true, + sameSite = Some(SameSite.None) + ), + doNotTrackCookie = Config.DoNotTrackCookie( + enabled = false, + name = "", + value = "" + ), + cookieBounce = Config.CookieBounce( + enabled = false, + name = "n3pc", + fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000", + forwardedProtocolHeader = None + ), + redirectMacro = Config.RedirectMacro( + enabled = false, + placeholder = None + ), + rootResponse = Config.RootResponse( + enabled = false, + statusCode = 302, + headers = Map.empty[String, String], + body = "" + ), + cors = Config.CORS(1.hour), + monitoring = + Config.Monitoring(Config.Metrics(Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector"))), + ssl = Config.SSL(enable = false, redirect = false, port = 443), + enableDefaultRedirect = false, + redirectDomains = Set.empty, + preTerminationPeriod = 10.seconds, + streams = Config.Streams( + good = "good", + bad = "bad", + useIpAddressAsPartitionKey = false, + buffer = Config.Buffer( + byteLimit = 3145728, + recordLimit = 500, + timeLimit = 5000 + ), + sink = NsqSinkConfig( + maxBytes = 1000000, + threadPoolSize = 10, + host = "nsqHost", + port = 4150 + ) + ), + telemetry = Config.Telemetry( + disable = false, + interval = 60.minutes, + method = "POST", + url = "telemetry-g.snowplowanalytics.com", + port = 443, + secure = true, + userProvidedId = None, + moduleName = None, + moduleVersion = None, + instanceId = None, + autoGeneratedId = None + ) + ) } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 59b689f24..fd7df112e 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -56,6 +56,7 @@ object Dependencies { val circeConfig = "0.10.0" val fs2PubSub = "0.22.0" val catsRetry = "3.1.0" + val nettyAll = "4.1.95.Final" // to fix nsq dependency // Scala (test only) val specs2 = "4.11.0" @@ -73,6 +74,7 @@ object Dependencies { object Libraries { // Java val jackson = "com.fasterxml.jackson.core" % "jackson-databind" % V.jackson // nsq only + val nettyAll = "io.netty" % "netty-all" % V.nettyAll //nsq only val thrift = "org.apache.thrift" % "libthrift" % V.thrift val kinesis = "com.amazonaws" % "aws-java-sdk-kinesis" % V.awsSdk val sqs = "com.amazonaws" % "aws-java-sdk-sqs" % V.awsSdk