Skip to content

Commit

Permalink
Wrap kafka sink with effects
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Sep 25, 2023
1 parent 45f3f67 commit c6982c8
Show file tree
Hide file tree
Showing 12 changed files with 465 additions and 92 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ jobs:
run: sbt "project kinesisDistroless" IntegrationTest/test
- name: Run integration tests PubSub
run: sbt "project pubsubDistroless" IntegrationTest/test
- name: Run integration tests Kafka
run: sbt "project kafkaDistroless" IntegrationTest/test
21 changes: 17 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -245,23 +245,36 @@ lazy val pubsubDistroless = project
.configs(IntegrationTest)

lazy val kafkaSettings =
allSettings ++ buildInfoSettings ++ Seq(
allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Defaults.itSettings ++ Seq(
moduleName := "snowplow-stream-collector-kafka",
buildInfoPackage := s"com.snowplowanalytics.snowplow.collectors.scalastream",
Docker / packageName := "scala-stream-collector-kafka",
libraryDependencies ++= Seq(Dependencies.Libraries.kafkaClients, Dependencies.Libraries.mskAuth)
libraryDependencies ++= Seq(
Dependencies.Libraries.kafkaClients,
Dependencies.Libraries.mskAuth,
// integration tests dependencies
Dependencies.Libraries.IT.specs2,
Dependencies.Libraries.IT.specs2CE
),
IntegrationTest / test := (IntegrationTest / test).dependsOn(Docker / publishLocal).value,
IntegrationTest / testOnly := (IntegrationTest / testOnly).dependsOn(Docker / publishLocal).evaluated
)

lazy val kafka = project
.settings(kafkaSettings)
.enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin)
.dependsOn(core % "test->test;compile->compile")
.dependsOn(http4s % "test->test;compile->compile;it->it")
.configs(IntegrationTest)


lazy val kafkaDistroless = project
.in(file("distroless/kafka"))
.settings(sourceDirectory := (kafka / sourceDirectory).value)
.settings(kafkaSettings)
.enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin)
.dependsOn(core % "test->test;compile->compile")
.dependsOn(http4s % "test->test;compile->compile;it->it")
.configs(IntegrationTest)


