Skip to content

Commit

Permalink
Revert "Revert "CDF-18450: add tracing support" (#836)"
Browse files Browse the repository at this point in the history
This reverts commit 20645e8.
  • Loading branch information
dmivankov committed Nov 6, 2023
1 parent 913920d commit 49c7001
Show file tree
Hide file tree
Showing 63 changed files with 1,692 additions and 1,060 deletions.
12 changes: 10 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ val supportedScalaVersions = List(scala212, scala213)
val sparkVersion = "3.3.3"
val circeVersion = "0.14.6"
val sttpVersion = "3.5.2"
val natchezVersion = "0.3.1"
val Specs2Version = "4.20.2"
val cogniteSdkVersion = "2.12.771"

Expand All @@ -20,6 +21,8 @@ sonatypeProfileName := "com.cognite" // default is same as organization and lead

lazy val gpgPass = Option(System.getenv("GPG_KEY_PASSWORD"))

addCompilerPlugin("org.typelevel" % "kind-projector" % "0.13.2" cross CrossVersion.full)

ThisBuild / scalafixDependencies += "org.typelevel" %% "typelevel-scalafix" % "0.1.4"

lazy val patchVersion = scala.io.Source.fromFile("patch_version.txt").mkString.trim
Expand All @@ -28,7 +31,7 @@ lazy val commonSettings = Seq(
organization := "com.cognite.spark.datasource",
organizationName := "Cognite",
organizationHomepage := Some(url("https://cognite.com")),
version := "3.4." + patchVersion,
version := "3.5." + patchVersion,
isSnapshot := patchVersion.endsWith("-SNAPSHOT"),
crossScalaVersions := supportedScalaVersions,
semanticdbEnabled := true,
Expand Down Expand Up @@ -155,7 +158,12 @@ lazy val library = (project in file("."))
exclude("org.glassfish.hk2.external", "javax.inject"),
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided
exclude("org.glassfish.hk2.external", "javax.inject"),
"org.log4s" %% "log4s" % log4sVersion
"org.log4s" %% "log4s" % log4sVersion,
"org.tpolecat" %% "natchez-core" % natchezVersion,
"org.tpolecat" %% "natchez-noop" % natchezVersion,
"org.tpolecat" %% "natchez-opentelemetry" % natchezVersion,
"io.opentelemetry" % "opentelemetry-sdk" % "1.23.0",
"com.lightstep.opentelemetry" % "opentelemetry-launcher" % "1.22.0"
),
coverageExcludedPackages := "com.cognite.data.*",
buildInfoKeys := Seq[BuildInfoKey](organization, version, organizationName),
Expand Down
44 changes: 20 additions & 24 deletions src/main/scala/cognite/spark/v1/AssetHierarchyBuilder.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package cognite.spark.v1

import cats.effect.IO
import cats.implicits._
import cognite.spark.v1.PushdownUtilities.stringSeqToCogniteExternalIdSeq
import cognite.spark.compiletime.macros.SparkSchemaHelper.{fromRow, structType}
Expand Down Expand Up @@ -76,19 +75,17 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)

import cognite.spark.compiletime.macros.StructTypeEncoderMacro._

import CdpConnector.ioRuntime

def delete(data: DataFrame): Unit = {
def delete(data: DataFrame): TracedIO[Unit] =
val partitionedData = if (config.enableSinglePartitionDeleteAssetHierarchy) {
data.repartition(numPartitions = 1)
} else {
data
}

partitionedData.foreachPartition((rows: Iterator[Row]) => {
SparkF(partitionedData).foreachPartition((rows: Iterator[Row]) => {
val deletes = rows.map(r => fromRow[DeleteItemByCogniteId](r))
Stream
.fromIterator[IO](deletes, chunkSize = batchSize)
.fromIterator[TracedIO](deletes, chunkSize = batchSize)
.chunks
.parEvalMapUnordered(config.parallelismPerPartition) { chunk =>
client.assets
Expand All @@ -100,24 +97,22 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)
}
.compile
.drain
.unsafeRunSync()
})
}

def buildFromDf(data: DataFrame): Unit =
def buildFromDf(data: DataFrame): TracedIO[Unit] =
// Do not use .collect to run the builder on one of the executors and not on the driver
data
.repartition(numPartitions = 1)
SparkF(data.repartition(numPartitions = 1))
.foreachPartition((rows: Iterator[Row]) => {
build(rows).unsafeRunSync()
build(rows)
})

private val batchSize = config.batchSize.getOrElse(Constants.DefaultBatchSize)

private val deleteMissingAssets = config.deleteMissingAssets
private val subtreeMode = config.subtrees

def build(data: Iterator[Row]): IO[Unit] = {
def build(data: Iterator[Row]): TracedIO[Unit] = {
val sourceTree = data.map(r => fromRow[AssetsIngestSchema](r)).toArray

val subtrees =
Expand Down Expand Up @@ -153,7 +148,7 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)
} yield ()
}

