Skip to content

Commit

Permalink
Just 1 partition code for now
Browse files Browse the repository at this point in the history
  • Loading branch information
BenChand committed Oct 30, 2023
1 parent d22ee7b commit ecb2e8b
Showing 1 changed file with 23 additions and 33 deletions.
56 changes: 23 additions & 33 deletions src/main/scala/cognite/spark/v1/AssetHierarchyBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,42 +78,32 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)

import CdpConnector.ioRuntime

def delete(data: DataFrame): Unit =
if (config.enableSinglePartitionDeleteAssetHierarchy) {
data
.repartition(numPartitions = 1)
.foreachPartition((rows: Iterator[Row]) => {
batchedOperation[DeleteItemByCogniteId, Unit](
rows.map(r => fromRow[DeleteItemByCogniteId](r)).toVector,
// The API calls throw exception when any of the ids do not exist
ids =>
client.assets
.deleteRecursive(ids.map(_.toCogniteId), recursive = true, ignoreUnknownIds = true)
.flatTap(_ => incMetrics(itemsDeleted, ids.size))
.map(Seq(_)),
batchSize = batchSize
).void.unsafeRunSync()
})
def delete(data: DataFrame): Unit = {
val partitionedData = if (config.enableSinglePartitionDeleteAssetHierarchy) {
data.repartition(numPartitions = 1)
} else {
data.foreachPartition((rows: Iterator[Row]) => {
val deletes = rows.map(r => fromRow[DeleteItemByCogniteId](r))
Stream
.fromIterator[IO](deletes, chunkSize = batchSize)
.chunks
.parEvalMapUnordered(config.parallelismPerPartition) { chunk =>
client.assets
.deleteRecursive(
chunk.toVector.map(_.toCogniteId),
recursive = true,
ignoreUnknownIds = true)
.flatTap(_ => incMetrics(itemsDeleted, chunk.size))
}
.compile
.drain
.unsafeRunSync()
})
data
}

partitionedData.foreachPartition((rows: Iterator[Row]) => {
val deletes = rows.map(r => fromRow[DeleteItemByCogniteId](r))
Stream
.fromIterator[IO](deletes, chunkSize = batchSize)
.chunks
.parEvalMapUnordered(config.parallelismPerPartition) { chunk =>
client.assets
.deleteRecursive(
chunk.toVector.map(_.toCogniteId),
recursive = true,
ignoreUnknownIds = true)
.flatTap(_ => incMetrics(itemsDeleted, chunk.size))
}
.compile
.drain
.unsafeRunSync()
})
}

def buildFromDf(data: DataFrame): Unit =
// Do not use .collect to run the builder on one of the executors and not on the driver
data
Expand Down

0 comments on commit ecb2e8b

Please sign in to comment.