Skip to content

Commit

Permalink
Limit payload sizes
Browse files Browse the repository at this point in the history
Previously it was possible to send arbitrarily large payloads at the collector.
Now, we add two mechanisms of handling large requests.
First, `networking.maxPayloadSize` (defaulting to 1048576 = 1MB) will stream the
input and generate SizeViolation bad event.
Second, `networking.dropPayloadSize` (defaulting to 2097152 = 2MB) will return
HTTP 413 and will not attempt to process the request.

The two mechanisms come hand in hand allowing for setting a safe but tolerant
configuration.
If requests exceed `sink.maxBytes` the collector will attempt to split them as
separate events. However, if payload is bigger than `maxPayloadSize`, parsing the
body is risky and in case of an attack may result in failures. The collector
will not attempt to parse the request and split it up but will generate a
SizeViolation bad event. But if payload exceeds `dropPayloadSize` the request
will not be handled and no bad event is going to be emitted. Instead HTTP 413 is generated.
  • Loading branch information
peel committed Aug 27, 2024
1 parent d301673 commit ff75a0a
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 30 deletions.
3 changes: 2 additions & 1 deletion core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@
bodyReadTimeout = 28 seconds
maxRequestLineLength = 20480
maxHeadersLength = 40960
maxPayloadSize = 1048576 # 1MB
maxPayloadSize = 1048576 # 1MB
dropPayloadSize = 2097152 # 2MB
}

