Skip to content

Commit

Permalink
Merge branch 'master' into dmivankov-patch-3
Browse files Browse the repository at this point in the history
  • Loading branch information
dmivankov authored Oct 16, 2023
2 parents 4b2d628 + ce5e507 commit ec533dd
Show file tree
Hide file tree
Showing 63 changed files with 1,061 additions and 1,693 deletions.
14 changes: 3 additions & 11 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ val supportedScalaVersions = List(scala212, scala213)
val sparkVersion = "3.3.3"
val circeVersion = "0.14.6"
val sttpVersion = "3.5.2"
val natchezVersion = "0.3.1"
val Specs2Version = "4.20.2"
val cogniteSdkVersion = "2.8.761"
val cogniteSdkVersion = "2.8.763"

val prometheusVersion = "0.16.0"
val log4sVersion = "1.10.0"
Expand All @@ -19,8 +18,6 @@ sonatypeProfileName := "com.cognite" // default is same as organization and lead

lazy val gpgPass = Option(System.getenv("GPG_KEY_PASSWORD"))

addCompilerPlugin("org.typelevel" % "kind-projector" % "0.13.2" cross CrossVersion.full)

ThisBuild / scalafixDependencies += "org.typelevel" %% "typelevel-scalafix" % "0.1.4"

lazy val patchVersion = scala.io.Source.fromFile("patch_version.txt").mkString.trim
Expand All @@ -29,7 +26,7 @@ lazy val commonSettings = Seq(
organization := "com.cognite.spark.datasource",
organizationName := "Cognite",
organizationHomepage := Some(url("https://cognite.com")),
version := "3.1." + patchVersion,
version := "3.2." + patchVersion,
isSnapshot := patchVersion.endsWith("-SNAPSHOT"),
crossScalaVersions := supportedScalaVersions,
semanticdbEnabled := true,
Expand Down Expand Up @@ -156,12 +153,7 @@ lazy val library = (project in file("."))
exclude("org.glassfish.hk2.external", "javax.inject"),
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided
exclude("org.glassfish.hk2.external", "javax.inject"),
"org.log4s" %% "log4s" % log4sVersion,
"org.tpolecat" %% "natchez-core" % natchezVersion,
"org.tpolecat" %% "natchez-noop" % natchezVersion,
"org.tpolecat" %% "natchez-opentelemetry" % natchezVersion,
"io.opentelemetry" % "opentelemetry-sdk" % "1.23.0",
"com.lightstep.opentelemetry" % "opentelemetry-launcher" % "1.22.0"
"org.log4s" %% "log4s" % log4sVersion
),
coverageExcludedPackages := "com.cognite.data.*",
buildInfoKeys := Seq[BuildInfoKey](organization, version, organizationName),
Expand Down
44 changes: 24 additions & 20 deletions src/main/scala/cognite/spark/v1/AssetHierarchyBuilder.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package cognite.spark.v1

import cats.effect.IO
import cats.implicits._
import cognite.spark.v1.PushdownUtilities.stringSeqToCogniteExternalIdSeq
import cognite.spark.compiletime.macros.SparkSchemaHelper.{fromRow, structType}
Expand Down Expand Up @@ -75,11 +76,13 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)

import cognite.spark.compiletime.macros.StructTypeEncoderMacro._

def delete(data: DataFrame): TracedIO[Unit] =
SparkF(data).foreachPartition((rows: Iterator[Row]) => {
import CdpConnector.ioRuntime

def delete(data: DataFrame): Unit =
data.foreachPartition((rows: Iterator[Row]) => {
val deletes = rows.map(r => fromRow[DeleteItemByCogniteId](r))
Stream
.fromIterator[TracedIO](deletes, chunkSize = batchSize)
.fromIterator[IO](deletes, chunkSize = batchSize)
.chunks
.parEvalMapUnordered(config.parallelismPerPartition) { chunk =>
client.assets
Expand All @@ -91,21 +94,23 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)
}
.compile
.drain
.unsafeRunSync()
})

def buildFromDf(data: DataFrame): TracedIO[Unit] =
def buildFromDf(data: DataFrame): Unit =
// Do not use .collect to run the builder on one of the executors and not on the driver
SparkF(data.repartition(numPartitions = 1))
data
.repartition(numPartitions = 1)
.foreachPartition((rows: Iterator[Row]) => {
build(rows)
build(rows).unsafeRunSync()
})

private val batchSize = config.batchSize.getOrElse(Constants.DefaultBatchSize)

private val deleteMissingAssets = config.deleteMissingAssets
private val subtreeMode = config.subtrees

def build(data: Iterator[Row]): TracedIO[Unit] = {
def build(data: Iterator[Row]): IO[Unit] = {
val sourceTree = data.map(r => fromRow[AssetsIngestSchema](r)).toArray

val subtrees =
Expand Down Expand Up @@ -141,7 +146,7 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)
} yield ()
}

