Skip to content

Commit

Permalink
Experimental: Adds Ameria backend
Browse files Browse the repository at this point in the history
Previously we supported Blaze (default), Ember and Netty. This adds
Ameria backend, that is based upon Netty under the hood.
Ameria has some interesting features, especially GRPC support.
  • Loading branch information
peel committed May 28, 2024
1 parent 2f1a7d7 commit 1547adc
Show file tree
Hide file tree
Showing 11 changed files with 109 additions and 23 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ lazy val core = project
Dependencies.Libraries.http4sBlaze,
Dependencies.Libraries.http4sEmber,
Dependencies.Libraries.http4sNetty,
Dependencies.Libraries.http4sArmeria,
Dependencies.Libraries.http4sClient,
Dependencies.Libraries.log4cats,
Dependencies.Libraries.thrift,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ object Config {
case object Blaze extends Backend
case object Ember extends Backend
case object Netty extends Backend
case object Armeria extends Backend
}
}

Expand Down Expand Up @@ -223,10 +224,11 @@ object Config {
implicit val sinkConfig = newDecoder[SinkConfig].or(legacyDecoder[SinkConfig])
implicit val streams = deriveDecoder[Streams[SinkConfig]]
implicit val backend: Decoder[Experimental.Backend] = Decoder[String].emap {
case s if s.toLowerCase() == "blaze" => Right(Experimental.Backend.Blaze)
case s if s.toLowerCase() == "ember" => Right(Experimental.Backend.Ember)
case s if s.toLowerCase() == "netty" => Right(Experimental.Backend.Netty)
case other => Left(s"Invalid backend $other")
case s if s.toLowerCase() == "blaze" => Right(Experimental.Backend.Blaze)
case s if s.toLowerCase() == "ember" => Right(Experimental.Backend.Ember)
case s if s.toLowerCase() == "netty" => Right(Experimental.Backend.Netty)
case s if s.toLowerCase() == "armeria" => Right(Experimental.Backend.Armeria)
case other => Left(s"Invalid backend $other")
}
implicit val experimental = deriveDecoder[Experimental]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.http4s.{HttpApp, HttpRoutes}
import org.http4s.blaze.server.BlazeServerBuilder
import org.http4s.ember.server.EmberServerBuilder
import org.http4s.netty.server.NettyServerBuilder
import org.http4s.armeria.server.ArmeriaServerBuilder
import com.comcast.ip4s._
import fs2.io.net.Network
import fs2.io.net.tls.TLSContext
Expand All @@ -31,6 +32,13 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger

import java.net.InetSocketAddress
import javax.net.ssl.SSLContext
import io.netty.handler.ssl.{ClientAuth, JdkSslContext}
import io.netty.handler.ssl.IdentityCipherSuiteFilter
import io.netty.handler.ssl.ApplicationProtocolConfig
import java.util.Properties
import javax.net.ssl.KeyManagerFactory
import java.security.KeyStore
import java.io.FileInputStream

object HttpServer {

Expand Down Expand Up @@ -72,8 +80,10 @@ object HttpServer {
networking: Config.Networking,
debugHttp: Config.Debug.Http
) = backend match {
case Config.Experimental.Backend.Ember => buildEmberServer(routes, port, secure, hsts, networking, debugHttp)
case Config.Experimental.Backend.Blaze => buildBlazeServer(routes, port, secure, hsts, networking, debugHttp)
case Config.Experimental.Backend.Ember => buildEmberServer(routes, port, secure, hsts, networking, debugHttp)
case Config.Experimental.Backend.Blaze => buildBlazeServer(routes, port, secure, hsts, networking, debugHttp)
case Config.Experimental.Backend.Netty => buildNettyServer(routes, port, secure, hsts, networking, debugHttp)
case Config.Experimental.Backend.Armeria => buildArmeriaServer(routes, port, secure, hsts, networking, debugHttp)
}
private def createStatsdConfig(metricsConfig: Config.Metrics): StatsDMetricFactoryConfig = {
val server = InetSocketAddress.createUnresolved(metricsConfig.statsd.hostname, metricsConfig.statsd.port)
Expand Down Expand Up @@ -145,27 +155,92 @@ object HttpServer {
.build
}

private def buildNettyServer[F[_]: Async: Network](
private def buildNettyServer[F[_]: Async](
routes: HttpRoutes[F],
port: Int,
secure: Boolean,
hsts: Config.HSTS,
networking: Config.Networking,
debugHttp: Config.Debug.Http
): Resource[F, Server] =
Resource.eval(TLSContext.Builder.forAsync[F].system).flatMap { tls =>
Resource.eval(Logger[F].info("Building netty server")) >>
NettyServerBuilder[F]
.bindHttp(port)
.withHttpApp(
loggerMiddleware(timeoutMiddleware(hstsMiddleware(hsts, routes.orNotFound), networking), debugHttp)
)
Resource.eval(Logger[F].info(s"Building netty server $secure $port")) >>
NettyServerBuilder[F]
.bindHttp(port)
.withHttpApp(
loggerMiddleware(timeoutMiddleware(hstsMiddleware(hsts, routes.orNotFound), networking), debugHttp)
)
.withIdleTimeout(networking.idleTimeout)
.withMaxInitialLineLength(networking.maxRequestLineLength)
.cond(secure, _.withSslContext(SSLContext.getDefault))
.build
.cond(
secure,
_.withSslContext(
sslContext = new JdkSslContext(
SSLContext.getDefault,
false,
null,
IdentityCipherSuiteFilter.INSTANCE_DEFAULTING_TO_SUPPORTED_CIPHERS,
ApplicationProtocolConfig.DISABLED,
ClientAuth.OPTIONAL,
null,
false
)
)
)
.resource

private def buildArmeriaServer[F[_]: Async](
routes: HttpRoutes[F],
port: Int,
secure: Boolean,
hsts: Config.HSTS,
networking: Config.Networking,
debugHttp: Config.Debug.Http
): Resource[F, Server] = {
case class ArmeriaTlsConfig private (ksType: String, ksPath: String, ksPass: String)
object ArmeriaTlsConfig {
def from(
props: Properties
): F[ArmeriaTlsConfig] =
(for {
t <- Option(props.getProperty("javax.net.ssl.keyStoreType"))
cert <- Option(props.getProperty("javax.net.ssl.keyStore"))
pass <- Option(props.getProperty("javax.net.ssl.keyStorePassword"))
} yield Async[F].delay(ArmeriaTlsConfig(t, cert, pass))).getOrElse(
Async[F].raiseError(
new IllegalStateException(
"Invalid SSL configuration. Missing required JSSE options. See: https://docs.snowplow.io/docs/pipeline-components-and-applications/stream-collector/configure/#tls-port-binding-and-certificate-240"
)
)
)
}

def mkTls(secure: Boolean): Resource[F, KeyManagerFactory] =
if (secure) {
for {
tlsConfig <- Resource.eval(ArmeriaTlsConfig.from(System.getProperties())) //FIXME make conditional
kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
ks = KeyStore.getInstance(tlsConfig.ksType)
_ <- Resource.eval(Async[F].delay(ks.load(new FileInputStream(tlsConfig.ksPath), tlsConfig.ksPass.toArray)))
_ <- Resource.eval(Async[F].delay(kmf.init(ks, tlsConfig.ksPass.toArray)))
} yield kmf
} else Resource.never

for {
_ <- Resource.eval(Logger[F].info(s"Building netty server"))
kmf <- mkTls(secure)
server <- ArmeriaServerBuilder[F]
.withHttp(port)
.withHttpApp(
"/",
loggerMiddleware(timeoutMiddleware(hstsMiddleware(hsts, routes.orNotFound), networking), debugHttp)
)
.cond(secure, _.withTls(kmf))
.withIdleTimeout(networking.idleTimeout)
.withRequestTimeout(networking.responseHeaderTimeout)
.resource
} yield server
}

implicit class ConditionalAction[A](item: A) {
def cond(cond: Boolean, action: A => A): A =
if (cond) action(item) else item
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ object Run {
)
}

private def fromConfig[F[_]: Async: Network: Tracking, SinkConfig](
private def fromConfig[F[_]: Async: Network: Tracking, SinkConfig](
appInfo: AppInfo,
mkSinks: MkSinks[F, SinkConfig],
telemetryInfo: TelemetryInfo[F, SinkConfig],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ object TestUtils {
instanceId = None,
autoGeneratedId = None
),
license = License(accept = true)
license = License(accept = true),
experimental = Experimental(backend = Config.Experimental.Backend.Blaze)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ object KafkaConfigSpec {
license = Config.License(accept = true),
debug = Config
.Debug
.Debug(Config.Debug.Http(enable = false, logHeaders = true, logBody = false, redactHeaders = List.empty))
.Debug(Config.Debug.Http(enable = false, logHeaders = true, logBody = false, redactHeaders = List.empty)),
experimental = Config.Experimental(backend = Config.Experimental.Backend.Blaze)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ object KinesisConfigSpec {
license = Config.License(accept = true),
debug = Config
.Debug
.Debug(Config.Debug.Http(enable = false, logHeaders = true, logBody = false, redactHeaders = List.empty))
.Debug(Config.Debug.Http(enable = false, logHeaders = true, logBody = false, redactHeaders = List.empty)),
experimental = Config.Experimental(backend = Config.Experimental.Backend.Blaze)
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ object NsqConfigSpec {
license = Config.License(accept = true),
debug = Config
.Debug
.Debug(Config.Debug.Http(enable = false, logHeaders = true, logBody = false, redactHeaders = List.empty))
.Debug(Config.Debug.Http(enable = false, logHeaders = true, logBody = false, redactHeaders = List.empty)),
experimental = Config.Experimental(backend = Config.Experimental.Backend.Blaze)
)
}
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ object Dependencies {
val fs2PubSub = "0.22.0"
val http4s = "0.23.23"
val http4sNetty = "0.5.16"
val http4sArmeria = "0.5.3"
val jackson = "2.12.7" // force this version to mitigate security vulnerabilities
val fs2Kafka = "2.6.1"
val log4cats = "2.6.0"
Expand Down Expand Up @@ -56,6 +57,7 @@ object Dependencies {
val decline = "com.monovore" %% "decline-effect" % V.decline
val emitterHttps = "com.snowplowanalytics" %% "snowplow-scala-tracker-emitter-http4s" % V.tracker
val http4sNetty = "org.http4s" %% "http4s-netty-server" % V.http4sNetty
val http4sArmeria = "org.http4s" %% "http4s-armeria-server" % V.http4sArmeria
val http4sEmber = "org.http4s" %% "http4s-ember-server" % V.http4s
val http4sBlaze = "org.http4s" %% "http4s-blaze-server" % V.blaze
val http4sClient = "org.http4s" %% "http4s-blaze-client" % V.blaze
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ object ConfigSpec {
license = Config.License(accept = true),
debug = Config
.Debug
.Debug(Config.Debug.Http(enable = false, logHeaders = true, logBody = false, redactHeaders = List.empty))
.Debug(Config.Debug.Http(enable = false, logHeaders = true, logBody = false, redactHeaders = List.empty)),
experimental = Config.Experimental(backend = Config.Experimental.Backend.Blaze)
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ object SqsConfigSpec {
license = Config.License(accept = true),
debug = Config
.Debug
.Debug(Config.Debug.Http(enable = false, logHeaders = true, logBody = false, redactHeaders = List.empty))
.Debug(Config.Debug.Http(enable = false, logHeaders = true, logBody = false, redactHeaders = List.empty)),
experimental = Config.Experimental(backend = Config.Experimental.Backend.Blaze)
)

}

0 comments on commit 1547adc

Please sign in to comment.