diff --git a/build.sbt b/build.sbt index dc020e98c..c80872d35 100644 --- a/build.sbt +++ b/build.sbt @@ -22,6 +22,7 @@ lazy val core = project Dependencies.Libraries.http4sBlaze, Dependencies.Libraries.http4sEmber, Dependencies.Libraries.http4sNetty, + Dependencies.Libraries.http4sArmeria, Dependencies.Libraries.http4sClient, Dependencies.Libraries.log4cats, Dependencies.Libraries.thrift, diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala index ce6d01652..bdce3703d 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala @@ -183,6 +183,7 @@ object Config { case object Blaze extends Backend case object Ember extends Backend case object Netty extends Backend + case object Armeria extends Backend } } @@ -223,10 +224,11 @@ object Config { implicit val sinkConfig = newDecoder[SinkConfig].or(legacyDecoder[SinkConfig]) implicit val streams = deriveDecoder[Streams[SinkConfig]] implicit val backend: Decoder[Experimental.Backend] = Decoder[String].emap { - case s if s.toLowerCase() == "blaze" => Right(Experimental.Backend.Blaze) - case s if s.toLowerCase() == "ember" => Right(Experimental.Backend.Ember) - case s if s.toLowerCase() == "netty" => Right(Experimental.Backend.Netty) - case other => Left(s"Invalid backend $other") + case s if s.toLowerCase() == "blaze" => Right(Experimental.Backend.Blaze) + case s if s.toLowerCase() == "ember" => Right(Experimental.Backend.Ember) + case s if s.toLowerCase() == "netty" => Right(Experimental.Backend.Netty) + case s if s.toLowerCase() == "armeria" => Right(Experimental.Backend.Armeria) + case other => Left(s"Invalid backend $other") } implicit val experimental = deriveDecoder[Experimental] diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala index 62d6891b8..509bf1bc8 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala @@ -19,6 +19,7 @@ import org.http4s.{HttpApp, HttpRoutes} import org.http4s.blaze.server.BlazeServerBuilder import org.http4s.ember.server.EmberServerBuilder import org.http4s.netty.server.NettyServerBuilder +import org.http4s.armeria.server.ArmeriaServerBuilder import com.comcast.ip4s._ import fs2.io.net.Network import fs2.io.net.tls.TLSContext @@ -31,6 +32,13 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger import java.net.InetSocketAddress import javax.net.ssl.SSLContext +import io.netty.handler.ssl.{ClientAuth, JdkSslContext} +import io.netty.handler.ssl.IdentityCipherSuiteFilter +import io.netty.handler.ssl.ApplicationProtocolConfig +import java.util.Properties +import javax.net.ssl.KeyManagerFactory +import java.security.KeyStore +import java.io.FileInputStream object HttpServer { @@ -72,8 +80,10 @@ object HttpServer { networking: Config.Networking, debugHttp: Config.Debug.Http ) = backend match { - case Config.Experimental.Backend.Ember => buildEmberServer(routes, port, secure, hsts, networking, debugHttp) - case Config.Experimental.Backend.Blaze => buildBlazeServer(routes, port, secure, hsts, networking, debugHttp) + case Config.Experimental.Backend.Ember => buildEmberServer(routes, port, secure, hsts, networking, debugHttp) + case Config.Experimental.Backend.Blaze => buildBlazeServer(routes, port, secure, hsts, networking, debugHttp) + case Config.Experimental.Backend.Netty => buildNettyServer(routes, port, secure, hsts, networking, debugHttp) + case Config.Experimental.Backend.Armeria => buildArmeriaServer(routes, port, secure, hsts, networking, debugHttp) } private def createStatsdConfig(metricsConfig: Config.Metrics): StatsDMetricFactoryConfig = { val server = InetSocketAddress.createUnresolved(metricsConfig.statsd.hostname, metricsConfig.statsd.port) @@ -145,7 +155,7 @@ object HttpServer { .build } - private def buildNettyServer[F[_]: Async: Network]( + private def buildNettyServer[F[_]: Async]( routes: HttpRoutes[F], port: Int, secure: Boolean, @@ -153,19 +163,84 @@ object HttpServer { networking: Config.Networking, debugHttp: Config.Debug.Http ): Resource[F, Server] = - Resource.eval(TLSContext.Builder.forAsync[F].system).flatMap { tls => - Resource.eval(Logger[F].info("Building netty server")) >> - NettyServerBuilder[F] - .bindHttp(port) - .withHttpApp( - loggerMiddleware(timeoutMiddleware(hstsMiddleware(hsts, routes.orNotFound), networking), debugHttp) - ) + Resource.eval(Logger[F].info(s"Building netty server $secure $port")) >> + NettyServerBuilder[F] + .bindHttp(port) + .withHttpApp( + loggerMiddleware(timeoutMiddleware(hstsMiddleware(hsts, routes.orNotFound), networking), debugHttp) + ) .withIdleTimeout(networking.idleTimeout) .withMaxInitialLineLength(networking.maxRequestLineLength) - .cond(secure, _.withSslContext(SSLContext.getDefault)) - .build + .cond( + secure, + _.withSslContext( + sslContext = new JdkSslContext( + SSLContext.getDefault, + false, + null, + IdentityCipherSuiteFilter.INSTANCE_DEFAULTING_TO_SUPPORTED_CIPHERS, + ApplicationProtocolConfig.DISABLED, + ClientAuth.OPTIONAL, + null, + false + ) + ) + ) + .resource + + private def buildArmeriaServer[F[_]: Async]( + routes: HttpRoutes[F], + port: Int, + secure: Boolean, + hsts: Config.HSTS, + networking: Config.Networking, + debugHttp: Config.Debug.Http + ): Resource[F, Server] = { + case class ArmeriaTlsConfig private (ksType: String, ksPath: String, ksPass: String) + object ArmeriaTlsConfig { + def from( + props: Properties + ): F[ArmeriaTlsConfig] = + (for { + t <- Option(props.getProperty("javax.net.ssl.keyStoreType")) + cert <- Option(props.getProperty("javax.net.ssl.keyStore")) + pass <- Option(props.getProperty("javax.net.ssl.keyStorePassword")) + } yield Async[F].delay(ArmeriaTlsConfig(t, cert, pass))).getOrElse( + Async[F].raiseError( + new IllegalStateException( + "Invalid SSL configuration. Missing required JSSE options. See: https://docs.snowplow.io/docs/pipeline-components-and-applications/stream-collector/configure/#tls-port-binding-and-certificate-240" + ) + ) + ) } + def mkTls(secure: Boolean): Resource[F, KeyManagerFactory] = + if (secure) { + for { + tlsConfig <- Resource.eval(ArmeriaTlsConfig.from(System.getProperties())) //FIXME make conditional + kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) + ks = KeyStore.getInstance(tlsConfig.ksType) + _ <- Resource.eval(Async[F].delay(ks.load(new FileInputStream(tlsConfig.ksPath), tlsConfig.ksPass.toArray))) + _ <- Resource.eval(Async[F].delay(kmf.init(ks, tlsConfig.ksPass.toArray))) + } yield kmf + } else Resource.never + + for { + _ <- Resource.eval(Logger[F].info(s"Building netty server")) + kmf <- mkTls(secure) + server <- ArmeriaServerBuilder[F] + .withHttp(port) + .withHttpApp( + "/", + loggerMiddleware(timeoutMiddleware(hstsMiddleware(hsts, routes.orNotFound), networking), debugHttp) + ) + .cond(secure, _.withTls(kmf)) + .withIdleTimeout(networking.idleTimeout) + .withRequestTimeout(networking.responseHeaderTimeout) + .resource + } yield server + } + implicit class ConditionalAction[A](item: A) { def cond(cond: Boolean, action: A => A): A = if (cond) action(item) else item diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala index 4786fed74..f61e6f248 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala @@ -82,7 +82,7 @@ object Run { ) } - private def fromConfig[F[_]: Async: Network: Tracking, SinkConfig]( + private def fromConfig[F[_]: Async: Network: Tracking, SinkConfig]( appInfo: AppInfo, mkSinks: MkSinks[F, SinkConfig], telemetryInfo: TelemetryInfo[F, SinkConfig], diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala index 1c0a5da72..dba6589c6 100644 --- a/core/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala @@ -144,6 +144,7 @@ object TestUtils { instanceId = None, autoGeneratedId = None ), - license = License(accept = true) + license = License(accept = true), + experimental = Experimental(backend = Config.Experimental.Backend.Blaze) ) } diff --git a/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala b/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala index 3b402dafc..d13b2f0d8 100644 --- a/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala +++ b/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala @@ -179,6 +179,7 @@ object KafkaConfigSpec { license = Config.License(accept = true), debug = Config .Debug - .Debug(Config.Debug.Http(enable = false, logHeaders = true, logBody = false, redactHeaders = List.empty)) + .Debug(Config.Debug.Http(enable = false, logHeaders = true, logBody = false, redactHeaders = List.empty)), + experimental = Config.Experimental(backend = Config.Experimental.Backend.Blaze) ) } diff --git a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala index 1574ee9c5..aef16e5ed 100644 --- a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala +++ b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala @@ -195,7 +195,8 @@ object KinesisConfigSpec { license = Config.License(accept = true), debug = Config .Debug - .Debug(Config.Debug.Http(enable = false, logHeaders = true, logBody = false, redactHeaders = List.empty)) + .Debug(Config.Debug.Http(enable = false, logHeaders = true, logBody = false, redactHeaders = List.empty)), + experimental = Config.Experimental(backend = Config.Experimental.Backend.Blaze) ) } diff --git a/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala b/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala index 8cc536d12..33c71d1ae 100644 --- a/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala +++ b/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala @@ -166,6 +166,7 @@ object NsqConfigSpec { license = Config.License(accept = true), debug = Config .Debug - .Debug(Config.Debug.Http(enable = false, logHeaders = true, logBody = false, redactHeaders = List.empty)) + .Debug(Config.Debug.Http(enable = false, logHeaders = true, logBody = false, redactHeaders = List.empty)), + experimental = Config.Experimental(backend = Config.Experimental.Backend.Blaze) ) } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 22b76d411..89b2c3183 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -25,6 +25,7 @@ object Dependencies { val fs2PubSub = "0.22.0" val http4s = "0.23.23" val http4sNetty = "0.5.16" + val http4sArmeria = "0.5.3" val jackson = "2.12.7" // force this version to mitigate security vulnerabilities val fs2Kafka = "2.6.1" val log4cats = "2.6.0" @@ -56,6 +57,7 @@ object Dependencies { val decline = "com.monovore" %% "decline-effect" % V.decline val emitterHttps = "com.snowplowanalytics" %% "snowplow-scala-tracker-emitter-http4s" % V.tracker val http4sNetty = "org.http4s" %% "http4s-netty-server" % V.http4sNetty + val http4sArmeria = "org.http4s" %% "http4s-armeria-server" % V.http4sArmeria val http4sEmber = "org.http4s" %% "http4s-ember-server" % V.http4s val http4sBlaze = "org.http4s" %% "http4s-blaze-server" % V.blaze val http4sClient = "org.http4s" %% "http4s-blaze-client" % V.blaze diff --git a/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala b/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala index 1392133de..0a001e4f0 100644 --- a/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala +++ b/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala @@ -186,7 +186,8 @@ object ConfigSpec { license = Config.License(accept = true), debug = Config .Debug - .Debug(Config.Debug.Http(enable = false, logHeaders = true, logBody = false, redactHeaders = List.empty)) + .Debug(Config.Debug.Http(enable = false, logHeaders = true, logBody = false, redactHeaders = List.empty)), + experimental = Config.Experimental(backend = Config.Experimental.Backend.Blaze) ) } diff --git a/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala b/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala index 86a8a8d76..7a43f919a 100644 --- a/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala +++ b/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala @@ -175,7 +175,8 @@ object SqsConfigSpec { license = Config.License(accept = true), debug = Config .Debug - .Debug(Config.Debug.Http(enable = false, logHeaders = true, logBody = false, redactHeaders = List.empty)) + .Debug(Config.Debug.Http(enable = false, logHeaders = true, logBody = false, redactHeaders = List.empty)), + experimental = Config.Experimental(backend = Config.Experimental.Backend.Blaze) ) }