Skip to content

Commit

Permalink
CollectorServiceSpec should test number of events written to sink (close
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Dec 3, 2021
1 parent c40ba1d commit 8079b55
Show file tree
Hide file tree
Showing 12 changed files with 116 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ trait CollectorRoute {
post {
extractContentType { ct =>
entity(as[String]) { body =>
val (r, _) = collectorService.cookie(
val r = collectorService.cookie(
qs,
Some(body),
path,
Expand All @@ -82,7 +82,7 @@ trait CollectorRoute {
}
} ~
(get | head) {
val (r, _) = collectorService.cookie(
val r = collectorService.cookie(
qs,
None,
path,
Expand All @@ -103,7 +103,7 @@ trait CollectorRoute {
} ~
path("""ice\.png""".r | "i".r) { path =>
(get | head) {
val (r, _) = collectorService.cookie(
val r = collectorService.cookie(
qs,
None,
"/" + path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ trait Service {
doNotTrack: Boolean,
contentType: Option[ContentType] = None,
spAnonymous: Option[String] = None
): (HttpResponse, List[Array[Byte]])
): HttpResponse
def cookieName: Option[String]
def doNotTrackCookie: Option[DntCookieMatcher]
def determinePath(vendor: String, version: String): String
Expand Down Expand Up @@ -111,7 +111,7 @@ class CollectorService(
doNotTrack: Boolean,
contentType: Option[ContentType] = None,
spAnonymous: Option[String]
): (HttpResponse, List[Array[Byte]]) = {
): HttpResponse = {
val (ipAddress, partitionKey) = ipAndPartitionKey(ip, config.streams.useIpAddressAsPartitionKey)

extractQueryParams(queryString) match {
Expand Down Expand Up @@ -144,7 +144,7 @@ class CollectorService(
spAnonymous
)
// we don't store events in case we're bouncing
val sinkResponses = if (!bounce && !doNotTrack) sinkEvent(event, partitionKey) else Nil
if (!bounce && !doNotTrack) sinkEvent(event, partitionKey)

val headers = bounceLocationHeader(params, request, config.cookieBounce, bounce) ++
cookieHeader(request, config.cookieConfig, nuid, doNotTrack, spAnonymous) ++
Expand All @@ -155,10 +155,7 @@ class CollectorService(
`Access-Control-Allow-Credentials`(true)
)

val (httpResponse, badRedirectResponses) =
buildHttpResponse(event, params, headers.toList, redirect, pixelExpected, bounce, config.redirectMacro)

(httpResponse, badRedirectResponses ++ sinkResponses)
buildHttpResponse(event, params, headers.toList, redirect, pixelExpected, bounce, config.redirectMacro)

case Left(error) =>
val badRow = BadRow.GenericError(
Expand All @@ -167,8 +164,10 @@ class CollectorService(
Payload.RawPayload(queryString.getOrElse(""))
)

if (sinks.bad.isHealthy) (HttpResponse(StatusCodes.OK), sinkBad(badRow, partitionKey))
else (HttpResponse(StatusCodes.OK), Nil) // if bad sink is unhealthy, we don't want to try storing the bad rows
if (sinks.bad.isHealthy) {
sinkBad(badRow, partitionKey)
HttpResponse(StatusCodes.OK)
} else HttpResponse(StatusCodes.OK) // if bad sink is unhealthy, we don't want to try storing the bad rows
}
}

Expand Down Expand Up @@ -261,18 +260,16 @@ class CollectorService(
def sinkEvent(
event: CollectorPayload,
partitionKey: String
): List[Array[Byte]] = {
): Unit = {
// Split events into Good and Bad
val eventSplit = splitBatch.splitAndSerializePayload(event, sinks.good.MaxBytes)
// Send events to respective sinks
val sinkResponseGood = sinks.good.storeRawEvents(eventSplit.good, partitionKey)
val sinkResponseBad = sinks.bad.storeRawEvents(eventSplit.bad, partitionKey)
// Sink Responses for Test Sink
sinkResponseGood ++ sinkResponseBad
sinks.good.storeRawEvents(eventSplit.good, partitionKey)
sinks.bad.storeRawEvents(eventSplit.bad, partitionKey)
}

/** Sinks a bad row generated by an illegal querystring. */
def sinkBad(badRow: BadRow, partitionKey: String): List[Array[Byte]] = {
def sinkBad(badRow: BadRow, partitionKey: String): Unit = {
val toSink = List(badRow.compact.getBytes(UTF_8))
sinks.bad.storeRawEvents(toSink, partitionKey)
}
Expand All @@ -286,12 +283,12 @@ class CollectorService(
pixelExpected: Boolean,
bounce: Boolean,
redirectMacroConfig: RedirectMacroConfig
): (HttpResponse, List[Array[Byte]]) =
): HttpResponse =
if (redirect) {
val (r, l) = buildRedirectHttpResponse(event, queryParams, redirectMacroConfig)
(r.withHeaders(r.headers ++ headers), l)
val r = buildRedirectHttpResponse(event, queryParams, redirectMacroConfig)
r.withHeaders(r.headers ++ headers)
} else {
(buildUsualHttpResponse(pixelExpected, bounce).withHeaders(headers), Nil)
buildUsualHttpResponse(pixelExpected, bounce).withHeaders(headers)
}

/** Builds the appropriate http response when not dealing with click redirects. */
Expand All @@ -311,16 +308,16 @@ class CollectorService(
event: CollectorPayload,
queryParams: Map[String, String],
redirectMacroConfig: RedirectMacroConfig
): (HttpResponse, List[Array[Byte]]) =
): HttpResponse =
queryParams.get("u") match {
case Some(target) =>
val canReplace = redirectMacroConfig.enabled && event.isSetNetworkUserId
val token = redirectMacroConfig.placeholder.getOrElse(s"$${SP_NUID}")
val replacedTarget =
if (canReplace) target.replaceAllLiterally(token, event.networkUserId)
else target
(HttpResponse(StatusCodes.Found).withHeaders(`RawHeader`("Location", replacedTarget)), Nil)
case None => (HttpResponse(StatusCodes.BadRequest), Nil)
HttpResponse(StatusCodes.Found).withHeaders(`RawHeader`("Location", replacedTarget))
case None => HttpResponse(StatusCodes.BadRequest)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ trait Sink {
lazy val log = LoggerFactory.getLogger(getClass())

def isHealthy: Boolean = true
def storeRawEvents(events: List[Array[Byte]], key: String): List[Array[Byte]]
def storeRawEvents(events: List[Array[Byte]], key: String): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class CollectorRouteSpec extends Specification with Specs2RouteTest {
doNotTrack: Boolean,
contentType: Option[ContentType] = None,
spAnonymous: Option[String] = spAnonymous
): (HttpResponse, List[Array[Byte]]) = (HttpResponse(200, entity = s"cookie"), List.empty)
): HttpResponse = HttpResponse(200, entity = s"cookie")
def cookieName: Option[String] = Some("name")
def doNotTrackCookie: Option[DntCookieMatcher] = None
def determinePath(vendor: String, version: String): String = "/p1/p2"
Expand Down
Loading

0 comments on commit 8079b55

Please sign in to comment.