def validateSubtreeRoots(roots: Vector[AssetsIngestSchema]): TracedIO[Unit] =
def validateSubtreeRoots(roots: Vector[AssetsIngestSchema]): IO[Unit] =
// check that all `parentExternalId`s exist
batchedOperation[String, Asset](
roots.map(_.parentExternalId).filter(_.nonEmpty).distinct,
Expand All @@ -161,7 +166,7 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)
})
).void

def buildSubtrees(trees: Vector[AssetSubtree]): TracedIO[Unit] =
def buildSubtrees(trees: Vector[AssetSubtree]): IO[Unit] =
for {
// fetch existing roots and update or insert them first
cdfRoots <- fetchCdfAssets(trees.map(_.root.externalId))
Expand All @@ -179,7 +184,7 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)
_ <- update(toUpdate)
} yield ()

def insert(toInsert: Seq[AssetsIngestSchema]): TracedIO[Vector[Asset]] = {
def insert(toInsert: Seq[AssetsIngestSchema]): IO[Vector[Asset]] = {
val assetCreatesToInsert = toInsert.map(toAssetCreate)
// Traverse batches in order to ensure writing parents first
assetCreatesToInsert
Expand All @@ -193,7 +198,7 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)
.map(_.flatten)
}

def deleteMissingChildren(trees: Vector[AssetSubtree], deleteMissingAssets: Boolean): TracedIO[Unit] =
def deleteMissingChildren(trees: Vector[AssetSubtree], deleteMissingAssets: Boolean): IO[Unit] =
if (deleteMissingAssets) {
val ingestedNodeSet = trees.flatMap(_.allNodes).map(_.externalId).toSet
// list all subtrees of the tree root and filter those which are not in the ingested set
Expand All @@ -207,7 +212,6 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)
.map(asset => CogniteInternalId(asset.id))
.compile
.toVector
.map(x => x)
)
_ <- batchedOperation[CogniteInternalId, Nothing](
idsToDelete,
Expand All @@ -219,10 +223,10 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)
)
} yield ()
} else {
TracedIO.unit
IO.unit
}

def update(toUpdate: Vector[AssetsIngestSchema]): TracedIO[Unit] =
def update(toUpdate: Vector[AssetsIngestSchema]): IO[Unit] =
batchedOperation[AssetsIngestSchema, Asset](
toUpdate,
updateBatch => {
Expand All @@ -232,7 +236,7 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)
}
).void

def fetchCdfAssets(sourceRootExternalIds: Vector[String]): TracedIO[Map[String, Asset]] =
def fetchCdfAssets(sourceRootExternalIds: Vector[String]): IO[Map[String, Asset]] =
batchedOperation[String, Asset](
sourceRootExternalIds,
batch =>
Expand All @@ -242,7 +246,7 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)

def upsertRoots( // scalastyle:off
newRoots: Vector[AssetsIngestSchema],
sourceRoots: Map[String, Asset]): TracedIO[Vector[Asset]] = {
sourceRoots: Map[String, Asset]): IO[Vector[Asset]] = {

// Assets without corresponding source root will be created
val (toCreate, toUpdate, toIgnore) = nodesToInsertUpdate(newRoots, sourceRoots)
Expand Down Expand Up @@ -392,18 +396,18 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)

