diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d388e0877..a482e472c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -20,3 +20,5 @@ jobs: run: sbt "project kinesisDistroless" IntegrationTest/test - name: Run integration tests PubSub run: sbt "project pubsubDistroless" IntegrationTest/test + - name: Run integration tests Kafka + run: sbt "project kafkaDistroless" IntegrationTest/test diff --git a/build.sbt b/build.sbt index 5b672f1f8..177c5ad7b 100644 --- a/build.sbt +++ b/build.sbt @@ -245,23 +245,36 @@ lazy val pubsubDistroless = project .configs(IntegrationTest) lazy val kafkaSettings = - allSettings ++ buildInfoSettings ++ Seq( + allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Defaults.itSettings ++ Seq( moduleName := "snowplow-stream-collector-kafka", + buildInfoPackage := s"com.snowplowanalytics.snowplow.collectors.scalastream", Docker / packageName := "scala-stream-collector-kafka", - libraryDependencies ++= Seq(Dependencies.Libraries.kafkaClients, Dependencies.Libraries.mskAuth) + libraryDependencies ++= Seq( + Dependencies.Libraries.kafkaClients, + Dependencies.Libraries.mskAuth, + // integration tests dependencies + Dependencies.Libraries.IT.specs2, + Dependencies.Libraries.IT.specs2CE + ), + IntegrationTest / test := (IntegrationTest / test).dependsOn(Docker / publishLocal).value, + IntegrationTest / testOnly := (IntegrationTest / testOnly).dependsOn(Docker / publishLocal).evaluated ) lazy val kafka = project .settings(kafkaSettings) .enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin) - .dependsOn(core % "test->test;compile->compile") + .dependsOn(http4s % "test->test;compile->compile;it->it") + .configs(IntegrationTest) + lazy val kafkaDistroless = project .in(file("distroless/kafka")) .settings(sourceDirectory := (kafka / sourceDirectory).value) .settings(kafkaSettings) .enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin) - .dependsOn(core % "test->test;compile->compile") + .dependsOn(http4s % "test->test;compile->compile;it->it") + .configs(IntegrationTest) + lazy val nsqSettings = allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Seq( diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala index 92839bac0..3aed8603d 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala @@ -30,7 +30,7 @@ object Run { def fromCli[F[_]: Async: Tracking, SinkConfig: Decoder]( appInfo: AppInfo, mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]], - telemetryInfo: Config[SinkConfig] => Telemetry.TelemetryInfo + telemetryInfo: Config[SinkConfig] => Telemetry.TelemetryInfo ): Opts[F[ExitCode]] = { val configPath = Opts.option[Path]("config", "Path to HOCON configuration (optional)", "c", "config.hocon").orNone configPath.map(fromPath[F, SinkConfig](appInfo, mkSinks, telemetryInfo, _)) @@ -39,7 +39,7 @@ object Run { private def fromPath[F[_]: Async: Tracking, SinkConfig: Decoder]( appInfo: AppInfo, mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]], - telemetryInfo: Config[SinkConfig] => Telemetry.TelemetryInfo, + telemetryInfo: Config[SinkConfig] => Telemetry.TelemetryInfo, path: Option[Path] ): F[ExitCode] = { val eitherT = for { diff --git a/kafka/src/it/resources/collector.hocon b/kafka/src/it/resources/collector.hocon new file mode 100644 index 000000000..78fd2c372 --- /dev/null +++ b/kafka/src/it/resources/collector.hocon @@ -0,0 +1,14 @@ +collector { + interface = "0.0.0.0" + port = ${PORT} + + streams { + good = ${TOPIC_GOOD} + bad = ${TOPIC_BAD} + + sink { + brokers = ${BROKER} + maxBytes = ${MAX_BYTES} + } + } +} \ No newline at end of file diff --git a/kafka/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kafka/Containers.scala b/kafka/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kafka/Containers.scala new file mode 100644 index 000000000..019445408 --- /dev/null +++ b/kafka/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kafka/Containers.scala @@ -0,0 +1,119 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream.it.kafka + +import cats.effect._ +import com.dimafeng.testcontainers.{FixedHostPortGenericContainer, GenericContainer} +import com.snowplowanalytics.snowplow.collectors.scalastream.BuildInfo +import com.snowplowanalytics.snowplow.collectors.scalastream.it.CollectorContainer +import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._ +import org.testcontainers.containers.wait.strategy.Wait +import org.testcontainers.containers.{BindMode, Network, GenericContainer => JGenericContainer} + +object Containers { + + val zookeeperContainerName = "zookeeper" + val zookeeperPort = 2181 + val brokerContainerName = "broker" + val brokerExternalPort = 9092 + val brokerInternalPort = 29092 + + def createContainers( + goodTopic: String, + badTopic: String, + maxBytes: Int + ): Resource[IO, CollectorContainer] = + for { + network <- network() + _ <- zookeeper(network) + _ <- kafka(network) + c <- collectorKafka(network, goodTopic, badTopic, maxBytes) + } yield c + + private def network(): Resource[IO, Network] = + Resource.make(IO(Network.newNetwork()))(n => IO(n.close())) + + private def kafka( + network: Network + ): Resource[IO, JGenericContainer[_]] = + Resource.make( + IO { + val container = FixedHostPortGenericContainer( + imageName = "confluentinc/cp-kafka:7.0.1", + env = Map( + "KAFKA_BROKER_ID" -> "1", + "KAFKA_ZOOKEEPER_CONNECT" -> s"$zookeeperContainerName:$zookeeperPort", + "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" -> "PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT", + "KAFKA_ADVERTISED_LISTENERS" -> s"PLAINTEXT://localhost:$brokerExternalPort,PLAINTEXT_INTERNAL://$brokerContainerName:$brokerInternalPort", + "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR" -> "1", + "KAFKA_TRANSACTION_STATE_LOG_MIN_ISR" -> "1", + "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR" -> "1" + ), + exposedPorts = List(brokerExternalPort, brokerInternalPort), + exposedHostPort = brokerExternalPort, + exposedContainerPort = brokerExternalPort + ) + container.container.withNetwork(network) + container.container.withNetworkAliases(brokerContainerName) + container.start() + container.container + } + )(e => IO(e.stop())) + + private def zookeeper( + network: Network, + ): Resource[IO, JGenericContainer[_]] = + Resource.make( + IO { + val container = GenericContainer( + dockerImage = "confluentinc/cp-zookeeper:7.0.1", + env = Map( + "ZOOKEEPER_CLIENT_PORT" -> zookeeperPort.toString, + "ZOOKEEPER_TICK_TIME" -> "2000" + ), + exposedPorts = List(zookeeperPort) + ) + container.container.withNetwork(network) + container.container.withNetworkAliases(zookeeperContainerName) + container.start() + container.container + } + )(e => IO(e.stop())) + + def collectorKafka( + network: Network, + goodTopic: String, + badTopic: String, + maxBytes: Int + ): Resource[IO, CollectorContainer] = { + Resource.make( + IO { + val collectorPort = 8080 + val container = GenericContainer( + dockerImage = BuildInfo.dockerAlias, + env = Map( + "PORT" -> collectorPort.toString, + "BROKER" -> s"$brokerContainerName:$brokerInternalPort", + "TOPIC_GOOD" -> goodTopic, + "TOPIC_BAD" -> badTopic, + "MAX_BYTES" -> maxBytes.toString + ), + exposedPorts = Seq(collectorPort), + fileSystemBind = Seq( + GenericContainer.FileSystemBind( + "kafka/src/it/resources/collector.hocon", + "/snowplow/config/collector.hocon", + BindMode.READ_ONLY + ) + ), + command = Seq( + "--config", + "/snowplow/config/collector.hocon" + ), + waitStrategy = Wait.forLogMessage(s".*Service bound to address.*", 1) + ) + container.container.withNetwork(network) + val c = startContainerWithLogs(container.container, "collector") + CollectorContainer(c, c.getHost, c.getMappedPort(collectorPort)) + } + )(c => IO(c.container.stop())) + } +} diff --git a/kafka/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kafka/KafkaCollectorSpec.scala b/kafka/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kafka/KafkaCollectorSpec.scala new file mode 100644 index 000000000..509e07128 --- /dev/null +++ b/kafka/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kafka/KafkaCollectorSpec.scala @@ -0,0 +1,55 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream.it.kafka + +import scala.concurrent.duration._ + +import cats.effect.IO +import cats.effect.testing.specs2.CatsEffect + +import com.snowplowanalytics.snowplow.collectors.scalastream.it.EventGenerator +import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._ + +import org.specs2.mutable.Specification + +class KafkaCollectorSpec extends Specification with CatsEffect { + + override protected val Timeout = 5.minutes + + val maxBytes = 10000 + + "emit the correct number of collector payloads and bad rows" in { + val testName = "count" + val nbGood = 1000 + val nbBad = 10 + val goodTopic = "test-raw" + val badTopic = "test-bad" + + Containers.createContainers( + goodTopic = goodTopic, + badTopic = badTopic, + maxBytes = maxBytes + ).use { collector => + for { + _ <- log(testName, "Sending data") + _ <- EventGenerator.sendEvents( + collector.host, + collector.port, + nbGood, + nbBad, + maxBytes + ) + _ <- log(testName, "Data sent. Waiting for collector to work") + _ <- IO.sleep(30.second) + _ <- log(testName, "Consuming collector's output") + collectorOutput <- KafkaUtils.readOutput( + brokerAddr = s"localhost:${Containers.brokerExternalPort}", + goodTopic = goodTopic, + badTopic = badTopic + ) + } yield { + collectorOutput.good.size must beEqualTo(nbGood) + collectorOutput.bad.size must beEqualTo(nbBad) + } + } + } + +} diff --git a/kafka/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kafka/KafkaUtils.scala b/kafka/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kafka/KafkaUtils.scala new file mode 100644 index 000000000..b70d73d07 --- /dev/null +++ b/kafka/src/it/scala/com/snowplowanalytics/snowplow/collectors/scalastream/it/kafka/KafkaUtils.scala @@ -0,0 +1,46 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream.it.kafka + +import cats.effect._ +import org.apache.kafka.clients.consumer._ +import java.util.Properties +import java.time.Duration +import scala.collection.JavaConverters._ +import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._ +import com.snowplowanalytics.snowplow.collectors.scalastream.it.CollectorOutput + +object KafkaUtils { + + def readOutput( + brokerAddr: String, + goodTopic: String, + badTopic: String + ): IO[CollectorOutput] = { + createConsumer(brokerAddr).use { kafkaConsumer => + IO { + kafkaConsumer.subscribe(List(goodTopic, badTopic).asJava) + val records = kafkaConsumer.poll(Duration.ofSeconds(20)) + val extract = (r: ConsumerRecords[String, Array[Byte]], topicName: String) => + r.records(topicName).asScala.toList.map(_.value()) + val goodCount = extract(records, goodTopic).map(parseCollectorPayload) + val badCount = extract(records, badTopic).map(parseBadRow) + CollectorOutput(goodCount, badCount) + } + } + } + + private def createConsumer(brokerAddr: String): Resource[IO, KafkaConsumer[String, Array[Byte]]] = { + val acquire = IO { + val props = new Properties() + props.setProperty("bootstrap.servers", brokerAddr) + props.setProperty("group.id", "it-collector") + props.setProperty("auto.offset.reset", "earliest") + props.setProperty("max.poll.records", "2000") + props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") + props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer") + new KafkaConsumer[String, Array[Byte]](props) + } + val release = (p: KafkaConsumer[String, Array[Byte]]) => IO(p.close()) + Resource.make(acquire)(release) + } + +} diff --git a/kafka/src/main/resources/application.conf b/kafka/src/main/resources/application.conf index 95f0f5c80..80182aeec 100644 --- a/kafka/src/main/resources/application.conf +++ b/kafka/src/main/resources/application.conf @@ -14,26 +14,3 @@ collector { } } } - - -akka { - loglevel = WARNING - loggers = ["akka.event.slf4j.Slf4jLogger"] - - http.server { - remote-address-header = on - raw-request-uri-header = on - - parsing { - max-uri-length = 32768 - uri-parsing-mode = relaxed - illegal-header-warnings = off - } - - max-connections = 2048 - } - - coordinated-shutdown { - run-by-jvm-shutdown-hook = off - } -} diff --git a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala index 7d625e208..6447b3209 100644 --- a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala +++ b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala @@ -14,33 +14,29 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream -import com.snowplowanalytics.snowplow.collectors.scalastream.model._ -import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KafkaSink -import com.snowplowanalytics.snowplow.collectors.scalastream.telemetry.TelemetryAkkaService -import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo +import cats.effect.{IO, Resource} +import com.snowplowanalytics.snowplow.collector.core.model.Sinks +import com.snowplowanalytics.snowplow.collector.core.{App, Config, Telemetry} +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks._ -object KafkaCollector extends Collector { - def appName = BuildInfo.shortName - def appVersion = BuildInfo.version - def scalaVersion = BuildInfo.scalaVersion +object KafkaCollector extends App[KafkaSinkConfig](BuildInfo) { - def main(args: Array[String]): Unit = { - val (collectorConf, akkaConf) = parseConfig(args) - val telemetry = TelemetryAkkaService.initWithCollector(collectorConf, BuildInfo.moduleName, appVersion) - val sinks = { - val goodStream = collectorConf.streams.good - val badStream = collectorConf.streams.bad - val bufferConf = collectorConf.streams.buffer - val (good, bad) = collectorConf.streams.sink match { - case kc: Kafka => - ( - new KafkaSink(kc.maxBytes, kc, bufferConf, goodStream), - new KafkaSink(kc.maxBytes, kc, bufferConf, badStream) - ) - case _ => throw new IllegalArgumentException("Configured sink is not Kafka") - } - CollectorSinks(good, bad) - } - run(collectorConf, akkaConf, sinks, telemetry) - } + override def mkSinks(config: Config.Streams[KafkaSinkConfig]): Resource[IO, Sinks[IO]] = + for { + good <- KafkaSink.create[IO]( + config.sink.maxBytes, + config.good, + config.sink, + config.buffer + ) + bad <- KafkaSink.create[IO]( + config.sink.maxBytes, + config.bad, + config.sink, + config.buffer + ) + } yield Sinks(good, bad) + + override def telemetryInfo(config: Config[KafkaSinkConfig]): Telemetry.TelemetryInfo = + Telemetry.TelemetryInfo(None, None) } diff --git a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala index 40a8dc9d8..91be35a0c 100644 --- a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala +++ b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala @@ -13,48 +13,28 @@ package com.snowplowanalytics.snowplow.collectors.scalastream package sinks +import cats.effect.{Resource, Sync} + +import org.slf4j.LoggerFactory + import java.util.Properties import org.apache.kafka.clients.producer._ -import com.snowplowanalytics.snowplow.collectors.scalastream.model._ +import com.snowplowanalytics.snowplow.collector.core.{Config, Sink} /** * Kafka Sink for the Scala Stream Collector */ -class KafkaSink( +class KafkaSink[F[_]: Sync]( val maxBytes: Int, - kafkaConfig: Kafka, - bufferConfig: BufferConfig, + kafkaProducer: KafkaProducer[String, Array[Byte]], topicName: String -) extends Sink { +) extends Sink[F] { - private val kafkaProducer = createProducer + private lazy val log = LoggerFactory.getLogger(getClass()) - /** - * Creates a new Kafka Producer with the given - * configuration options - * - * @return a new Kafka Producer - */ - private def createProducer: KafkaProducer[String, Array[Byte]] = { - - log.info(s"Create Kafka Producer to brokers: ${kafkaConfig.brokers}") - - val props = new Properties() - props.setProperty("bootstrap.servers", kafkaConfig.brokers) - props.setProperty("acks", "all") - props.setProperty("retries", kafkaConfig.retries.toString) - props.setProperty("buffer.memory", bufferConfig.byteLimit.toString) - props.setProperty("linger.ms", bufferConfig.timeLimit.toString) - props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") - props.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer") - - // Can't use `putAll` in JDK 11 because of https://github.com/scala/bug/issues/10418 - kafkaConfig.producerConf.getOrElse(Map()).foreach { case (k, v) => props.setProperty(k, v) } - - new KafkaProducer[String, Array[Byte]](props) - } + override def isHealthy: F[Boolean] = Sync[F].pure(true) /** * Store raw events to the topic @@ -62,7 +42,7 @@ class KafkaSink( * @param events The list of events to send * @param key The partition key to use */ - override def storeRawEvents(events: List[Array[Byte]], key: String): Unit = { + override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = Sync[F].delay { log.debug(s"Writing ${events.size} Thrift records to Kafka topic $topicName at key $key") events.foreach { event => kafkaProducer.send( @@ -74,7 +54,47 @@ class KafkaSink( ) } } +} + +object KafkaSink { - override def shutdown(): Unit = - kafkaProducer.close() + def create[F[_]: Sync]( + maxBytes: Int, + topicName: String, + kafkaConfig: KafkaSinkConfig, + bufferConfig: Config.Buffer + ): Resource[F, KafkaSink[F]] = + for { + kafkaProducer <- createProducer(kafkaConfig, bufferConfig) + kafkaSink = new KafkaSink(maxBytes, kafkaProducer, topicName) + } yield kafkaSink + + /** + * Creates a new Kafka Producer with the given + * configuration options + * + * @return a new Kafka Producer + */ + private def createProducer[F[_]: Sync]( + kafkaConfig: KafkaSinkConfig, + bufferConfig: Config.Buffer + ): Resource[F, KafkaProducer[String, Array[Byte]]] = { + val acquire = Sync[F].delay { + val props = new Properties() + props.setProperty("bootstrap.servers", kafkaConfig.brokers) + props.setProperty("acks", "all") + props.setProperty("retries", kafkaConfig.retries.toString) + props.setProperty("buffer.memory", bufferConfig.byteLimit.toString) + props.setProperty("linger.ms", bufferConfig.timeLimit.toString) + props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") + props.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer") + + // Can't use `putAll` in JDK 11 because of https://github.com/scala/bug/issues/10418 + kafkaConfig.producerConf.getOrElse(Map()).foreach { case (k, v) => props.setProperty(k, v) } + + new KafkaProducer[String, Array[Byte]](props) + } + val release = (p: KafkaProducer[String, Array[Byte]]) => Sync[F].delay(p.close()) + Resource.make(acquire)(release) + } } diff --git a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSinkConfig.scala b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSinkConfig.scala new file mode 100644 index 000000000..676a5259d --- /dev/null +++ b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSinkConfig.scala @@ -0,0 +1,17 @@ +package com.snowplowanalytics.snowplow.collectors.scalastream.sinks + +import io.circe.Decoder +import io.circe.generic.semiauto._ + +import com.snowplowanalytics.snowplow.collector.core.Config + +final case class KafkaSinkConfig( + maxBytes: Int, + brokers: String, + retries: Int, + producerConf: Option[Map[String, String]] +) extends Config.Sink + +object KafkaSinkConfig { + implicit val configDecoder: Decoder[KafkaSinkConfig] = deriveDecoder[KafkaSinkConfig] +} diff --git a/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala b/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala index 8ef97a9a5..24bc9a288 100644 --- a/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala +++ b/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala @@ -18,8 +18,122 @@ */ package com.snowplowanalytics.snowplow.collectors.scalastream -import com.snowplowanalytics.snowplow.collectors.scalastream.config.ConfigSpec +import cats.effect.testing.specs2.CatsEffect +import cats.effect.{ExitCode, IO} +import com.snowplowanalytics.snowplow.collector.core.{Config, ConfigParser} +import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KafkaSinkConfig +import org.http4s.SameSite +import org.specs2.mutable.Specification -class KafkaConfigSpec extends ConfigSpec { - makeConfigTest("kafka", "", "") +import java.nio.file.Paths +import scala.concurrent.duration.DurationInt + +class KafkaConfigSpec extends Specification with CatsEffect { + + "Config parser" should { + "be able to parse extended kafka config" in { + assert( + resource = "/config.kafka.extended.hocon", + expectedResult = Right(KafkaConfigSpec.expectedConfig) + ) + } + "be able to parse minimal kafka config" in { + assert( + resource = "/config.kafka.minimal.hocon", + expectedResult = Right(KafkaConfigSpec.expectedConfig) + ) + } + } + + private def assert(resource: String, expectedResult: Either[ExitCode, Config[KafkaSinkConfig]]) = { + val path = Paths.get(getClass.getResource(resource).toURI) + ConfigParser.fromPath[IO, KafkaSinkConfig](Some(path)).value.map { result => + result must beEqualTo(expectedResult) + } + } +} + +object KafkaConfigSpec { + + private val expectedConfig = Config[KafkaSinkConfig]( + interface = "0.0.0.0", + port = 8080, + paths = Map.empty[String, String], + p3p = Config.P3P( + policyRef = "/w3c/p3p.xml", + CP = "NOI DSP COR NID PSA OUR IND COM NAV STA" + ), + crossDomain = Config.CrossDomain( + enabled = false, + domains = List("*"), + secure = true + ), + cookie = Config.Cookie( + enabled = true, + expiration = 365.days, + name = "sp", + domains = List.empty, + fallbackDomain = None, + secure = true, + httpOnly = true, + sameSite = Some(SameSite.None) + ), + doNotTrackCookie = Config.DoNotTrackCookie( + enabled = false, + name = "", + value = "" + ), + cookieBounce = Config.CookieBounce( + enabled = false, + name = "n3pc", + fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000", + forwardedProtocolHeader = None + ), + redirectMacro = Config.RedirectMacro( + enabled = false, + placeholder = None + ), + rootResponse = Config.RootResponse( + enabled = false, + statusCode = 302, + headers = Map.empty[String, String], + body = "" + ), + cors = Config.CORS(1.hour), + monitoring = + Config.Monitoring(Config.Metrics(Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector"))), + ssl = Config.SSL(enable = false, redirect = false, port = 443), + enableDefaultRedirect = false, + redirectDomains = Set.empty, + preTerminationPeriod = 10.seconds, + streams = Config.Streams( + good = "good", + bad = "bad", + useIpAddressAsPartitionKey = false, + buffer = Config.Buffer( + byteLimit = 3145728, + recordLimit = 500, + timeLimit = 5000 + ), + sink = KafkaSinkConfig( + maxBytes = 1000000, + brokers = "localhost:9092,another.host:9092", + retries = 10, + producerConf = None + ) + ), + telemetry = Config.Telemetry( + disable = false, + interval = 60.minutes, + method = "POST", + url = "telemetry-g.snowplowanalytics.com", + port = 443, + secure = true, + userProvidedId = None, + moduleName = None, + moduleVersion = None, + instanceId = None, + autoGeneratedId = None + ) + ) }