From 5599a70d96d2f47749eaa1f1358d561016046b63 Mon Sep 17 00:00:00 2001 From: colmsnowplow Date: Tue, 26 Sep 2023 13:11:27 +0200 Subject: [PATCH] wip - initial outline of nsq integration tests --- build.sbt | 17 +- .../scalastream/it/nsq/Containers.scala | 243 ++++++++++++++++++ .../scalastream/it/nsq/NsqCollectorSpec.scala | 171 ++++++++++++ 3 files changed, 426 insertions(+), 5 deletions(-) create mode 100644 nsq/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/nsq/Containers.scala create mode 100644 nsq/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/nsq/NsqCollectorSpec.scala diff --git a/build.sbt b/build.sbt index 5b672f1f8..8420c40de 100644 --- a/build.sbt +++ b/build.sbt @@ -264,7 +264,7 @@ lazy val kafkaDistroless = project .dependsOn(core % "test->test;compile->compile") lazy val nsqSettings = - allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Seq( + allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Defaults.itSettings ++ scalifiedSettings ++ Seq( moduleName := "snowplow-stream-collector-nsq", Docker / packageName := "scala-stream-collector-nsq", buildInfoPackage := s"com.snowplowanalytics.snowplow.collectors.scalastream", @@ -272,21 +272,28 @@ lazy val nsqSettings = Dependencies.Libraries.nsqClient, Dependencies.Libraries.jackson, Dependencies.Libraries.nettyAll, - Dependencies.Libraries.log4j - ) + Dependencies.Libraries.log4j, + // integration tests dependencies + Dependencies.Libraries.IT.specs2, + Dependencies.Libraries.IT.specs2CE + ), + IntegrationTest / test := (IntegrationTest / test).dependsOn(Docker / publishLocal).value, + IntegrationTest / testOnly := (IntegrationTest / testOnly).dependsOn(Docker / publishLocal).evaluated ) lazy val nsq = project .settings(nsqSettings) .enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin) - .dependsOn(http4s % "test->test;compile->compile") + .dependsOn(http4s % "test->test;compile->compile;it->it") + .configs(IntegrationTest) lazy val nsqDistroless = project .in(file("distroless/nsq")) .settings(sourceDirectory := (nsq / sourceDirectory).value) .settings(nsqSettings) .enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin) - .dependsOn(http4s % "test->test;compile->compile") + .dependsOn(http4s % "test->test;compile->compile;it->it") + .configs(IntegrationTest) lazy val stdoutSettings = allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Seq( diff --git a/nsq/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/nsq/Containers.scala b/nsq/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/nsq/Containers.scala new file mode 100644 index 000000000..e45fe0699 --- /dev/null +++ b/nsq/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/nsq/Containers.scala @@ -0,0 +1,243 @@ +/* + * Copyright (c) 2022-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, and + * you may not use this file except in compliance with the Apache License + * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the Apache License Version 2.0 is distributed on an "AS + * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ + +package com.snowplowanalytics.snowplow.collectors.scalastream.it.nsq + +import org.testcontainers.containers.{BindMode, Network} +import org.testcontainers.containers.wait.strategy.Wait +import com.dimafeng.testcontainers.{GenericContainer, FixedHostPortGenericContainer} +import cats.effect.{IO, Resource} +import com.snowplowanalytics.snowplow.collectors.scalastream.BuildInfo +import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._ +import com.snowplowanalytics.snowplow.collectors.scalastream.it.CollectorContainer + +object Containers { + + val collectorPort = 8080 + // val projectId = "google-project-id" + // val emulatorHost = "localhost" + // val emulatorPort = 8085 + // lazy val emulatorHostPort = pubSubEmulator.getMappedPort(emulatorPort) + val topicGood = "good" + val topicBad = "bad" + + private val network = Network.newNetwork() + + // private val pubSubEmulator = { + // val container = GenericContainer( + // dockerImage = "gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators", + // waitStrategy = Wait.forLogMessage(".*Server started.*", 1), + // exposedPorts = Seq(emulatorPort), + // command = Seq( + // "gcloud", + // "beta", + // "emulators", + // "pubsub", + // "start", + // s"--project=$projectId", + // s"--host-port=0.0.0.0:$emulatorPort" + // ) + // ) + + // container.underlyingUnsafeContainer.withNetwork(network) + // container.underlyingUnsafeContainer.withNetworkAliases("pubsub-emulator") + // container.container + // } + + def collector( + configPath: String, + testName: String, + // topicGood: String, + // topicBad: String, + // createTopics: Boolean = true, + envs: Map[String, String] = Map.empty[String, String] + ): Resource[IO, CollectorContainer] = { + val container = GenericContainer( + dockerImage = BuildInfo.dockerAlias, + env = Map( + "PORT" -> collectorPort.toString, + "JDK_JAVA_OPTIONS" -> "-Dorg.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.collectors.scalastream.sinks.NsqSink=warn", + "HTTP4S_BACKEND" -> "BLAZE" + ) ++ envs, + exposedPorts = Seq(collectorPort), + fileSystemBind = Seq( + GenericContainer.FileSystemBind( + configPath, + "/snowplow/config/collector.hocon", + BindMode.READ_ONLY + ) + ), + command = Seq( + "--config", + "/snowplow/config/collector.hocon" + ) + ,waitStrategy = Wait.forLogMessage(s".*Service bound to address.*", 1) + ) + container.container.withNetwork(network) + + + val create = + // if(createTopics) + // PubSub.createTopicsAndSubscriptions( + // projectId, + // emulatorHost, + // emulatorHostPort, + // List(topicGood, topicBad) + // ) + // else + IO.unit + + Resource.make ( + create *> + IO(startContainerWithLogs(container.container, testName)) + .map(c => CollectorContainer(c, c.getHost, c.getMappedPort(collectorPort))) + )( + c => IO(c.container.stop()) + ) + } + + // We loosely copy the test implementation from enrich, which requires two nsqd and nsqlookup addresses - explained here: https://github.com/snowplow/enrich/blob/b13099e51be7a253c115ab0e0f66c31824771e33/modules/nsq/src/it/scala/com/snowplowanalytics/snowplow/enrich/nsq/test/Containers.scala#L52-L60 + // This wasn't necessary for manual testing which used the http api to check results, so we can likely rationalise this, but I ran out of time in my attempts to do so here. + private val nsqlookupd1 = { + val container = FixedHostPortGenericContainer( + imageName = "nsqio/nsq:latest", + command = Seq( + "/nsqlookupd", + s"--broadcast-address=nsqlookupd1", + s"--http-address=0.0.0.0:4161", + s"--tcp-address=0.0.0.0:4160", + ), + exposedPorts = List(4161, 4160), + exposedContainerPort = 4161, + exposedHostPort = 4161 + ) + container.container.withFixedExposedPort(4160, 4160) + container.container.withNetwork(network) + container.container.withNetworkAliases("nsqlookupd1") + container.container + } + + private val nsqlookupd2 = { + val container = FixedHostPortGenericContainer( + imageName = "nsqio/nsq:latest", + command = Seq( + "/nsqlookupd", + s"--broadcast-address=nsqlookupd2", + s"--http-address=0.0.0.0:4261", + s"--tcp-address=0.0.0.0:4260", + ), + exposedPorts = List(4261, 4260), + exposedContainerPort = 4261, + exposedHostPort = 4261 + ) + container.container.withFixedExposedPort(4260, 4260) + container.container.withNetwork(network) + container.container.withNetworkAliases("nsqlookupd2") + container.container + } + + + private val nsqd1 = { + val container = FixedHostPortGenericContainer( + imageName = "nsqio/nsq:latest", + command = Seq( + "/nsqd", + s"--broadcast-address=nsqd", + s"--broadcast-http-port=4151", + s"--broadcast-tcp-port=4150", + s"--http-address=0.0.0.0:4151", + s"--tcp-address=0.0.0.0:4150", + s"--lookupd-tcp-address=nsqlookupd1:4160" + ), + exposedPorts = List(4150, 4151), + exposedContainerPort = 4151, + exposedHostPort = 4151 + ) + container.container.withFixedExposedPort(4150, 4150) + container.container.withNetwork(network) + container.container.withNetworkAliases("nsqd") + container.container + } + + private val nsqd2 = { + val container = FixedHostPortGenericContainer( + imageName = "nsqio/nsq:latest", + command = Seq( + "/nsqd", + s"--broadcast-address=127.0.0.1", + s"--broadcast-http-port=4251", + s"--broadcast-tcp-port=4250", + s"--http-address=0.0.0.0:4251", + s"--tcp-address=0.0.0.0:4250", + s"--lookupd-tcp-address=nsqlookupd2:4260" + ), + exposedPorts = List(4250, 4251), + exposedContainerPort = 4251, + exposedHostPort = 4251 + ) + container.container.withFixedExposedPort(4250, 4250) + container.container.withNetwork(network) + container.container.withNetworkAliases("nsqd2") + container.container + } + + private val nsqTonsqGood = { + val container = GenericContainer( + dockerImage = "nsqio/nsq:latest", + command = Seq( + "/nsq_to_nsq", + s"--nsqd-tcp-address=nsqd1:4150", + s"--topic=good", + s"--destination-nsqd-tcp-address=nsqd2:4250", + s"--destination-topic=good", + ), + ) + container.container.withNetwork(network) + container.container + } + + private val nsqTonsqBad = { + val container = GenericContainer( + dockerImage = "nsqio/nsq:latest", + command = Seq( + "/nsq_to_nsq", + s"--nsqd-tcp-address=nsqd1:4150", + s"--topic=bad", + s"--destination-nsqd-tcp-address=nsqd2:4250", + s"--destination-topic=bad", + ), + ) + container.container.withNetwork(network) + container.container + } + + def startEmulator(): Unit = { + nsqlookupd1.start() + nsqlookupd2.start() + nsqd1.start() + nsqd2.start() + nsqTonsqGood.start() + nsqTonsqBad.start() + } + + def stopEmulator(): Unit = { + nsqlookupd1.stop() + nsqlookupd2.stop() + nsqd1.stop() + nsqd2.stop() + nsqTonsqGood.stop() + nsqTonsqBad.stop() + } +} diff --git a/nsq/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/nsq/NsqCollectorSpec.scala b/nsq/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/nsq/NsqCollectorSpec.scala new file mode 100644 index 000000000..b5c29c32d --- /dev/null +++ b/nsq/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/nsq/NsqCollectorSpec.scala @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2022-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, and + * you may not use this file except in compliance with the Apache License + * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the Apache License Version 2.0 is distributed on an "AS + * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ + +package com.snowplowanalytics.snowplow.collectors.scalastream.it.nsq + +import scala.concurrent.duration._ +import cats.effect.IO +// import org.http4s.{Method, Request, Status, Uri} +import cats.effect.testing.specs2.CatsEffect +import org.specs2.mutable.Specification +import org.specs2.specification.BeforeAfterAll +// import org.testcontainers.containers.GenericContainer +// import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._ +// import com.snowplowanalytics.snowplow.collectors.scalastream.it.{EventGenerator, Http} + +class NsqCollectorSpec extends Specification with CatsEffect with BeforeAfterAll { + + override protected val Timeout = 5.minutes + + def beforeAll: Unit = Containers.startEmulator() + + def afterAll: Unit = Containers.stopEmulator() + + val stopTimeout = 20.second + + // val maxBytes = 10000 + + "collector-pubsub" should { + "be able to parse the minimal config" in { + val testName = "minimal" + + Containers.collector( + "examples/config.nsq.minimal.hocon", + testName, + ).use { collector => + IO(collector.container.getLogs() must contain("Service bound to address")) + } + } + + // "emit the correct number of collector payloads and bad rows" in { + // val testName = "count" + // val nbGood = 1000 + // val nbBad = 10 + // val topicGood = s"${testName}-raw" + // val topicBad = s"${testName}-bad-1" + + // Containers.collector( + // "pubsub/src/it/resources/collector.hocon", + // testName, + // topicGood, + // topicBad, + // envs = Map("MAX_BYTES" -> maxBytes.toString) + // ).use { collector => + // for { + // _ <- log(testName, "Sending data") + // _ <- EventGenerator.sendEvents( + // collector.host, + // collector.port, + // nbGood, + // nbBad, + // maxBytes + // ) + // _ <- log(testName, "Data sent. Waiting for collector to work") + // _ <- IO.sleep(5.second) + // _ <- log(testName, "Consuming collector's output") + // collectorOutput <- PubSub.consume( + // Containers.projectId, + // Containers.emulatorHost, + // Containers.emulatorHostPort, + // topicGood, + // topicBad + // ) + // _ <- printBadRows(testName, collectorOutput.bad) + // } yield { + // collectorOutput.good.size should beEqualTo(nbGood) + // collectorOutput.bad.size should beEqualTo(nbBad) + // } + // } + // } + + // s"shutdown within $stopTimeout when it receives a SIGTERM" in { + // val testName = "stop" + + // Containers.collector( + // "pubsub/src/it/resources/collector.hocon", + // testName, + // s"${testName}-raw", + // s"${testName}-bad-1" + // ).use { collector => + // val container = collector.container + // for { + // _ <- log(testName, "Sending signal") + // _ <- IO(container.getDockerClient().killContainerCmd(container.getContainerId()).withSignal("TERM").exec()) + // _ <- waitWhile[GenericContainer[_]](container, _.isRunning, stopTimeout) + // } yield { + // container.isRunning() must beFalse + // container.getLogs() must contain("Closing NIO1 channel") + // } + // } + // } + + // "start with /sink-health unhealthy and insert pending events when topics become available" in { + // val testName = "sink-health" + // val nbGood = 10 + // val nbBad = 10 + // val topicGood = s"${testName}-raw" + // val topicBad = s"${testName}-bad-1" + + // Containers.collector( + // "pubsub/src/it/resources/collector.hocon", + // testName, + // topicGood, + // topicBad, + // createTopics = false, + // envs = Map("MAX_BYTES" -> maxBytes.toString) + // ).use { collector => + // val uri = Uri.unsafeFromString(s"http://${collector.host}:${collector.port}/sink-health") + // val request = Request[IO](Method.GET, uri) + + // for { + // _ <- log(testName, "Checking /sink-health before creating the topics") + // statusBeforeCreate <- Http.status(request) + // _ <- log(testName, "Sending events before creating the topics") + // _ <- EventGenerator.sendEvents( + // collector.host, + // collector.port, + // nbGood, + // nbBad, + // maxBytes + // ) + // _ <- log(testName, "Creating topics") + // _ <- PubSub.createTopicsAndSubscriptions( + // Containers.projectId, + // Containers.emulatorHost, + // Containers.emulatorHostPort, + // List(topicGood, topicBad) + // ) + // _ <- IO.sleep(10.second) + // _ <- log(testName, "Checking /sink-health after creating the topics") + // statusAfterCreate <- Http.status(request) + // collectorOutput <- PubSub.consume( + // Containers.projectId, + // Containers.emulatorHost, + // Containers.emulatorHostPort, + // topicGood, + // topicBad + // ) + // _ <- printBadRows(testName, collectorOutput.bad) + // } yield { + // statusBeforeCreate should beEqualTo(Status.ServiceUnavailable) + // statusAfterCreate should beEqualTo(Status.Ok) + // collectorOutput.good.size should beEqualTo(nbGood) + // collectorOutput.bad.size should beEqualTo(nbBad) + // } + // } + // } + } +} +