Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Http4s Kafka Sink #353

Merged
merged 1 commit into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ jobs:
run: sbt "project kinesisDistroless" IntegrationTest/test
- name: Run integration tests PubSub
run: sbt "project pubsubDistroless" IntegrationTest/test
- name: Run integration tests Kafka
run: sbt "project kafkaDistroless" IntegrationTest/test
21 changes: 17 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -245,23 +245,36 @@ lazy val pubsubDistroless = project
.configs(IntegrationTest)

lazy val kafkaSettings =
allSettings ++ buildInfoSettings ++ Seq(
allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Defaults.itSettings ++ Seq(
moduleName := "snowplow-stream-collector-kafka",
buildInfoPackage := s"com.snowplowanalytics.snowplow.collectors.scalastream",
Docker / packageName := "scala-stream-collector-kafka",
libraryDependencies ++= Seq(Dependencies.Libraries.kafkaClients, Dependencies.Libraries.mskAuth)
libraryDependencies ++= Seq(
Dependencies.Libraries.kafkaClients,
Dependencies.Libraries.mskAuth,
// integration tests dependencies
Dependencies.Libraries.IT.specs2,
Dependencies.Libraries.IT.specs2CE
),
IntegrationTest / test := (IntegrationTest / test).dependsOn(Docker / publishLocal).value,
IntegrationTest / testOnly := (IntegrationTest / testOnly).dependsOn(Docker / publishLocal).evaluated
)

lazy val kafka = project
.settings(kafkaSettings)
.enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin)
.dependsOn(core % "test->test;compile->compile")
.dependsOn(http4s % "test->test;compile->compile;it->it")
.configs(IntegrationTest)


lazy val kafkaDistroless = project
.in(file("distroless/kafka"))
.settings(sourceDirectory := (kafka / sourceDirectory).value)
.settings(kafkaSettings)
.enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin)
.dependsOn(core % "test->test;compile->compile")
.dependsOn(http4s % "test->test;compile->compile;it->it")
.configs(IntegrationTest)


