diff --git a/build.sbt b/build.sbt index 7aad81ed9..d3843bb34 100644 --- a/build.sbt +++ b/build.sbt @@ -20,6 +20,7 @@ lazy val core = project libraryDependencies ++= Seq( Dependencies.Libraries.http4sDsl, Dependencies.Libraries.http4sBlaze, + Dependencies.Libraries.http4sEmber, Dependencies.Libraries.http4sClient, Dependencies.Libraries.log4cats, Dependencies.Libraries.thrift, @@ -123,4 +124,4 @@ lazy val stdoutDistroless = project .settings(sourceDirectory := (stdout / sourceDirectory).value) .settings(BuildSettings.stdoutSettings) .enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin) - .dependsOn(core % "test->test;compile->compile") \ No newline at end of file + .dependsOn(core % "test->test;compile->compile") diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 4ad566183..295f50e12 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -108,6 +108,11 @@ redactHeaders = [] } } + + experimental { + backend = blaze + } + enableDefaultRedirect = false preTerminationPeriod = 10 seconds diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala index 5d2d335a3..a3d676be8 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala @@ -42,7 +42,8 @@ case class Config[+SinkConfig]( redirectDomains: Set[String], preTerminationPeriod: FiniteDuration, license: Config.License, - debug: Config.Debug.Debug + debug: Config.Debug.Debug, + experimental: Config.Experimental ) object Config { @@ -175,6 +176,15 @@ object Config { case class Debug(http: Http) } + case class Experimental(backend: Experimental.Backend) + object Experimental { + sealed trait Backend + object Backend { + case object Blaze extends Backend + case object Ember extends Backend + } + } + implicit def decoder[SinkConfig: Decoder]: Decoder[Config[SinkConfig]] = { implicit val license: Decoder[License] = { val truthy = Set("true", "yes", "on", "1") @@ -211,6 +221,12 @@ object Config { implicit val debug = deriveDecoder[Debug.Debug] implicit val sinkConfig = newDecoder[SinkConfig].or(legacyDecoder[SinkConfig]) implicit val streams = deriveDecoder[Streams[SinkConfig]] + implicit val backend: Decoder[Experimental.Backend] = Decoder[String].emap { + case s if s.toLowerCase() == "blaze" => Right(Experimental.Backend.Blaze) + case s if s.toLowerCase() == "ember" => Right(Experimental.Backend.Ember) + case other => Left(s"Invalid backend $other") + } + implicit val experimental = deriveDecoder[Experimental] deriveDecoder[Config[SinkConfig]] } diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala index bc59ba783..560a7d23d 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala @@ -17,6 +17,10 @@ import com.avast.datadog4s.extension.http4s.DatadogMetricsOps import com.avast.datadog4s.{StatsDMetricFactory, StatsDMetricFactoryConfig} import org.http4s.{HttpApp, HttpRoutes} import org.http4s.blaze.server.BlazeServerBuilder +import org.http4s.ember.server._ +import com.comcast.ip4s._ +import fs2.io.net.Network +import fs2.io.net.tls.TLSContext import org.http4s.headers.`Strict-Transport-Security` import org.http4s.server.Server import org.http4s.server.middleware.{HSTS, Logger => LoggerMiddleware, Metrics, Timeout} @@ -31,9 +35,10 @@ object HttpServer { implicit private def logger[F[_]: Async]: Logger[F] = Slf4jLogger.getLogger[F] - def build[F[_]: Async]( + def build[F[_]: Async: Network]( routes: HttpRoutes[F], port: Int, + backend: Config.Experimental.Backend, secure: Boolean, hsts: Config.HSTS, networking: Config.Networking, @@ -42,7 +47,7 @@ object HttpServer { ): Resource[F, Server] = for { withMetricsMiddleware <- createMetricsMiddleware(routes, metricsConfig) - server <- buildBlazeServer[F](withMetricsMiddleware, port, secure, hsts, networking, debugHttp) + server <- buildServer(backend)(withMetricsMiddleware, port, secure, hsts, networking, debugHttp) } yield server private def createMetricsMiddleware[F[_]: Async]( @@ -58,6 +63,17 @@ object HttpServer { Resource.pure(routes) } + private def buildServer[F[_]: Async: Network](backend: Config.Experimental.Backend)( + routes: HttpRoutes[F], + port: Int, + secure: Boolean, + hsts: Config.HSTS, + networking: Config.Networking, + debugHttp: Config.Debug.Http + ) = backend match { + case Config.Experimental.Backend.Ember => buildEmberServer(routes, port, secure, hsts, networking, debugHttp) + case Config.Experimental.Backend.Blaze => buildBlazeServer(routes, port, secure, hsts, networking, debugHttp) + } private def createStatsdConfig(metricsConfig: Config.Metrics): StatsDMetricFactoryConfig = { val server = InetSocketAddress.createUnresolved(metricsConfig.statsd.hostname, metricsConfig.statsd.port) val tags = metricsConfig.statsd.tags.toVector.map { case (name, value) => Tag.of(name, value) } @@ -106,6 +122,28 @@ object HttpServer { .cond(secure, _.withSslContext(SSLContext.getDefault)) .resource + private def buildEmberServer[F[_]: Async: Network]( + routes: HttpRoutes[F], + port: Int, + secure: Boolean, + hsts: Config.HSTS, + networking: Config.Networking, + debugHttp: Config.Debug.Http + ): Resource[F, Server] = + Resource.eval(TLSContext.Builder.forAsync[F].system).flatMap { tls => + Resource.eval(Logger[F].info("Building blaze server")) >> + EmberServerBuilder + .default[F] + .withHost(ipv4"0.0.0.0") + .withPort(Port.fromInt(port).getOrElse(port"9090")) + .withHttpApp( + loggerMiddleware(timeoutMiddleware(hstsMiddleware(hsts, routes.orNotFound), networking), debugHttp) + ) + .withIdleTimeout(networking.idleTimeout) + .cond(secure, _.withTLS(tls)) + .build + } + implicit class ConditionalAction[A](item: A) { def cond(cond: Boolean, action: A => A): A = if (cond) action(item) else item diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala index 4553955ff..4786fed74 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala @@ -23,6 +23,8 @@ import cats.data.EitherT import cats.effect.{Async, ExitCode, Sync} import cats.effect.kernel.Resource +import fs2.io.net.Network + import org.http4s.blaze.client.BlazeClientBuilder import com.monovore.decline.Opts @@ -41,7 +43,7 @@ object Run { implicit private def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] - def fromCli[F[_]: Async: Tracking, SinkConfig: Decoder]( + def fromCli[F[_]: Async: Network: Tracking, SinkConfig: Decoder]( appInfo: AppInfo, mkSinks: MkSinks[F, SinkConfig], telemetryInfo: TelemetryInfo[F, SinkConfig] @@ -50,7 +52,7 @@ object Run { configPath.map(fromPath[F, SinkConfig](appInfo, mkSinks, telemetryInfo, _)) } - private def fromPath[F[_]: Async: Tracking, SinkConfig: Decoder]( + private def fromPath[F[_]: Async: Network: Tracking, SinkConfig: Decoder]( appInfo: AppInfo, mkSinks: MkSinks[F, SinkConfig], telemetryInfo: TelemetryInfo[F, SinkConfig], @@ -80,7 +82,7 @@ object Run { ) } - private def fromConfig[F[_]: Async: Tracking, SinkConfig]( + private def fromConfig[F[_]: Async: Network: Tracking, SinkConfig]( appInfo: AppInfo, mkSinks: MkSinks[F, SinkConfig], telemetryInfo: TelemetryInfo[F, SinkConfig], @@ -102,6 +104,7 @@ object Run { collectorService ).value, if (config.ssl.enable) config.ssl.port else config.port, + config.experimental.backend, config.ssl.enable, config.hsts, config.networking, diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 31ae80d44..7d3c3ffb7 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -54,6 +54,7 @@ object Dependencies { val collectorPayload = "com.snowplowanalytics" % "collector-payload-1" % V.collectorPayload val decline = "com.monovore" %% "decline-effect" % V.decline val emitterHttps = "com.snowplowanalytics" %% "snowplow-scala-tracker-emitter-http4s" % V.tracker + val http4sEmber = "org.http4s" %% "http4s-ember-server" % V.http4s val http4sBlaze = "org.http4s" %% "http4s-blaze-server" % V.blaze val http4sClient = "org.http4s" %% "http4s-blaze-client" % V.blaze val http4sDsl = "org.http4s" %% "http4s-dsl" % V.http4s