private def batchedOperation[I, R](
list: Vector[I],
op: Vector[I] => TracedIO[Seq[R]],
batchSize: Int = this.batchSize): TracedIO[Vector[R]] =
op: Vector[I] => IO[Seq[R]],
batchSize: Int = this.batchSize): IO[Vector[R]] =
if (list.nonEmpty) {
Stream
.fromIterator[TracedIO](list.iterator, chunkSize = batchSize)
.fromIterator[IO](list.iterator, chunkSize = batchSize)
.chunks
.parEvalMapUnordered(config.parallelismPerPartition)(chunk => op(chunk.toVector).map(Chunk.seq))
.flatMap(Stream.chunk)
.compile
.toVector
} else {
TracedIO.pure(Vector.empty)
IO.pure(Vector.empty)
}
}

Expand Down
22 changes: 11 additions & 11 deletions src/main/scala/cognite/spark/v1/AssetsRelation.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package cognite.spark.v1

import cats.implicits._
import cats.effect.IO
import cognite.spark.v1.PushdownUtilities._
import cognite.spark.compiletime.macros.SparkSchemaHelper._
import com.cognite.sdk.scala.common._
Expand All @@ -24,7 +24,7 @@ class AssetsRelation(config: RelationConfig, subtreeIds: Option[List[CogniteId]]

Array("name", "source", "dataSetId", "labels", "id", "externalId", "externalIdPrefix")
override def getStreams(sparkFilters: Array[Filter])(
client: GenericClient[TracedIO]): Seq[Stream[TracedIO, AssetsReadSchema]] = {
client: GenericClient[IO]): Seq[Stream[IO, AssetsReadSchema]] = {
val (ids, filters) =
pushdownToFilters(sparkFilters, assetsFilterFromMap, AssetsFilter(assetSubtreeIds = subtreeIds))
executeFilter(client.assets, filters, ids, config.partitions, config.limitPerPartition)
Expand Down Expand Up @@ -58,48 +58,48 @@ class AssetsRelation(config: RelationConfig, subtreeIds: Option[List[CogniteId]]
assetSubtreeIds = subtreeIds
)

override def insert(rows: Seq[Row]): TracedIO[Unit] = {
override def insert(rows: Seq[Row]): IO[Unit] = {
val assetsInsertions = rows.map(fromRow[AssetsInsertSchema](_))
val assets = assetsInsertions.map(
_.into[AssetCreate]
.withFieldComputed(_.labels, u => stringSeqToCogniteExternalIdSeq(u.labels))
.transform)
client.assets
.create(assets)
.flatTap(_ => incMetrics(itemsCreated, assets.size)) *> TracedIO.unit
.flatTap(_ => incMetrics(itemsCreated, assets.size)) *> IO.unit
}

private def isUpdateEmpty(u: AssetUpdate): Boolean = u == AssetUpdate()

override def update(rows: Seq[Row]): TracedIO[Unit] = {
override def update(rows: Seq[Row]): IO[Unit] = {
val assetUpdates = rows.map(r => fromRow[AssetsUpsertSchema](r))

updateByIdOrExternalId[AssetsUpsertSchema, AssetUpdate, Assets[TracedIO], Asset](
updateByIdOrExternalId[AssetsUpsertSchema, AssetUpdate, Assets[IO], Asset](
assetUpdates,
client.assets,
isUpdateEmpty
)
}

override def delete(rows: Seq[Row]): TracedIO[Unit] = {
override def delete(rows: Seq[Row]): IO[Unit] = {
val deletes = rows.map(fromRow[DeleteItemByCogniteId](_))
deleteWithIgnoreUnknownIds(client.assets, deletes.map(_.toCogniteId), config.ignoreUnknownIds)
}

override def upsert(rows: Seq[Row]): TracedIO[Unit] = {
override def upsert(rows: Seq[Row]): IO[Unit] = {
val assets = rows.map(fromRow[AssetsUpsertSchema](_))
genericUpsert[Asset, AssetsUpsertSchema, AssetCreate, AssetUpdate, Assets[TracedIO]](
genericUpsert[Asset, AssetsUpsertSchema, AssetCreate, AssetUpdate, Assets[IO]](
assets,
isUpdateEmpty,
client.assets,
mustBeUpdate = r => r.name.isEmpty && r.getExternalId.nonEmpty
)
}

override def getFromRowsAndCreate(rows: Seq[Row], @unused doUpsert: Boolean = true): TracedIO[Unit] = {
override def getFromRowsAndCreate(rows: Seq[Row], @unused doUpsert: Boolean = true): IO[Unit] = {
val assetsUpserts = rows.map(fromRow[AssetsUpsertSchema](_))
val assets = assetsUpserts.map(_.transformInto[AssetCreate])
createOrUpdateByExternalId[Asset, AssetUpdate, AssetCreate, AssetCreate, Option, Assets[TracedIO]](
createOrUpdateByExternalId[Asset, AssetUpdate, AssetCreate, AssetCreate, Option, Assets[IO]](
Set.empty,
assets,
client.assets,
Expand Down
17 changes: 8 additions & 9 deletions src/main/scala/cognite/spark/v1/CdfRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,18 @@ abstract class CdfRelation(config: RelationConfig, shortNameStr: String)
@transient lazy protected val itemsUpserted: Counter =
MetricsSource.getOrCreateCounter(config.metricsPrefix, s"$shortName.upserted")

@transient lazy val client: GenericClient[TracedIO] =
@transient lazy val client: GenericClient[IO] =
CdpConnector.clientFromConfig(config)

@transient lazy val alphaClient: GenericClient[TracedIO] =
@transient lazy val alphaClient: GenericClient[IO] =
CdpConnector.clientFromConfig(config, Some("alpha"))

def incMetrics(counter: Counter, count: Int): TracedIO[Unit] =
TracedIO.liftIO(
IO(
if (config.collectMetrics) {
counter.inc(count.toLong)
}
))
def incMetrics(counter: Counter, count: Int): IO[Unit] =
IO(
if (config.collectMetrics) {
counter.inc(count.toLong)
}
)

// Needed for labels property when transforming from UpsertSchema to Update
implicit def seqStrToCogIdSetter
Expand Down
32 changes: 13 additions & 19 deletions src/main/scala/cognite/spark/v1/CdfSparkAuth.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,35 @@ import com.cognite.sdk.scala.common.{Auth, AuthProvider, OAuth2}
import sttp.client3.SttpBackend

sealed trait CdfSparkAuth extends Serializable {
def provider(
implicit clock: Clock[TracedIO],
sttpBackend: SttpBackend[TracedIO, Any]): TracedIO[AuthProvider[TracedIO]]
def provider(implicit clock: Clock[IO], sttpBackend: SttpBackend[IO, Any]): IO[AuthProvider[IO]]
}

object CdfSparkAuth {
final case class Static(auth: Auth) extends CdfSparkAuth {
override def provider(
implicit clock: Clock[TracedIO],
sttpBackend: SttpBackend[TracedIO, Any]): TracedIO[AuthProvider[TracedIO]] =
TracedIO.liftIO(IO(AuthProvider[TracedIO](auth)))
implicit clock: Clock[IO],
sttpBackend: SttpBackend[IO, Any]): IO[AuthProvider[IO]] =
IO(AuthProvider(auth))
}

final case class OAuth2ClientCredentials(credentials: OAuth2.ClientCredentials) extends CdfSparkAuth {

override def provider(
implicit clock: Clock[TracedIO],
sttpBackend: SttpBackend[TracedIO, Any]): TracedIO[AuthProvider[TracedIO]] =
OAuth2
.ClientCredentialsProvider[TracedIO](credentials)
.map(x => x)
implicit clock: Clock[IO],
sttpBackend: SttpBackend[IO, Any]): IO[AuthProvider[IO]] =
OAuth2.ClientCredentialsProvider[IO](credentials)
}

final case class OAuth2Sessions(session: OAuth2.Session) extends CdfSparkAuth {

private val refreshSecondsBeforeExpiration = 300L

override def provider(
implicit clock: Clock[TracedIO],
sttpBackend: SttpBackend[TracedIO, Any]): TracedIO[AuthProvider[TracedIO]] =
OAuth2
.SessionProvider[TracedIO](
session,
refreshSecondsBeforeExpiration = refreshSecondsBeforeExpiration
)
.map(x => x)
implicit clock: Clock[IO],
sttpBackend: SttpBackend[IO, Any]): IO[AuthProvider[IO]] =
OAuth2.SessionProvider[IO](
session,
refreshSecondsBeforeExpiration = refreshSecondsBeforeExpiration
)
}
}
Loading

0 comments on commit ec533dd

Please sign in to comment.