Skip to content

Commit

Permalink
Remove flag
Browse files Browse the repository at this point in the history
  • Loading branch information
BenChand committed Nov 2, 2023
1 parent 913920d commit 9ee2fa5
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 34 deletions.
45 changes: 20 additions & 25 deletions src/main/scala/cognite/spark/v1/AssetHierarchyBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,31 +78,26 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)

import CdpConnector.ioRuntime

def delete(data: DataFrame): Unit = {
val partitionedData = if (config.enableSinglePartitionDeleteAssetHierarchy) {
data.repartition(numPartitions = 1)
} else {
data
}

partitionedData.foreachPartition((rows: Iterator[Row]) => {
val deletes = rows.map(r => fromRow[DeleteItemByCogniteId](r))
Stream
.fromIterator[IO](deletes, chunkSize = batchSize)
.chunks
.parEvalMapUnordered(config.parallelismPerPartition) { chunk =>
client.assets
.deleteRecursive(
chunk.toVector.map(_.toCogniteId),
recursive = true,
ignoreUnknownIds = true)
.flatTap(_ => incMetrics(itemsDeleted, chunk.size))
}
.compile
.drain
.unsafeRunSync()
})
}
def delete(data: DataFrame): Unit =
data
.repartition(numPartitions = 1)
.foreachPartition((rows: Iterator[Row]) => {
val deletes = rows.map(r => fromRow[DeleteItemByCogniteId](r))
Stream
.fromIterator[IO](deletes, chunkSize = batchSize)
.chunks
.parEvalMapUnordered(config.parallelismPerPartition) { chunk =>
client.assets
.deleteRecursive(
chunk.toVector.map(_.toCogniteId),
recursive = true,
ignoreUnknownIds = true)
.flatTap(_ => incMetrics(itemsDeleted, chunk.size))
}
.compile
.drain
.unsafeRunSync()
})

def buildFromDf(data: DataFrame): Unit =
// Do not use .collect to run the builder on one of the executors and not on the driver
Expand Down
6 changes: 1 addition & 5 deletions src/main/scala/cognite/spark/v1/DefaultSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -406,9 +406,6 @@ object DefaultSource {
val collectMetrics = toBoolean(parameters, "collectMetrics")
val collectTestMetrics = toBoolean(parameters, "collectTestMetrics")

val enableSinglePartitionDeleteAssetHierarchy =
toBoolean(parameters, "enableSinglePartitionDeleteHierarchy", defaultValue = false)

val saveMode = parseSaveMode(parameters)
val parallelismPerPartition = {
toPositiveInt(parameters, "parallelismPerPartition").getOrElse(
Expand Down Expand Up @@ -458,8 +455,7 @@ object DefaultSource {
deleteMissingAssets = toBoolean(parameters, "deleteMissingAssets"),
subtrees = subtreesOption,
ignoreNullFields = toBoolean(parameters, "ignoreNullFields", defaultValue = true),
rawEnsureParent = toBoolean(parameters, "rawEnsureParent", defaultValue = true),
enableSinglePartitionDeleteAssetHierarchy = enableSinglePartitionDeleteAssetHierarchy
rawEnsureParent = toBoolean(parameters, "rawEnsureParent", defaultValue = true)
)
}

Expand Down
3 changes: 1 addition & 2 deletions src/main/scala/cognite/spark/v1/RelationConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ final case class RelationConfig(
deleteMissingAssets: Boolean,
subtrees: AssetSubtreeOption,
ignoreNullFields: Boolean,
rawEnsureParent: Boolean,
enableSinglePartitionDeleteAssetHierarchy: Boolean // flag to test whether single partition helps avoid NPE in asset hierarchy builder
rawEnsureParent: Boolean
) {

/** Desired number of Spark partitions ~= partitions / parallelismPerPartition */
Expand Down
3 changes: 1 addition & 2 deletions src/test/scala/cognite/spark/v1/SparkTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,7 @@ trait SparkTest {
deleteMissingAssets = false,
subtrees = AssetSubtreeOption.Ingest,
ignoreNullFields = true,
rawEnsureParent = false,
enableSinglePartitionDeleteAssetHierarchy = false
rawEnsureParent = false
)

private def getCounterSafe(metricName: String): Option[Long] =
Expand Down

0 comments on commit 9ee2fa5

Please sign in to comment.