Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Http4s hostname telemetry #355

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@
"uri": "http://iglucentral.com"
}
}
},
{
"name": "Iglu Central Test",
"priority": 0,
"vendorPrefixes": [ "com.snowplowanalytics.oss" ],
"connection": {
"http": {
"uri": "https://raw.githubusercontent.com/snowplow/iglu-central/oss_context_hostname"
}
}
}
]
}
Expand Down
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ lazy val http4s = project
Dependencies.Libraries.emitterHttps,
Dependencies.Libraries.specs2,
Dependencies.Libraries.specs2CE,
Dependencies.Libraries.ceTestkit,

//Integration tests
Dependencies.Libraries.IT.testcontainers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ object Run {
config: Config[SinkConfig]
): F[ExitCode] = {
val resources = for {
sinks <- mkSinks(config.streams)
sinks <- mkSinks(config.streams)
hostnameSet <- Resource.eval(Telemetry.HostnameSet.init(config.telemetry))
collectorService = new Service[F](
config,
Sinks(sinks.good, sinks.bad),
appInfo
appInfo,
hostnameSet
)
httpServer = HttpServer.build[F](
new Routes[F](config.enableDefaultRedirect, collectorService).value,
Expand All @@ -78,20 +80,24 @@ object Run {
)
_ <- withGracefulShutdown(config.preTerminationPeriod)(httpServer)
httpClient <- BlazeClientBuilder[F].resource
} yield httpClient

resources.use { httpClient =>
Telemetry
.run(
config.telemetry,
httpClient,
appInfo,
telemetryInfo(config).region,
telemetryInfo(config).cloud
)
.compile
.drain
.flatMap(_ => Async[F].never[ExitCode])
} yield (httpClient, hostnameSet)

resources.use {
case (httpClient, hostnameSet) =>
val appId = java.util.UUID.randomUUID.toString
Telemetry
.run(
config.telemetry,
httpClient,
appInfo,
appId,
telemetryInfo(config).region,
telemetryInfo(config).cloud,
hostnameSet
)
.compile
.drain
.flatMap(_ => Async[F].never[ExitCode])
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import java.util.UUID

import org.apache.commons.codec.binary.Base64

import com.comcast.ip4s.Dns

import scala.concurrent.duration._
import scala.collection.JavaConverters._

Expand Down Expand Up @@ -47,9 +49,12 @@ object Service {
class Service[F[_]: Sync](
config: Config[Any],
sinks: Sinks[F],
appInfo: AppInfo
appInfo: AppInfo,
hostnameSet: Telemetry.HostnameSet[F]
) extends IService[F] {

implicit val dns: Dns[F] = Dns.forSync[F]

val pixelStream = Stream.iterable[F, Byte](Service.pixel)

private val collector = s"${appInfo.name}:${appInfo.version}"
Expand All @@ -67,7 +72,6 @@ class Service[F[_]: Sync](
for {
body <- body
redirect = path.startsWith("/r/")
hostname = extractHostname(request)
userAgent = extractHeader(request, "User-Agent")
refererUri = extractHeader(request, "Referer")
spAnonymous = extractHeader(request, "SP-Anonymous")
Expand All @@ -77,6 +81,7 @@ class Service[F[_]: Sync](
nuidOpt = networkUserId(request, cookie, spAnonymous)
nuid = nuidOpt.getOrElse(UUID.randomUUID().toString)
(ipAddress, partitionKey) = ipAndPartitionKey(ip, config.streams.useIpAddressAsPartitionKey)
hostname <- extractHostname(request)
event = buildEvent(
queryString,
body,
Expand Down Expand Up @@ -105,6 +110,7 @@ class Service[F[_]: Sync](
`Access-Control-Allow-Credentials`().toRaw1.some
).flatten
responseHeaders = Headers(headerList)
_ <- hostname.map(hostnameSet.add).getOrElse(Sync[F].unit)
_ <- sinkEvent(event, partitionKey)
resp = buildHttpResponse(
queryParams = request.uri.query.params,
Expand Down Expand Up @@ -138,8 +144,8 @@ class Service[F[_]: Sync](
def extractCookie(req: Request[F]): Option[RequestCookie] =
req.cookies.find(_.name == config.cookie.name)

def extractHostname(req: Request[F]): Option[String] =
req.uri.authority.map(_.host.renderString) // Hostname is extracted like this in Akka-Http as well
def extractHostname(req: Request[F]): F[Option[String]] =
req.remoteHost.map(_.map(_.toString))

def extractIp(req: Request[F], spAnonymous: Option[String]): Option[String] =
spAnonymous match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package com.snowplowanalytics.snowplow.collector.core
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import org.apache.commons.codec.digest.DigestUtils

import cats.data.NonEmptyList
import cats.implicits._

import cats.effect.{Async, Resource, Sync}
import cats.effect.std.Random
import cats.effect.kernel.Ref

import fs2.Stream

Expand All @@ -32,22 +35,30 @@ object Telemetry {
telemetryConfig: Config.Telemetry,
httpClient: HttpClient[F],
appInfo: AppInfo,
appGeneratedId: String,
region: Option[String],
cloud: Option[String]
cloud: Option[String],
hostnameSet: HostnameSet[F]
): Stream[F, Unit] =
if (telemetryConfig.disable)
Stream.empty.covary[F]
else {
val sdj = makeHeartbeatEvent(
telemetryConfig,
region,
cloud,
appInfo.moduleName,
appInfo.version
)
Stream.resource(initTracker(telemetryConfig, appInfo.moduleName, httpClient)).flatMap { tracker =>
Stream.fixedDelay[F](telemetryConfig.interval).evalMap { _ =>
tracker.trackSelfDescribingEvent(unstructEvent = sdj) >> tracker.flushEmitters()
for {
hostnames <- hostnameSet.getHashed
sdj = makeHeartbeatEvent(
telemetryConfig,
region,
cloud,
appInfo.moduleName,
appInfo.version,
appGeneratedId,
hostnames
)
_ <- tracker.trackSelfDescribingEvent(unstructEvent = sdj)
_ <- tracker.flushEmitters()
} yield ()
}
}
}
Expand Down Expand Up @@ -93,26 +104,60 @@ object Telemetry {
region: Option[String],
cloud: Option[String],
appName: String,
appVersion: String
appVersion: String,
appGeneratedId: String,
hashedHostnames: Set[String]
): SelfDescribingData[Json] =
SelfDescribingData(
SchemaKey("com.snowplowanalytics.oss", "oss_context", "jsonschema", SchemaVer.Full(1, 0, 1)),
SchemaKey("com.snowplowanalytics.oss", "oss_context", "jsonschema", SchemaVer.Full(1, 0, 2)),
Json.obj(
"userProvidedId" -> teleCfg.userProvidedId.asJson,
"autoGeneratedId" -> teleCfg.autoGeneratedId.asJson,
"moduleName" -> teleCfg.moduleName.asJson,
"moduleVersion" -> teleCfg.moduleVersion.asJson,
"instanceId" -> teleCfg.instanceId.asJson,
"appGeneratedId" -> java.util.UUID.randomUUID.toString.asJson,
"appGeneratedId" -> appGeneratedId.asJson,
"cloud" -> cloud.asJson,
"region" -> region.asJson,
"applicationName" -> appName.asJson,
"applicationVersion" -> appVersion.asJson
"applicationVersion" -> appVersion.asJson,
"hashedHostnames" -> hashedHostnames.asJson
)
)

case class TelemetryInfo(
region: Option[String],
cloud: Option[String]
)

trait HostnameSet[F[_]] {
def add(hostname: String): F[Unit]
def getHashed: F[Set[String]]
}

object HostnameSet {
private def create[F[_]: Sync]: F[HostnameSet[F]] =
Ref
.of[F, Set[String]](Set.empty)
.map(ref =>
new HostnameSet[F] {
override def add(hostname: String): F[Unit] =
ref.update(_ + hostname)

override def getHashed: F[Set[String]] =
ref.get.map(s => s.map(DigestUtils.sha256Hex))
}
)

private def createNoop[F[_]: Sync]: F[HostnameSet[F]] =
Sync[F].pure {
new HostnameSet[F] {
override def add(hostname: String): F[Unit] = Sync[F].unit
override def getHashed: F[Set[String]] = Sync[F].pure(Set.empty)
}
}

def init[F[_]: Sync](telemetryConfig: Config.Telemetry): F[HostnameSet[F]] =
if (telemetryConfig.disable) createNoop[F] else create[F]
}
}
Loading
Loading