def validateSubtreeRoots(roots: Vector[AssetsIngestSchema]): IO[Unit] =
def validateSubtreeRoots(roots: Vector[AssetsIngestSchema]): TracedIO[Unit] =
// check that all `parentExternalId`s exist
batchedOperation[String, Asset](
roots.map(_.parentExternalId).filter(_.nonEmpty).distinct,
Expand All @@ -173,7 +168,7 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)
})
).void

def buildSubtrees(trees: Vector[AssetSubtree]): IO[Unit] =
def buildSubtrees(trees: Vector[AssetSubtree]): TracedIO[Unit] =
for {
// fetch existing roots and update or insert them first
cdfRoots <- fetchCdfAssets(trees.map(_.root.externalId))
Expand All @@ -191,7 +186,7 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)
_ <- update(toUpdate)
} yield ()

def insert(toInsert: Seq[AssetsIngestSchema]): IO[Vector[Asset]] = {
def insert(toInsert: Seq[AssetsIngestSchema]): TracedIO[Vector[Asset]] = {
val assetCreatesToInsert = toInsert.map(toAssetCreate)
// Traverse batches in order to ensure writing parents first
assetCreatesToInsert
Expand All @@ -205,7 +200,7 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)
.map(_.flatten)
}

def deleteMissingChildren(trees: Vector[AssetSubtree], deleteMissingAssets: Boolean): IO[Unit] =
def deleteMissingChildren(trees: Vector[AssetSubtree], deleteMissingAssets: Boolean): TracedIO[Unit] =
if (deleteMissingAssets) {
val ingestedNodeSet = trees.flatMap(_.allNodes).map(_.externalId).toSet
// list all subtrees of the tree root and filter those which are not in the ingested set
Expand All @@ -219,6 +214,7 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)
.map(asset => CogniteInternalId(asset.id))
.compile
.toVector
.map(x => x)
)
_ <- batchedOperation[CogniteInternalId, Nothing](
idsToDelete,
Expand All @@ -230,10 +226,10 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)
)
} yield ()
} else {
IO.unit
TracedIO.unit
}

def update(toUpdate: Vector[AssetsIngestSchema]): IO[Unit] =
def update(toUpdate: Vector[AssetsIngestSchema]): TracedIO[Unit] =
batchedOperation[AssetsIngestSchema, Asset](
toUpdate,
updateBatch => {
Expand All @@ -243,7 +239,7 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)
}
).void

def fetchCdfAssets(sourceRootExternalIds: Vector[String]): IO[Map[String, Asset]] =
def fetchCdfAssets(sourceRootExternalIds: Vector[String]): TracedIO[Map[String, Asset]] =
batchedOperation[String, Asset](
sourceRootExternalIds,
batch =>
Expand All @@ -253,7 +249,7 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)

def upsertRoots( // scalastyle:off
newRoots: Vector[AssetsIngestSchema],
sourceRoots: Map[String, Asset]): IO[Vector[Asset]] = {
sourceRoots: Map[String, Asset]): TracedIO[Vector[Asset]] = {

// Assets without corresponding source root will be created
val (toCreate, toUpdate, toIgnore) = nodesToInsertUpdate(newRoots, sourceRoots)
Expand Down Expand Up @@ -403,18 +399,18 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)