debug {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ object Config {
bodyReadTimeout: FiniteDuration,
maxRequestLineLength: Int,
maxHeadersLength: Int,
maxPayloadSize: Long
maxPayloadSize: Long,
dropPayloadSize: Long
)

case class License(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.http4s.{HttpApp, HttpRoutes}
import org.http4s.blaze.server.BlazeServerBuilder
import org.http4s.headers.`Strict-Transport-Security`
import org.http4s.server.Server
import org.http4s.server.middleware.{HSTS, Logger => LoggerMiddleware, Metrics, Timeout}
import org.http4s.server.middleware.{EntityLimiter, HSTS, Logger => LoggerMiddleware, Metrics, Timeout}
import org.typelevel.ci.CIString
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
Expand Down Expand Up @@ -47,8 +47,8 @@ object HttpServer {
): Resource[F, Server] =
for {
withMetricsMiddleware <- createMetricsMiddleware(routes, metricsConfig)
httpApp <- Resource.pure(httpApp(withMetricsMiddleware, healthRoutes, hsts, networking, debugHttp))
server <- mkServer(httpApp, port, secure, networking)
app <- Resource.pure(httpApp(withMetricsMiddleware, healthRoutes, hsts, networking, debugHttp))
server <- mkServer(app, port, secure, networking)
} yield server

def buildBlazeServer[F[_]: Async](
Expand Down Expand Up @@ -77,10 +77,17 @@ object HttpServer {
hsts: Config.HSTS,
networking: Config.Networking,
debugHttp: Config.Debug.Http
): HttpApp[F] = hstsApp(
hsts,
loggerMiddleware(timeoutMiddleware(routes, networking) <+> healthRoutes, debugHttp)
)
): HttpApp[F] =
EntityLimiter(
hstsApp(
hsts,
loggerMiddleware(
timeoutMiddleware(entityLimiter(routes, networking.dropPayloadSize), networking) <+> healthRoutes,
debugHttp
)
),
networking.dropPayloadSize
)

private def createMetricsMiddleware[F[_]: Async](
routes: HttpRoutes[F],
Expand All @@ -106,6 +113,12 @@ object HttpServer {
HSTS(routes.orNotFound, `Strict-Transport-Security`.unsafeFromDuration(hsts.maxAge))
else routes.orNotFound

private def entityLimiter[F[_]: Async](routes: HttpRoutes[F], dropPayloadSize: Long): HttpRoutes[F] =
EntityLimiter.httpRoutes[F](routes, dropPayloadSize).recover {
case _: EntityLimiter.EntityTooLarge =>
Response[F](Status.PayloadTooLarge)
}

private def loggerMiddleware[F[_]: Async](routes: HttpRoutes[F], config: Config.Debug.Http): HttpRoutes[F] =
if (config.enable) {
LoggerMiddleware.httpRoutes[F](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ import cats.effect.IO
import org.http4s.client.Client
import org.http4s._
import org.http4s.dsl.io._
import cats.implicits._
import org.http4s.implicits._
import scala.concurrent.duration._
import cats.effect.testing.specs2._

class HttpServerSpec extends Specification with CatsEffect {
val routes = HttpRoutes.of[IO] {
case r if r.pathInfo == path"/large" =>
r.decode[String](Response[IO](Ok).withEntity(_).pure[IO])
case _ -> Root / "fast" =>
Ok("Fast")
case _ -> Root / "never" =>
Expand All @@ -29,24 +32,59 @@ class HttpServerSpec extends Specification with CatsEffect {
TestUtils
.testConfig
.copy(networking = TestUtils.testConfig.networking.copy(responseHeaderTimeout = 100.millis))
val httpApp = HttpServer.httpApp(
routes,
healthRoutes,
config.hsts,
config.networking,
config.debug.http
)
val client: Client[IO] = Client.fromHttpApp(httpApp)

val request: Request[IO] = Request(method = Method.GET, uri = uri"/never")
val res: IO[String] = client.expect[String](request)

res
.attempt
.map(_ must beLeft[Throwable].which {
check(config, request)(
_ must beLeft[Throwable].which {
case org.http4s.client.UnexpectedStatus(Status.RequestTimeout, _, _) => true
case _ => false
})
}
)
}
}
"manage request size" should {
"drop requests larger than `networking.dropPayloadSize`" in {
val config =
TestUtils
.testConfig
.copy(networking = TestUtils.testConfig.networking.copy(maxPayloadSize = 5L, dropPayloadSize = 10L))
val request: Request[IO] = Request(
Method.POST,
uri"/large"
).withEntity("s" * 1000)

check(config, request)(
_ must beLeft[Throwable].which {
case org.http4s.client.UnexpectedStatus(Status.PayloadTooLarge, _, _) => true
case _ => false
}
)
}
"allow request that's smaller than `networking.dropPayloadSize`" in {
val config =
TestUtils.testConfig.copy(networking = TestUtils.testConfig.networking.copy(dropPayloadSize = 1002L))
val body = "s" * 1000
val request: Request[IO] = Request(
Method.POST,
uri"/large"
).withEntity(body)

check(config, request)(_ must beRight(body))
}
}
}

private[this] def check(config: Config[Any], request: Request[IO])(assert: Either[Throwable, _] => Boolean) = {
val httpApp = HttpServer.httpApp(
routes,
healthRoutes,
config.hsts,
config.networking,
config.debug.http
)

Client.fromHttpApp(httpApp).expect[String](request).attempt.map(assert)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ class SplitBatchSpec extends Specification {
sizeViolation
.failure
.expectation must_== "oversized collector payload: Event exceeds max payload size of 1000. Actual length: 1029"
sizeViolation.payload.event must_== "CollectorPayload(schema:null, ipAddress:null, timestamp:0, encoding:null, collector:null, body:sssss"
sizeViolation
.payload
.event must_== "CollectorPayload(schema:null, ipAddress:null, timestamp:0, encoding:null, collector:null, body:sssss"
sizeViolation.processor shouldEqual Processor(TestUtils.appName, TestUtils.appVersion)
}

Expand Down Expand Up @@ -124,7 +126,7 @@ class SplitBatchSpec extends Specification {
)
payload.setBody(data.noSpaces)
payload.setPath("p" * 1000)
val actual = splitBatch.splitAndSerializePayload(payload, 1000, 1092L)
val actual = splitBatch.splitAndSerializePayload(payload, 1000, 2000L)
actual.bad.size must_== 1
val res = parse(new String(actual.bad.head)).toOption.get
val selfDesc = SelfDescribingData.parse(res).toOption.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ object TestUtils {
28.second,
20480,
40960,
100000
1048576,
2097152
),
debug = Debug.Debug(
http = Debug.Http(enable = false, logHeaders = true, logBody = false, redactHeaders = List.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ object KafkaConfigSpec {
bodyReadTimeout = 28.seconds,
maxRequestLineLength = 20480,
maxHeadersLength = 40960,
maxPayloadSize = 1048576
maxPayloadSize = 1048576,
dropPayloadSize = 2097152
),
license = Config.License(accept = true),
debug = Config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ object KinesisConfigSpec {
bodyReadTimeout = 28.seconds,
maxRequestLineLength = 20480,
maxHeadersLength = 40960,
maxPayloadSize = 1048576
maxPayloadSize = 1048576,
dropPayloadSize = 2097152
),
streams = Config.Streams(
useIpAddressAsPartitionKey = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ object NsqConfigSpec {
bodyReadTimeout = 28.seconds,
maxRequestLineLength = 20480,
maxHeadersLength = 40960,
maxPayloadSize = 1048576
maxPayloadSize = 1048576,
dropPayloadSize = 2097152
),
license = Config.License(accept = true),
debug = Config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ object ConfigSpec {
bodyReadTimeout = 28.seconds,
maxRequestLineLength = 20480,
maxHeadersLength = 40960,
maxPayloadSize = 1048576
maxPayloadSize = 1048576,
dropPayloadSize = 2097152
),
streams = Config.Streams(
useIpAddressAsPartitionKey = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ object SqsConfigSpec {
bodyReadTimeout = 28.seconds,
maxRequestLineLength = 20480,
maxHeadersLength = 40960,
maxPayloadSize = 1048576
maxPayloadSize = 1048576,
dropPayloadSize = 2097152
),
streams = Config.Streams(
useIpAddressAsPartitionKey = false,
Expand Down

0 comments on commit ff75a0a

Please sign in to comment.