lazy val nsqSettings =
allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object Run {
def fromCli[F[_]: Async: Tracking, SinkConfig: Decoder](
appInfo: AppInfo,
mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]],
telemetryInfo: Config[SinkConfig] => Telemetry.TelemetryInfo
telemetryInfo: Config[SinkConfig] => Telemetry.TelemetryInfo
): Opts[F[ExitCode]] = {
val configPath = Opts.option[Path]("config", "Path to HOCON configuration (optional)", "c", "config.hocon").orNone
configPath.map(fromPath[F, SinkConfig](appInfo, mkSinks, telemetryInfo, _))
Expand All @@ -39,7 +39,7 @@ object Run {
private def fromPath[F[_]: Async: Tracking, SinkConfig: Decoder](
appInfo: AppInfo,
mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]],
telemetryInfo: Config[SinkConfig] => Telemetry.TelemetryInfo,
telemetryInfo: Config[SinkConfig] => Telemetry.TelemetryInfo,
path: Option[Path]
): F[ExitCode] = {
val eitherT = for {
Expand Down
14 changes: 14 additions & 0 deletions kafka/src/it/resources/collector.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
collector {
interface = "0.0.0.0"
port = ${PORT}

streams {
good = ${TOPIC_GOOD}
bad = ${TOPIC_BAD}

sink {
brokers = ${BROKER}
maxBytes = ${MAX_BYTES}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package com.snowplowanalytics.snowplow.collectors.scalastream.it.kafka

import cats.effect._
import com.dimafeng.testcontainers.{FixedHostPortGenericContainer, GenericContainer}
import com.snowplowanalytics.snowplow.collectors.scalastream.BuildInfo
import com.snowplowanalytics.snowplow.collectors.scalastream.it.CollectorContainer
import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._
import org.testcontainers.containers.wait.strategy.Wait
import org.testcontainers.containers.{BindMode, Network, GenericContainer => JGenericContainer}

object Containers {

val zookeeperContainerName = "zookeeper"
val zookeeperPort = 2181
val brokerContainerName = "broker"
val brokerExternalPort = 9092
val brokerInternalPort = 29092

def createContainers(
goodTopic: String,
badTopic: String,
maxBytes: Int
): Resource[IO, CollectorContainer] =
for {
network <- network()
_ <- zookeeper(network)
_ <- kafka(network)
c <- collectorKafka(network, goodTopic, badTopic, maxBytes)
} yield c

private def network(): Resource[IO, Network] =
Resource.make(IO(Network.newNetwork()))(n => IO(n.close()))

private def kafka(
network: Network
): Resource[IO, JGenericContainer[_]] =
Resource.make(
IO {
val container = FixedHostPortGenericContainer(
imageName = "confluentinc/cp-kafka:7.0.1",
env = Map(
"KAFKA_BROKER_ID" -> "1",
"KAFKA_ZOOKEEPER_CONNECT" -> s"$zookeeperContainerName:$zookeeperPort",
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" -> "PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT",
"KAFKA_ADVERTISED_LISTENERS" -> s"PLAINTEXT://localhost:$brokerExternalPort,PLAINTEXT_INTERNAL://$brokerContainerName:$brokerInternalPort",
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR" -> "1",
"KAFKA_TRANSACTION_STATE_LOG_MIN_ISR" -> "1",
"KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR" -> "1"
),
exposedPorts = List(brokerExternalPort, brokerInternalPort),
exposedHostPort = brokerExternalPort,
exposedContainerPort = brokerExternalPort
)
container.container.withNetwork(network)
container.container.withNetworkAliases(brokerContainerName)
container.start()
container.container
}
)(e => IO(e.stop()))

private def zookeeper(
network: Network,
): Resource[IO, JGenericContainer[_]] =
Resource.make(
IO {
val container = GenericContainer(
dockerImage = "confluentinc/cp-zookeeper:7.0.1",
env = Map(
"ZOOKEEPER_CLIENT_PORT" -> zookeeperPort.toString,
"ZOOKEEPER_TICK_TIME" -> "2000"
),
exposedPorts = List(zookeeperPort)
)
container.container.withNetwork(network)
container.container.withNetworkAliases(zookeeperContainerName)
container.start()
container.container
}
)(e => IO(e.stop()))

def collectorKafka(
network: Network,
goodTopic: String,
badTopic: String,
maxBytes: Int
): Resource[IO, CollectorContainer] = {
Resource.make(
IO {
val collectorPort = 8080
val container = GenericContainer(
dockerImage = BuildInfo.dockerAlias,
env = Map(
"PORT" -> collectorPort.toString,
"BROKER" -> s"$brokerContainerName:$brokerInternalPort",
"TOPIC_GOOD" -> goodTopic,
"TOPIC_BAD" -> badTopic,
"MAX_BYTES" -> maxBytes.toString
),
exposedPorts = Seq(collectorPort),
fileSystemBind = Seq(
GenericContainer.FileSystemBind(
"kafka/src/it/resources/collector.hocon",
"/snowplow/config/collector.hocon",
BindMode.READ_ONLY
)
),
command = Seq(
"--config",
"/snowplow/config/collector.hocon"
),
waitStrategy = Wait.forLogMessage(s".*Service bound to address.*", 1)
)
container.container.withNetwork(network)
val c = startContainerWithLogs(container.container, "collector")
CollectorContainer(c, c.getHost, c.getMappedPort(collectorPort))
}
)(c => IO(c.container.stop()))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.snowplowanalytics.snowplow.collectors.scalastream.it.kafka

import scala.concurrent.duration._

import cats.effect.IO
import cats.effect.testing.specs2.CatsEffect

import com.snowplowanalytics.snowplow.collectors.scalastream.it.EventGenerator
import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._

import org.specs2.mutable.Specification

class KafkaCollectorSpec extends Specification with CatsEffect {

override protected val Timeout = 5.minutes

val maxBytes = 10000

"emit the correct number of collector payloads and bad rows" in {
val testName = "count"
val nbGood = 1000
val nbBad = 10
val goodTopic = "test-raw"
val badTopic = "test-bad"

Containers.createContainers(
goodTopic = goodTopic,
badTopic = badTopic,
maxBytes = maxBytes
).use { collector =>
for {
_ <- log(testName, "Sending data")
_ <- EventGenerator.sendEvents(
collector.host,
collector.port,
nbGood,
nbBad,
maxBytes
)
_ <- log(testName, "Data sent. Waiting for collector to work")
_ <- IO.sleep(30.second)
_ <- log(testName, "Consuming collector's output")
collectorOutput <- KafkaUtils.readOutput(
brokerAddr = s"localhost:${Containers.brokerExternalPort}",
goodTopic = goodTopic,
badTopic = badTopic
)
} yield {
collectorOutput.good.size must beEqualTo(nbGood)
collectorOutput.bad.size must beEqualTo(nbBad)
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.snowplowanalytics.snowplow.collectors.scalastream.it.kafka

import cats.effect._
import org.apache.kafka.clients.consumer._
import java.util.Properties
import java.time.Duration
import scala.collection.JavaConverters._
import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._
import com.snowplowanalytics.snowplow.collectors.scalastream.it.CollectorOutput

object KafkaUtils {

def readOutput(
brokerAddr: String,
goodTopic: String,
badTopic: String
): IO[CollectorOutput] = {
createConsumer(brokerAddr).use { kafkaConsumer =>
IO {
kafkaConsumer.subscribe(List(goodTopic, badTopic).asJava)
val records = kafkaConsumer.poll(Duration.ofSeconds(20))
val extract = (r: ConsumerRecords[String, Array[Byte]], topicName: String) =>
r.records(topicName).asScala.toList.map(_.value())
val goodCount = extract(records, goodTopic).map(parseCollectorPayload)
val badCount = extract(records, badTopic).map(parseBadRow)
CollectorOutput(goodCount, badCount)
}
}
}

private def createConsumer(brokerAddr: String): Resource[IO, KafkaConsumer[String, Array[Byte]]] = {
val acquire = IO {
val props = new Properties()
props.setProperty("bootstrap.servers", brokerAddr)
props.setProperty("group.id", "it-collector")
props.setProperty("auto.offset.reset", "earliest")
props.setProperty("max.poll.records", "2000")
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
new KafkaConsumer[String, Array[Byte]](props)
}
val release = (p: KafkaConsumer[String, Array[Byte]]) => IO(p.close())
Resource.make(acquire)(release)
}

}
23 changes: 0 additions & 23 deletions kafka/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,3 @@ collector {
}
}
}


akka {
loglevel = WARNING
loggers = ["akka.event.slf4j.Slf4jLogger"]

http.server {
remote-address-header = on
raw-request-uri-header = on

parsing {
max-uri-length = 32768
uri-parsing-mode = relaxed
illegal-header-warnings = off
}

max-connections = 2048
}

coordinated-shutdown {
run-by-jvm-shutdown-hook = off
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,29 @@
*/
package com.snowplowanalytics.snowplow.collectors.scalastream

import com.snowplowanalytics.snowplow.collectors.scalastream.model._
import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KafkaSink
import com.snowplowanalytics.snowplow.collectors.scalastream.telemetry.TelemetryAkkaService
import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo
import cats.effect.{IO, Resource}
import com.snowplowanalytics.snowplow.collector.core.model.Sinks
import com.snowplowanalytics.snowplow.collector.core.{App, Config, Telemetry}
import com.snowplowanalytics.snowplow.collectors.scalastream.sinks._

object KafkaCollector extends Collector {
def appName = BuildInfo.shortName
def appVersion = BuildInfo.version
def scalaVersion = BuildInfo.scalaVersion
object KafkaCollector extends App[KafkaSinkConfig](BuildInfo) {

def main(args: Array[String]): Unit = {
val (collectorConf, akkaConf) = parseConfig(args)
val telemetry = TelemetryAkkaService.initWithCollector(collectorConf, BuildInfo.moduleName, appVersion)
val sinks = {
val goodStream = collectorConf.streams.good
val badStream = collectorConf.streams.bad
val bufferConf = collectorConf.streams.buffer
val (good, bad) = collectorConf.streams.sink match {
case kc: Kafka =>
(
new KafkaSink(kc.maxBytes, kc, bufferConf, goodStream),
new KafkaSink(kc.maxBytes, kc, bufferConf, badStream)
)
case _ => throw new IllegalArgumentException("Configured sink is not Kafka")
}
CollectorSinks(good, bad)
}
run(collectorConf, akkaConf, sinks, telemetry)
}
override def mkSinks(config: Config.Streams[KafkaSinkConfig]): Resource[IO, Sinks[IO]] =
for {
good <- KafkaSink.create[IO](
config.sink.maxBytes,
config.good,
config.sink,
config.buffer
)
bad <- KafkaSink.create[IO](
config.sink.maxBytes,
config.bad,
config.sink,
config.buffer
)
} yield Sinks(good, bad)

override def telemetryInfo(config: Config[KafkaSinkConfig]): Telemetry.TelemetryInfo =
Telemetry.TelemetryInfo(None, None)
}
Loading

0 comments on commit c6982c8

Please sign in to comment.