From 6cfe3fb17503e6113c79b895ad893cf6e3d58ffe Mon Sep 17 00:00:00 2001 From: Andreas Korneliussen Date: Tue, 26 Mar 2024 16:17:49 +0100 Subject: [PATCH 1/3] Enable syncing/cursors with is_new when resolving view through data model --- .../cognite/spark/v1/DefaultSource.scala | 42 ++++++++--- ...bleDataModelCorePropertySyncRelation.scala | 12 +--- .../v1/FlexibleDataModelRelationFactory.scala | 69 ++++++++++++------- 3 files changed, 82 insertions(+), 41 deletions(-) diff --git a/src/main/scala/cognite/spark/v1/DefaultSource.scala b/src/main/scala/cognite/spark/v1/DefaultSource.scala index 2d9925813..8fe954102 100644 --- a/src/main/scala/cognite/spark/v1/DefaultSource.scala +++ b/src/main/scala/cognite/spark/v1/DefaultSource.scala @@ -7,8 +7,7 @@ import cognite.spark.v1.FlexibleDataModelRelationFactory.{ ConnectionConfig, DataModelConnectionConfig, DataModelViewConfig, - ViewCorePropertyConfig, - ViewSyncCorePropertyConfig + ViewCorePropertyConfig } import com.cognite.sdk.scala.common.{BearerTokenAuth, OAuth2, TicketAuth} import com.cognite.sdk.scala.v1.fdm.common.Usage @@ -63,6 +62,7 @@ class DefaultSource sqlContext: SQLContext): FlexibleDataModelBaseRelation = { val corePropertySyncRelation = extractCorePropertySyncRelation(parameters, config, sqlContext) val corePropertyRelation = extractCorePropertyRelation(parameters, config, sqlContext) + val datamodelBasedSync = extractDataModelBasedConnectionRelationSync(parameters, config, sqlContext) val dataModelBasedConnectionRelation = extractDataModelBasedConnectionRelation(parameters, config, sqlContext) val dataModelBasedCorePropertyRelation = @@ -71,12 +71,12 @@ class DefaultSource corePropertySyncRelation .orElse(corePropertyRelation) + .orElse(datamodelBasedSync) .orElse(dataModelBasedConnectionRelation) .orElse(dataModelBasedCorePropertyRelation) .orElse(connectionRelation) - .getOrElse( - throw new CdfSparkException( - s""" + .getOrElse(throw new CdfSparkException( + s""" |Invalid combination of arguments! | | Expecting 'instanceType' and 'cursor' with optional arguments ('viewSpace', 'viewExternalId', 'viewVersion', @@ -85,11 +85,11 @@ class DefaultSource | 'instanceSpace') for CorePropertyRelation, | or expecting ('edgeTypeSpace', 'edgeTypeExternalId') with optional 'instanceSpace' for ConnectionRelation, | or expecting ('modelSpace', 'modelExternalId', 'modelVersion', 'viewExternalId') with optional - | 'instanceSpace' for data model based CorePropertyRelation, + | 'instanceSpace' and optional 'cursor' for data model based CorePropertyRelation, | or expecting ('modelSpace', 'modelExternalId', 'modelVersion', viewExternalId', 'connectionPropertyName') | with optional 'instanceSpace' for data model based ConnectionRelation, |""".stripMargin - )) + )) } /** @@ -525,6 +525,31 @@ object DefaultSource { .map(FlexibleDataModelRelationFactory.dataModelRelation(config, sqlContext, _)) } + private def extractDataModelBasedConnectionRelationSync( + parameters: Map[String, String], + config: RelationConfig, + sqlContext: SQLContext): Option[FlexibleDataModelBaseRelation] = { + val instanceSpace = parameters.get("instanceSpace") + Apply[Option] + .map5( + parameters.get("modelSpace"), + parameters.get("modelExternalId"), + parameters.get("modelVersion"), + parameters.get("viewExternalId"), + parameters.get("cursor") + )(Tuple5(_, _, _, _, _)) + .map(t => + FlexibleDataModelRelationFactory.dataModelRelationSync( + t._5, + parameters.get("cursorName"), + parameters.get("jobId"), + parameters.get("syncCursorSaveCallbackUrl"), + config, + sqlContext, + DataModelViewConfig(t._1, t._2, t._3, t._4, instanceSpace) + )) + } + private def extractConnectionRelation( parameters: Map[String, String], config: RelationConfig, @@ -570,10 +595,9 @@ object DefaultSource { cursorName = cursorName, jobId = jobId, syncCursorSaveCallbackUrl = syncCursorSaveCallbackUrl, - viewCorePropConfig = ViewSyncCorePropertyConfig( + viewCorePropConfig = ViewCorePropertyConfig( intendedUsage = usage, viewReference = viewReference, - cursor = usageAndCursor._2, instanceSpace = parameters.get("instanceSpace")) ) } diff --git a/src/main/scala/cognite/spark/v1/FlexibleDataModelCorePropertySyncRelation.scala b/src/main/scala/cognite/spark/v1/FlexibleDataModelCorePropertySyncRelation.scala index 2865e7130..5231ad4c7 100644 --- a/src/main/scala/cognite/spark/v1/FlexibleDataModelCorePropertySyncRelation.scala +++ b/src/main/scala/cognite/spark/v1/FlexibleDataModelCorePropertySyncRelation.scala @@ -3,10 +3,7 @@ package cognite.spark.v1 import cats.effect.IO import cognite.spark.v1.CdpConnector.ioRuntime import cognite.spark.v1.FlexibleDataModelBaseRelation.ProjectedFlexibleDataModelInstance -import cognite.spark.v1.FlexibleDataModelRelationFactory.{ - ViewCorePropertyConfig, - ViewSyncCorePropertyConfig -} +import cognite.spark.v1.FlexibleDataModelRelationFactory.ViewCorePropertyConfig import com.cognite.sdk.scala.common.{CdpApiException, ItemsWithCursor} import com.cognite.sdk.scala.v1.GenericClient import com.cognite.sdk.scala.v1.fdm.common.Usage @@ -41,13 +38,10 @@ private[spark] class FlexibleDataModelCorePropertySyncRelation( cursorName: Option[String], jobId: Option[String], syncCursorSaveCallbackUrl: Option[String], - corePropConfig: ViewSyncCorePropertyConfig)(sqlContext: SQLContext) + corePropConfig: ViewCorePropertyConfig)(sqlContext: SQLContext) extends FlexibleDataModelCorePropertyRelation( config, - ViewCorePropertyConfig( - corePropConfig.intendedUsage, - corePropConfig.viewReference, - corePropConfig.instanceSpace) + corePropConfig )(sqlContext) { private val logger = getLogger diff --git a/src/main/scala/cognite/spark/v1/FlexibleDataModelRelationFactory.scala b/src/main/scala/cognite/spark/v1/FlexibleDataModelRelationFactory.scala index 5beb7b279..da0343480 100644 --- a/src/main/scala/cognite/spark/v1/FlexibleDataModelRelationFactory.scala +++ b/src/main/scala/cognite/spark/v1/FlexibleDataModelRelationFactory.scala @@ -21,13 +21,6 @@ object FlexibleDataModelRelationFactory { instanceSpace: Option[String]) extends FlexibleDataModelRelationFactory - final case class ViewSyncCorePropertyConfig( - intendedUsage: Usage, - viewReference: Option[ViewReference], - cursor: String, - instanceSpace: Option[String]) - extends FlexibleDataModelRelationFactory - final case class ConnectionConfig( edgeTypeSpace: String, edgeTypeExternalId: String, @@ -66,7 +59,7 @@ object FlexibleDataModelRelationFactory { cursorName: Option[String], jobId: Option[String], syncCursorSaveCallbackUrl: Option[String], - viewCorePropConfig: ViewSyncCorePropertyConfig): FlexibleDataModelCorePropertySyncRelation = + viewCorePropConfig: ViewCorePropertyConfig): FlexibleDataModelCorePropertySyncRelation = new FlexibleDataModelCorePropertySyncRelation( cursor, config, @@ -81,6 +74,28 @@ object FlexibleDataModelRelationFactory { connectionConfig: ConnectionConfig): FlexibleDataModelConnectionRelation = new FlexibleDataModelConnectionRelation(config, connectionConfig)(sqlContext) + def dataModelRelationSync( + cursor: String, + cursorName: Option[String], + jobId: Option[String], + syncCursorSaveCallbackUrl: Option[String], + config: RelationConfig, + sqlContext: SQLContext, + dataModelConfig: DataModelConfig): FlexibleDataModelBaseRelation = { + val viewCorePropertyConfig = (dataModelConfig match { + case vc: DataModelViewConfig => resolveViewCorePropertyConfig(config, vc).unsafeRunSync() + case cc: DataModelConnectionConfig => ViewCorePropertyConfig(Usage.Edge, None, cc.instanceSpace) + }) + new FlexibleDataModelCorePropertySyncRelation( + cursor, + config, + cursorName, + jobId, + syncCursorSaveCallbackUrl, + viewCorePropertyConfig + )(sqlContext) + } + def dataModelRelation( config: RelationConfig, sqlContext: SQLContext, @@ -91,10 +106,9 @@ object FlexibleDataModelRelationFactory { case cc: DataModelConnectionConfig => createConnectionRelationForDataModel(config, sqlContext, cc) }).unsafeRunSync() - private def createCorePropertyRelationForDataModel( + private def resolveViewCorePropertyConfig( config: RelationConfig, - sqlContext: SQLContext, - modelViewConfig: DataModelViewConfig): IO[FlexibleDataModelCorePropertyRelation] = { + modelViewConfig: DataModelViewConfig): IO[ViewCorePropertyConfig] = { val client = CdpConnector.clientFromConfig(config) fetchInlinedDataModel(client, modelViewConfig) .map { models => @@ -112,26 +126,35 @@ object FlexibleDataModelRelationFactory { .flatMap { case Some(vc: DataModelViewReference) => IO.delay( - new FlexibleDataModelCorePropertyRelation( - config, - corePropConfig = ViewCorePropertyConfig( - intendedUsage = vc.usedFor, - viewReference = Some(vc.toSourceReference), - instanceSpace = modelViewConfig.instanceSpace - ) - )(sqlContext) + ViewCorePropertyConfig( + intendedUsage = vc.usedFor, + viewReference = Some(vc.toSourceReference), + instanceSpace = modelViewConfig.instanceSpace + ) ) case None => IO.raiseError( new CdfSparkIllegalArgumentException(s""" - |Could not find a view with externalId: '${modelViewConfig.viewExternalId}' in the specified data model - | with (space: '${modelViewConfig.modelSpace}', externalId: '${modelViewConfig.modelExternalId}', - | version: '${modelViewConfig.modelVersion}') - |""".stripMargin) + |Could not find a view with externalId: '${modelViewConfig.viewExternalId}' in the specified data model + | with (space: '${modelViewConfig.modelSpace}', externalId: '${modelViewConfig.modelExternalId}', + | version: '${modelViewConfig.modelVersion}') + |""".stripMargin) ) } } + private def createCorePropertyRelationForDataModel( + config: RelationConfig, + sqlContext: SQLContext, + modelViewConfig: DataModelViewConfig): IO[FlexibleDataModelCorePropertyRelation] = + resolveViewCorePropertyConfig(config, modelViewConfig) + .map( + viewCorePropertyConfig => + new FlexibleDataModelCorePropertyRelation( + config, + corePropConfig = viewCorePropertyConfig + )(sqlContext)) + private def createConnectionRelationForDataModel( config: RelationConfig, sqlContext: SQLContext, From 00177fab8a183ed1e5612037d71414b65d4c0eb5 Mon Sep 17 00:00:00 2001 From: Andreas Korneliussen Date: Wed, 27 Mar 2024 13:49:33 +0100 Subject: [PATCH 2/3] Resolve connection before sync --- src/main/scala/cognite/spark/v1/DefaultSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/cognite/spark/v1/DefaultSource.scala b/src/main/scala/cognite/spark/v1/DefaultSource.scala index 8fe954102..07c4eecdf 100644 --- a/src/main/scala/cognite/spark/v1/DefaultSource.scala +++ b/src/main/scala/cognite/spark/v1/DefaultSource.scala @@ -71,8 +71,8 @@ class DefaultSource corePropertySyncRelation .orElse(corePropertyRelation) - .orElse(datamodelBasedSync) .orElse(dataModelBasedConnectionRelation) + .orElse(datamodelBasedSync) .orElse(dataModelBasedCorePropertyRelation) .orElse(connectionRelation) .getOrElse(throw new CdfSparkException( From c27ef4624f84c7f43eb812ca43f7403981d91fb0 Mon Sep 17 00:00:00 2001 From: Andreas Korneliussen Date: Wed, 27 Mar 2024 13:51:08 +0100 Subject: [PATCH 3/3] Resolve connection before sync --- src/main/scala/cognite/spark/v1/DefaultSource.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/scala/cognite/spark/v1/DefaultSource.scala b/src/main/scala/cognite/spark/v1/DefaultSource.scala index 3946102a5..07c4eecdf 100644 --- a/src/main/scala/cognite/spark/v1/DefaultSource.scala +++ b/src/main/scala/cognite/spark/v1/DefaultSource.scala @@ -71,7 +71,6 @@ class DefaultSource corePropertySyncRelation .orElse(corePropertyRelation) - .orElse(datamodelBasedSync) .orElse(dataModelBasedConnectionRelation) .orElse(datamodelBasedSync) .orElse(dataModelBasedCorePropertyRelation)