Skip to content

Commit

Permalink
Ditch duplicate limiter and adjust message in splitbatch
Browse files Browse the repository at this point in the history
  • Loading branch information
peel committed Sep 4, 2024
1 parent ff75a0a commit afa73de
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,12 @@ object HttpServer {
networking: Config.Networking,
debugHttp: Config.Debug.Http
): HttpApp[F] =
EntityLimiter(
hstsApp(
hsts,
loggerMiddleware(
timeoutMiddleware(entityLimiter(routes, networking.dropPayloadSize), networking) <+> healthRoutes,
debugHttp
)
),
networking.dropPayloadSize
hstsApp(
hsts,
loggerMiddleware(
timeoutMiddleware(entityLimiter(routes, networking.dropPayloadSize), networking) <+> healthRoutes,
debugHttp
)
)

private def createMetricsMiddleware[F[_]: Async](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,45 +77,45 @@ case class SplitBatch(appInfo: AppInfo) {
* @param event Incoming CollectorPayload
* @return a List of Good and Bad events
*/
def splitAndSerializePayload(event: CollectorPayload, maxBytes: Int, maxPayloadSize: Long): EventSerializeResult = {
def splitAndSerializePayload(payload: CollectorPayload, maxBytes: Int, maxPayloadSize: Long): EventSerializeResult = {
val serializer = ThriftSerializer.get()
val everythingSerialized = serializer.serialize(event)
val wholeEventBytes = getSize(everythingSerialized)
val everythingSerialized = serializer.serialize(payload)
val wholePayloadBytes = getSize(everythingSerialized)

// If the event is below the size limit, no splitting is necessary
if (wholeEventBytes < maxBytes) {
if (wholePayloadBytes < maxBytes) {
EventSerializeResult(List(everythingSerialized), Nil)
// If the event is above max payload size it is turned into SizeViolation
} else if (wholeEventBytes > maxPayloadSize) {
} else if (wholePayloadBytes > maxPayloadSize) {
EventSerializeResult(
Nil,
List(
oversizedPayload(
event,
wholeEventBytes,
payload,
wholePayloadBytes,
maxPayloadSize.toInt,
s"Event exceeds max payload size of $maxPayloadSize. Actual length: $wholeEventBytes"
s"Payload exceeds max size of $maxPayloadSize. Actual length: $wholePayloadBytes"
)
)
)
} else {
(for {
body <- Option(event.getBody).toRight("GET requests cannot be split")
body <- Option(payload.getBody).toRight("GET requests cannot be split")
children <- splitBody(body)
initialBodyDataBytes = getSize(Json.arr(children._2: _*).noSpaces)
_ <- Either.cond[String, Unit](
wholeEventBytes - initialBodyDataBytes < maxBytes,
wholePayloadBytes - initialBodyDataBytes < maxBytes,
(),
"cannot split this POST request because event without \"data\" field is still too big"
)
splitted = split(children._2, maxBytes - wholeEventBytes + initialBodyDataBytes)
goodSerialized = serializeBatch(serializer, event, splitted.goodBatches, children._1)
splitted = split(children._2, maxBytes - wholePayloadBytes + initialBodyDataBytes)
goodSerialized = serializeBatch(serializer, payload, splitted.goodBatches, children._1)
badList = splitted.failedBigEvents.map { e =>
val msg = "this POST request split is still too large"
oversizedPayload(event, getSize(e), maxBytes, msg)
oversizedPayload(payload, getSize(e), maxBytes, msg)
}
} yield EventSerializeResult(goodSerialized, badList)).fold({ msg =>
val tooBigPayload = oversizedPayload(event, wholeEventBytes, maxBytes, msg)
val tooBigPayload = oversizedPayload(payload, wholePayloadBytes, maxBytes, msg)
EventSerializeResult(Nil, List(tooBigPayload))
}, identity)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class SplitBatchSpec extends Specification {
sizeViolation.failure.actualSizeBytes must_== 1029
sizeViolation
.failure
.expectation must_== "oversized collector payload: Event exceeds max payload size of 1000. Actual length: 1029"
.expectation must_== "oversized collector payload: Payload exceeds max size of 1000. Actual length: 1029"
sizeViolation
.payload
.event must_== "CollectorPayload(schema:null, ipAddress:null, timestamp:0, encoding:null, collector:null, body:sssss"
Expand Down

0 comments on commit afa73de

Please sign in to comment.