Skip to content

Commit

Permalink
CDF-18449: add tracing support (#632)
Browse files Browse the repository at this point in the history
* CDF-18449: add tracing support

[CDF-18449]

* randomize

* weirdo warn

* Revert "weirdo warn"

This reverts commit c5ab829.

* Revert "randomize"

This reverts commit 1a62a2e.

* rename

* rename

* rename
  • Loading branch information
dmivankov authored Jun 22, 2023
1 parent 571e756 commit 73db1c9
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 43 deletions.
6 changes: 4 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ val sttpVersion = "3.5.2"
val circeVersion = "0.14.5"
val catsEffectVersion = "3.3.14"
val fs2Version = "3.3.0"
val natchezVersion = "0.3.1"

lazy val gpgPass = Option(System.getenv("GPG_KEY_PASSWORD"))

Expand All @@ -25,7 +26,7 @@ lazy val commonSettings = Seq(
organization := "com.cognite",
organizationName := "Cognite",
organizationHomepage := Some(url("https://cognite.com")),
version := "2.6." + patchVersion,
version := "2.7." + patchVersion,
isSnapshot := patchVersion.endsWith("-SNAPSHOT"),
crossScalaVersions := supportedScalaVersions,
semanticdbEnabled := true,
Expand Down Expand Up @@ -106,7 +107,8 @@ lazy val core = (project in file("."))
"org.typelevel" %% "cats-effect-testkit" % catsEffectVersion % Test,
"co.fs2" %% "fs2-core" % fs2Version,
"co.fs2" %% "fs2-io" % fs2Version,
"com.google.protobuf" % "protobuf-java" % "3.21.4"
"com.google.protobuf" % "protobuf-java" % "3.21.4",
"org.tpolecat" %% "natchez-core" % natchezVersion
) ++ scalaTestDeps ++ sttpDeps ++ circeDeps(CrossVersion.partialVersion(scalaVersion.value)),
scalacOptions ++= (CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, minor)) if minor == 13 =>
Expand Down
72 changes: 55 additions & 17 deletions src/main/scala/com/cognite/sdk/scala/v1/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,43 @@ import io.circe.generic.semiauto.deriveDecoder
import sttp.capabilities.Effect
import sttp.client3._
import sttp.client3.circe.asJsonEither
import sttp.model.{StatusCode, Uri}
import sttp.model.{Header, StatusCode, Uri}
import sttp.monad.MonadError

import java.net.{InetAddress, UnknownHostException}
import scala.concurrent.duration._
import scala.util.control.NonFatal
import natchez.Trace

class TraceSttpBackend[F[_]: Trace, +P](delegate: SttpBackend[F, P]) extends SttpBackend[F, P] {

def sendImpl[T, R >: P with Effect[F]](
request: Request[T, R]
)(implicit monad: MonadError[F]): F[Response[T]] =
Trace[F].span("sttp-client-request") {
import sttp.monad.syntax._
for {
knl <- Trace[F].kernel
_ <- Trace[F].put(
"client.http.uri" -> request.uri.toString(),
"client.http.method" -> request.method.toString
)
response <- delegate.send(
request.headers(
knl.toHeaders.map { case (k, v) => Header(k.toString, v) }.toSeq ++
request.headers: _*
)
) // prioritize request headers over kernel ones)
_ <- Trace[F].put("client.http.status_code" -> response.code.toString())
} yield response
}
override def send[T, R >: P with Effect[F]](request: Request[T, R]): F[Response[T]] =
sendImpl(request)(responseMonad)

override def close(): F[Unit] = delegate.close()

override def responseMonad: MonadError[F] = delegate.responseMonad
}