lazy val nsqSettings =
allSettings ++ buildInfoSettings ++ http4sBuildInfoSettings ++ Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object Run {
def fromCli[F[_]: Async: Tracking, SinkConfig: Decoder](
appInfo: AppInfo,
mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]],
telemetryInfo: Config[SinkConfig] => Telemetry.TelemetryInfo
telemetryInfo: Config[SinkConfig] => Telemetry.TelemetryInfo
): Opts[F[ExitCode]] = {
val configPath = Opts.option[Path]("config", "Path to HOCON configuration (optional)", "c", "config.hocon").orNone
configPath.map(fromPath[F, SinkConfig](appInfo, mkSinks, telemetryInfo, _))
Expand All @@ -39,7 +39,7 @@ object Run {
private def fromPath[F[_]: Async: Tracking, SinkConfig: Decoder](
appInfo: AppInfo,
mkSinks: Config.Streams[SinkConfig] => Resource[F, Sinks[F]],
telemetryInfo: Config[SinkConfig] => Telemetry.TelemetryInfo,
telemetryInfo: Config[SinkConfig] => Telemetry.TelemetryInfo,
path: Option[Path]
): F[ExitCode] = {
val eitherT = for {
Expand Down
14 changes: 14 additions & 0 deletions kafka/src/it/resources/collector.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
collector {
interface = "0.0.0.0"
port = ${PORT}

streams {
good = ${TOPIC_GOOD}
bad = ${TOPIC_BAD}

sink {
brokers = ${BROKER}
maxBytes = ${MAX_BYTES}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package com.snowplowanalytics.snowplow.collectors.scalastream.it.kafka

import cats.effect._
import com.dimafeng.testcontainers.{FixedHostPortGenericContainer, GenericContainer}
import com.snowplowanalytics.snowplow.collectors.scalastream.BuildInfo
import com.snowplowanalytics.snowplow.collectors.scalastream.it.CollectorContainer
import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._
import org.testcontainers.containers.wait.strategy.Wait
import org.testcontainers.containers.{BindMode, Network, GenericContainer => JGenericContainer}

object Containers {

val zookeeperContainerName = "zookeeper"
val zookeeperPort = 2181
val brokerContainerName = "broker"
val brokerExternalPort = 9092
val brokerInternalPort = 29092
spenes marked this conversation as resolved.
Show resolved Hide resolved

def createContainers(
goodTopic: String,
badTopic: String,
maxBytes: Int
): Resource[IO, CollectorContainer] =
for {
network <- network()
_ <- zookeeper(network)
_ <- kafka(network)
c <- collectorKafka(network, goodTopic, badTopic, maxBytes)
} yield c

private def network(): Resource[IO, Network] =
Resource.make(IO(Network.newNetwork()))(n => IO(n.close()))

private def kafka(
network: Network
): Resource[IO, JGenericContainer[_]] =
Resource.make(
IO {
val container = FixedHostPortGenericContainer(
imageName = "confluentinc/cp-kafka:7.0.1",
env = Map(
"KAFKA_BROKER_ID" -> "1",
"KAFKA_ZOOKEEPER_CONNECT" -> s"$zookeeperContainerName:$zookeeperPort",
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" -> "PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT",
"KAFKA_ADVERTISED_LISTENERS" -> s"PLAINTEXT://localhost:$brokerExternalPort,PLAINTEXT_INTERNAL://$brokerContainerName:$brokerInternalPort",
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR" -> "1",
"KAFKA_TRANSACTION_STATE_LOG_MIN_ISR" -> "1",
"KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR" -> "1"
),
exposedPorts = List(brokerExternalPort, brokerInternalPort),
exposedHostPort = brokerExternalPort,
exposedContainerPort = brokerExternalPort
)
container.container.withNetwork(network)
container.container.withNetworkAliases(brokerContainerName)
container.start()
container.container
}
)(e => IO(e.stop()))

private def zookeeper(
network: Network,
): Resource[IO, JGenericContainer[_]] =
Resource.make(
IO {
val container = GenericContainer(
dockerImage = "confluentinc/cp-zookeeper:7.0.1",
env = Map(
"ZOOKEEPER_CLIENT_PORT" -> zookeeperPort.toString,
"ZOOKEEPER_TICK_TIME" -> "2000"
),
exposedPorts = List(zookeeperPort)
)
container.container.withNetwork(network)
container.container.withNetworkAliases(zookeeperContainerName)
container.start()
container.container
}
)(e => IO(e.stop()))

def collectorKafka(
network: Network,
goodTopic: String,
badTopic: String,
maxBytes: Int
): Resource[IO, CollectorContainer] = {
Resource.make(
IO {
val collectorPort = 8080
val container = GenericContainer(
dockerImage = BuildInfo.dockerAlias,
env = Map(
"PORT" -> collectorPort.toString,
"BROKER" -> s"$brokerContainerName:$brokerInternalPort",
"TOPIC_GOOD" -> goodTopic,
"TOPIC_BAD" -> badTopic,
"MAX_BYTES" -> maxBytes.toString
),
exposedPorts = Seq(collectorPort),
fileSystemBind = Seq(
GenericContainer.FileSystemBind(
"kafka/src/it/resources/collector.hocon",
"/snowplow/config/collector.hocon",
BindMode.READ_ONLY
)
),
command = Seq(
"--config",
"/snowplow/config/collector.hocon"
),
waitStrategy = Wait.forLogMessage(s".*Service bound to address.*", 1)
)
container.container.withNetwork(network)
val c = startContainerWithLogs(container.container, "collector")
CollectorContainer(c, c.getHost, c.getMappedPort(collectorPort))
}
)(c => IO(c.container.stop()))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.snowplowanalytics.snowplow.collectors.scalastream.it.kafka

import scala.concurrent.duration._

import cats.effect.IO
import cats.effect.testing.specs2.CatsEffect

import com.snowplowanalytics.snowplow.collectors.scalastream.it.EventGenerator
import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._

import org.specs2.mutable.Specification

class KafkaCollectorSpec extends Specification with CatsEffect {

override protected val Timeout = 5.minutes

val maxBytes = 10000

"emit the correct number of collector payloads and bad rows" in {
val testName = "count"
val nbGood = 1000
val nbBad = 10
val goodTopic = "test-raw"
val badTopic = "test-bad"

Containers.createContainers(
goodTopic = goodTopic,
badTopic = badTopic,
maxBytes = maxBytes
).use { collector =>
for {
_ <- log(testName, "Sending data")
_ <- EventGenerator.sendEvents(
collector.host,
collector.port,
nbGood,
nbBad,
maxBytes
)
_ <- log(testName, "Data sent. Waiting for collector to work")
_ <- IO.sleep(30.second)
_ <- log(testName, "Consuming collector's output")
collectorOutput <- KafkaUtils.readOutput(
brokerAddr = s"localhost:${Containers.brokerExternalPort}",
goodTopic = goodTopic,
badTopic = badTopic
)
} yield {
collectorOutput.good.size must beEqualTo(nbGood)
collectorOutput.bad.size must beEqualTo(nbBad)
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.snowplowanalytics.snowplow.collectors.scalastream.it.kafka

import cats.effect._
import org.apache.kafka.clients.consumer._
import java.util.Properties
import java.time.Duration
import scala.collection.JavaConverters._
import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._
import com.snowplowanalytics.snowplow.collectors.scalastream.it.CollectorOutput

object KafkaUtils {

def readOutput(
brokerAddr: String,
goodTopic: String,
badTopic: String
): IO[CollectorOutput] = {
createConsumer(brokerAddr).use { kafkaConsumer =>
IO {
kafkaConsumer.subscribe(List(goodTopic, badTopic).asJava)
val records = kafkaConsumer.poll(Duration.ofSeconds(20))
val extract = (r: ConsumerRecords[String, Array[Byte]], topicName: String) =>
r.records(topicName).asScala.toList.map(_.value())
val goodCount = extract(records, goodTopic).map(parseCollectorPayload)
val badCount = extract(records, badTopic).map(parseBadRow)
CollectorOutput(goodCount, badCount)
}
}
}

private def createConsumer(brokerAddr: String): Resource[IO, KafkaConsumer[String, Array[Byte]]] = {
val acquire = IO {
val props = new Properties()
props.setProperty("bootstrap.servers", brokerAddr)
props.setProperty("group.id", "it-collector")
props.setProperty("auto.offset.reset", "earliest")
props.setProperty("max.poll.records", "2000")
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
new KafkaConsumer[String, Array[Byte]](props)
}
val release = (p: KafkaConsumer[String, Array[Byte]]) => IO(p.close())
Resource.make(acquire)(release)
}

}
23 changes: 0 additions & 23 deletions kafka/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,3 @@ collector {
}
}
}


akka {
loglevel = WARNING
loggers = ["akka.event.slf4j.Slf4jLogger"]

http.server {
remote-address-header = on
raw-request-uri-header = on

parsing {
max-uri-length = 32768
uri-parsing-mode = relaxed
illegal-header-warnings = off
}

max-connections = 2048
}

coordinated-shutdown {
run-by-jvm-shutdown-hook = off
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,29 @@
*/
package com.snowplowanalytics.snowplow.collectors.scalastream

import com.snowplowanalytics.snowplow.collectors.scalastream.model._
import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KafkaSink
import com.snowplowanalytics.snowplow.collectors.scalastream.telemetry.TelemetryAkkaService
import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo
import cats.effect.{IO, Resource}
import com.snowplowanalytics.snowplow.collector.core.model.Sinks
import com.snowplowanalytics.snowplow.collector.core.{App, Config, Telemetry}
import com.snowplowanalytics.snowplow.collectors.scalastream.sinks._

object KafkaCollector extends Collector {
def appName = BuildInfo.shortName
def appVersion = BuildInfo.version
def scalaVersion = BuildInfo.scalaVersion
object KafkaCollector extends App[KafkaSinkConfig](BuildInfo) {

def main(args: Array[String]): Unit = {
val (collectorConf, akkaConf) = parseConfig(args)
val telemetry = TelemetryAkkaService.initWithCollector(collectorConf, BuildInfo.moduleName, appVersion)
val sinks = {
val goodStream = collectorConf.streams.good
val badStream = collectorConf.streams.bad
val bufferConf = collectorConf.streams.buffer
val (good, bad) = collectorConf.streams.sink match {
case kc: Kafka =>
(
new KafkaSink(kc.maxBytes, kc, bufferConf, goodStream),
new KafkaSink(kc.maxBytes, kc, bufferConf, badStream)
)
case _ => throw new IllegalArgumentException("Configured sink is not Kafka")
}
CollectorSinks(good, bad)
}
run(collectorConf, akkaConf, sinks, telemetry)
}
override def mkSinks(config: Config.Streams[KafkaSinkConfig]): Resource[IO, Sinks[IO]] =
for {
good <- KafkaSink.create[IO](
config.sink.maxBytes,
config.good,
config.sink,
config.buffer
)
bad <- KafkaSink.create[IO](
config.sink.maxBytes,
config.bad,
config.sink,
config.buffer
)
} yield Sinks(good, bad)

override def telemetryInfo(config: Config[KafkaSinkConfig]): Telemetry.TelemetryInfo =
Telemetry.TelemetryInfo(None, None)
}
Loading