From e5f82bd95b7da3794d0e93ee8dd1b3ee5ed92bce Mon Sep 17 00:00:00 2001 From: Piotr Limanowski Date: Mon, 27 May 2024 19:10:01 +0200 Subject: [PATCH] Adds Ameria backend Previously we supported Blaze (default), Ember and Netty. This adds Ameria backend, that is based upon Netty under the hood. Ameria has some interesting features, especially GRPC support. --- build.sbt | 1 + .../Config.scala | 10 +- .../HttpServer.scala | 99 ++++++++++++++++--- project/Dependencies.scala | 2 + 4 files changed, 96 insertions(+), 16 deletions(-) diff --git a/build.sbt b/build.sbt index dc020e98c..c80872d35 100644 --- a/build.sbt +++ b/build.sbt @@ -22,6 +22,7 @@ lazy val core = project Dependencies.Libraries.http4sBlaze, Dependencies.Libraries.http4sEmber, Dependencies.Libraries.http4sNetty, + Dependencies.Libraries.http4sArmeria, Dependencies.Libraries.http4sClient, Dependencies.Libraries.log4cats, Dependencies.Libraries.thrift, diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala index ce6d01652..bdce3703d 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala @@ -183,6 +183,7 @@ object Config { case object Blaze extends Backend case object Ember extends Backend case object Netty extends Backend + case object Armeria extends Backend } } @@ -223,10 +224,11 @@ object Config { implicit val sinkConfig = newDecoder[SinkConfig].or(legacyDecoder[SinkConfig]) implicit val streams = deriveDecoder[Streams[SinkConfig]] implicit val backend: Decoder[Experimental.Backend] = Decoder[String].emap { - case s if s.toLowerCase() == "blaze" => Right(Experimental.Backend.Blaze) - case s if s.toLowerCase() == "ember" => Right(Experimental.Backend.Ember) - case s if s.toLowerCase() == "netty" => Right(Experimental.Backend.Netty) - case other => Left(s"Invalid backend $other") + case s if s.toLowerCase() == "blaze" => Right(Experimental.Backend.Blaze) + case s if s.toLowerCase() == "ember" => Right(Experimental.Backend.Ember) + case s if s.toLowerCase() == "netty" => Right(Experimental.Backend.Netty) + case s if s.toLowerCase() == "armeria" => Right(Experimental.Backend.Armeria) + case other => Left(s"Invalid backend $other") } implicit val experimental = deriveDecoder[Experimental] diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala index 62d6891b8..509bf1bc8 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala @@ -19,6 +19,7 @@ import org.http4s.{HttpApp, HttpRoutes} import org.http4s.blaze.server.BlazeServerBuilder import org.http4s.ember.server.EmberServerBuilder import org.http4s.netty.server.NettyServerBuilder +import org.http4s.armeria.server.ArmeriaServerBuilder import com.comcast.ip4s._ import fs2.io.net.Network import fs2.io.net.tls.TLSContext @@ -31,6 +32,13 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger import java.net.InetSocketAddress import javax.net.ssl.SSLContext +import io.netty.handler.ssl.{ClientAuth, JdkSslContext} +import io.netty.handler.ssl.IdentityCipherSuiteFilter +import io.netty.handler.ssl.ApplicationProtocolConfig +import java.util.Properties +import javax.net.ssl.KeyManagerFactory +import java.security.KeyStore +import java.io.FileInputStream object HttpServer { @@ -72,8 +80,10 @@ object HttpServer { networking: Config.Networking, debugHttp: Config.Debug.Http ) = backend match { - case Config.Experimental.Backend.Ember => buildEmberServer(routes, port, secure, hsts, networking, debugHttp) - case Config.Experimental.Backend.Blaze => buildBlazeServer(routes, port, secure, hsts, networking, debugHttp) + case Config.Experimental.Backend.Ember => buildEmberServer(routes, port, secure, hsts, networking, debugHttp) + case Config.Experimental.Backend.Blaze => buildBlazeServer(routes, port, secure, hsts, networking, debugHttp) + case Config.Experimental.Backend.Netty => buildNettyServer(routes, port, secure, hsts, networking, debugHttp) + case Config.Experimental.Backend.Armeria => buildArmeriaServer(routes, port, secure, hsts, networking, debugHttp) } private def createStatsdConfig(metricsConfig: Config.Metrics): StatsDMetricFactoryConfig = { val server = InetSocketAddress.createUnresolved(metricsConfig.statsd.hostname, metricsConfig.statsd.port) @@ -145,7 +155,7 @@ object HttpServer { .build } - private def buildNettyServer[F[_]: Async: Network]( + private def buildNettyServer[F[_]: Async]( routes: HttpRoutes[F], port: Int, secure: Boolean, @@ -153,19 +163,84 @@ object HttpServer { networking: Config.Networking, debugHttp: Config.Debug.Http ): Resource[F, Server] = - Resource.eval(TLSContext.Builder.forAsync[F].system).flatMap { tls => - Resource.eval(Logger[F].info("Building netty server")) >> - NettyServerBuilder[F] - .bindHttp(port) - .withHttpApp( - loggerMiddleware(timeoutMiddleware(hstsMiddleware(hsts, routes.orNotFound), networking), debugHttp) - ) + Resource.eval(Logger[F].info(s"Building netty server $secure $port")) >> + NettyServerBuilder[F] + .bindHttp(port) + .withHttpApp( + loggerMiddleware(timeoutMiddleware(hstsMiddleware(hsts, routes.orNotFound), networking), debugHttp) + ) .withIdleTimeout(networking.idleTimeout) .withMaxInitialLineLength(networking.maxRequestLineLength) - .cond(secure, _.withSslContext(SSLContext.getDefault)) - .build + .cond( + secure, + _.withSslContext( + sslContext = new JdkSslContext( + SSLContext.getDefault, + false, + null, + IdentityCipherSuiteFilter.INSTANCE_DEFAULTING_TO_SUPPORTED_CIPHERS, + ApplicationProtocolConfig.DISABLED, + ClientAuth.OPTIONAL, + null, + false + ) + ) + ) + .resource + + private def buildArmeriaServer[F[_]: Async]( + routes: HttpRoutes[F], + port: Int, + secure: Boolean, + hsts: Config.HSTS, + networking: Config.Networking, + debugHttp: Config.Debug.Http + ): Resource[F, Server] = { + case class ArmeriaTlsConfig private (ksType: String, ksPath: String, ksPass: String) + object ArmeriaTlsConfig { + def from( + props: Properties + ): F[ArmeriaTlsConfig] = + (for { + t <- Option(props.getProperty("javax.net.ssl.keyStoreType")) + cert <- Option(props.getProperty("javax.net.ssl.keyStore")) + pass <- Option(props.getProperty("javax.net.ssl.keyStorePassword")) + } yield Async[F].delay(ArmeriaTlsConfig(t, cert, pass))).getOrElse( + Async[F].raiseError( + new IllegalStateException( + "Invalid SSL configuration. Missing required JSSE options. See: https://docs.snowplow.io/docs/pipeline-components-and-applications/stream-collector/configure/#tls-port-binding-and-certificate-240" + ) + ) + ) } + def mkTls(secure: Boolean): Resource[F, KeyManagerFactory] = + if (secure) { + for { + tlsConfig <- Resource.eval(ArmeriaTlsConfig.from(System.getProperties())) //FIXME make conditional + kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) + ks = KeyStore.getInstance(tlsConfig.ksType) + _ <- Resource.eval(Async[F].delay(ks.load(new FileInputStream(tlsConfig.ksPath), tlsConfig.ksPass.toArray))) + _ <- Resource.eval(Async[F].delay(kmf.init(ks, tlsConfig.ksPass.toArray))) + } yield kmf + } else Resource.never + + for { + _ <- Resource.eval(Logger[F].info(s"Building netty server")) + kmf <- mkTls(secure) + server <- ArmeriaServerBuilder[F] + .withHttp(port) + .withHttpApp( + "/", + loggerMiddleware(timeoutMiddleware(hstsMiddleware(hsts, routes.orNotFound), networking), debugHttp) + ) + .cond(secure, _.withTls(kmf)) + .withIdleTimeout(networking.idleTimeout) + .withRequestTimeout(networking.responseHeaderTimeout) + .resource + } yield server + } + implicit class ConditionalAction[A](item: A) { def cond(cond: Boolean, action: A => A): A = if (cond) action(item) else item diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 22b76d411..89b2c3183 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -25,6 +25,7 @@ object Dependencies { val fs2PubSub = "0.22.0" val http4s = "0.23.23" val http4sNetty = "0.5.16" + val http4sArmeria = "0.5.3" val jackson = "2.12.7" // force this version to mitigate security vulnerabilities val fs2Kafka = "2.6.1" val log4cats = "2.6.0" @@ -56,6 +57,7 @@ object Dependencies { val decline = "com.monovore" %% "decline-effect" % V.decline val emitterHttps = "com.snowplowanalytics" %% "snowplow-scala-tracker-emitter-http4s" % V.tracker val http4sNetty = "org.http4s" %% "http4s-netty-server" % V.http4sNetty + val http4sArmeria = "org.http4s" %% "http4s-armeria-server" % V.http4sArmeria val http4sEmber = "org.http4s" %% "http4s-ember-server" % V.http4s val http4sBlaze = "org.http4s" %% "http4s-blaze-server" % V.blaze val http4sClient = "org.http4s" %% "http4s-blaze-client" % V.blaze