private def batchedOperation[I, R](
list: Vector[I],
op: Vector[I] => IO[Seq[R]],
batchSize: Int = this.batchSize): IO[Vector[R]] =
op: Vector[I] => TracedIO[Seq[R]],
batchSize: Int = this.batchSize): TracedIO[Vector[R]] =
if (list.nonEmpty) {
Stream
.fromIterator[IO](list.iterator, chunkSize = batchSize)
.fromIterator[TracedIO](list.iterator, chunkSize = batchSize)
.chunks
.parEvalMapUnordered(config.parallelismPerPartition)(chunk => op(chunk.toVector).map(Chunk.seq))
.flatMap(Stream.chunk)
.compile
.toVector
} else {
IO.pure(Vector.empty)
TracedIO.pure(Vector.empty)
}
}

Expand Down
22 changes: 11 additions & 11 deletions src/main/scala/cognite/spark/v1/AssetsRelation.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package cognite.spark.v1

import cats.effect.IO
import cats.implicits._
import cognite.spark.v1.PushdownUtilities._
import cognite.spark.compiletime.macros.SparkSchemaHelper._
import com.cognite.sdk.scala.common._
Expand All @@ -24,7 +24,7 @@ class AssetsRelation(config: RelationConfig, subtreeIds: Option[List[CogniteId]]

Array("name", "source", "dataSetId", "labels", "id", "externalId", "externalIdPrefix")
override def getStreams(sparkFilters: Array[Filter])(
client: GenericClient[IO]): Seq[Stream[IO, AssetsReadSchema]] = {
client: GenericClient[TracedIO]): Seq[Stream[TracedIO, AssetsReadSchema]] = {
val (ids, filters) =
pushdownToFilters(sparkFilters, assetsFilterFromMap, AssetsFilter(assetSubtreeIds = subtreeIds))
executeFilter(client.assets, filters, ids, config.partitions, config.limitPerPartition)
Expand Down Expand Up @@ -58,48 +58,48 @@ class AssetsRelation(config: RelationConfig, subtreeIds: Option[List[CogniteId]]
assetSubtreeIds = subtreeIds
)

override def insert(rows: Seq[Row]): IO[Unit] = {
override def insert(rows: Seq[Row]): TracedIO[Unit] = {
val assetsInsertions = rows.map(fromRow[AssetsInsertSchema](_))
val assets = assetsInsertions.map(
_.into[AssetCreate]
.withFieldComputed(_.labels, u => stringSeqToCogniteExternalIdSeq(u.labels))
.transform)
client.assets
.create(assets)
.flatTap(_ => incMetrics(itemsCreated, assets.size)) *> IO.unit
.flatTap(_ => incMetrics(itemsCreated, assets.size)) *> TracedIO.unit
}

private def isUpdateEmpty(u: AssetUpdate): Boolean = u == AssetUpdate()

override def update(rows: Seq[Row]): IO[Unit] = {
override def update(rows: Seq[Row]): TracedIO[Unit] = {
val assetUpdates = rows.map(r => fromRow[AssetsUpsertSchema](r))

updateByIdOrExternalId[AssetsUpsertSchema, AssetUpdate, Assets[IO], Asset](
updateByIdOrExternalId[AssetsUpsertSchema, AssetUpdate, Assets[TracedIO], Asset](
assetUpdates,
client.assets,
isUpdateEmpty
)
}

override def delete(rows: Seq[Row]): IO[Unit] = {
override def delete(rows: Seq[Row]): TracedIO[Unit] = {
val deletes = rows.map(fromRow[DeleteItemByCogniteId](_))
deleteWithIgnoreUnknownIds(client.assets, deletes.map(_.toCogniteId), config.ignoreUnknownIds)
}

override def upsert(rows: Seq[Row]): IO[Unit] = {
override def upsert(rows: Seq[Row]): TracedIO[Unit] = {
val assets = rows.map(fromRow[AssetsUpsertSchema](_))
genericUpsert[Asset, AssetsUpsertSchema, AssetCreate, AssetUpdate, Assets[IO]](
genericUpsert[Asset, AssetsUpsertSchema, AssetCreate, AssetUpdate, Assets[TracedIO]](
assets,
isUpdateEmpty,
client.assets,
mustBeUpdate = r => r.name.isEmpty && r.getExternalId.nonEmpty
)
}

override def getFromRowsAndCreate(rows: Seq[Row], @unused doUpsert: Boolean = true): IO[Unit] = {
override def getFromRowsAndCreate(rows: Seq[Row], @unused doUpsert: Boolean = true): TracedIO[Unit] = {
val assetsUpserts = rows.map(fromRow[AssetsUpsertSchema](_))
val assets = assetsUpserts.map(_.transformInto[AssetCreate])
createOrUpdateByExternalId[Asset, AssetUpdate, AssetCreate, AssetCreate, Option, Assets[IO]](
createOrUpdateByExternalId[Asset, AssetUpdate, AssetCreate, AssetCreate, Option, Assets[TracedIO]](
Set.empty,
assets,
client.assets,
Expand Down
17 changes: 9 additions & 8 deletions src/main/scala/cognite/spark/v1/CdfRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@ abstract class CdfRelation(config: RelationConfig, shortNameStr: String)
@transient lazy protected val itemsUpserted: Counter =
MetricsSource.getOrCreateCounter(config.metricsPrefix, s"$shortName.upserted")

@transient lazy val client: GenericClient[IO] =
@transient lazy val client: GenericClient[TracedIO] =
CdpConnector.clientFromConfig(config)

@transient lazy val alphaClient: GenericClient[IO] =
@transient lazy val alphaClient: GenericClient[TracedIO] =
CdpConnector.clientFromConfig(config, Some("alpha"))

def incMetrics(counter: Counter, count: Int): IO[Unit] =
IO(
if (config.collectMetrics) {
counter.inc(count.toLong)
}
)
def incMetrics(counter: Counter, count: Int): TracedIO[Unit] =
TracedIO.liftIO(
IO(
if (config.collectMetrics) {
counter.inc(count.toLong)
}
))

// Needed for labels property when transforming from UpsertSchema to Update
implicit def seqStrToCogIdSetter
Expand Down
32 changes: 19 additions & 13 deletions src/main/scala/cognite/spark/v1/CdfSparkAuth.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,41 @@ import com.cognite.sdk.scala.common.{Auth, AuthProvider, OAuth2}
import sttp.client3.SttpBackend

sealed trait CdfSparkAuth extends Serializable {
def provider(implicit clock: Clock[IO], sttpBackend: SttpBackend[IO, Any]): IO[AuthProvider[IO]]
def provider(
implicit clock: Clock[TracedIO],
sttpBackend: SttpBackend[TracedIO, Any]): TracedIO[AuthProvider[TracedIO]]
}

object CdfSparkAuth {
final case class Static(auth: Auth) extends CdfSparkAuth {
override def provider(
implicit clock: Clock[IO],
sttpBackend: SttpBackend[IO, Any]): IO[AuthProvider[IO]] =
IO(AuthProvider(auth))
implicit clock: Clock[TracedIO],
sttpBackend: SttpBackend[TracedIO, Any]): TracedIO[AuthProvider[TracedIO]] =
TracedIO.liftIO(IO(AuthProvider[TracedIO](auth)))
}

final case class OAuth2ClientCredentials(credentials: OAuth2.ClientCredentials) extends CdfSparkAuth {

override def provider(
implicit clock: Clock[IO],
sttpBackend: SttpBackend[IO, Any]): IO[AuthProvider[IO]] =
OAuth2.ClientCredentialsProvider[IO](credentials)
implicit clock: Clock[TracedIO],
sttpBackend: SttpBackend[TracedIO, Any]): TracedIO[AuthProvider[TracedIO]] =
OAuth2
.ClientCredentialsProvider[TracedIO](credentials)
.map(x => x)
}

final case class OAuth2Sessions(session: OAuth2.Session) extends CdfSparkAuth {

private val refreshSecondsBeforeExpiration = 300L

override def provider(
implicit clock: Clock[IO],
sttpBackend: SttpBackend[IO, Any]): IO[AuthProvider[IO]] =
OAuth2.SessionProvider[IO](
session,
refreshSecondsBeforeExpiration = refreshSecondsBeforeExpiration
)
implicit clock: Clock[TracedIO],
sttpBackend: SttpBackend[TracedIO, Any]): TracedIO[AuthProvider[TracedIO]] =
OAuth2
.SessionProvider[TracedIO](
session,
refreshSecondsBeforeExpiration = refreshSecondsBeforeExpiration
)
.map(x => x)
}
}
Loading

0 comments on commit 49c7001

Please sign in to comment.