class AuthSttpBackend[F[_], +P](delegate: SttpBackend[F, P], authProvider: AuthProvider[F])
extends SttpBackend[F, P] {
Expand All @@ -37,15 +68,16 @@ class AuthSttpBackend[F[_], +P](delegate: SttpBackend[F, P], authProvider: AuthP
override def responseMonad: MonadError[F] = delegate.responseMonad
}

final case class RequestSession[F[_]: Monad](
final case class RequestSession[F[_]: Monad: Trace](
applicationName: String,
baseUrl: Uri,
baseSttpBackend: SttpBackend[F, _],
auth: AuthProvider[F],
clientTag: Option[String] = None,
cdfVersion: Option[String] = None
) {
val sttpBackend: SttpBackend[F, _] = new AuthSttpBackend(baseSttpBackend, auth)
val sttpBackend: SttpBackend[F, _] =
new AuthSttpBackend(new TraceSttpBackend(baseSttpBackend), auth)

def send[R](
r: RequestT[Empty, Either[String, String], Any] => RequestT[Id, R, Any]
Expand Down Expand Up @@ -130,7 +162,7 @@ final case class RequestSession[F[_]: Monad](
}

// scalastyle:off parameter.number
class GenericClient[F[_]](
class GenericClient[F[_]: Trace](
applicationName: String,
val projectName: String,
baseUrl: String,
Expand Down Expand Up @@ -245,10 +277,13 @@ object GenericClient {
val defaultBaseUrl: String = Option(System.getenv("COGNITE_BASE_URL"))
.getOrElse("https://api.cognitedata.com")

def apply[F[_]: Monad](applicationName: String, projectName: String, baseUrl: String, auth: Auth)(
implicit sttpBackend: SttpBackend[F, Any]
): GenericClient[F] =
new GenericClient(applicationName, projectName, baseUrl, auth)(implicitly, sttpBackend)
def apply[F[_]: Monad: Trace](
applicationName: String,
projectName: String,
baseUrl: String,
auth: Auth
)(implicit sttpBackend: SttpBackend[F, Any]): GenericClient[F] =
new GenericClient(applicationName, projectName, baseUrl, auth)

def parseBaseUrlOrThrow(baseUrl: String): Uri =
try {
Expand Down Expand Up @@ -277,7 +312,7 @@ object GenericClient {
)
}

def forAuth[F[_]: Monad](
def forAuth[F[_]: Monad: Trace](
applicationName: String,
projectName: String,
auth: Auth,
Expand All @@ -296,7 +331,7 @@ object GenericClient {
cdfVersion
)

def forAuthProvider[F[_]: Monad](
def forAuthProvider[F[_]: Monad: Trace](
applicationName: String,
projectName: String,
authProvider: AuthProvider[F],
Expand All @@ -317,9 +352,6 @@ object GenericClient {
apiVersion,
clientTag,
cdfVersion
)(
implicitly,
sttpBackend
)
)
}
Expand Down Expand Up @@ -361,11 +393,17 @@ class Client(
baseUrl: String =
Option(System.getenv("COGNITE_BASE_URL")).getOrElse("https://api.cognitedata.com"),
auth: Auth
)(implicit sttpBackend: SttpBackend[Id, Any])
)(implicit trace: Trace[Id], sttpBackend: SttpBackend[Id, Any])
extends GenericClient[Id](applicationName, projectName, baseUrl, auth)

object Client {
def apply(applicationName: String, projectName: String, baseUrl: String, auth: Auth)(
implicit sttpBackend: SttpBackend[Id, Any]
): Client = new Client(applicationName, projectName, baseUrl, auth)(sttpBackend)
def apply(
applicationName: String,
projectName: String,
baseUrl: String,
auth: Auth
)(
implicit trace: Trace[Id],
sttpBackend: SttpBackend[Id, Any]
): Client = new Client(applicationName, projectName, baseUrl, auth)
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class FilterTest extends SdkTestSpec with OptionValues {
lazy val dummyClient = Client("foo",
projectName,
"https://api.cognitedata.com",
auth)(requestHijacker)
auth)(implicitly, requestHijacker)
val dummyRequestSession = dummyClient.requestSession

val _ = Filter.filterWithCursor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.concurrent.duration._

@SuppressWarnings(Array("org.wartremover.warts.Var", "org.wartremover.warts.NonUnitStatements"))
class OAuth2ClientCredentialsTest extends AnyFlatSpec with Matchers with OptionValues with RetryWhile {
import natchez.Trace.Implicits.noop
val tenant: String = sys.env("TEST_AAD_TENANT")
val clientId: String = sys.env("TEST_CLIENT_ID")
val clientSecret: String = sys.env("TEST_CLIENT_SECRET")
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/com/cognite/sdk/scala/common/ReadTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ReadTest extends SdkTestSpec with OptionValues {
lazy val dummyClient = Client("foo",
projectName,
"https://api.cognitedata.com",
auth)(requestHijacker)
auth)(implicitly, requestHijacker)
val dummyRequestSession = dummyClient.requestSession

Readable.readWithCursor(
Expand Down
6 changes: 6 additions & 0 deletions src/test/scala/com/cognite/sdk/scala/common/SdkTestSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ import sttp.monad.MonadError

import scala.concurrent.duration._
import scala.util.control.NonFatal
import cats.Id
import cats.effect.IO
import cats.effect.unsafe.IORuntime

import natchez.Trace

class LoggingSttpBackend[F[_], +P](delegate: SttpBackend[F, P]) extends SttpBackend[F, P] {
override def send[T, R >: P with Effect[F]](request: Request[T, R]): F[Response[T]] =
responseMonad.map(try {
Expand Down Expand Up @@ -48,6 +51,8 @@ class LoggingSttpBackend[F[_], +P](delegate: SttpBackend[F, P]) extends SttpBack

abstract class SdkTestSpec extends AnyFlatSpec with Matchers with OptionValues {
implicit val ioRuntime: IORuntime = IORuntime.global
implicit val trace: Trace[IO] = natchez.Trace.Implicits.noop
implicit val traceId: Trace[Id] = natchez.Trace.Implicits.noop
implicit val authSttpBackend: SttpBackend[IO, Any] = AsyncHttpClientCatsBackend[IO]().unsafeRunSync()
// Use this if you need request logs for debugging: new LoggingSttpBackend[Id, Nothing](sttpBackend)
lazy val client: GenericClient[IO] = GenericClient[IO](
Expand All @@ -56,6 +61,7 @@ abstract class SdkTestSpec extends AnyFlatSpec with Matchers with OptionValues {
baseUrl,
auth
)(
implicitly,
implicitly,
new RetryingBackend[IO, Any](AsyncHttpClientCatsBackend[IO]().unsafeRunSync())
)
Expand Down
23 changes: 18 additions & 5 deletions src/test/scala/com/cognite/sdk/scala/v1/ClientTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class ClientTest extends SdkTestSpec with OptionValues {
headers = req.headers
Response.ok(tokenInspectResponse).copy(headers = req.headers)
}
new GenericClient[Id]("scala-sdk-test", projectName, auth = auth, clientTag = Some("client-test"))(implicitly, saveHeadersStub)
new GenericClient[Id]("scala-sdk-test", projectName, auth = auth, clientTag = Some("client-test"))(implicitly, implicitly, saveHeadersStub)
.token.inspect()
headers should contain (Header("x-cdp-clienttag", "client-test"))
headers should contain (Header("x-cdp-sdk", s"CogniteScalaSDK:${BuildInfo.version}"))
Expand All @@ -74,6 +74,7 @@ class ClientTest extends SdkTestSpec with OptionValues {
baseUrl,
auth
)(
implicitly,
implicitly,
new RetryingBackend[IO, Any](AsyncHttpClientCatsBackend[IO]().unsafeRunSync())
).token.inspect().unsafeRunSync().projects should not be empty
Expand All @@ -86,6 +87,7 @@ class ClientTest extends SdkTestSpec with OptionValues {
baseUrl,
auth
)(
implicitly,
implicitly,
RateLimitingBackend[Any](AsyncHttpClientCatsBackend[IO]().unsafeRunSync(), 5)
).token.inspect().unsafeRunSync().projects should not be empty
Expand All @@ -103,6 +105,7 @@ class ClientTest extends SdkTestSpec with OptionValues {
baseUrl,
auth
)(
implicitly,
implicitly,
new BackpressureThrottleBackend[IO, Any](AsyncHttpClientCatsBackend[IO]().unsafeRunSync(), makeQueueOf1.unsafeRunSync(), 1.seconds)
).token.inspect().unsafeRunSync().projects should not be empty
Expand All @@ -113,6 +116,7 @@ class ClientTest extends SdkTestSpec with OptionValues {
an[InvalidAuthentication] should be thrownBy GenericClient.forAuth[Id](
"scala-sdk-test", "", auth)(
implicitly,
implicitly,
sttpBackend
).assets.list(Some(1)).compile.toList
}
Expand All @@ -122,6 +126,7 @@ class ClientTest extends SdkTestSpec with OptionValues {
noException should be thrownBy new GenericClient[Id](
"scala-sdk-test", projectName, auth = auth)(
implicitly,
implicitly,
sttpBackend
)
}
Expand All @@ -133,15 +138,15 @@ class ClientTest extends SdkTestSpec with OptionValues {
projectName,
"",
auth
)(new LoggingSttpBackend[Id, Any](sttpBackend)).token.inspect()
)(implicitly, new LoggingSttpBackend[Id, Any](sttpBackend)).token.inspect()
}
assertThrows[UnknownHostException] {
Client(
"url-test-3",
projectName,
"thisShouldThrowAnUnknownHostException:)",
auth
)(sttpBackend).token.inspect()
)(implicitly, sttpBackend).token.inspect()
}
}

Expand All @@ -152,7 +157,7 @@ class ClientTest extends SdkTestSpec with OptionValues {
projectName,
"http://api.cognitedata.com",
auth
)(sttpBackend).token.inspect()
)(implicitly, sttpBackend).token.inspect()
}
}

Expand All @@ -164,6 +169,7 @@ class ClientTest extends SdkTestSpec with OptionValues {
baseUrl,
auth
)(
implicitly,
implicitly,
makeTestingBackend()
).threeDModels.list().compile.toList.unsafeRunSync()
Expand All @@ -176,6 +182,7 @@ class ClientTest extends SdkTestSpec with OptionValues {
auth
)(
implicitly,
implicitly,
new RetryingBackend[IO, Any](
makeTestingBackend(),
initialRetryDelay = 1.millis,
Expand All @@ -189,6 +196,7 @@ class ClientTest extends SdkTestSpec with OptionValues {
baseUrl,
auth
)(
implicitly,
implicitly,
new RetryingBackend[IO, Any](
makeTestingBackend(),
Expand All @@ -204,7 +212,8 @@ class ClientTest extends SdkTestSpec with OptionValues {
projectName,
"https://www.cognite.com/nowhereatall",
BearerTokenAuth("irrelevant", Some("randomproject"))
)(implicitly,
)(natchez.Trace.Implicits.noop,
implicitly,
new RetryingBackend[F, Any](backend,
maxRetries = maxRetries,
initialRetryDelay = 1.millis,
Expand Down Expand Up @@ -234,6 +243,7 @@ class ClientTest extends SdkTestSpec with OptionValues {
BearerTokenAuth("irrelevant", Some("randomproject"))

)(
implicitly,
implicitly,
new RetryingBackend[IO, Any](backendStub,
initialRetryDelay = 1.millis,
Expand Down Expand Up @@ -312,6 +322,7 @@ class ClientTest extends SdkTestSpec with OptionValues {
"https://www.cognite.com/nowhere-at-all",
BearerTokenAuth("irrelevant", Some("randomproject"))
)(
implicitly,
implicitly,
new RetryingBackend[IO, Any](
badRequestBackendStub,
Expand Down Expand Up @@ -350,6 +361,7 @@ class ClientTest extends SdkTestSpec with OptionValues {
"https://www.cognite.com/nowhere-at-all",
BearerTokenAuth("irrelevant", Some("randomproject"))
)(
implicitly,
implicitly,
new RetryingBackend[IO, Any](
badRequestBackendStub1,
Expand Down Expand Up @@ -395,6 +407,7 @@ class ClientTest extends SdkTestSpec with OptionValues {
baseUrl,
auth
)(
implicitly,
implicitly,
makeTestingBackend()
).threeDModels.list().compile.toList.unsafeRunSync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ import sttp.client3.asynchttpclient.cats.AsyncHttpClientCatsBackend

import scala.concurrent.duration.DurationInt

import natchez.Trace

@SuppressWarnings(
Array(
"org.wartremover.warts.OptionPartial",
"org.wartremover.warts.PublicInference"
)
)
trait CommonDataModelTestHelper extends AnyFlatSpec with Matchers {
implicit val trace: Trace[IO] = natchez.Trace.Implicits.noop
val tenant: String = sys.env("TEST_AAD_TENANT")
val clientId: String = sys.env("TEST_CLIENT_ID")
val clientSecret: String = sys.env("TEST_CLIENT_SECRET")
Expand Down Expand Up @@ -47,6 +50,7 @@ trait CommonDataModelTestHelper extends AnyFlatSpec with Matchers {
None,
Some("alpha")
)(
implicitly,
implicitly,
new RetryingBackend[IO, Any](implicitly)
)
Expand Down
Loading

0 comments on commit 73db1c9

Please sign in to comment.