From 7b3eb28624f2ccd88cc10fa891438f25c458eb34 Mon Sep 17 00:00:00 2001 From: Dmitry Ivankov Date: Fri, 3 Nov 2023 14:58:24 +0100 Subject: [PATCH] CDF-20182: fix fdm deduplication logic: use (space, externalId) instead of just (space) During scan we have deduplication of items, it was done by just `externalId`. But `externalId` can be reused between different spaces in dms case. So let's pass `space` around and use `space`+`externalId` for deduplication. We do have `space` value available, just a matter of using it. [CDF-20182] --- .../spark/v1/FlexibleDataModelBaseRelation.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/main/scala/cognite/spark/v1/FlexibleDataModelBaseRelation.scala b/src/main/scala/cognite/spark/v1/FlexibleDataModelBaseRelation.scala index ba9a2087f..28055c0ce 100644 --- a/src/main/scala/cognite/spark/v1/FlexibleDataModelBaseRelation.scala +++ b/src/main/scala/cognite/spark/v1/FlexibleDataModelBaseRelation.scala @@ -34,11 +34,11 @@ abstract class FlexibleDataModelBaseRelation(config: RelationConfig, sqlContext: with WritableRelation { override def buildScan(selectedColumns: Array[String], filters: Array[Filter]): RDD[Row] = - SdkV1Rdd[ProjectedFlexibleDataModelInstance, String]( + SdkV1Rdd[ProjectedFlexibleDataModelInstance, (String, String)]( sqlContext.sparkContext, config, (item: ProjectedFlexibleDataModelInstance, _) => toRow(item), - _.externalId, + instance => (instance.space, instance.externalId), getStreams(filters, selectedColumns) ) @@ -201,7 +201,8 @@ abstract class FlexibleDataModelBaseRelation(config: RelationConfig, sqlContext: case s if s.equalsIgnoreCase("spaceExternalId") => n.space case s if s.equalsIgnoreCase("externalId") => n.externalId case p => allAvailablePropValues.get(p).map(extractInstancePropertyValue).orNull - } + }, + space = n.space ) case e: InstanceDefinition.EdgeDefinition => ProjectedFlexibleDataModelInstance( @@ -214,7 +215,8 @@ abstract class FlexibleDataModelBaseRelation(config: RelationConfig, sqlContext: case s if s.equalsIgnoreCase("endNode") => Array(e.endNode.space, e.endNode.externalId) case s if s.equalsIgnoreCase("type") => Array(e.`type`.space, e.`type`.externalId) case p => allAvailablePropValues.get(p).map(extractInstancePropertyValue).orNull - } + }, + space = e.space ) } } @@ -485,5 +487,8 @@ abstract class FlexibleDataModelBaseRelation(config: RelationConfig, sqlContext: } object FlexibleDataModelBaseRelation { - final case class ProjectedFlexibleDataModelInstance(externalId: String, properties: Array[Any]) + final case class ProjectedFlexibleDataModelInstance( + space: String, + externalId: String, + properties: Array[Any]) }