diff --git a/README.md b/README.md index a9606f563..7cf57bd33 100644 --- a/README.md +++ b/README.md @@ -1474,6 +1474,8 @@ Optionally, you can have Spark infer the DataFrame schema with the following opt - `rawEnsureParent`: When set to true, the parent database and table will be creates if it does not exists already. +- `filterNullFieldsOnNonSchemaRawQueries`: Set this to `"true"`to enable experimental support for filtering empty columns server side in the Raw API, without impacting the inferred schema. Aimed to become enabled by default in the future once it has been fully tested. + ```scala val df = spark.read.format("cognite.spark.v1") .option("type", "raw") diff --git a/build.sbt b/build.sbt index c1d207088..022e2ab7a 100644 --- a/build.sbt +++ b/build.sbt @@ -11,7 +11,7 @@ val circeVersion = "0.14.9" val sttpVersion = "3.5.2" val natchezVersion = "0.3.1" val Specs2Version = "4.20.3" -val cogniteSdkVersion = "2.28.844" +val cogniteSdkVersion = "2.31.861" val prometheusVersion = "0.16.0" val log4sVersion = "1.10.0" @@ -41,7 +41,7 @@ lazy val commonSettings = Seq( organization := "com.cognite.spark.datasource", organizationName := "Cognite", organizationHomepage := Some(url("https://cognite.com")), - version := "3.20." + patchVersion, + version := "3.21." + patchVersion, isSnapshot := patchVersion.endsWith("-SNAPSHOT"), crossScalaVersions := supportedScalaVersions, semanticdbEnabled := true, @@ -160,6 +160,7 @@ lazy val library = (project in file(".")) exclude("org.scala-lang.modules", "scala-collection-compat_2.12") exclude("org.scala-lang.modules", "scala-collection-compat_2.13"), "org.specs2" %% "specs2-core" % Specs2Version % Test, + "com.softwaremill.sttp.client3" %% "async-http-client-backend-fs2" % sttpVersion, "com.softwaremill.sttp.client3" %% "async-http-client-backend-cats" % sttpVersion // Netty is included in Spark as jars/netty-all-4...Final.jar exclude("io.netty", "netty-buffer") diff --git a/build.scala-2.12.sbt.lock b/build.scala-2.12.sbt.lock index 24c4efbde..c98f891d7 100644 --- a/build.scala-2.12.sbt.lock +++ b/build.scala-2.12.sbt.lock @@ -1,6 +1,6 @@ { "lockVersion" : 1, - "timestamp" : "2024-08-20T11:20:27.551661Z", + "timestamp" : "2024-11-13T11:30:40.138510863Z", "configurations" : [ "compile", "optional", @@ -13,11 +13,11 @@ { "org" : "co.fs2", "name" : "fs2-core_2.12", - "version" : "3.10.2", + "version" : "3.11.0", "artifacts" : [ { "name" : "fs2-core_2.12.jar", - "hash" : "sha1:413c7d123f943eea68f6c77907318e58e0410ad4" + "hash" : "sha1:3651a652ea778dea12d9ec8083896f05ba489439" } ], "configurations" : [ @@ -30,11 +30,11 @@ { "org" : "co.fs2", "name" : "fs2-io_2.12", - "version" : "3.10.2", + "version" : "3.11.0", "artifacts" : [ { "name" : "fs2-io_2.12.jar", - "hash" : "sha1:89cefd0c05480d603e26d4b92a3d8284b8ec7b4a" + "hash" : "sha1:eb83f8a4387e935926d00d81999adbae49756a30" } ], "configurations" : [ @@ -44,6 +44,22 @@ "test" ] }, + { + "org" : "co.fs2", + "name" : "fs2-reactive-streams_2.12", + "version" : "3.2.7", + "artifacts" : [ + { + "name" : "fs2-reactive-streams_2.12.jar", + "hash" : "sha1:4b959071b733d9759dcb324dd15216003e511f41" + } + ], + "configurations" : [ + "compile", + "runtime", + "test" + ] + }, { "org" : "com.chuusai", "name" : "shapeless_2.12", @@ -78,11 +94,11 @@ { "org" : "com.cognite", "name" : "cognite-sdk-scala_2.12", - "version" : "2.28.844", + "version" : "2.31.861", "artifacts" : [ { "name" : "cognite-sdk-scala_2.12.jar", - "hash" : "sha1:a020427c9c31f27e6ec0028f0f3e12f4b8e6fc8f" + "hash" : "sha1:ae52606e613a31e1979830a6036f265328033908" } ], "configurations" : [ @@ -95,11 +111,11 @@ { "org" : "com.comcast", "name" : "ip4s-core_2.12", - "version" : "3.5.0", + "version" : "3.6.0", "artifacts" : [ { "name" : "ip4s-core_2.12.jar", - "hash" : "sha1:19d582e20c3cb27c1481b7e43baea0fa90aa7cd6" + "hash" : "sha1:d5b15a847dbd0ca35a281d140ef4d26b00d33b60" } ], "configurations" : [ @@ -324,6 +340,22 @@ "test" ] }, + { + "org" : "com.softwaremill.sttp.client3", + "name" : "async-http-client-backend-fs2_2.12", + "version" : "3.5.2", + "artifacts" : [ + { + "name" : "async-http-client-backend-fs2_2.12.jar", + "hash" : "sha1:fadae6ca5d30cd1d2eb90ad18c679969f027a94e" + } + ], + "configurations" : [ + "compile", + "runtime", + "test" + ] + }, { "org" : "com.softwaremill.sttp.client3", "name" : "async-http-client-backend_2.12", @@ -390,6 +422,22 @@ "test" ] }, + { + "org" : "com.softwaremill.sttp.client3", + "name" : "fs2_2.12", + "version" : "3.5.2", + "artifacts" : [ + { + "name" : "fs2_2.12.jar", + "hash" : "sha1:8dde4d364d013efd245e5d4b849ff3b1e202d5f2" + } + ], + "configurations" : [ + "compile", + "runtime", + "test" + ] + }, { "org" : "com.softwaremill.sttp.client3", "name" : "json-common_2.12", @@ -441,6 +489,22 @@ "test" ] }, + { + "org" : "com.softwaremill.sttp.shared", + "name" : "fs2_2.12", + "version" : "1.3.4", + "artifacts" : [ + { + "name" : "fs2_2.12.jar", + "hash" : "sha1:a53a923369b494da9283bd0bb9c34639f9d63f8a" + } + ], + "configurations" : [ + "compile", + "runtime", + "test" + ] + }, { "org" : "com.softwaremill.sttp.shared", "name" : "ws_2.12", @@ -577,11 +641,11 @@ { "org" : "commons-io", "name" : "commons-io", - "version" : "2.16.1", + "version" : "2.17.0", "artifacts" : [ { "name" : "commons-io.jar", - "hash" : "sha1:377d592e740dc77124e0901291dbfaa6810a200e" + "hash" : "sha1:ddcc8433eb019fb48fe25207c0278143f3e1d7e2" } ], "configurations" : [ @@ -818,6 +882,22 @@ "provided" ] }, + { + "org" : "io.netty", + "name" : "netty-buffer", + "version" : "4.1.60.Final", + "artifacts" : [ + { + "name" : "netty-buffer.jar", + "hash" : "sha1:9d213d090deeca2541ad6827eb3345bcd6e1e701" + } + ], + "configurations" : [ + "compile", + "runtime", + "test" + ] + }, { "org" : "io.netty", "name" : "netty-buffer", @@ -924,6 +1004,22 @@ "provided" ] }, + { + "org" : "io.netty", + "name" : "netty-handler", + "version" : "4.1.60.Final", + "artifacts" : [ + { + "name" : "netty-handler.jar", + "hash" : "sha1:14e28bab0173be10c9631a85069636a0d0221dfe" + } + ], + "configurations" : [ + "compile", + "runtime", + "test" + ] + }, { "org" : "io.netty", "name" : "netty-handler", @@ -2817,11 +2913,11 @@ { "org" : "org.typelevel", "name" : "cats-core_2.12", - "version" : "2.10.0", + "version" : "2.11.0", "artifacts" : [ { "name" : "cats-core_2.12.jar", - "hash" : "sha1:100bc0d103c5bd9ddcd4e2b5c24e88483e2a77d5" + "hash" : "sha1:44f89a95aed47e43f85896b1243b7dc6a7e1feae" } ], "configurations" : [ @@ -2885,11 +2981,11 @@ { "org" : "org.typelevel", "name" : "cats-kernel_2.12", - "version" : "2.10.0", + "version" : "2.11.0", "artifacts" : [ { "name" : "cats-kernel_2.12.jar", - "hash" : "sha1:94b372f873e763186c3cccc9e76492553d01350e" + "hash" : "sha1:dd67f32ab72c3aa2045432992cdf3f0a42484645" } ], "configurations" : [ diff --git a/build.scala-2.13.sbt.lock b/build.scala-2.13.sbt.lock index 06a5b79c5..edd1964c8 100644 --- a/build.scala-2.13.sbt.lock +++ b/build.scala-2.13.sbt.lock @@ -1,6 +1,6 @@ { "lockVersion" : 1, - "timestamp" : "2024-08-20T11:20:32.040120Z", + "timestamp" : "2024-11-13T11:30:56.507065520Z", "configurations" : [ "compile", "optional", @@ -13,11 +13,11 @@ { "org" : "co.fs2", "name" : "fs2-core_2.13", - "version" : "3.10.2", + "version" : "3.11.0", "artifacts" : [ { "name" : "fs2-core_2.13.jar", - "hash" : "sha1:ee6589d3ee01c487af8a9d60cd35274b7010ae95" + "hash" : "sha1:c826e98190d0f45cd1ab52fb4675f2890dd21ce9" } ], "configurations" : [ @@ -30,11 +30,11 @@ { "org" : "co.fs2", "name" : "fs2-io_2.13", - "version" : "3.10.2", + "version" : "3.11.0", "artifacts" : [ { "name" : "fs2-io_2.13.jar", - "hash" : "sha1:e1c3ac4217d017f26efb0cf2af104d62208090c9" + "hash" : "sha1:d0e7805a44b3a88ef75bc6eb22a7c237ed82887e" } ], "configurations" : [ @@ -44,6 +44,22 @@ "test" ] }, + { + "org" : "co.fs2", + "name" : "fs2-reactive-streams_2.13", + "version" : "3.2.7", + "artifacts" : [ + { + "name" : "fs2-reactive-streams_2.13.jar", + "hash" : "sha1:3a05da2c76346ef71bf296d1c75ca585859f3d57" + } + ], + "configurations" : [ + "compile", + "runtime", + "test" + ] + }, { "org" : "com.chuusai", "name" : "shapeless_2.13", @@ -78,11 +94,11 @@ { "org" : "com.cognite", "name" : "cognite-sdk-scala_2.13", - "version" : "2.28.844", + "version" : "2.31.861", "artifacts" : [ { "name" : "cognite-sdk-scala_2.13.jar", - "hash" : "sha1:4e7bc7e592ee1e7d822e7f8e0565d3c7496ed21d" + "hash" : "sha1:245bd96891942232ba8d5a8e2c7ff1714cf2c230" } ], "configurations" : [ @@ -95,11 +111,11 @@ { "org" : "com.comcast", "name" : "ip4s-core_2.13", - "version" : "3.5.0", + "version" : "3.6.0", "artifacts" : [ { "name" : "ip4s-core_2.13.jar", - "hash" : "sha1:a35384c0eea504e57656f0a859487624e1ff8c63" + "hash" : "sha1:4b2e8c831697c325853ea8eeea1f28029805714a" } ], "configurations" : [ @@ -324,6 +340,22 @@ "test" ] }, + { + "org" : "com.softwaremill.sttp.client3", + "name" : "async-http-client-backend-fs2_2.13", + "version" : "3.5.2", + "artifacts" : [ + { + "name" : "async-http-client-backend-fs2_2.13.jar", + "hash" : "sha1:c33cbc7d6041eaf794cc5a5c7deaa480902571da" + } + ], + "configurations" : [ + "compile", + "runtime", + "test" + ] + }, { "org" : "com.softwaremill.sttp.client3", "name" : "async-http-client-backend_2.13", @@ -390,6 +422,22 @@ "test" ] }, + { + "org" : "com.softwaremill.sttp.client3", + "name" : "fs2_2.13", + "version" : "3.5.2", + "artifacts" : [ + { + "name" : "fs2_2.13.jar", + "hash" : "sha1:f169cced1665841097e00fe43e34f206fc854d8f" + } + ], + "configurations" : [ + "compile", + "runtime", + "test" + ] + }, { "org" : "com.softwaremill.sttp.client3", "name" : "json-common_2.13", @@ -441,6 +489,22 @@ "test" ] }, + { + "org" : "com.softwaremill.sttp.shared", + "name" : "fs2_2.13", + "version" : "1.3.4", + "artifacts" : [ + { + "name" : "fs2_2.13.jar", + "hash" : "sha1:242af688949c9e2469df3e2ac5628dec24e761c0" + } + ], + "configurations" : [ + "compile", + "runtime", + "test" + ] + }, { "org" : "com.softwaremill.sttp.shared", "name" : "ws_2.13", @@ -577,11 +641,11 @@ { "org" : "commons-io", "name" : "commons-io", - "version" : "2.16.1", + "version" : "2.17.0", "artifacts" : [ { "name" : "commons-io.jar", - "hash" : "sha1:377d592e740dc77124e0901291dbfaa6810a200e" + "hash" : "sha1:ddcc8433eb019fb48fe25207c0278143f3e1d7e2" } ], "configurations" : [ @@ -818,6 +882,22 @@ "provided" ] }, + { + "org" : "io.netty", + "name" : "netty-buffer", + "version" : "4.1.60.Final", + "artifacts" : [ + { + "name" : "netty-buffer.jar", + "hash" : "sha1:9d213d090deeca2541ad6827eb3345bcd6e1e701" + } + ], + "configurations" : [ + "compile", + "runtime", + "test" + ] + }, { "org" : "io.netty", "name" : "netty-buffer", @@ -924,6 +1004,22 @@ "provided" ] }, + { + "org" : "io.netty", + "name" : "netty-handler", + "version" : "4.1.60.Final", + "artifacts" : [ + { + "name" : "netty-handler.jar", + "hash" : "sha1:14e28bab0173be10c9631a85069636a0d0221dfe" + } + ], + "configurations" : [ + "compile", + "runtime", + "test" + ] + }, { "org" : "io.netty", "name" : "netty-handler", @@ -2831,11 +2927,11 @@ { "org" : "org.typelevel", "name" : "cats-core_2.13", - "version" : "2.10.0", + "version" : "2.11.0", "artifacts" : [ { "name" : "cats-core_2.13.jar", - "hash" : "sha1:f333233487bff425196019aed0443acdfbc80dae" + "hash" : "sha1:bf17f0927b59f990241657fd9ed7e37a40395f53" } ], "configurations" : [ @@ -2899,11 +2995,11 @@ { "org" : "org.typelevel", "name" : "cats-kernel_2.13", - "version" : "2.10.0", + "version" : "2.11.0", "artifacts" : [ { "name" : "cats-kernel_2.13.jar", - "hash" : "sha1:6e9fbc99fa5a88a00ac55afc2540247f7f4ce04d" + "hash" : "sha1:9f46f77ebad492c1abeb6ca27df78ba1fa05eb65" } ], "configurations" : [ diff --git a/macro/build.scala-2.12.sbt.lock b/macro/build.scala-2.12.sbt.lock index bb41b95a7..12efdf4df 100644 --- a/macro/build.scala-2.12.sbt.lock +++ b/macro/build.scala-2.12.sbt.lock @@ -1,6 +1,6 @@ { "lockVersion" : 1, - "timestamp" : "2024-08-20T11:20:33.759356Z", + "timestamp" : "2024-11-13T11:30:59.215321253Z", "configurations" : [ "compile", "optional", @@ -13,11 +13,11 @@ { "org" : "co.fs2", "name" : "fs2-core_2.12", - "version" : "3.10.2", + "version" : "3.11.0", "artifacts" : [ { "name" : "fs2-core_2.12.jar", - "hash" : "sha1:413c7d123f943eea68f6c77907318e58e0410ad4" + "hash" : "sha1:3651a652ea778dea12d9ec8083896f05ba489439" } ], "configurations" : [ @@ -29,11 +29,11 @@ { "org" : "co.fs2", "name" : "fs2-io_2.12", - "version" : "3.10.2", + "version" : "3.11.0", "artifacts" : [ { "name" : "fs2-io_2.12.jar", - "hash" : "sha1:89cefd0c05480d603e26d4b92a3d8284b8ec7b4a" + "hash" : "sha1:eb83f8a4387e935926d00d81999adbae49756a30" } ], "configurations" : [ @@ -75,11 +75,11 @@ { "org" : "com.cognite", "name" : "cognite-sdk-scala_2.12", - "version" : "2.28.844", + "version" : "2.31.861", "artifacts" : [ { "name" : "cognite-sdk-scala_2.12.jar", - "hash" : "sha1:a020427c9c31f27e6ec0028f0f3e12f4b8e6fc8f" + "hash" : "sha1:ae52606e613a31e1979830a6036f265328033908" } ], "configurations" : [ @@ -91,11 +91,11 @@ { "org" : "com.comcast", "name" : "ip4s-core_2.12", - "version" : "3.5.0", + "version" : "3.6.0", "artifacts" : [ { "name" : "ip4s-core_2.12.jar", - "hash" : "sha1:19d582e20c3cb27c1481b7e43baea0fa90aa7cd6" + "hash" : "sha1:d5b15a847dbd0ca35a281d140ef4d26b00d33b60" } ], "configurations" : [ @@ -513,11 +513,11 @@ { "org" : "commons-io", "name" : "commons-io", - "version" : "2.16.1", + "version" : "2.17.0", "artifacts" : [ { "name" : "commons-io.jar", - "hash" : "sha1:377d592e740dc77124e0901291dbfaa6810a200e" + "hash" : "sha1:ddcc8433eb019fb48fe25207c0278143f3e1d7e2" } ], "configurations" : [ @@ -2290,11 +2290,11 @@ { "org" : "org.typelevel", "name" : "cats-core_2.12", - "version" : "2.10.0", + "version" : "2.11.0", "artifacts" : [ { "name" : "cats-core_2.12.jar", - "hash" : "sha1:100bc0d103c5bd9ddcd4e2b5c24e88483e2a77d5" + "hash" : "sha1:44f89a95aed47e43f85896b1243b7dc6a7e1feae" } ], "configurations" : [ @@ -2354,11 +2354,11 @@ { "org" : "org.typelevel", "name" : "cats-kernel_2.12", - "version" : "2.10.0", + "version" : "2.11.0", "artifacts" : [ { "name" : "cats-kernel_2.12.jar", - "hash" : "sha1:94b372f873e763186c3cccc9e76492553d01350e" + "hash" : "sha1:dd67f32ab72c3aa2045432992cdf3f0a42484645" } ], "configurations" : [ diff --git a/macro/build.scala-2.13.sbt.lock b/macro/build.scala-2.13.sbt.lock index e08f44976..fd770fcf8 100644 --- a/macro/build.scala-2.13.sbt.lock +++ b/macro/build.scala-2.13.sbt.lock @@ -1,6 +1,6 @@ { "lockVersion" : 1, - "timestamp" : "2024-08-20T11:20:35.412705Z", + "timestamp" : "2024-11-13T11:31:01.766754723Z", "configurations" : [ "compile", "optional", @@ -13,11 +13,11 @@ { "org" : "co.fs2", "name" : "fs2-core_2.13", - "version" : "3.10.2", + "version" : "3.11.0", "artifacts" : [ { "name" : "fs2-core_2.13.jar", - "hash" : "sha1:ee6589d3ee01c487af8a9d60cd35274b7010ae95" + "hash" : "sha1:c826e98190d0f45cd1ab52fb4675f2890dd21ce9" } ], "configurations" : [ @@ -29,11 +29,11 @@ { "org" : "co.fs2", "name" : "fs2-io_2.13", - "version" : "3.10.2", + "version" : "3.11.0", "artifacts" : [ { "name" : "fs2-io_2.13.jar", - "hash" : "sha1:e1c3ac4217d017f26efb0cf2af104d62208090c9" + "hash" : "sha1:d0e7805a44b3a88ef75bc6eb22a7c237ed82887e" } ], "configurations" : [ @@ -75,11 +75,11 @@ { "org" : "com.cognite", "name" : "cognite-sdk-scala_2.13", - "version" : "2.28.844", + "version" : "2.31.861", "artifacts" : [ { "name" : "cognite-sdk-scala_2.13.jar", - "hash" : "sha1:4e7bc7e592ee1e7d822e7f8e0565d3c7496ed21d" + "hash" : "sha1:245bd96891942232ba8d5a8e2c7ff1714cf2c230" } ], "configurations" : [ @@ -91,11 +91,11 @@ { "org" : "com.comcast", "name" : "ip4s-core_2.13", - "version" : "3.5.0", + "version" : "3.6.0", "artifacts" : [ { "name" : "ip4s-core_2.13.jar", - "hash" : "sha1:a35384c0eea504e57656f0a859487624e1ff8c63" + "hash" : "sha1:4b2e8c831697c325853ea8eeea1f28029805714a" } ], "configurations" : [ @@ -513,11 +513,11 @@ { "org" : "commons-io", "name" : "commons-io", - "version" : "2.16.1", + "version" : "2.17.0", "artifacts" : [ { "name" : "commons-io.jar", - "hash" : "sha1:377d592e740dc77124e0901291dbfaa6810a200e" + "hash" : "sha1:ddcc8433eb019fb48fe25207c0278143f3e1d7e2" } ], "configurations" : [ @@ -2304,11 +2304,11 @@ { "org" : "org.typelevel", "name" : "cats-core_2.13", - "version" : "2.10.0", + "version" : "2.11.0", "artifacts" : [ { "name" : "cats-core_2.13.jar", - "hash" : "sha1:f333233487bff425196019aed0443acdfbc80dae" + "hash" : "sha1:bf17f0927b59f990241657fd9ed7e37a40395f53" } ], "configurations" : [ @@ -2368,11 +2368,11 @@ { "org" : "org.typelevel", "name" : "cats-kernel_2.13", - "version" : "2.10.0", + "version" : "2.11.0", "artifacts" : [ { "name" : "cats-kernel_2.13.jar", - "hash" : "sha1:6e9fbc99fa5a88a00ac55afc2540247f7f4ce04d" + "hash" : "sha1:9f46f77ebad492c1abeb6ca27df78ba1fa05eb65" } ], "configurations" : [ diff --git a/project/plugins.sbt b/project/plugins.sbt index e4b330b86..af6bbb406 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,5 +1,5 @@ addSbtPlugin("org.lyranthe.sbt" % "partial-unification" % "1.1.2") -addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.1.0") +addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.2.2") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2") addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.9.21") addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.2-1") diff --git a/src/main/scala/cognite/spark/v1/AssetHierarchyBuilder.scala b/src/main/scala/cognite/spark/v1/AssetHierarchyBuilder.scala index aa8e96ee6..bafee6818 100644 --- a/src/main/scala/cognite/spark/v1/AssetHierarchyBuilder.scala +++ b/src/main/scala/cognite/spark/v1/AssetHierarchyBuilder.scala @@ -2,8 +2,8 @@ package cognite.spark.v1 import cats.effect.IO import cats.implicits._ -import cognite.spark.v1.PushdownUtilities.stringSeqToCogniteExternalIdSeq import cognite.spark.compiletime.macros.SparkSchemaHelper.{fromRow, structType} +import cognite.spark.v1.PushdownUtilities.stringSeqToCogniteExternalIdSeq import com.cognite.sdk.scala.common.{CdpApiException, SetValue} import com.cognite.sdk.scala.v1.{ Asset, @@ -72,7 +72,7 @@ final case class InvalidRootChangeException(assetIds: Seq[String], subtreeId: St ) class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext) - extends CdfRelation(config, "assethierarchy") { + extends CdfRelation(config, AssetHierarchyBuilder.name) { import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ @@ -418,8 +418,10 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext) } } -object AssetHierarchyBuilder { +object AssetHierarchyBuilder extends NamedRelation with UpsertSchema with DeleteWithIdSchema { + override val name = "assethierarchy" + import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ - val upsertSchema: StructType = structType[AssetsIngestSchema]() + override val upsertSchema: StructType = structType[AssetsIngestSchema]() } diff --git a/src/main/scala/cognite/spark/v1/AssetsRelation.scala b/src/main/scala/cognite/spark/v1/AssetsRelation.scala index ad4e18605..701556d95 100644 --- a/src/main/scala/cognite/spark/v1/AssetsRelation.scala +++ b/src/main/scala/cognite/spark/v1/AssetsRelation.scala @@ -3,6 +3,7 @@ package cognite.spark.v1 import cats.effect.IO import cognite.spark.v1.PushdownUtilities._ import cognite.spark.compiletime.macros.SparkSchemaHelper._ +import cognite.spark.v1.AssetsRelation.name import com.cognite.sdk.scala.common._ import com.cognite.sdk.scala.v1._ import com.cognite.sdk.scala.v1.resources.Assets @@ -18,7 +19,7 @@ import scala.annotation.unused class AssetsRelation(config: RelationConfig, subtreeIds: Option[List[CogniteId]] = None)( val sqlContext: SQLContext) - extends SdkV1InsertableRelation[AssetsReadSchema, Long](config, "assets") + extends SdkV1InsertableRelation[AssetsReadSchema, Long](config, name) with WritableRelation { import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ @@ -115,12 +116,21 @@ class AssetsRelation(config: RelationConfig, subtreeIds: Option[List[CogniteId]] override def uniqueId(a: AssetsReadSchema): Long = a.id } -object AssetsRelation extends UpsertSchema { +object AssetsRelation + extends UpsertSchema + with ReadSchema + with NamedRelation + with InsertSchema + with DeleteWithIdSchema + with UpdateSchema { + override val name = "assets" import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ - val upsertSchema: StructType = structType[AssetsUpsertSchema]() - val insertSchema: StructType = structType[AssetsInsertSchema]() - val readSchema: StructType = structType[AssetsReadSchema]() + override val upsertSchema: StructType = structType[AssetsUpsertSchema]() + override val insertSchema: StructType = structType[AssetsInsertSchema]() + override val readSchema: StructType = structType[AssetsReadSchema]() + override val updateSchema: StructType = upsertSchema + } final case class AssetsUpsertSchema( @@ -136,7 +146,7 @@ final case class AssetsUpsertSchema( parentExternalId: Option[String] = None, dataSetId: OptionalField[Long] = FieldNotSpecified, labels: Option[Seq[String]] = None -) extends WithNullableExtenalId +) extends WithNullableExternalId with WithId[Option[Long]] object AssetsUpsertSchema { diff --git a/src/main/scala/cognite/spark/v1/CdfRelation.scala b/src/main/scala/cognite/spark/v1/CdfRelation.scala index 0858f39ee..12311b693 100644 --- a/src/main/scala/cognite/spark/v1/CdfRelation.scala +++ b/src/main/scala/cognite/spark/v1/CdfRelation.scala @@ -9,16 +9,21 @@ import org.apache.spark.TaskContext import org.apache.spark.datasource.MetricsSource import org.apache.spark.sql.sources.BaseRelation +trait NamedRelation { + val name: String +} + abstract class CdfRelation(config: RelationConfig, shortNameStr: String) extends BaseRelation - with Serializable { - protected val shortName: String = shortNameStr + with Serializable + with NamedRelation { + override val name: String = shortNameStr private def getOrCreateCounter(action: String): Counter = MetricsSource.getOrCreateAttemptTrackingCounter( config.metricsTrackAttempts, config.metricsPrefix, - s"$shortName.$action", + s"$name.$action", Option(TaskContext.get())) @transient lazy protected val itemsRead: Counter = getOrCreateCounter("read") diff --git a/src/main/scala/cognite/spark/v1/DataSetsRelation.scala b/src/main/scala/cognite/spark/v1/DataSetsRelation.scala index 215e38caa..43329b06a 100644 --- a/src/main/scala/cognite/spark/v1/DataSetsRelation.scala +++ b/src/main/scala/cognite/spark/v1/DataSetsRelation.scala @@ -14,7 +14,7 @@ import org.apache.spark.sql.{Row, SQLContext} import java.time.Instant class DataSetsRelation(config: RelationConfig)(val sqlContext: SQLContext) - extends SdkV1Relation[DataSet, String](config, "datasets") + extends SdkV1Relation[DataSet, String](config, DataSetsRelation.name) with WritableRelation { import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ @@ -69,12 +69,20 @@ class DataSetsRelation(config: RelationConfig)(val sqlContext: SQLContext) override def delete(rows: Seq[Row]): IO[Unit] = throw new CdfSparkException("Delete is not supported for data sets.") } -object DataSetsRelation extends UpsertSchema { +object DataSetsRelation + extends UpsertSchema + with ReadSchema + with InsertSchema + with UpdateSchema + with NamedRelation { + override val name = "datasets" import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ - val upsertSchema: StructType = structType[DataSetsUpsertSchema]() - val insertSchema: StructType = structType[DataSetsInsertSchema]() - val readSchema: StructType = structType[DataSetsReadSchema]() + override val upsertSchema: StructType = structType[DataSetsUpsertSchema]() + override val insertSchema: StructType = structType[DataSetsInsertSchema]() + override val readSchema: StructType = structType[DataSetsReadSchema]() + override val updateSchema: StructType = upsertSchema + } case class DataSetsUpsertSchema( @@ -84,7 +92,7 @@ case class DataSetsUpsertSchema( description: OptionalField[String] = FieldNotSpecified, metadata: Option[Map[String, String]] = None, writeProtected: Option[Boolean] = None) - extends WithNullableExtenalId + extends WithNullableExternalId with WithId[Option[Long]] object DataSetsUpsertSchema { implicit val toCreate: Transformer[DataSetsUpsertSchema, DataSetCreate] = diff --git a/src/main/scala/cognite/spark/v1/DefaultSource.scala b/src/main/scala/cognite/spark/v1/DefaultSource.scala index f8ec21da7..f95830397 100644 --- a/src/main/scala/cognite/spark/v1/DefaultSource.scala +++ b/src/main/scala/cognite/spark/v1/DefaultSource.scala @@ -105,13 +105,13 @@ class DefaultSource val config = parseRelationConfig(parameters, sqlContext) resourceType match { - case "datapoints" => + case NumericDataPointsRelation.name => new NumericDataPointsRelationV1(config)(sqlContext) - case "stringdatapoints" => + case StringDataPointsRelation.name => new StringDataPointsRelationV1(config)(sqlContext) - case "timeseries" => + case TimeSeriesRelation.name => new TimeSeriesRelation(config)(sqlContext) - case "raw" => + case RawTableRelation.name => val database = parameters.getOrElse("database", sys.error("Database must be specified")) val tableName = parameters.getOrElse("table", sys.error("Table must be specified")) @@ -132,42 +132,47 @@ class DefaultSource inferSchema, inferSchemaLimit, collectSchemaInferenceMetrics)(sqlContext) - case "sequencerows" => + case SequenceRowsRelation.name => createSequenceRows(parameters, config, sqlContext) - case "assets" => + case AssetsRelation.name => val subtreeIds = parameters.get("assetSubtreeIds").map(parseCogniteIds) new AssetsRelation(config, subtreeIds)(sqlContext) - case "events" => + case EventsRelation.name => new EventsRelation(config)(sqlContext) - case "files" => + case FilesRelation.name => new FilesRelation(config)(sqlContext) - case "3dmodels" => + case FileContentRelation.name => + val inferSchema = toBoolean(parameters, "inferSchema") + val fileExternalId = + parameters.getOrElse("externalId", sys.error("File's external id must be specified")) + new FileContentRelation(config, fileExternalId, inferSchema)(sqlContext) + case ThreeDModelsRelation.name => new ThreeDModelsRelation(config)(sqlContext) - case "3dmodelrevisions" => + case ThreeDModelRevisionsRelation.name => val modelId = parameters.getOrElse("modelId", sys.error("Model id must be specified")).toLong new ThreeDModelRevisionsRelation(config, modelId)(sqlContext) - case "3dmodelrevisionmappings" => + case ThreeDModelRevisionMappingsRelation.name => val modelId = parameters.getOrElse("modelId", sys.error("Model id must be specified")).toLong val revisionId = parameters.getOrElse("revisionId", sys.error("Revision id must be specified")).toLong new ThreeDModelRevisionMappingsRelation(config, modelId, revisionId)(sqlContext) - case "3dmodelrevisionnodes" => + case ThreeDModelRevisionNodesRelation.name => val modelId = parameters.getOrElse("modelId", sys.error("Model id must be specified")).toLong val revisionId = parameters.getOrElse("revisionId", sys.error("Revision id must be specified")).toLong new ThreeDModelRevisionNodesRelation(config, modelId, revisionId)(sqlContext) - case "sequences" => + case SequenceRelation.name => new SequencesRelation(config)(sqlContext) - case "labels" => + case LabelsRelation.name => new LabelsRelation(config)(sqlContext) - case "relationships" => + case RelationshipsRelation.name => new RelationshipsRelation(config)(sqlContext) - case "datasets" => + case DataSetsRelation.name => new DataSetsRelation(config)(sqlContext) - case FlexibleDataModelRelationFactory.ResourceType => + case FlexibleDataModelBaseRelation.name => createFlexibleDataModelRelation(parameters, config, sqlContext) case _ => sys.error("Unknown resource type: " + resourceType) } @@ -183,104 +188,108 @@ class DefaultSource data: DataFrame): BaseRelation = { val config = parseRelationConfig(parameters, sqlContext) val resourceType = parameters.getOrElse("type", sys.error("Resource type must be specified")) - if (resourceType == "assethierarchy") { - val relation = new AssetHierarchyBuilder(config)(sqlContext) - config.onConflict match { - case OnConflictOption.Delete => - relation.delete(data) - case _ => - relation.buildFromDf(data) - } - relation - } else if (resourceType == "datapoints" || resourceType == "stringdatapoints") { - val relation = resourceType match { - case "datapoints" => - new NumericDataPointsRelationV1(config)(sqlContext) - case "stringdatapoints" => - new StringDataPointsRelationV1(config)(sqlContext) - } - if (config.onConflict == OnConflictOption.Delete) { - // Datapoints support 100_000 per request when inserting, but only 10_000 when deleting - val batchSize = config.batchSize.getOrElse(Constants.DefaultDataPointsLimit) - data.foreachPartition((rows: Iterator[Row]) => { - import CdpConnector.ioRuntime - val batches = rows.grouped(batchSize).toVector - batches.parTraverse_(relation.delete).unsafeRunSync() - }) - } else { - // datapoints need special handling of dataframes and batches - relation.insert(data, overwrite = true) - } - relation - } else { - val relation = resourceType match { - case "events" => - new EventsRelation(config)(sqlContext) - case "timeseries" => - new TimeSeriesRelation(config)(sqlContext) - case "assets" => - new AssetsRelation(config)(sqlContext) - case "files" => - new FilesRelation(config)(sqlContext) - case "sequences" => - new SequencesRelation(config)(sqlContext) - case "labels" => - new LabelsRelation(config)(sqlContext) - case "sequencerows" => - createSequenceRows(parameters, config, sqlContext) - case "relationships" => - new RelationshipsRelation(config)(sqlContext) - case "datasets" => - new DataSetsRelation(config)(sqlContext) - case FlexibleDataModelRelationFactory.ResourceType => - createFlexibleDataModelRelation(parameters, config, sqlContext) - case _ => sys.error(s"Resource type $resourceType does not support save()") - } - val batchSizeDefault = relation match { - case _: SequenceRowsRelation => Constants.DefaultSequenceRowsBatchSize - case _ => Constants.DefaultBatchSize - } - val batchSize = config.batchSize.getOrElse(batchSizeDefault) - val originalNumberOfPartitions = data.rdd.getNumPartitions - val idealNumberOfPartitions = config.sparkPartitions - - // If we have very many partitions, it's quite likely that they are significantly uneven. - // And we will have to limit parallelism on each partition to low number, so the operation could - // take unnecessarily long time. Rather than risking this, we'll just repartition data in such case. - // If the number of partitions is reasonable, we avoid the data shuffling - val dataRepartitioned = - if (originalNumberOfPartitions > 50 && originalNumberOfPartitions > idealNumberOfPartitions) { - data.repartition(idealNumberOfPartitions) - } else { - data - } - dataRepartitioned.foreachPartition((rows: Iterator[Row]) => { - import CdpConnector.ioRuntime - - val maxParallelism = config.parallelismPerPartition - val batches = Stream.fromIterator[IO](rows, chunkSize = batchSize).chunks + val relation: CdfRelation = resourceType match { + case AssetHierarchyBuilder.name => + new AssetHierarchyBuilder(config)(sqlContext) + case NumericDataPointsRelation.name => + new NumericDataPointsRelationV1(config)(sqlContext) + case StringDataPointsRelation.name => + new StringDataPointsRelationV1(config)(sqlContext) + case EventsRelation.name => + new EventsRelation(config)(sqlContext) + case TimeSeriesRelation.name => + new TimeSeriesRelation(config)(sqlContext) + case AssetsRelation.name => + new AssetsRelation(config)(sqlContext) + case FilesRelation.name => + new FilesRelation(config)(sqlContext) + case SequenceRelation.name => + new SequencesRelation(config)(sqlContext) + case LabelsRelation.name => + new LabelsRelation(config)(sqlContext) + case SequenceRowsRelation.name => + createSequenceRows(parameters, config, sqlContext) + case RelationshipsRelation.name => + new RelationshipsRelation(config)(sqlContext) + case DataSetsRelation.name => + new DataSetsRelation(config)(sqlContext) + case FlexibleDataModelBaseRelation.name => + createFlexibleDataModelRelation(parameters, config, sqlContext) + case _ => sys.error(s"Resource type $resourceType does not support save()") + } - val operation: Seq[Row] => IO[Unit] = config.onConflict match { - case OnConflictOption.Abort => - relation.insert - case OnConflictOption.Upsert => - relation.upsert - case OnConflictOption.Update => - relation.update + relation match { + case relation: DataPointsRelationV1[_] => + if (config.onConflict == OnConflictOption.Delete) { + // Datapoints support 100_000 per request when inserting, but only 10_000 when deleting + val batchSize = config.batchSize.getOrElse(Constants.DefaultDataPointsLimit) + data.foreachPartition((rows: Iterator[Row]) => { + import CdpConnector.ioRuntime + val batches = rows.grouped(batchSize).toVector + batches.parTraverse_(relation.delete).unsafeRunSync() + }) + } else { + // datapoints need special handling of dataframes and batches + relation.insert(data, overwrite = true) + } + relation + case relation: AssetHierarchyBuilder => + config.onConflict match { case OnConflictOption.Delete => - relation.delete + relation.delete(data) + case _ => + relation.buildFromDf(data) + } + relation + case relation: CdfRelation with WritableRelation => + val batchSizeDefault = relation match { + case _: SequenceRowsRelation => Constants.DefaultSequenceRowsBatchSize + case _ => Constants.DefaultBatchSize } + val batchSize = config.batchSize.getOrElse(batchSizeDefault) + val originalNumberOfPartitions = data.rdd.getNumPartitions + val idealNumberOfPartitions = config.sparkPartitions + + // If we have very many partitions, it's quite likely that they are significantly uneven. + // And we will have to limit parallelism on each partition to low number, so the operation could + // take unnecessarily long time. Rather than risking this, we'll just repartition data in such case. + // If the number of partitions is reasonable, we avoid the data shuffling + val dataRepartitioned = + if (originalNumberOfPartitions > 50 && originalNumberOfPartitions > idealNumberOfPartitions) { + data.repartition(idealNumberOfPartitions) + } else { + data + } - batches - .parEvalMapUnordered(maxParallelism) { chunk => - operation(chunk.toVector) + dataRepartitioned.foreachPartition((rows: Iterator[Row]) => { + import CdpConnector.ioRuntime + + val maxParallelism = config.parallelismPerPartition + val batches = Stream.fromIterator[IO](rows, chunkSize = batchSize).chunks + + val operation: Seq[Row] => IO[Unit] = config.onConflict match { + case OnConflictOption.Abort => + relation.insert + case OnConflictOption.Upsert => + relation.upsert + case OnConflictOption.Update => + relation.update + case OnConflictOption.Delete => + relation.delete } - .compile - .drain - .unsafeRunSync() - }) - relation + + batches + .parEvalMapUnordered(maxParallelism) { chunk => + operation(chunk.toVector) + } + .compile + .drain + .unsafeRunSync() + }) + relation + case _ => + sys.error(s"Resource type $resourceType does not support save()") } } } @@ -472,7 +481,9 @@ object DefaultSource { rawEnsureParent = toBoolean(parameters, "rawEnsureParent", defaultValue = true), enableSinglePartitionDeleteAssetHierarchy = enableSinglePartitionDeleteAssetHierarchy, tracingParent = extractTracingHeadersKernel(parameters), - useSharedThrottle = toBoolean(parameters, "useSharedThrottle", defaultValue = false) + useSharedThrottle = toBoolean(parameters, "useSharedThrottle", defaultValue = false), + serverSideFilterNullValuesOnNonSchemaRawQueries = + toBoolean(parameters, "filterNullFieldsOnNonSchemaRawQueries", defaultValue = false) ) } diff --git a/src/main/scala/cognite/spark/v1/EventsRelation.scala b/src/main/scala/cognite/spark/v1/EventsRelation.scala index 10a34043e..abd8b354c 100644 --- a/src/main/scala/cognite/spark/v1/EventsRelation.scala +++ b/src/main/scala/cognite/spark/v1/EventsRelation.scala @@ -1,8 +1,8 @@ package cognite.spark.v1 import cats.effect.IO -import cognite.spark.v1.PushdownUtilities._ import cognite.spark.compiletime.macros.SparkSchemaHelper.{asRow, fromRow, structType} +import cognite.spark.v1.PushdownUtilities._ import com.cognite.sdk.scala.common.{WithExternalIdGeneric, WithId} import com.cognite.sdk.scala.v1._ import com.cognite.sdk.scala.v1.resources.Events @@ -15,9 +15,8 @@ import java.time.Instant import scala.annotation.unused class EventsRelation(config: RelationConfig)(val sqlContext: SQLContext) - extends SdkV1InsertableRelation[Event, Long](config, "events") + extends SdkV1InsertableRelation[Event, Long](config, EventsRelation.name) with WritableRelation { - import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ override def getStreams(sparkFilters: Array[Filter])( client: GenericClient[IO]): Seq[Stream[IO, Event]] = { val (ids, filters) = @@ -81,6 +80,7 @@ class EventsRelation(config: RelationConfig)(val sqlContext: SQLContext) client.events, doUpsert = true) } + import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ override def schema: StructType = structType[Event]() @@ -88,15 +88,23 @@ class EventsRelation(config: RelationConfig)(val sqlContext: SQLContext) override def uniqueId(a: Event): Long = a.id } -object EventsRelation extends UpsertSchema { +object EventsRelation + extends UpsertSchema + with ReadSchema + with NamedRelation + with InsertSchema + with DeleteWithIdSchema + with UpdateSchema { + override val name: String = "events" import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ - val upsertSchema: StructType = structType[EventsUpsertSchema]() - val insertSchema: StructType = structType[EventsInsertSchema]() - val readSchema: StructType = structType[EventsReadSchema]() + override val upsertSchema: StructType = structType[EventsUpsertSchema]() + override val insertSchema: StructType = structType[EventsInsertSchema]() + override val readSchema: StructType = structType[EventsReadSchema]() + override val updateSchema: StructType = upsertSchema } -trait WithNullableExtenalId extends WithExternalIdGeneric[OptionalField] { +trait WithNullableExternalId extends WithExternalIdGeneric[OptionalField] { val externalId: OptionalField[String] override def getExternalId: Option[String] = externalId.toOption } @@ -113,7 +121,7 @@ final case class EventsUpsertSchema( source: OptionalField[String] = FieldNotSpecified, externalId: OptionalField[String] = FieldNotSpecified, dataSetId: OptionalField[Long] = FieldNotSpecified -) extends WithNullableExtenalId +) extends WithNullableExternalId with WithId[Option[Long]] final case class EventsInsertSchema( diff --git a/src/main/scala/cognite/spark/v1/FileContentRelation.scala b/src/main/scala/cognite/spark/v1/FileContentRelation.scala new file mode 100644 index 000000000..2c297fcdd --- /dev/null +++ b/src/main/scala/cognite/spark/v1/FileContentRelation.scala @@ -0,0 +1,144 @@ +package cognite.spark.v1 + +import cats.effect.std.Dispatcher +import cats.effect.{IO, Resource} +import com.cognite.sdk.scala.v1.FileDownloadExternalId +import fs2.{Pipe, Stream} +import org.apache.commons.io.FileUtils +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.sources.{Filter, PrunedFilteredScan, TableScan} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.{Partition, TaskContext} +import sttp.capabilities.WebSockets +import sttp.capabilities.fs2.Fs2Streams +import sttp.client3.asynchttpclient.SttpClientBackendFactory +import sttp.client3.asynchttpclient.fs2.AsyncHttpClientFs2Backend +import sttp.client3.{SttpBackend, UriContext, asStreamUnsafe, basicRequest} +import sttp.model.Uri + +import scala.collection.immutable._ +import scala.concurrent.duration.Duration + +//The trait exist for testing purposes +trait WithSizeLimit { + val sizeLimit: Long +} + +class FileContentRelation(config: RelationConfig, fileExternalId: String, inferSchema: Boolean)( + override val sqlContext: SQLContext) + extends CdfRelation(config, FileContentRelation.name) + with TableScan + with PrunedFilteredScan + with Serializable + with WithSizeLimit { + + override val sizeLimit: Long = 5 * FileUtils.ONE_GB + + @transient private lazy val sttpFileContentStreamingBackendResource + : Resource[IO, SttpBackend[IO, Fs2Streams[IO] with WebSockets]] = + for { + dispatcher <- Dispatcher.parallel[IO] + backend <- Resource.make( + IO( + AsyncHttpClientFs2Backend + .usingClient[IO]( + SttpClientBackendFactory.create("file content download", Some(3600000)), + dispatcher)) + )(backend => backend.close()) + } yield backend + + @transient private lazy val dataFrame: DataFrame = createDataFrame + + override def schema: StructType = + dataFrame.schema + + @transient lazy val createDataFrame: DataFrame = { + val rdd: RDD[String] = new RDD[String](sqlContext.sparkContext, Nil) with Serializable { + + import cognite.spark.v1.CdpConnector.ioRuntime + + val maxParallelism = 1 + override def compute(split: Partition, context: TaskContext): Iterator[String] = { + + val validUrl = for { + file <- client.files.retrieveByExternalId(fileExternalId) + _ <- IO.raiseWhen(!file.uploaded)( + new CdfSparkException( + f"Could not read file because no file was uploaded for externalId: $fileExternalId") + ) + downloadLink <- client.files + .downloadLink(FileDownloadExternalId(fileExternalId)) + uri <- IO.pure(uri"${downloadLink.downloadUrl}") + + _ <- IO.raiseWhen(!uri.scheme.contains("https"))( + new CdfSparkException("Invalid download uri, it should be a valid url using https") + ) + + } yield uri + + StreamIterator( + Stream + .eval(validUrl) + .flatMap(readUrlContentLines), + maxParallelism * 2, + None + ) + + } + + override protected def getPartitions: Array[Partition] = + Array(CdfPartition(0)) + } + import sqlContext.sparkSession.implicits._ + val dataset = rdd.cache().toDS() + + if (inferSchema) { + sqlContext.sparkSession.read.json(dataset) + } else { + dataset.toDF() + } + } + + private def readUrlContentLines(link: Uri): Stream[IO, String] = + Stream.resource(sttpFileContentStreamingBackendResource).flatMap { backend => + val request = basicRequest + .get(link) + .response(asStreamUnsafe(Fs2Streams[IO])) + .readTimeout(Duration.Inf) + + Stream.eval(backend.send(request)).flatMap { response => + response.body match { + case Right(byteStream) => + byteStream + .through(enforceSizeLimit) + .through(fs2.text.utf8.decode) + .through(fs2.text.lines) + case Left(error) => + Stream.raiseError[IO](new Exception(s"Error while requesting underlying file: $error")) + } + } + } + + override def buildScan(): RDD[Row] = + dataFrame.rdd + + override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = + dataFrame.select(requiredColumns.map(col).toIndexedSeq: _*).rdd + + private val enforceSizeLimit: Pipe[IO, Byte, Byte] = + in => + in.scanChunks(0L) { (acc, chunk) => + val newSize = acc + chunk.size + if (newSize > sizeLimit) + throw new CdfSparkException(s"File size too big. SizeLimit: $sizeLimit") + else + (newSize, chunk) + } + +} + +object FileContentRelation extends NamedRelation { + override val name: String = "filecontent" +} diff --git a/src/main/scala/cognite/spark/v1/FilesRelation.scala b/src/main/scala/cognite/spark/v1/FilesRelation.scala index c8d27bfbe..e6a91f3bd 100644 --- a/src/main/scala/cognite/spark/v1/FilesRelation.scala +++ b/src/main/scala/cognite/spark/v1/FilesRelation.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.{Row, SQLContext} import java.time.Instant class FilesRelation(config: RelationConfig)(val sqlContext: SQLContext) - extends SdkV1InsertableRelation[FilesReadSchema, Long](config, "files") + extends SdkV1InsertableRelation[FilesReadSchema, Long](config, FilesRelation.name) with WritableRelation { import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ override def getFromRowsAndCreate(rows: Seq[Row], doUpsert: Boolean = true): IO[Unit] = { @@ -110,12 +110,20 @@ class FilesRelation(config: RelationConfig)(val sqlContext: SQLContext) override def uniqueId(a: FilesReadSchema): Long = a.id } -object FilesRelation extends UpsertSchema { +object FilesRelation + extends UpsertSchema + with ReadSchema + with InsertSchema + with DeleteWithIdSchema + with UpdateSchema + with NamedRelation { + override val name: String = "files" import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ - val upsertSchema: StructType = structType[FilesUpsertSchema]() - val insertSchema: StructType = structType[FilesInsertSchema]() - val readSchema: StructType = structType[FilesReadSchema]() + override val upsertSchema: StructType = structType[FilesUpsertSchema]() + override val insertSchema: StructType = structType[FilesInsertSchema]() + override val readSchema: StructType = structType[FilesReadSchema]() + override val updateSchema: StructType = upsertSchema } final case class FilesUpsertSchema( @@ -132,7 +140,7 @@ final case class FilesUpsertSchema( sourceModifiedTime: OptionalField[Instant] = FieldNotSpecified, securityCategories: Option[Seq[Long]] = None, labels: Option[Seq[String]] = None -) extends WithNullableExtenalId +) extends WithNullableExternalId with WithId[Option[Long]] object FilesUpsertSchema { @@ -145,7 +153,6 @@ object FilesUpsertSchema { throw new CdfSparkIllegalArgumentException("The name field must be set when creating files."))) .withFieldComputed(_.labels, u => stringSeqToCogniteExternalIdSeq(u.labels)) .buildTransformer - } final case class FilesInsertSchema( diff --git a/src/main/scala/cognite/spark/v1/LabelsRelation.scala b/src/main/scala/cognite/spark/v1/LabelsRelation.scala index bc0fd5d2b..8c54529e0 100644 --- a/src/main/scala/cognite/spark/v1/LabelsRelation.scala +++ b/src/main/scala/cognite/spark/v1/LabelsRelation.scala @@ -10,7 +10,7 @@ import org.apache.spark.sql.{Row, SQLContext} import java.time.Instant class LabelsRelation(config: RelationConfig)(val sqlContext: SQLContext) - extends SdkV1Relation[Label, String](config, "labels") + extends SdkV1Relation[Label, String](config, LabelsRelation.name) with WritableRelation { import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ override def schema: StructType = structType[Label]() @@ -31,7 +31,7 @@ class LabelsRelation(config: RelationConfig)(val sqlContext: SQLContext) } override def delete(rows: Seq[Row]): IO[Unit] = { - val labelIds = rows.map(fromRow[LabelDeleteSchema](_)).map(_.externalId) + val labelIds = rows.map(fromRow[DeleteByExternalId](_)).map(_.externalId) client.labels .deleteByExternalIds(labelIds) .flatTap(_ => incMetrics(itemsDeleted, labelIds.length)) @@ -44,18 +44,18 @@ class LabelsRelation(config: RelationConfig)(val sqlContext: SQLContext) throw new CdfSparkException("Update is not supported for labels.") } -object LabelsRelation { +object LabelsRelation + extends ReadSchema + with DeleteWithExternalIdSchema + with InsertSchema + with NamedRelation { + override val name: String = "labels" import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ - val insertSchema: StructType = structType[LabelInsertSchema]() - val readSchema: StructType = structType[LabelReadSchema]() - val deleteSchema: StructType = structType[LabelDeleteSchema]() + override val insertSchema: StructType = structType[LabelInsertSchema]() + override val readSchema: StructType = structType[LabelReadSchema]() } -final case class LabelDeleteSchema( - externalId: String -) - final case class LabelInsertSchema( externalId: String, name: String, diff --git a/src/main/scala/cognite/spark/v1/NumericDataPointsRelation.scala b/src/main/scala/cognite/spark/v1/NumericDataPointsRelation.scala index 97617febf..a02af058e 100644 --- a/src/main/scala/cognite/spark/v1/NumericDataPointsRelation.scala +++ b/src/main/scala/cognite/spark/v1/NumericDataPointsRelation.scala @@ -120,7 +120,7 @@ object Granularity { } class NumericDataPointsRelationV1(config: RelationConfig)(sqlContext: SQLContext) - extends DataPointsRelationV1[DataPointsItem](config, "datapoints")(sqlContext) + extends DataPointsRelationV1[DataPointsItem](config, NumericDataPointsRelation.name)(sqlContext) with WritableRelation { import PushdownUtilities.filtersToTimestampLimits override def insert(rows: Seq[Row]): IO[Unit] = @@ -222,11 +222,18 @@ class NumericDataPointsRelationV1(config: RelationConfig)(sqlContext: SQLContext } } -object NumericDataPointsRelation extends UpsertSchema { +object NumericDataPointsRelation + extends UpsertSchema + with ReadSchema + with DeleteSchema + with InsertSchema + with NamedRelation { import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ + override val name = "datapoints" + + override val upsertSchema: StructType = structType[InsertDataPointsItem]() + override val readSchema: StructType = structType[DataPointsItem]() + override val insertSchema: StructType = structType[InsertDataPointsItem]() + override val deleteSchema: StructType = structType[DeleteDataPointsItem]() - val upsertSchema: StructType = structType[InsertDataPointsItem]() - val readSchema: StructType = structType[DataPointsItem]() - val insertSchema: StructType = structType[InsertDataPointsItem]() - val deleteSchema: StructType = structType[DeleteDataPointsItem]() } diff --git a/src/main/scala/cognite/spark/v1/PushdownUtilities.scala b/src/main/scala/cognite/spark/v1/PushdownUtilities.scala index 1b1d4332a..13e6e5ff9 100644 --- a/src/main/scala/cognite/spark/v1/PushdownUtilities.scala +++ b/src/main/scala/cognite/spark/v1/PushdownUtilities.scala @@ -11,7 +11,6 @@ import com.cognite.sdk.scala.common.{ import com.cognite.sdk.scala.v1.{CogniteExternalId, CogniteId, CogniteInternalId, ContainsAny, TimeRange} import fs2.Stream import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import java.time.Instant import scala.util.Try @@ -378,19 +377,3 @@ object PushdownUtilities { partitionStreams } } - -trait InsertSchema { - val insertSchema: StructType -} - -trait UpsertSchema { - val upsertSchema: StructType -} - -trait UpdateSchema { - val updateSchema: StructType -} - -abstract class DeleteSchema { - val deleteSchema: StructType = StructType(Seq(StructField("id", DataTypes.LongType))) -} diff --git a/src/main/scala/cognite/spark/v1/RawTableRelation.scala b/src/main/scala/cognite/spark/v1/RawTableRelation.scala index cc5032dcc..127153b06 100644 --- a/src/main/scala/cognite/spark/v1/RawTableRelation.scala +++ b/src/main/scala/cognite/spark/v1/RawTableRelation.scala @@ -12,7 +12,7 @@ import org.apache.spark.datasource.MetricsSource import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.{DataTypes, StructField, StructType} +import org.apache.spark.sql.types.{DataTypes, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SQLContext} import java.time.Instant @@ -66,6 +66,7 @@ class RawTableRelation( filter = RawRowFilter(), requestedKeys = None, schema = None, + filterNulls = false, collectMetrics = collectSchemaInferenceMetrics, collectTestMetrics = false ) @@ -83,15 +84,17 @@ class RawTableRelation( } } - private def getStreams(filter: RawRowFilter, cursors: Vector[String])( + private def getStreams(filter: RawRowFilter, filterNullFields: Boolean, cursors: Vector[String])( limit: Option[Int], numPartitions: Int)(client: GenericClient[IO]): Seq[Stream[IO, RawRow]] = { assert(numPartitions == cursors.length) - val rawClient = client.rawRows(database, table) + val rawClient = client.rawRows(database, table, filterNullFields) cursors.map(rawClient.filterOnePartition(filter, _, limit)) } private def getStreamByKeys(client: GenericClient[IO], keys: Set[String]): Stream[IO, RawRow] = { + // Note that retrieveByKey does not currently support filtering out null fields. When/If that is + // added, we should also pass in the flag to filter out those here. val rawClient = client.rawRows(database, table) Stream .emits(keys.toSeq) @@ -127,6 +130,7 @@ class RawTableRelation( filter: RawRowFilter, requestedKeys: Option[Set[String]], schema: Option[StructType], + filterNulls: Boolean, collectMetrics: Boolean = config.collectMetrics, collectTestMetrics: Boolean = config.collectTestMetrics): RDD[Row] = { val configWithLimit = @@ -142,11 +146,11 @@ class RawTableRelation( val partitionCursors = CdpConnector .clientFromConfig(config) - .rawRows(database, table) + .rawRows(database, table, filterNulls) .getPartitionCursors(filter, configWithLimit.partitions) .unsafeRunSync() .toVector - getStreams(filter, partitionCursors)( + getStreams(filter, filterNulls, partitionCursors)( configWithLimit.limitPerPartition, configWithLimit.partitions) } @@ -203,7 +207,13 @@ class RawTableRelation( } val rdd = - readRows(config.limitPerPartition, None, rawRowFilter, requestedKeys, jsonSchema) + readRows( + config.limitPerPartition, + None, + rawRowFilter, + requestedKeys, + jsonSchema, + config.serverSideFilterNullValuesOnNonSchemaRawQueries) rdd.map(row => { val filteredCols = requiredColumns.map(colName => row.get(schema.fieldIndex(colName))) @@ -298,11 +308,14 @@ class RawTableRelation( } } -object RawTableRelation { +object RawTableRelation extends NamedRelation with UpsertSchema { + override val name = "raw" private val lastUpdatedTimeColName = "lastUpdatedTime" private val keyColumnPattern = """^_*key$""".r private val lastUpdatedTimeColumnPattern = """^_*lastUpdatedTime$""".r + override val upsertSchema: StructType = StructType( + Seq(StructField("key", StringType, nullable = false))) val defaultSchema: StructType = StructType( Seq( StructField("key", DataTypes.StringType), diff --git a/src/main/scala/cognite/spark/v1/RelationConfig.scala b/src/main/scala/cognite/spark/v1/RelationConfig.scala index 722590c6f..307dc1193 100644 --- a/src/main/scala/cognite/spark/v1/RelationConfig.scala +++ b/src/main/scala/cognite/spark/v1/RelationConfig.scala @@ -28,7 +28,8 @@ final case class RelationConfig( enableSinglePartitionDeleteAssetHierarchy: Boolean, // flag to test whether single partition helps avoid NPE in asset hierarchy builder tracingParent: Kernel, initialRetryDelayMillis: Int, - useSharedThrottle: Boolean + useSharedThrottle: Boolean, + serverSideFilterNullValuesOnNonSchemaRawQueries: Boolean, ) { /** Desired number of Spark partitions ~= partitions / parallelismPerPartition */ diff --git a/src/main/scala/cognite/spark/v1/RelationSchemas.scala b/src/main/scala/cognite/spark/v1/RelationSchemas.scala new file mode 100644 index 000000000..55f254341 --- /dev/null +++ b/src/main/scala/cognite/spark/v1/RelationSchemas.scala @@ -0,0 +1,62 @@ +package cognite.spark.v1 + +import cognite.spark.compiletime.macros.SparkSchemaHelper.structType +import org.apache.spark.sql.types.StructType + +/** + * Schema traits here aren't strict or currently compile-time checked, but they should + * indicate a schema compatible with what a relation expects or produces + * + * ReadSchema: for read operations + * + * DeleteSchema: for deletes + * + * {Insert,Upsert,Update}Schema: for writes, differ by conflict resolution mode + * - upsert: fully overwrites destination items if they exist already + * - insert: should fail if item already exists in destination. This is fragile as operation + * retries on network errors may be doomed to fail + * - update: partially overwrites existing destination items, keeps properties that aren't part + * of update schema (or from update row) intact + */ +trait InsertSchema { + val insertSchema: StructType +} + +trait UpsertSchema { + val upsertSchema: StructType +} + +trait UpdateSchema { + val updateSchema: StructType +} + +// TODO: this isn't applied to some relations that have read support +trait ReadSchema { + val readSchema: StructType +} + +trait DeleteSchema { + val deleteSchema: StructType +} + +trait DeleteWithIdSchema extends DeleteSchema { + import org.apache.spark.sql.types._ + import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ + + override val deleteSchema: StructType = structType[DeleteByInternalId]() +} + +trait DeleteWithExternalIdSchema extends DeleteSchema { + import org.apache.spark.sql.types._ + import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ + + override val deleteSchema: StructType = structType[DeleteByExternalId]() +} + +final case class DeleteByInternalId( + id: Long +) + +final case class DeleteByExternalId( + externalId: String +) diff --git a/src/main/scala/cognite/spark/v1/RelationshipsRelation.scala b/src/main/scala/cognite/spark/v1/RelationshipsRelation.scala index 87cd05185..3324b8c75 100644 --- a/src/main/scala/cognite/spark/v1/RelationshipsRelation.scala +++ b/src/main/scala/cognite/spark/v1/RelationshipsRelation.scala @@ -16,7 +16,7 @@ import org.apache.spark.sql.{Row, SQLContext} import java.time.Instant class RelationshipsRelation(config: RelationConfig)(val sqlContext: SQLContext) - extends SdkV1Relation[RelationshipsReadSchema, String](config, "relationships") + extends SdkV1Relation[RelationshipsReadSchema, String](config, RelationshipsRelation.name) with WritableRelation { import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ override def schema: StructType = structType[RelationshipsReadSchema]() @@ -65,7 +65,7 @@ class RelationshipsRelation(config: RelationConfig)(val sqlContext: SQLContext) } override def delete(rows: Seq[Row]): IO[Unit] = { - val relationshipIds = rows.map(fromRow[RelationshipsDeleteSchema](_)).map(_.externalId) + val relationshipIds = rows.map(fromRow[DeleteByExternalId](_)).map(_.externalId) client.relationships .deleteByExternalIds(relationshipIds) .flatTap(_ => incMetrics(itemsDeleted, relationshipIds.length)) @@ -138,19 +138,22 @@ class RelationshipsRelation(config: RelationConfig)(val sqlContext: SQLContext) } } -object RelationshipsRelation { +object RelationshipsRelation + extends UpsertSchema + with ReadSchema + with DeleteWithExternalIdSchema + with InsertSchema + with UpdateSchema + with NamedRelation { + override val name: String = "relationships" import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ - val insertSchema: StructType = structType[RelationshipsInsertSchema]() - val readSchema: StructType = structType[RelationshipsReadSchema]() - val deleteSchema: StructType = structType[RelationshipsDeleteSchema]() - val upsertSchema: StructType = structType[RelationshipsUpsertSchema]() + override val insertSchema: StructType = structType[RelationshipsInsertSchema]() + override val readSchema: StructType = structType[RelationshipsReadSchema]() + override val upsertSchema: StructType = structType[RelationshipsUpsertSchema]() + override val updateSchema: StructType = upsertSchema } -final case class RelationshipsDeleteSchema( - externalId: String -) - final case class RelationshipsInsertSchema( externalId: String, sourceExternalId: String, diff --git a/src/main/scala/cognite/spark/v1/SequenceRowsRelation.scala b/src/main/scala/cognite/spark/v1/SequenceRowsRelation.scala index 7fce23d5e..f2b4dfa7e 100644 --- a/src/main/scala/cognite/spark/v1/SequenceRowsRelation.scala +++ b/src/main/scala/cognite/spark/v1/SequenceRowsRelation.scala @@ -16,7 +16,7 @@ import org.apache.spark.sql.{Row, SQLContext} case class SequenceRowWithId(id: CogniteId, sequenceRow: SequenceRow) class SequenceRowsRelation(config: RelationConfig, sequenceId: CogniteId)(val sqlContext: SQLContext) - extends CdfRelation(config, "sequencerows") + extends CdfRelation(config, SequenceRowsRelation.name) with WritableRelation with PrunedFilteredScan { import CdpConnector._ @@ -272,7 +272,16 @@ class SequenceRowsRelation(config: RelationConfig, sequenceId: CogniteId)(val sq } } -object SequenceRowsRelation { +object SequenceRowsRelation extends NamedRelation with UpsertSchema with DeleteSchema { + override val name = "sequencerows" + override val upsertSchema: StructType = StructType( + Seq( + StructField("id", DataTypes.LongType), + StructField("externalId", DataTypes.StringType), + StructField("rowNumber", DataTypes.LongType, nullable = false) + ) + ) + override val deleteSchema: StructType = upsertSchema private def parseValue(value: Long, offset: Long = 0) = Some(value + offset) def getSeqFilter(filter: Filter): Seq[SequenceRowFilter] = diff --git a/src/main/scala/cognite/spark/v1/SequencesRelation.scala b/src/main/scala/cognite/spark/v1/SequencesRelation.scala index 27d716b4b..120ea6c51 100644 --- a/src/main/scala/cognite/spark/v1/SequencesRelation.scala +++ b/src/main/scala/cognite/spark/v1/SequencesRelation.scala @@ -21,7 +21,7 @@ import cognite.spark.v1.CdpConnector.ioRuntime import scala.annotation.unused class SequencesRelation(config: RelationConfig)(val sqlContext: SQLContext) - extends SdkV1InsertableRelation[SequenceReadSchema, Long](config, "sequences") + extends SdkV1InsertableRelation[SequenceReadSchema, Long](config, SequenceRelation.name) with WritableRelation { import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ override def getStreams(sparkFilters: Array[Filter])( @@ -218,12 +218,20 @@ class SequencesRelation(config: RelationConfig)(val sqlContext: SQLContext) override def uniqueId(a: SequenceReadSchema): Long = a.id } -object SequenceRelation extends UpsertSchema { +object SequenceRelation + extends UpsertSchema + with ReadSchema + with InsertSchema + with DeleteWithIdSchema + with UpdateSchema + with NamedRelation { + override val name: String = "sequences" import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ - val upsertSchema: StructType = structType[SequenceUpsertSchema]() - val insertSchema: StructType = structType[SequenceInsertSchema]() - val readSchema: StructType = structType[SequenceReadSchema]() + override val upsertSchema: StructType = structType[SequenceUpsertSchema]() + override val insertSchema: StructType = structType[SequenceInsertSchema]() + override val readSchema: StructType = structType[SequenceReadSchema]() + override val updateSchema: StructType = upsertSchema } final case class SequenceColumnUpsertSchema( @@ -261,7 +269,7 @@ final case class SequenceUpsertSchema( metadata: Option[Map[String, String]] = None, columns: Option[Seq[SequenceColumnUpsertSchema]] = None, dataSetId: OptionalField[Long] = FieldNotSpecified -) extends WithNullableExtenalId +) extends WithNullableExternalId with WithId[Option[Long]] { def getSequenceColumnCreate: NonEmptyList[SequenceColumnCreate] = { diff --git a/src/main/scala/cognite/spark/v1/StringDataPointsRelation.scala b/src/main/scala/cognite/spark/v1/StringDataPointsRelation.scala index 887f507a0..3aa724809 100644 --- a/src/main/scala/cognite/spark/v1/StringDataPointsRelation.scala +++ b/src/main/scala/cognite/spark/v1/StringDataPointsRelation.scala @@ -32,7 +32,7 @@ final case class StringDataPointsInsertItem( ) extends RowWithCogniteId("inserting string data points") class StringDataPointsRelationV1(config: RelationConfig)(override val sqlContext: SQLContext) - extends DataPointsRelationV1[StringDataPointsItem](config, "stringdatapoints")(sqlContext) + extends DataPointsRelationV1[StringDataPointsItem](config, StringDataPointsRelation.name)(sqlContext) with WritableRelation { override def insert(rows: Seq[Row]): IO[Unit] = throw new CdfSparkException("Insert not supported for stringdatapoints. Please use upsert instead.") @@ -99,14 +99,21 @@ class StringDataPointsRelationV1(config: RelationConfig)(override val sqlContext } } -object StringDataPointsRelation extends UpsertSchema { +object StringDataPointsRelation + extends UpsertSchema + with ReadSchema + with DeleteSchema + with InsertSchema + with NamedRelation { + override val name: String = "stringdatapoints" import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ // We should use StringDataPointsItem here, but doing that gives the error: "constructor Timestamp encapsulates // multiple overloaded alternatives and cannot be treated as a method. Consider invoking // `.asTerm.alternatives` and manually picking the required method" in StructTypeEncoder, probably // because TimeStamp has multiple constructors. Issue in backlog for investigating this. - val upsertSchema = structType[StringDataPointsInsertItem]() - val readSchema = structType[StringDataPointsItem]() - val insertSchema = structType[StringDataPointsInsertItem]() - val deleteSchema = structType[DeleteDataPointsItem]() + override val upsertSchema: StructType = structType[StringDataPointsInsertItem]() + override val readSchema: StructType = structType[StringDataPointsItem]() + override val insertSchema: StructType = structType[StringDataPointsInsertItem]() + override val deleteSchema: StructType = structType[DeleteDataPointsItem]() + } diff --git a/src/main/scala/cognite/spark/v1/SttpClientBackendFactory.scala b/src/main/scala/cognite/spark/v1/SttpClientBackendFactory.scala index 929506bcf..f6bf190e8 100644 --- a/src/main/scala/cognite/spark/v1/SttpClientBackendFactory.scala +++ b/src/main/scala/cognite/spark/v1/SttpClientBackendFactory.scala @@ -6,7 +6,7 @@ import org.asynchttpclient.AsyncHttpClient import sttp.client3.SttpBackendOptions object SttpClientBackendFactory { - def create(prefix: String = "Cdf-Spark"): AsyncHttpClient = { + def create(prefix: String = "Cdf-Spark", requestTimeoutMs: Option[Int] = None): AsyncHttpClient = { // It's important that the threads made by the async http client is daemon threads, // so that we don't hang applications using our library during exit. // See for more info https://github.com/cognitedata/cdp-spark-datasource/pull/415/files#r396774391 @@ -22,10 +22,14 @@ object SttpClientBackendFactory { .build() AsyncHttpClientBackend.clientWithModifiedOptions( SttpBackendOptions.Default, - options => + options => { options .setThreadFactory(clientThreadFactory) .setNettyTimer(new HashedWheelTimer(timerThreadFactory)) + //Timeout override for potentially long stream operation + requestTimeoutMs.foreach(options.setRequestTimeout) + options + } ) } } diff --git a/src/main/scala/cognite/spark/v1/ThreeDModelRevisionMappingsRelation.scala b/src/main/scala/cognite/spark/v1/ThreeDModelRevisionMappingsRelation.scala index 04f612d0c..d16810968 100644 --- a/src/main/scala/cognite/spark/v1/ThreeDModelRevisionMappingsRelation.scala +++ b/src/main/scala/cognite/spark/v1/ThreeDModelRevisionMappingsRelation.scala @@ -10,7 +10,7 @@ import org.apache.spark.sql.{Row, SQLContext} class ThreeDModelRevisionMappingsRelation(config: RelationConfig, modelId: Long, revisionId: Long)( val sqlContext: SQLContext) - extends SdkV1Relation[ThreeDAssetMapping, String](config, "3dmodelrevisionmappings") { + extends SdkV1Relation[ThreeDAssetMapping, String](config, ThreeDModelRevisionMappingsRelation.name) { import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ override def schema: StructType = structType[ThreeDAssetMapping]() @@ -23,3 +23,7 @@ class ThreeDModelRevisionMappingsRelation(config: RelationConfig, modelId: Long, client: GenericClient[IO]): Seq[Stream[IO, ThreeDAssetMapping]] = Seq(client.threeDAssetMappings(modelId, revisionId).list(config.limitPerPartition)) } + +object ThreeDModelRevisionMappingsRelation extends NamedRelation { + override val name: String = "3dmodelrevisionmappings" +} diff --git a/src/main/scala/cognite/spark/v1/ThreeDModelRevisionNodesRelation.scala b/src/main/scala/cognite/spark/v1/ThreeDModelRevisionNodesRelation.scala index 0b2d6b19d..73f42c819 100644 --- a/src/main/scala/cognite/spark/v1/ThreeDModelRevisionNodesRelation.scala +++ b/src/main/scala/cognite/spark/v1/ThreeDModelRevisionNodesRelation.scala @@ -10,7 +10,7 @@ import org.apache.spark.sql.{Row, SQLContext} class ThreeDModelRevisionNodesRelation(config: RelationConfig, modelId: Long, revisionId: Long)( val sqlContext: SQLContext) - extends SdkV1Relation[ThreeDNode, Long](config, "3dmodelrevisionnodes") { + extends SdkV1Relation[ThreeDNode, Long](config, ThreeDModelRevisionNodesRelation.name) { import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ override def schema: StructType = structType[ThreeDNode]() @@ -23,3 +23,7 @@ class ThreeDModelRevisionNodesRelation(config: RelationConfig, modelId: Long, re client: GenericClient[IO]): Seq[Stream[IO, ThreeDNode]] = Seq(client.threeDNodes(modelId, revisionId).list(config.limitPerPartition)) } + +object ThreeDModelRevisionNodesRelation extends NamedRelation { + override val name: String = "3dmodelrevisionnodes" +} diff --git a/src/main/scala/cognite/spark/v1/ThreeDModelRevisionsRelation.scala b/src/main/scala/cognite/spark/v1/ThreeDModelRevisionsRelation.scala index 2db874f20..e3fab65ef 100644 --- a/src/main/scala/cognite/spark/v1/ThreeDModelRevisionsRelation.scala +++ b/src/main/scala/cognite/spark/v1/ThreeDModelRevisionsRelation.scala @@ -23,7 +23,7 @@ final case class ModelRevisionItem( createdTime: Long) class ThreeDModelRevisionsRelation(config: RelationConfig, modelId: Long)(val sqlContext: SQLContext) - extends SdkV1Relation[ThreeDRevision, Long](config, "3dmodelrevision") { + extends SdkV1Relation[ThreeDRevision, Long](config, ThreeDModelRevisionsRelation.name) { import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ override def schema: StructType = structType[ThreeDRevision]() @@ -36,3 +36,7 @@ class ThreeDModelRevisionsRelation(config: RelationConfig, modelId: Long)(val sq client: GenericClient[IO]): Seq[Stream[IO, ThreeDRevision]] = Seq(client.threeDRevisions(modelId).list(config.limitPerPartition)) } + +object ThreeDModelRevisionsRelation extends NamedRelation { + override val name: String = "3dmodelrevisions" +} diff --git a/src/main/scala/cognite/spark/v1/ThreeDModelsRelation.scala b/src/main/scala/cognite/spark/v1/ThreeDModelsRelation.scala index c5cfa716f..c9046ab2b 100644 --- a/src/main/scala/cognite/spark/v1/ThreeDModelsRelation.scala +++ b/src/main/scala/cognite/spark/v1/ThreeDModelsRelation.scala @@ -11,7 +11,7 @@ import org.apache.spark.sql.{Row, SQLContext} final case class ModelItem(id: Long, name: String, createdTime: Long) class ThreeDModelsRelation(config: RelationConfig)(val sqlContext: SQLContext) - extends SdkV1Relation[ThreeDModel, Long](config, "threeDModels.read") { + extends SdkV1Relation[ThreeDModel, Long](config, ThreeDModelsRelation.name) { import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ override def schema: StructType = structType[ThreeDModel]() @@ -23,3 +23,7 @@ class ThreeDModelsRelation(config: RelationConfig)(val sqlContext: SQLContext) client: GenericClient[IO]): Seq[Stream[IO, ThreeDModel]] = Seq(client.threeDModels.list(config.limitPerPartition)) } + +object ThreeDModelsRelation extends NamedRelation { + override val name: String = "3dmodels" +} diff --git a/src/main/scala/cognite/spark/v1/TimeSeriesRelation.scala b/src/main/scala/cognite/spark/v1/TimeSeriesRelation.scala index e4f243cd5..21c68da46 100644 --- a/src/main/scala/cognite/spark/v1/TimeSeriesRelation.scala +++ b/src/main/scala/cognite/spark/v1/TimeSeriesRelation.scala @@ -15,7 +15,7 @@ import org.apache.spark.sql.{Row, SQLContext} import java.time.Instant class TimeSeriesRelation(config: RelationConfig)(val sqlContext: SQLContext) - extends SdkV1InsertableRelation[TimeSeries, Long](config, "timeseries") + extends SdkV1InsertableRelation[TimeSeries, Long](config, TimeSeriesRelation.name) with WritableRelation { import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ override def insert(rows: Seq[Row]): IO[Unit] = @@ -90,12 +90,20 @@ class TimeSeriesRelation(config: RelationConfig)(val sqlContext: SQLContext) unitExternalId = m.get("unitExternalId") ) } -object TimeSeriesRelation extends UpsertSchema { +object TimeSeriesRelation + extends UpsertSchema + with ReadSchema + with InsertSchema + with NamedRelation + with DeleteWithIdSchema + with UpdateSchema { + override val name: String = "timeseries" import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ - val upsertSchema: StructType = structType[TimeSeriesUpsertSchema]() - val insertSchema: StructType = structType[TimeSeriesInsertSchema]() - val readSchema: StructType = structType[TimeSeriesReadSchema]() + override val upsertSchema: StructType = structType[TimeSeriesUpsertSchema]() + override val insertSchema: StructType = structType[TimeSeriesInsertSchema]() + override val readSchema: StructType = structType[TimeSeriesReadSchema]() + override val updateSchema: StructType = upsertSchema } final case class TimeSeriesUpsertSchema( @@ -111,7 +119,7 @@ final case class TimeSeriesUpsertSchema( isString: Option[Boolean] = None, dataSetId: OptionalField[Long] = FieldNotSpecified, unitExternalId: OptionalField[String] = FieldNotSpecified -) extends WithNullableExtenalId +) extends WithNullableExternalId with WithId[Option[Long]] object TimeSeriesUpsertSchema { diff --git a/src/main/scala/cognite/spark/v1/fdm/FlexibleDataModelBaseRelation.scala b/src/main/scala/cognite/spark/v1/fdm/FlexibleDataModelBaseRelation.scala index fdef2ca27..4705fda4a 100644 --- a/src/main/scala/cognite/spark/v1/fdm/FlexibleDataModelBaseRelation.scala +++ b/src/main/scala/cognite/spark/v1/fdm/FlexibleDataModelBaseRelation.scala @@ -7,8 +7,11 @@ import cognite.spark.v1.{ CdfRelation, CdfSparkException, CdfSparkIllegalArgumentException, + DeleteSchema, + NamedRelation, RelationConfig, SdkV1Rdd, + UpsertSchema, WritableRelation } import com.cognite.sdk.scala.v1.GenericClient @@ -582,7 +585,22 @@ abstract class FlexibleDataModelBaseRelation(config: RelationConfig, sqlContext: } } -object FlexibleDataModelBaseRelation { +object FlexibleDataModelBaseRelation extends NamedRelation with UpsertSchema with DeleteSchema { + import cognite.spark.compiletime.macros.SparkSchemaHelper.structType + import cognite.spark.v1.StructTypeEncoder + import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ + + // TODO: this seems to be incomplete or fully wrong schema + final case class DataModelBaseRelationUpsertSchema(externalId: String) {} + + override val upsertSchema: StructType = structType[DataModelBaseRelationUpsertSchema]() + + // TODO: this seems to be missing optional "space" parameter + final case class DataModelBaseRelationDeleteSchema(externalId: String) {} + + override val deleteSchema: StructType = structType[DataModelBaseRelationDeleteSchema]() + + override val name = "instances" final case class ProjectedFlexibleDataModelInstance( space: String, externalId: String, diff --git a/src/main/scala/cognite/spark/v1/fdm/FlexibleDataModelRelationFactory.scala b/src/main/scala/cognite/spark/v1/fdm/FlexibleDataModelRelationFactory.scala index 6a28096bf..1a4b44add 100644 --- a/src/main/scala/cognite/spark/v1/fdm/FlexibleDataModelRelationFactory.scala +++ b/src/main/scala/cognite/spark/v1/fdm/FlexibleDataModelRelationFactory.scala @@ -17,7 +17,7 @@ import org.apache.spark.sql.SQLContext sealed trait FlexibleDataModelRelationFactory object FlexibleDataModelRelationFactory { - val ResourceType = "instances" + val ResourceType = FlexibleDataModelBaseRelation.name final case class ViewCorePropertyConfig( intendedUsage: Usage, diff --git a/src/main/scripts/Pipfile.lock b/src/main/scripts/Pipfile.lock index bbd99ffe1..7b3e569d3 100644 --- a/src/main/scripts/Pipfile.lock +++ b/src/main/scripts/Pipfile.lock @@ -18,190 +18,204 @@ "default": { "certifi": { "hashes": [ - "sha256:5a1e7645bc0ec61a09e26c36f6106dd4cf40c6db3a1fb6352b0244e7fb057c7b", - "sha256:c198e21b1289c2ab85ee4e67bb4b4ef3ead0892059901a8d5b622f24a1101e90" + "sha256:922820b53db7a7257ffbda3f597266d435245903d80737e34f8a45ff3e3230d8", + "sha256:bec941d2aa8195e248a60b31ff9f0558284cf01a52591ceda73ea9afffd69fd9" ], - "index": "pypi", "markers": "python_version >= '3.6'", - "version": "==2024.7.4" + "version": "==2024.8.30" }, "cffi": { "hashes": [ - "sha256:011aff3524d578a9412c8b3cfaa50f2c0bd78e03eb7af7aa5e0df59b158efb2f", - "sha256:0a048d4f6630113e54bb4b77e315e1ba32a5a31512c31a273807d0027a7e69ab", - "sha256:0bb15e7acf8ab35ca8b24b90af52c8b391690ef5c4aec3d31f38f0d37d2cc499", - "sha256:0d46ee4764b88b91f16661a8befc6bfb24806d885e27436fdc292ed7e6f6d058", - "sha256:0e60821d312f99d3e1569202518dddf10ae547e799d75aef3bca3a2d9e8ee693", - "sha256:0fdacad9e0d9fc23e519efd5ea24a70348305e8d7d85ecbb1a5fa66dc834e7fb", - "sha256:14b9cbc8f7ac98a739558eb86fabc283d4d564dafed50216e7f7ee62d0d25377", - "sha256:17c6d6d3260c7f2d94f657e6872591fe8733872a86ed1345bda872cfc8c74885", - "sha256:1a2ddbac59dc3716bc79f27906c010406155031a1c801410f1bafff17ea304d2", - "sha256:2404f3de742f47cb62d023f0ba7c5a916c9c653d5b368cc966382ae4e57da401", - "sha256:24658baf6224d8f280e827f0a50c46ad819ec8ba380a42448e24459daf809cf4", - "sha256:24aa705a5f5bd3a8bcfa4d123f03413de5d86e497435693b638cbffb7d5d8a1b", - "sha256:2770bb0d5e3cc0e31e7318db06efcbcdb7b31bcb1a70086d3177692a02256f59", - "sha256:331ad15c39c9fe9186ceaf87203a9ecf5ae0ba2538c9e898e3a6967e8ad3db6f", - "sha256:3aa9d43b02a0c681f0bfbc12d476d47b2b2b6a3f9287f11ee42989a268a1833c", - "sha256:41f4915e09218744d8bae14759f983e466ab69b178de38066f7579892ff2a555", - "sha256:4304d4416ff032ed50ad6bb87416d802e67139e31c0bde4628f36a47a3164bfa", - "sha256:435a22d00ec7d7ea533db494da8581b05977f9c37338c80bc86314bec2619424", - "sha256:45f7cd36186db767d803b1473b3c659d57a23b5fa491ad83c6d40f2af58e4dbb", - "sha256:48b389b1fd5144603d61d752afd7167dfd205973a43151ae5045b35793232aa2", - "sha256:4e67d26532bfd8b7f7c05d5a766d6f437b362c1bf203a3a5ce3593a645e870b8", - "sha256:516a405f174fd3b88829eabfe4bb296ac602d6a0f68e0d64d5ac9456194a5b7e", - "sha256:5ba5c243f4004c750836f81606a9fcb7841f8874ad8f3bf204ff5e56332b72b9", - "sha256:5bdc0f1f610d067c70aa3737ed06e2726fd9d6f7bfee4a351f4c40b6831f4e82", - "sha256:6107e445faf057c118d5050560695e46d272e5301feffda3c41849641222a828", - "sha256:6327b572f5770293fc062a7ec04160e89741e8552bf1c358d1a23eba68166759", - "sha256:669b29a9eca6146465cc574659058ed949748f0809a2582d1f1a324eb91054dc", - "sha256:6ce01337d23884b21c03869d2f68c5523d43174d4fc405490eb0091057943118", - "sha256:6d872186c1617d143969defeadac5a904e6e374183e07977eedef9c07c8953bf", - "sha256:6f76a90c345796c01d85e6332e81cab6d70de83b829cf1d9762d0a3da59c7932", - "sha256:70d2aa9fb00cf52034feac4b913181a6e10356019b18ef89bc7c12a283bf5f5a", - "sha256:7cbc78dc018596315d4e7841c8c3a7ae31cc4d638c9b627f87d52e8abaaf2d29", - "sha256:856bf0924d24e7f93b8aee12a3a1095c34085600aa805693fb7f5d1962393206", - "sha256:8a98748ed1a1df4ee1d6f927e151ed6c1a09d5ec21684de879c7ea6aa96f58f2", - "sha256:93a7350f6706b31f457c1457d3a3259ff9071a66f312ae64dc024f049055f72c", - "sha256:964823b2fc77b55355999ade496c54dde161c621cb1f6eac61dc30ed1b63cd4c", - "sha256:a003ac9edc22d99ae1286b0875c460351f4e101f8c9d9d2576e78d7e048f64e0", - "sha256:a0ce71725cacc9ebf839630772b07eeec220cbb5f03be1399e0457a1464f8e1a", - "sha256:a47eef975d2b8b721775a0fa286f50eab535b9d56c70a6e62842134cf7841195", - "sha256:a8b5b9712783415695663bd463990e2f00c6750562e6ad1d28e072a611c5f2a6", - "sha256:a9015f5b8af1bb6837a3fcb0cdf3b874fe3385ff6274e8b7925d81ccaec3c5c9", - "sha256:aec510255ce690d240f7cb23d7114f6b351c733a74c279a84def763660a2c3bc", - "sha256:b00e7bcd71caa0282cbe3c90966f738e2db91e64092a877c3ff7f19a1628fdcb", - "sha256:b50aaac7d05c2c26dfd50c3321199f019ba76bb650e346a6ef3616306eed67b0", - "sha256:b7b6ea9e36d32582cda3465f54c4b454f62f23cb083ebc7a94e2ca6ef011c3a7", - "sha256:bb9333f58fc3a2296fb1d54576138d4cf5d496a2cc118422bd77835e6ae0b9cb", - "sha256:c1c13185b90bbd3f8b5963cd8ce7ad4ff441924c31e23c975cb150e27c2bf67a", - "sha256:c3b8bd3133cd50f6b637bb4322822c94c5ce4bf0d724ed5ae70afce62187c492", - "sha256:c5d97162c196ce54af6700949ddf9409e9833ef1003b4741c2b39ef46f1d9720", - "sha256:c815270206f983309915a6844fe994b2fa47e5d05c4c4cef267c3b30e34dbe42", - "sha256:cab2eba3830bf4f6d91e2d6718e0e1c14a2f5ad1af68a89d24ace0c6b17cced7", - "sha256:d1df34588123fcc88c872f5acb6f74ae59e9d182a2707097f9e28275ec26a12d", - "sha256:d6bdcd415ba87846fd317bee0774e412e8792832e7805938987e4ede1d13046d", - "sha256:db9a30ec064129d605d0f1aedc93e00894b9334ec74ba9c6bdd08147434b33eb", - "sha256:dbc183e7bef690c9abe5ea67b7b60fdbca81aa8da43468287dae7b5c046107d4", - "sha256:dca802c8db0720ce1c49cce1149ff7b06e91ba15fa84b1d59144fef1a1bc7ac2", - "sha256:dec6b307ce928e8e112a6bb9921a1cb00a0e14979bf28b98e084a4b8a742bd9b", - "sha256:df8bb0010fdd0a743b7542589223a2816bdde4d94bb5ad67884348fa2c1c67e8", - "sha256:e4094c7b464cf0a858e75cd14b03509e84789abf7b79f8537e6a72152109c76e", - "sha256:e4760a68cab57bfaa628938e9c2971137e05ce48e762a9cb53b76c9b569f1204", - "sha256:eb09b82377233b902d4c3fbeeb7ad731cdab579c6c6fda1f763cd779139e47c3", - "sha256:eb862356ee9391dc5a0b3cbc00f416b48c1b9a52d252d898e5b7696a5f9fe150", - "sha256:ef9528915df81b8f4c7612b19b8628214c65c9b7f74db2e34a646a0a2a0da2d4", - "sha256:f3157624b7558b914cb039fd1af735e5e8049a87c817cc215109ad1c8779df76", - "sha256:f3e0992f23bbb0be00a921eae5363329253c3b86287db27092461c887b791e5e", - "sha256:f9338cc05451f1942d0d8203ec2c346c830f8e86469903d5126c1f0a13a2bcbb", - "sha256:ffef8fd58a36fb5f1196919638f73dd3ae0db1a878982b27a9a5a176ede4ba91" + "sha256:045d61c734659cc045141be4bae381a41d89b741f795af1dd018bfb532fd0df8", + "sha256:0984a4925a435b1da406122d4d7968dd861c1385afe3b45ba82b750f229811e2", + "sha256:0e2b1fac190ae3ebfe37b979cc1ce69c81f4e4fe5746bb401dca63a9062cdaf1", + "sha256:0f048dcf80db46f0098ccac01132761580d28e28bc0f78ae0d58048063317e15", + "sha256:1257bdabf294dceb59f5e70c64a3e2f462c30c7ad68092d01bbbfb1c16b1ba36", + "sha256:1c39c6016c32bc48dd54561950ebd6836e1670f2ae46128f67cf49e789c52824", + "sha256:1d599671f396c4723d016dbddb72fe8e0397082b0a77a4fab8028923bec050e8", + "sha256:28b16024becceed8c6dfbc75629e27788d8a3f9030691a1dbf9821a128b22c36", + "sha256:2bb1a08b8008b281856e5971307cc386a8e9c5b625ac297e853d36da6efe9c17", + "sha256:30c5e0cb5ae493c04c8b42916e52ca38079f1b235c2f8ae5f4527b963c401caf", + "sha256:31000ec67d4221a71bd3f67df918b1f88f676f1c3b535a7eb473255fdc0b83fc", + "sha256:386c8bf53c502fff58903061338ce4f4950cbdcb23e2902d86c0f722b786bbe3", + "sha256:3edc8d958eb099c634dace3c7e16560ae474aa3803a5df240542b305d14e14ed", + "sha256:45398b671ac6d70e67da8e4224a065cec6a93541bb7aebe1b198a61b58c7b702", + "sha256:46bf43160c1a35f7ec506d254e5c890f3c03648a4dbac12d624e4490a7046cd1", + "sha256:4ceb10419a9adf4460ea14cfd6bc43d08701f0835e979bf821052f1805850fe8", + "sha256:51392eae71afec0d0c8fb1a53b204dbb3bcabcb3c9b807eedf3e1e6ccf2de903", + "sha256:5da5719280082ac6bd9aa7becb3938dc9f9cbd57fac7d2871717b1feb0902ab6", + "sha256:610faea79c43e44c71e1ec53a554553fa22321b65fae24889706c0a84d4ad86d", + "sha256:636062ea65bd0195bc012fea9321aca499c0504409f413dc88af450b57ffd03b", + "sha256:6883e737d7d9e4899a8a695e00ec36bd4e5e4f18fabe0aca0efe0a4b44cdb13e", + "sha256:6b8b4a92e1c65048ff98cfe1f735ef8f1ceb72e3d5f0c25fdb12087a23da22be", + "sha256:6f17be4345073b0a7b8ea599688f692ac3ef23ce28e5df79c04de519dbc4912c", + "sha256:706510fe141c86a69c8ddc029c7910003a17353970cff3b904ff0686a5927683", + "sha256:72e72408cad3d5419375fc87d289076ee319835bdfa2caad331e377589aebba9", + "sha256:733e99bc2df47476e3848417c5a4540522f234dfd4ef3ab7fafdf555b082ec0c", + "sha256:7596d6620d3fa590f677e9ee430df2958d2d6d6de2feeae5b20e82c00b76fbf8", + "sha256:78122be759c3f8a014ce010908ae03364d00a1f81ab5c7f4a7a5120607ea56e1", + "sha256:805b4371bf7197c329fcb3ead37e710d1bca9da5d583f5073b799d5c5bd1eee4", + "sha256:85a950a4ac9c359340d5963966e3e0a94a676bd6245a4b55bc43949eee26a655", + "sha256:8f2cdc858323644ab277e9bb925ad72ae0e67f69e804f4898c070998d50b1a67", + "sha256:9755e4345d1ec879e3849e62222a18c7174d65a6a92d5b346b1863912168b595", + "sha256:98e3969bcff97cae1b2def8ba499ea3d6f31ddfdb7635374834cf89a1a08ecf0", + "sha256:a08d7e755f8ed21095a310a693525137cfe756ce62d066e53f502a83dc550f65", + "sha256:a1ed2dd2972641495a3ec98445e09766f077aee98a1c896dcb4ad0d303628e41", + "sha256:a24ed04c8ffd54b0729c07cee15a81d964e6fee0e3d4d342a27b020d22959dc6", + "sha256:a45e3c6913c5b87b3ff120dcdc03f6131fa0065027d0ed7ee6190736a74cd401", + "sha256:a9b15d491f3ad5d692e11f6b71f7857e7835eb677955c00cc0aefcd0669adaf6", + "sha256:ad9413ccdeda48c5afdae7e4fa2192157e991ff761e7ab8fdd8926f40b160cc3", + "sha256:b2ab587605f4ba0bf81dc0cb08a41bd1c0a5906bd59243d56bad7668a6fc6c16", + "sha256:b62ce867176a75d03a665bad002af8e6d54644fad99a3c70905c543130e39d93", + "sha256:c03e868a0b3bc35839ba98e74211ed2b05d2119be4e8a0f224fba9384f1fe02e", + "sha256:c59d6e989d07460165cc5ad3c61f9fd8f1b4796eacbd81cee78957842b834af4", + "sha256:c7eac2ef9b63c79431bc4b25f1cd649d7f061a28808cbc6c47b534bd789ef964", + "sha256:c9c3d058ebabb74db66e431095118094d06abf53284d9c81f27300d0e0d8bc7c", + "sha256:ca74b8dbe6e8e8263c0ffd60277de77dcee6c837a3d0881d8c1ead7268c9e576", + "sha256:caaf0640ef5f5517f49bc275eca1406b0ffa6aa184892812030f04c2abf589a0", + "sha256:cdf5ce3acdfd1661132f2a9c19cac174758dc2352bfe37d98aa7512c6b7178b3", + "sha256:d016c76bdd850f3c626af19b0542c9677ba156e4ee4fccfdd7848803533ef662", + "sha256:d01b12eeeb4427d3110de311e1774046ad344f5b1a7403101878976ecd7a10f3", + "sha256:d63afe322132c194cf832bfec0dc69a99fb9bb6bbd550f161a49e9e855cc78ff", + "sha256:da95af8214998d77a98cc14e3a3bd00aa191526343078b530ceb0bd710fb48a5", + "sha256:dd398dbc6773384a17fe0d3e7eeb8d1a21c2200473ee6806bb5e6a8e62bb73dd", + "sha256:de2ea4b5833625383e464549fec1bc395c1bdeeb5f25c4a3a82b5a8c756ec22f", + "sha256:de55b766c7aa2e2a3092c51e0483d700341182f08e67c63630d5b6f200bb28e5", + "sha256:df8b1c11f177bc2313ec4b2d46baec87a5f3e71fc8b45dab2ee7cae86d9aba14", + "sha256:e03eab0a8677fa80d646b5ddece1cbeaf556c313dcfac435ba11f107ba117b5d", + "sha256:e221cf152cff04059d011ee126477f0d9588303eb57e88923578ace7baad17f9", + "sha256:e31ae45bc2e29f6b2abd0de1cc3b9d5205aa847cafaecb8af1476a609a2f6eb7", + "sha256:edae79245293e15384b51f88b00613ba9f7198016a5948b5dddf4917d4d26382", + "sha256:f1e22e8c4419538cb197e4dd60acc919d7696e5ef98ee4da4e01d3f8cfa4cc5a", + "sha256:f3a2b4222ce6b60e2e8b337bb9596923045681d71e5a082783484d845390938e", + "sha256:f6a16c31041f09ead72d69f583767292f750d24913dadacf5756b966aacb3f1a", + "sha256:f75c7ab1f9e4aca5414ed4d8e5c0e303a34f4421f8a0d47a4d019ceff0ab6af4", + "sha256:f79fc4fc25f1c8698ff97788206bb3c2598949bfe0fef03d299eb1b5356ada99", + "sha256:f7f5baafcc48261359e14bcd6d9bff6d4b28d9103847c9e136694cb0501aef87", + "sha256:fc48c783f9c87e60831201f2cce7f3b2e4846bf4d8728eabe54d60700b318a0b" ], "markers": "platform_python_implementation != 'PyPy'", - "version": "==1.17.0" + "version": "==1.17.1" }, "charset-normalizer": { "hashes": [ - "sha256:06435b539f889b1f6f4ac1758871aae42dc3a8c0e24ac9e60c2384973ad73027", - "sha256:06a81e93cd441c56a9b65d8e1d043daeb97a3d0856d177d5c90ba85acb3db087", - "sha256:0a55554a2fa0d408816b3b5cedf0045f4b8e1a6065aec45849de2d6f3f8e9786", - "sha256:0b2b64d2bb6d3fb9112bafa732def486049e63de9618b5843bcdd081d8144cd8", - "sha256:10955842570876604d404661fbccbc9c7e684caf432c09c715ec38fbae45ae09", - "sha256:122c7fa62b130ed55f8f285bfd56d5f4b4a5b503609d181f9ad85e55c89f4185", - "sha256:1ceae2f17a9c33cb48e3263960dc5fc8005351ee19db217e9b1bb15d28c02574", - "sha256:1d3193f4a680c64b4b6a9115943538edb896edc190f0b222e73761716519268e", - "sha256:1f79682fbe303db92bc2b1136016a38a42e835d932bab5b3b1bfcfbf0640e519", - "sha256:2127566c664442652f024c837091890cb1942c30937add288223dc895793f898", - "sha256:22afcb9f253dac0696b5a4be4a1c0f8762f8239e21b99680099abd9b2b1b2269", - "sha256:25baf083bf6f6b341f4121c2f3c548875ee6f5339300e08be3f2b2ba1721cdd3", - "sha256:2e81c7b9c8979ce92ed306c249d46894776a909505d8f5a4ba55b14206e3222f", - "sha256:3287761bc4ee9e33561a7e058c72ac0938c4f57fe49a09eae428fd88aafe7bb6", - "sha256:34d1c8da1e78d2e001f363791c98a272bb734000fcef47a491c1e3b0505657a8", - "sha256:37e55c8e51c236f95b033f6fb391d7d7970ba5fe7ff453dad675e88cf303377a", - "sha256:3d47fa203a7bd9c5b6cee4736ee84ca03b8ef23193c0d1ca99b5089f72645c73", - "sha256:3e4d1f6587322d2788836a99c69062fbb091331ec940e02d12d179c1d53e25fc", - "sha256:42cb296636fcc8b0644486d15c12376cb9fa75443e00fb25de0b8602e64c1714", - "sha256:45485e01ff4d3630ec0d9617310448a8702f70e9c01906b0d0118bdf9d124cf2", - "sha256:4a78b2b446bd7c934f5dcedc588903fb2f5eec172f3d29e52a9096a43722adfc", - "sha256:4ab2fe47fae9e0f9dee8c04187ce5d09f48eabe611be8259444906793ab7cbce", - "sha256:4d0d1650369165a14e14e1e47b372cfcb31d6ab44e6e33cb2d4e57265290044d", - "sha256:549a3a73da901d5bc3ce8d24e0600d1fa85524c10287f6004fbab87672bf3e1e", - "sha256:55086ee1064215781fff39a1af09518bc9255b50d6333f2e4c74ca09fac6a8f6", - "sha256:572c3763a264ba47b3cf708a44ce965d98555f618ca42c926a9c1616d8f34269", - "sha256:573f6eac48f4769d667c4442081b1794f52919e7edada77495aaed9236d13a96", - "sha256:5b4c145409bef602a690e7cfad0a15a55c13320ff7a3ad7ca59c13bb8ba4d45d", - "sha256:6463effa3186ea09411d50efc7d85360b38d5f09b870c48e4600f63af490e56a", - "sha256:65f6f63034100ead094b8744b3b97965785388f308a64cf8d7c34f2f2e5be0c4", - "sha256:663946639d296df6a2bb2aa51b60a2454ca1cb29835324c640dafb5ff2131a77", - "sha256:6897af51655e3691ff853668779c7bad41579facacf5fd7253b0133308cf000d", - "sha256:68d1f8a9e9e37c1223b656399be5d6b448dea850bed7d0f87a8311f1ff3dabb0", - "sha256:6ac7ffc7ad6d040517be39eb591cac5ff87416c2537df6ba3cba3bae290c0fed", - "sha256:6b3251890fff30ee142c44144871185dbe13b11bab478a88887a639655be1068", - "sha256:6c4caeef8fa63d06bd437cd4bdcf3ffefe6738fb1b25951440d80dc7df8c03ac", - "sha256:6ef1d82a3af9d3eecdba2321dc1b3c238245d890843e040e41e470ffa64c3e25", - "sha256:753f10e867343b4511128c6ed8c82f7bec3bd026875576dfd88483c5c73b2fd8", - "sha256:7cd13a2e3ddeed6913a65e66e94b51d80a041145a026c27e6bb76c31a853c6ab", - "sha256:7ed9e526742851e8d5cc9e6cf41427dfc6068d4f5a3bb03659444b4cabf6bc26", - "sha256:7f04c839ed0b6b98b1a7501a002144b76c18fb1c1850c8b98d458ac269e26ed2", - "sha256:802fe99cca7457642125a8a88a084cef28ff0cf9407060f7b93dca5aa25480db", - "sha256:80402cd6ee291dcb72644d6eac93785fe2c8b9cb30893c1af5b8fdd753b9d40f", - "sha256:8465322196c8b4d7ab6d1e049e4c5cb460d0394da4a27d23cc242fbf0034b6b5", - "sha256:86216b5cee4b06df986d214f664305142d9c76df9b6512be2738aa72a2048f99", - "sha256:87d1351268731db79e0f8e745d92493ee2841c974128ef629dc518b937d9194c", - "sha256:8bdb58ff7ba23002a4c5808d608e4e6c687175724f54a5dade5fa8c67b604e4d", - "sha256:8c622a5fe39a48f78944a87d4fb8a53ee07344641b0562c540d840748571b811", - "sha256:8d756e44e94489e49571086ef83b2bb8ce311e730092d2c34ca8f7d925cb20aa", - "sha256:8f4a014bc36d3c57402e2977dada34f9c12300af536839dc38c0beab8878f38a", - "sha256:9063e24fdb1e498ab71cb7419e24622516c4a04476b17a2dab57e8baa30d6e03", - "sha256:90d558489962fd4918143277a773316e56c72da56ec7aa3dc3dbbe20fdfed15b", - "sha256:923c0c831b7cfcb071580d3f46c4baf50f174be571576556269530f4bbd79d04", - "sha256:95f2a5796329323b8f0512e09dbb7a1860c46a39da62ecb2324f116fa8fdc85c", - "sha256:96b02a3dc4381e5494fad39be677abcb5e6634bf7b4fa83a6dd3112607547001", - "sha256:9f96df6923e21816da7e0ad3fd47dd8f94b2a5ce594e00677c0013018b813458", - "sha256:a10af20b82360ab00827f916a6058451b723b4e65030c5a18577c8b2de5b3389", - "sha256:a50aebfa173e157099939b17f18600f72f84eed3049e743b68ad15bd69b6bf99", - "sha256:a981a536974bbc7a512cf44ed14938cf01030a99e9b3a06dd59578882f06f985", - "sha256:a9a8e9031d613fd2009c182b69c7b2c1ef8239a0efb1df3f7c8da66d5dd3d537", - "sha256:ae5f4161f18c61806f411a13b0310bea87f987c7d2ecdbdaad0e94eb2e404238", - "sha256:aed38f6e4fb3f5d6bf81bfa990a07806be9d83cf7bacef998ab1a9bd660a581f", - "sha256:b01b88d45a6fcb69667cd6d2f7a9aeb4bf53760d7fc536bf679ec94fe9f3ff3d", - "sha256:b261ccdec7821281dade748d088bb6e9b69e6d15b30652b74cbbac25e280b796", - "sha256:b2b0a0c0517616b6869869f8c581d4eb2dd83a4d79e0ebcb7d373ef9956aeb0a", - "sha256:b4a23f61ce87adf89be746c8a8974fe1c823c891d8f86eb218bb957c924bb143", - "sha256:bd8f7df7d12c2db9fab40bdd87a7c09b1530128315d047a086fa3ae3435cb3a8", - "sha256:beb58fe5cdb101e3a055192ac291b7a21e3b7ef4f67fa1d74e331a7f2124341c", - "sha256:c002b4ffc0be611f0d9da932eb0f704fe2602a9a949d1f738e4c34c75b0863d5", - "sha256:c083af607d2515612056a31f0a8d9e0fcb5876b7bfc0abad3ecd275bc4ebc2d5", - "sha256:c180f51afb394e165eafe4ac2936a14bee3eb10debc9d9e4db8958fe36afe711", - "sha256:c235ebd9baae02f1b77bcea61bce332cb4331dc3617d254df3323aa01ab47bd4", - "sha256:cd70574b12bb8a4d2aaa0094515df2463cb429d8536cfb6c7ce983246983e5a6", - "sha256:d0eccceffcb53201b5bfebb52600a5fb483a20b61da9dbc885f8b103cbe7598c", - "sha256:d965bba47ddeec8cd560687584e88cf699fd28f192ceb452d1d7ee807c5597b7", - "sha256:db364eca23f876da6f9e16c9da0df51aa4f104a972735574842618b8c6d999d4", - "sha256:ddbb2551d7e0102e7252db79ba445cdab71b26640817ab1e3e3648dad515003b", - "sha256:deb6be0ac38ece9ba87dea880e438f25ca3eddfac8b002a2ec3d9183a454e8ae", - "sha256:e06ed3eb3218bc64786f7db41917d4e686cc4856944f53d5bdf83a6884432e12", - "sha256:e27ad930a842b4c5eb8ac0016b0a54f5aebbe679340c26101df33424142c143c", - "sha256:e537484df0d8f426ce2afb2d0f8e1c3d0b114b83f8850e5f2fbea0e797bd82ae", - "sha256:eb00ed941194665c332bf8e078baf037d6c35d7c4f3102ea2d4f16ca94a26dc8", - "sha256:eb6904c354526e758fda7167b33005998fb68c46fbc10e013ca97f21ca5c8887", - "sha256:eb8821e09e916165e160797a6c17edda0679379a4be5c716c260e836e122f54b", - "sha256:efcb3f6676480691518c177e3b465bcddf57cea040302f9f4e6e191af91174d4", - "sha256:f27273b60488abe721a075bcca6d7f3964f9f6f067c8c4c605743023d7d3944f", - "sha256:f30c3cb33b24454a82faecaf01b19c18562b1e89558fb6c56de4d9118a032fd5", - "sha256:fb69256e180cb6c8a894fee62b3afebae785babc1ee98b81cdf68bbca1987f33", - "sha256:fd1abc0d89e30cc4e02e4064dc67fcc51bd941eb395c502aac3ec19fab46b519", - "sha256:ff8fa367d09b717b2a17a052544193ad76cd49979c805768879cb63d9ca50561" + "sha256:0099d79bdfcf5c1f0c2c72f91516702ebf8b0b8ddd8905f97a8aecf49712c621", + "sha256:0713f3adb9d03d49d365b70b84775d0a0d18e4ab08d12bc46baa6132ba78aaf6", + "sha256:07afec21bbbbf8a5cc3651aa96b980afe2526e7f048fdfb7f1014d84acc8b6d8", + "sha256:0b309d1747110feb25d7ed6b01afdec269c647d382c857ef4663bbe6ad95a912", + "sha256:0d99dd8ff461990f12d6e42c7347fd9ab2532fb70e9621ba520f9e8637161d7c", + "sha256:0de7b687289d3c1b3e8660d0741874abe7888100efe14bd0f9fd7141bcbda92b", + "sha256:1110e22af8ca26b90bd6364fe4c763329b0ebf1ee213ba32b68c73de5752323d", + "sha256:130272c698667a982a5d0e626851ceff662565379baf0ff2cc58067b81d4f11d", + "sha256:136815f06a3ae311fae551c3df1f998a1ebd01ddd424aa5603a4336997629e95", + "sha256:14215b71a762336254351b00ec720a8e85cada43b987da5a042e4ce3e82bd68e", + "sha256:1db4e7fefefd0f548d73e2e2e041f9df5c59e178b4c72fbac4cc6f535cfb1565", + "sha256:1ffd9493de4c922f2a38c2bf62b831dcec90ac673ed1ca182fe11b4d8e9f2a64", + "sha256:2006769bd1640bdf4d5641c69a3d63b71b81445473cac5ded39740a226fa88ab", + "sha256:20587d20f557fe189b7947d8e7ec5afa110ccf72a3128d61a2a387c3313f46be", + "sha256:223217c3d4f82c3ac5e29032b3f1c2eb0fb591b72161f86d93f5719079dae93e", + "sha256:27623ba66c183eca01bf9ff833875b459cad267aeeb044477fedac35e19ba907", + "sha256:285e96d9d53422efc0d7a17c60e59f37fbf3dfa942073f666db4ac71e8d726d0", + "sha256:2de62e8801ddfff069cd5c504ce3bc9672b23266597d4e4f50eda28846c322f2", + "sha256:2f6c34da58ea9c1a9515621f4d9ac379871a8f21168ba1b5e09d74250de5ad62", + "sha256:309a7de0a0ff3040acaebb35ec45d18db4b28232f21998851cfa709eeff49d62", + "sha256:35c404d74c2926d0287fbd63ed5d27eb911eb9e4a3bb2c6d294f3cfd4a9e0c23", + "sha256:3710a9751938947e6327ea9f3ea6332a09bf0ba0c09cae9cb1f250bd1f1549bc", + "sha256:3d59d125ffbd6d552765510e3f31ed75ebac2c7470c7274195b9161a32350284", + "sha256:40d3ff7fc90b98c637bda91c89d51264a3dcf210cade3a2c6f838c7268d7a4ca", + "sha256:425c5f215d0eecee9a56cdb703203dda90423247421bf0d67125add85d0c4455", + "sha256:43193c5cda5d612f247172016c4bb71251c784d7a4d9314677186a838ad34858", + "sha256:44aeb140295a2f0659e113b31cfe92c9061622cadbc9e2a2f7b8ef6b1e29ef4b", + "sha256:47334db71978b23ebcf3c0f9f5ee98b8d65992b65c9c4f2d34c2eaf5bcaf0594", + "sha256:4796efc4faf6b53a18e3d46343535caed491776a22af773f366534056c4e1fbc", + "sha256:4a51b48f42d9358460b78725283f04bddaf44a9358197b889657deba38f329db", + "sha256:4b67fdab07fdd3c10bb21edab3cbfe8cf5696f453afce75d815d9d7223fbe88b", + "sha256:4ec9dd88a5b71abfc74e9df5ebe7921c35cbb3b641181a531ca65cdb5e8e4dea", + "sha256:4f9fc98dad6c2eaa32fc3af1417d95b5e3d08aff968df0cd320066def971f9a6", + "sha256:54b6a92d009cbe2fb11054ba694bc9e284dad30a26757b1e372a1fdddaf21920", + "sha256:55f56e2ebd4e3bc50442fbc0888c9d8c94e4e06a933804e2af3e89e2f9c1c749", + "sha256:5726cf76c982532c1863fb64d8c6dd0e4c90b6ece9feb06c9f202417a31f7dd7", + "sha256:5d447056e2ca60382d460a604b6302d8db69476fd2015c81e7c35417cfabe4cd", + "sha256:5ed2e36c3e9b4f21dd9422f6893dec0abf2cca553af509b10cd630f878d3eb99", + "sha256:5ff2ed8194587faf56555927b3aa10e6fb69d931e33953943bc4f837dfee2242", + "sha256:62f60aebecfc7f4b82e3f639a7d1433a20ec32824db2199a11ad4f5e146ef5ee", + "sha256:63bc5c4ae26e4bc6be6469943b8253c0fd4e4186c43ad46e713ea61a0ba49129", + "sha256:6b40e8d38afe634559e398cc32b1472f376a4099c75fe6299ae607e404c033b2", + "sha256:6b493a043635eb376e50eedf7818f2f322eabbaa974e948bd8bdd29eb7ef2a51", + "sha256:6dba5d19c4dfab08e58d5b36304b3f92f3bd5d42c1a3fa37b5ba5cdf6dfcbcee", + "sha256:6fd30dc99682dc2c603c2b315bded2799019cea829f8bf57dc6b61efde6611c8", + "sha256:707b82d19e65c9bd28b81dde95249b07bf9f5b90ebe1ef17d9b57473f8a64b7b", + "sha256:7706f5850360ac01d80c89bcef1640683cc12ed87f42579dab6c5d3ed6888613", + "sha256:7782afc9b6b42200f7362858f9e73b1f8316afb276d316336c0ec3bd73312742", + "sha256:79983512b108e4a164b9c8d34de3992f76d48cadc9554c9e60b43f308988aabe", + "sha256:7f683ddc7eedd742e2889d2bfb96d69573fde1d92fcb811979cdb7165bb9c7d3", + "sha256:82357d85de703176b5587dbe6ade8ff67f9f69a41c0733cf2425378b49954de5", + "sha256:84450ba661fb96e9fd67629b93d2941c871ca86fc38d835d19d4225ff946a631", + "sha256:86f4e8cca779080f66ff4f191a685ced73d2f72d50216f7112185dc02b90b9b7", + "sha256:8cda06946eac330cbe6598f77bb54e690b4ca93f593dee1568ad22b04f347c15", + "sha256:8ce7fd6767a1cc5a92a639b391891bf1c268b03ec7e021c7d6d902285259685c", + "sha256:8ff4e7cdfdb1ab5698e675ca622e72d58a6fa2a8aa58195de0c0061288e6e3ea", + "sha256:9289fd5dddcf57bab41d044f1756550f9e7cf0c8e373b8cdf0ce8773dc4bd417", + "sha256:92a7e36b000bf022ef3dbb9c46bfe2d52c047d5e3f3343f43204263c5addc250", + "sha256:92db3c28b5b2a273346bebb24857fda45601aef6ae1c011c0a997106581e8a88", + "sha256:95c3c157765b031331dd4db3c775e58deaee050a3042fcad72cbc4189d7c8dca", + "sha256:980b4f289d1d90ca5efcf07958d3eb38ed9c0b7676bf2831a54d4f66f9c27dfa", + "sha256:9ae4ef0b3f6b41bad6366fb0ea4fc1d7ed051528e113a60fa2a65a9abb5b1d99", + "sha256:9c98230f5042f4945f957d006edccc2af1e03ed5e37ce7c373f00a5a4daa6149", + "sha256:9fa2566ca27d67c86569e8c85297aaf413ffab85a8960500f12ea34ff98e4c41", + "sha256:a14969b8691f7998e74663b77b4c36c0337cb1df552da83d5c9004a93afdb574", + "sha256:a8aacce6e2e1edcb6ac625fb0f8c3a9570ccc7bfba1f63419b3769ccf6a00ed0", + "sha256:a8e538f46104c815be19c975572d74afb53f29650ea2025bbfaef359d2de2f7f", + "sha256:aa41e526a5d4a9dfcfbab0716c7e8a1b215abd3f3df5a45cf18a12721d31cb5d", + "sha256:aa693779a8b50cd97570e5a0f343538a8dbd3e496fa5dcb87e29406ad0299654", + "sha256:ab22fbd9765e6954bc0bcff24c25ff71dcbfdb185fcdaca49e81bac68fe724d3", + "sha256:ab2e5bef076f5a235c3774b4f4028a680432cded7cad37bba0fd90d64b187d19", + "sha256:ab973df98fc99ab39080bfb0eb3a925181454d7c3ac8a1e695fddfae696d9e90", + "sha256:af73657b7a68211996527dbfeffbb0864e043d270580c5aef06dc4b659a4b578", + "sha256:b197e7094f232959f8f20541ead1d9862ac5ebea1d58e9849c1bf979255dfac9", + "sha256:b295729485b06c1a0683af02a9e42d2caa9db04a373dc38a6a58cdd1e8abddf1", + "sha256:b8831399554b92b72af5932cdbbd4ddc55c55f631bb13ff8fe4e6536a06c5c51", + "sha256:b8dcd239c743aa2f9c22ce674a145e0a25cb1566c495928440a181ca1ccf6719", + "sha256:bcb4f8ea87d03bc51ad04add8ceaf9b0f085ac045ab4d74e73bbc2dc033f0236", + "sha256:bd7af3717683bea4c87acd8c0d3d5b44d56120b26fd3f8a692bdd2d5260c620a", + "sha256:bf4475b82be41b07cc5e5ff94810e6a01f276e37c2d55571e3fe175e467a1a1c", + "sha256:c3e446d253bd88f6377260d07c895816ebf33ffffd56c1c792b13bff9c3e1ade", + "sha256:c57516e58fd17d03ebe67e181a4e4e2ccab1168f8c2976c6a334d4f819fe5944", + "sha256:c94057af19bc953643a33581844649a7fdab902624d2eb739738a30e2b3e60fc", + "sha256:cab5d0b79d987c67f3b9e9c53f54a61360422a5a0bc075f43cab5621d530c3b6", + "sha256:ce031db0408e487fd2775d745ce30a7cd2923667cf3b69d48d219f1d8f5ddeb6", + "sha256:cee4373f4d3ad28f1ab6290684d8e2ebdb9e7a1b74fdc39e4c211995f77bec27", + "sha256:d5b054862739d276e09928de37c79ddeec42a6e1bfc55863be96a36ba22926f6", + "sha256:dbe03226baf438ac4fda9e2d0715022fd579cb641c4cf639fa40d53b2fe6f3e2", + "sha256:dc15e99b2d8a656f8e666854404f1ba54765871104e50c8e9813af8a7db07f12", + "sha256:dcaf7c1524c0542ee2fc82cc8ec337f7a9f7edee2532421ab200d2b920fc97cf", + "sha256:dd4eda173a9fcccb5f2e2bd2a9f423d180194b1bf17cf59e3269899235b2a114", + "sha256:dd9a8bd8900e65504a305bf8ae6fa9fbc66de94178c420791d0293702fce2df7", + "sha256:de7376c29d95d6719048c194a9cf1a1b0393fbe8488a22008610b0361d834ecf", + "sha256:e7fdd52961feb4c96507aa649550ec2a0d527c086d284749b2f582f2d40a2e0d", + "sha256:e91f541a85298cf35433bf66f3fab2a4a2cff05c127eeca4af174f6d497f0d4b", + "sha256:e9e3c4c9e1ed40ea53acf11e2a386383c3304212c965773704e4603d589343ed", + "sha256:ee803480535c44e7f5ad00788526da7d85525cfefaf8acf8ab9a310000be4b03", + "sha256:f09cb5a7bbe1ecae6e87901a2eb23e0256bb524a79ccc53eb0b7629fbe7677c4", + "sha256:f19c1585933c82098c2a520f8ec1227f20e339e33aca8fa6f956f6691b784e67", + "sha256:f1a2f519ae173b5b6a2c9d5fa3116ce16e48b3462c8b96dfdded11055e3d6365", + "sha256:f28f891ccd15c514a0981f3b9db9aa23d62fe1a99997512b0491d2ed323d229a", + "sha256:f3e73a4255342d4eb26ef6df01e3962e73aa29baa3124a8e824c5d3364a65748", + "sha256:f606a1881d2663630ea5b8ce2efe2111740df4b687bd78b34a8131baa007f79b", + "sha256:fe9f97feb71aa9896b81973a7bbada8c49501dc73e58a10fcef6663af95e5079", + "sha256:ffc519621dce0c767e96b9c53f09c5d215578e10b02c285809f76509a3931482" ], "markers": "python_full_version >= '3.7.0'", - "version": "==3.3.2" + "version": "==3.4.0" }, "cognite-sdk": { "hashes": [ - "sha256:6333c7f9cb020cdf2287dc9d63fc49be24c607dc58846caceaf24af2f12c3358", - "sha256:8d79630ee6c62d64460cc6a0f053163a815378da4ac5c17d8b00a8421ac79b8d" + "sha256:5ea34853afb140ffb51a86148bc9223913b9cf99a7dea4ee5ee8122e14fe06b4", + "sha256:b77411d824ac365993381d7199cf99229d0c5de64a71e5c24310a9b216ee14f4" ], "index": "pypi", - "markers": "python_version >= '3.8' and python_version < '4.0'", - "version": "==7.53.1" + "markers": "python_version >= '3.10' and python_version < '4.0'", + "version": "==7.63.5" }, "cryptography": { "hashes": [ @@ -233,25 +247,24 @@ "sha256:f98bf604c82c416bc829e490c700ca1553eafdf2912a91e23a79d97d9801372a", "sha256:fba1007b3ef89946dbbb515aeeb41e30203b004f0b4b00e5e16078b518563289" ], - "index": "pypi", "markers": "python_version >= '3.7'", "version": "==43.0.1" }, "idna": { "hashes": [ - "sha256:028ff3aadf0609c1fd278d8ea3089299412a7a8b9bd005dd08b9f8285bcb5cfc", - "sha256:82fee1fc78add43492d3a1898bfa6d8a904cc97d8427f683ed8e798d07761aa0" + "sha256:12f65c9b470abda6dc35cf8e63cc574b1c52b11df2c86030af0ac09b01b13ea9", + "sha256:946d195a0d259cbba61165e88e65941f16e9b36ea6ddb97f00452bae8b1287d3" ], - "markers": "python_version >= '3.5'", - "version": "==3.7" + "markers": "python_version >= '3.6'", + "version": "==3.10" }, "msal": { "hashes": [ - "sha256:6b301e63f967481f0cc1a3a3bac0cf322b276855bc1b0955468d9deb3f33d511", - "sha256:8f6725f099752553f9b2fe84125e2a5ebe47b49f92eacca33ebedd3a9ebaae25" + "sha256:2c4f189cf9cc8f00c80045f66d39b7c0f3ed45873fd3d1f2af9f22db2e12ff4b", + "sha256:96bc37cff82ebe4b160d5fc0f1196f6ca8b50e274ecd0ec5bf69c438514086e7" ], "markers": "python_version >= '3.7'", - "version": "==1.29.0" + "version": "==1.31.0" }, "oauthlib": { "hashes": [ @@ -263,20 +276,20 @@ }, "protobuf": { "hashes": [ - "sha256:0e341109c609749d501986b835f667c6e1e24531096cff9d34ae411595e26505", - "sha256:176c12b1f1c880bf7a76d9f7c75822b6a2bc3db2d28baa4d300e8ce4cde7409b", - "sha256:354d84fac2b0d76062e9b3221f4abbbacdfd2a4d8af36bab0474f3a0bb30ab38", - "sha256:4fadd8d83e1992eed0248bc50a4a6361dc31bcccc84388c54c86e530b7f58863", - "sha256:54330f07e4949d09614707c48b06d1a22f8ffb5763c159efd5c0928326a91470", - "sha256:610e700f02469c4a997e58e328cac6f305f649826853813177e6290416e846c6", - "sha256:7fc3add9e6003e026da5fc9e59b131b8f22b428b991ccd53e2af8071687b4fce", - "sha256:9e8f199bf7f97bd7ecebffcae45ebf9527603549b2b562df0fbc6d4d688f14ca", - "sha256:a109916aaac42bff84702fb5187f3edadbc7c97fc2c99c5ff81dd15dcce0d1e5", - "sha256:b848dbe1d57ed7c191dfc4ea64b8b004a3f9ece4bf4d0d80a367b76df20bf36e", - "sha256:f3ecdef226b9af856075f28227ff2c90ce3a594d092c39bee5513573f25e2714" + "sha256:2c69461a7fcc8e24be697624c09a839976d82ae75062b11a0972e41fd2cd9132", + "sha256:35cfcb15f213449af7ff6198d6eb5f739c37d7e4f1c09b5d0641babf2cc0c68f", + "sha256:52235802093bd8a2811abbe8bf0ab9c5f54cca0a751fdd3f6ac2a21438bffece", + "sha256:59379674ff119717404f7454647913787034f03fe7049cbef1d74a97bb4593f0", + "sha256:5e8a95246d581eef20471b5d5ba010d55f66740942b95ba9b872d918c459452f", + "sha256:87317e9bcda04a32f2ee82089a204d3a2f0d3c8aeed16568c7daf4756e4f1fe0", + "sha256:8ddc60bf374785fb7cb12510b267f59067fa10087325b8e1855b898a0d81d276", + "sha256:a8b9403fc70764b08d2f593ce44f1d2920c5077bf7d311fefec999f8c40f78b7", + "sha256:c0ea0123dac3399a2eeb1a1443d82b7afc9ff40241433296769f7da42d142ec3", + "sha256:ca53faf29896c526863366a52a8f4d88e69cd04ec9571ed6082fa117fac3ab36", + "sha256:eeea10f3dc0ac7e6b4933d32db20662902b4ab81bf28df12218aa389e9c2102d" ], "markers": "python_version >= '3.8'", - "version": "==5.27.2" + "version": "==5.28.2" }, "pycparser": { "hashes": [ @@ -291,11 +304,11 @@ "crypto" ], "hashes": [ - "sha256:57e28d156e3d5c10088e0c68abb90bfac3df82b40a71bd0daa20c65ccd5c23de", - "sha256:59127c392cc44c2da5bb3192169a91f429924e17aff6534d70fdc02ab3e04320" + "sha256:3b02fb0f44517787776cf48f2ae25d8e14f300e6d7545a4315cee571a415e850", + "sha256:7e1e5b56cc735432a7369cbfa0efe50fa113ebecdc04ae6922deba8b84582d0c" ], - "markers": "python_version >= '3.7'", - "version": "==2.8.0" + "markers": "python_version >= '3.8'", + "version": "==2.9.0" }, "requests": { "hashes": [ @@ -323,12 +336,11 @@ }, "urllib3": { "hashes": [ - "sha256:a448b2f64d686155468037e1ace9f2d2199776e17f0a46610480d311f73e3472", - "sha256:dd505485549a7a552833da5e6063639d0d177c04f23bc3864e41e5dc5f612168" + "sha256:ca899ca043dcb1bafa3e262d73aa25c465bfb49e0bd9dd5d59f1d0acba2f8fac", + "sha256:e7d814a81dad81e6caf2ec9fdedb284ecc9c73076b62654547cc64ccdcae26e9" ], - "index": "pypi", "markers": "python_version >= '3.8'", - "version": "==2.2.2" + "version": "==2.2.3" } }, "develop": {} diff --git a/src/test/scala/cognite/spark/v1/FileContentRelationTest.scala b/src/test/scala/cognite/spark/v1/FileContentRelationTest.scala new file mode 100644 index 000000000..9afc9c4ef --- /dev/null +++ b/src/test/scala/cognite/spark/v1/FileContentRelationTest.scala @@ -0,0 +1,278 @@ +package cognite.spark.v1 + +import cats.effect.unsafe.implicits.global +import cats.effect.{IO, Resource} +import com.cognite.sdk.scala.v1.FileCreate +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.types.{ArrayType, LongType, StringType, StructField, StructType} +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} +import sttp.client3.{HttpURLConnectionBackend, UriContext, basicRequest} + +import scala.collection.immutable._ +import java.net.MalformedURLException + +class FileContentRelationTest extends FlatSpec with Matchers with SparkTest with BeforeAndAfterAll { + val fileExternalId: String = "fileContentTransformationFile" + val fileExternalIdWithNestedJson: String = "fileContentTransformationFileNestedJson" + val fileExternalIdWithConflicts: String = "fileContentTransformationFileConflicts" + val fileWithWrongMimeTypeExternalId: String = "fileContentTransformationFileWrongMimeType" + val fileWithoutUploadExternalId: String = "fileWithoutUploadExternalId" + + override def beforeAll(): Unit = { + makeFile(fileExternalId).unsafeRunSync() + makeFile(fileExternalIdWithNestedJson, None, generateNdjsonDataNested).unsafeRunSync() + makeFile(fileExternalIdWithConflicts, None, generateNdjsonDataConflicting).unsafeRunSync() + + makeFile(fileWithWrongMimeTypeExternalId, Some("application/json")).unsafeRunSync()//bad mimetype + } + + +// uncomment for cleanups +// override def afterAll(): Unit = { +// writeClient.files.deleteByExternalIds(Seq( +// fileExternalId, +// fileExternalIdWithNestedJson, +// fileExternalIdWithConflicts, +// fileWithWrongMimeTypeExternalId, +// fileWithoutUploadExternalId +// )).unsafeRunSync() +// } + + val generateNdjsonData: String = { + val jsonObjects = List( + """{"name": "Alice", "age": 30}""", + """{"name": "Bob", "age": 25}""", + """{"name": "Charlie", "age": 35}""", + """{"name": "Charlie2", "age": 35, "test": "test"}""" + ) + jsonObjects.mkString("\n") + } + + val generateNdjsonDataNested: String = { + val jsonObjects = List( + """{"name": "Alice", "age": 30}""", + """{"name": "Bob", "age": 25}""", + """{"name": "Charlie", "age": 35}""", + """{"name": "Charlie2", "age": 35, "test": [{"key": "value", "key2": "value2"}, {"key": "value"}]}""" + ) + jsonObjects.mkString("\n") + } + + val generateNdjsonDataConflicting: String = { + val jsonObjects = List( + """{"name": "Alice", "age": 30}""", + """{"name": "Bob", "age": 25}""", + """{"name": "Charlie", "age": 35, "test": "content"}""", + """{"name": "Charlie2", "age": 35, "test": [{"key": "value", "key2": "value2"}, {"key": "value"}]}""" + ) + jsonObjects.mkString("\n") + } + + def makeFile(externalId: String, mimeType: Option[String] = Some("application/jsonlines"), content: String = generateNdjsonData): IO[Unit] = { + val toCreate: FileCreate = FileCreate( + name = "test file for file content transformation", + externalId = Some(externalId), + mimeType = mimeType, + ) + for { + existingFile <- writeClient.files.retrieveByExternalId(externalId).attempt + + // delete file if it was created but the actual file wasn't uploaded so we can get an uploadUrl + _ <- existingFile match { + case Right(value) if !value.uploaded => writeClient.files.deleteByExternalIds(Seq(externalId)) + case _ => IO.pure(()) + } + + file <- existingFile match { + case Right(value) if value.uploaded => IO.pure(value) + case _ => + writeClient.files.create(Seq(toCreate)).map(_.headOption.getOrElse(throw new IllegalStateException("could not upload file"))) + } + + _ <- { + if (!file.uploaded) { + val backend = Resource.make(IO(HttpURLConnectionBackend()))(b => IO(b.close())) + val request = basicRequest.put(uri"${file.uploadUrl.getOrElse(throw new MalformedURLException("bad url"))}") + .body(content) + backend.use { backend => + IO { + val response = request.send(backend) + if (response.code.isSuccess) { + println(s"NDJSON content uploaded successfully to ${file.uploadUrl}") + } else { + throw new Exception(s"Failed to upload content: ${response.statusText}") + } + } + } + } else { + IO.pure(()) + } + } + } yield () + } + + "file content transformations" should "read from a ndjson file" in { + val sourceDf: DataFrame = dataFrameReaderUsingOidc + .useOIDCWrite + .option("type", "filecontent") + .option("externalId", fileExternalId) + .option("inferSchema", true) + .load() + sourceDf.createOrReplaceTempView("fileContent") + val result = spark.sqlContext.sql(s"select * from filecontent").collect() + result.map(_.toSeq.toList) should contain theSameElementsAs + Array( + List(30, "Alice", null), + List(25, "Bob", null), + List(35, "Charlie", null), + List(35, "Charlie2", "test") + ) + } + + it should "infer the schema" in { + val sourceDf: DataFrame = dataFrameReaderUsingOidc + .useOIDCWrite + .option("type", "filecontent") + .option("inferSchema", value = true) + .option("externalId", fileExternalId) + .load() + sourceDf.createOrReplaceTempView("fileContent") + sourceDf.schema.fields should contain allElementsOf Seq( + StructField("name", StringType, nullable = true), + StructField("test", StringType, nullable = true), + StructField("age", LongType, nullable = true) + ) + } + + it should "infer the schema correctly with nested json" in { + val sourceDf: DataFrame = dataFrameReaderUsingOidc + .useOIDCWrite + .option("type", "filecontent") + .option("inferSchema", value = true) + .option("externalId", fileExternalIdWithNestedJson) + .load() + sourceDf.createOrReplaceTempView("fileContent") + sourceDf.schema.fields should contain allElementsOf Seq( + StructField("name", StringType, nullable = true), + StructField("test", + ArrayType( + StructType( + Seq( + StructField("key",StringType, nullable = true), + StructField("key2",StringType, nullable = true) + ) + ), + containsNull = true + ), + nullable = true + ), + StructField("age", LongType, nullable = true) + ) + } + + it should "infer the schema correctly with conflicting types" in { + val sourceDf: DataFrame = dataFrameReaderUsingOidc + .useOIDCWrite + .option("type", "filecontent") + .option("inferSchema", value = true) + .option("externalId", fileExternalIdWithConflicts) + .load() + sourceDf.createOrReplaceTempView("fileContent") + sourceDf.schema.fields should contain allElementsOf Seq( + StructField("name", StringType, nullable = true), + StructField("test", StringType, nullable = true), + StructField("age", LongType, nullable = true) + ) + } + + + it should "select specific columns" in { + val sourceDf: DataFrame = dataFrameReaderUsingOidc + .useOIDCWrite + .option("type", "filecontent") + .option("externalId", fileExternalId) + .option("inferSchema", true) + .load() + sourceDf.createOrReplaceTempView("fileContent") + val result = spark.sqlContext.sql(s"select name, test from filecontent").collect() + result.map(_.toSeq.toList) should contain theSameElementsAs + Seq( + List("Alice", null), + List("Bob", null), + List("Charlie", null), + List("Charlie2", "test") + ) + } + + it should "not infer schema if not asked to" in { + val sourceDf: DataFrame = dataFrameReaderUsingOidc + .useOIDCWrite + .option("type", "filecontent") + .option("externalId", fileExternalId) + .option("inferSchema", false) + .load() + sourceDf.createOrReplaceTempView("fileContent") + val result = spark.sqlContext.sql(s"select * from filecontent").collect() + + sourceDf.schema.fields should contain only StructField("value", StringType, nullable = true) + + result.map(_.toSeq.toList) should contain theSameElementsAs Array( + List("""{"name": "Alice", "age": 30}"""), + List("""{"name": "Bob", "age": 25}"""), + List("""{"name": "Charlie", "age": 35}"""), + List("""{"name": "Charlie2", "age": 35, "test": "test"}""") + ) + } + + it should "get size from endpoint and check for it" in { + val relation = new FileContentRelation( + getDefaultConfig(auth = CdfSparkAuth.OAuth2ClientCredentials(credentials = writeCredentials), projectName = OIDCWrite.project, cluster = OIDCWrite.cluster, applicationName = Some("jetfire-test")), + fileExternalId = fileExternalId, + true + )(spark.sqlContext) { + override val sizeLimit: Long = 100 + } + + val expectedMessage = "File size too big. SizeLimit: 100" + val exception = sparkIntercept { + relation.createDataFrame + } + withClue(s"Expected '$expectedMessage' but got: '${exception.getMessage}'") { + exception.getMessage.contains(expectedMessage) should be(true) + } + } + + it should "throw if the file was never uploaded" in { + + val toCreate: FileCreate = FileCreate( + name = "test file for file content transformation", + externalId = Some(fileWithoutUploadExternalId), + mimeType = None, + ) + val file = for { + existingFile <- writeClient.files.retrieveByExternalId(fileWithoutUploadExternalId).attempt + _ <- existingFile match { + case Right(value) if !value.uploaded => IO.pure(value) + case _ => + writeClient.files.create(Seq(toCreate)).map(_.headOption.getOrElse(throw new IllegalStateException("could not upload file"))) + } + } yield () + file.unsafeRunSync() + + val relation = new FileContentRelation( + getDefaultConfig(auth = CdfSparkAuth.OAuth2ClientCredentials(credentials = writeCredentials), projectName = OIDCWrite.project, cluster = OIDCWrite.cluster, applicationName = Some("jetfire-test")), + fileExternalId = fileWithoutUploadExternalId, + true + )(spark.sqlContext) + + val expectedMessage = "Could not read file because no file was uploaded for externalId: fileWithoutUploadExternalId" + val exception = sparkIntercept { + relation.createDataFrame + } + withClue(s"Expected '$expectedMessage' but got: '${exception.getMessage}'") { + exception.getMessage.contains(expectedMessage) should be(true) + } + } +} + + diff --git a/src/test/scala/cognite/spark/v1/RawTableRelationTest.scala b/src/test/scala/cognite/spark/v1/RawTableRelationTest.scala index 03999ccf4..c29d1ec03 100644 --- a/src/test/scala/cognite/spark/v1/RawTableRelationTest.scala +++ b/src/test/scala/cognite/spark/v1/RawTableRelationTest.scala @@ -5,7 +5,7 @@ import cats.implicits._ import cognite.spark.v1.CdpConnector.ioRuntime import com.cognite.sdk.scala.common.CdpApiException import com.cognite.sdk.scala.v1.{RawDatabase, RawRow, RawTable} -import io.circe.Json +import io.circe.{Json, JsonObject} import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ @@ -117,6 +117,16 @@ class RawTableRelationTest RawRow("k2", Map("bool" -> Json.fromBoolean(java.lang.Boolean.parseBoolean("true")))), RawRow("k3", Map("bool" -> Json.fromBoolean(false))) ) + private val dataWithNullFieldValue = Seq( + RawRow("k1", Map("toBeFiltered" -> Json.Null)), + RawRow("k2", Map("toBeFiltered" -> Json.Null, "notFiltered" -> Json.fromString("string"))), + RawRow("k3", Map("toBeFiltered" -> Json.fromString("but not here"), "notFiltered" -> Json.fromString("string2"))) + ) + private val dataWithEmptyColumn = Seq( + RawRow("k1", Map("toBeFiltered" -> Json.Null, "notFiltered" -> Json.fromString("string_k1"))), + RawRow("k2", Map("toBeFiltered" -> Json.Null, "notFiltered" -> Json.fromString("string_k2"))), + RawRow("k3", Map("toBeFiltered" -> Json.Null, "notFiltered" -> Json.fromString("string_k3"))), + ) case class TestTable(name: String, data: Seq[RawRow]) case class TestData(dbName: String, tables: Seq[TestTable]) @@ -137,6 +147,8 @@ class RawTableRelationTest TestTable("with-long-empty-str", dataWithEmptyStringInLongField), TestTable("with-number-empty-str", dataWithEmptyStringInDoubleField), TestTable("with-boolean-empty-str", dataWithEmptyStringInBooleanField), + TestTable("with-some-null-values", dataWithNullFieldValue), + TestTable("with-only-null-values-for-field", dataWithEmptyColumn), TestTable("cryptoAssets", (1 to 500).map(i => RawRow(i.toString, Map("i" -> Json.fromString("exist"))) )), @@ -237,7 +249,8 @@ class RawTableRelationTest table: String, database: String = "spark-test-database", inferSchema: Boolean = true, - metricsPrefix: Option[String] = None): DataFrame = { + metricsPrefix: Option[String] = None, + filterNullFields: Option[Boolean] = None): DataFrame = { val df = spark.read .format(DefaultSource.sparkFormatString) .useOIDCWrite @@ -248,6 +261,8 @@ class RawTableRelationTest .option("inferSchema", inferSchema) .option("inferSchemaLimit", "100") + filterNullFields.foreach(v => df.option("filterNullFieldsOnNonSchemaRawQueries", v.toString)) + metricsPrefix match { case Some(prefix) => df.option("collectMetrics", "true") @@ -894,4 +909,77 @@ class RawTableRelationTest err.getMessage shouldBe "Error while loading RAW row [key='k'] in column 'value': java.lang.NumberFormatException: For input string: \"test\"" } + + it should "filter out fields with null value but not impact schema inference" in { + val tableName = "with-some-null-values" + val df = rawRead(table = tableName, database = testData.dbName, inferSchema = true, filterNullFields = Some(true)) + df.count() shouldBe 3 + df.schema.fieldNames.toSet shouldBe Set("key", "lastUpdatedTime", "notFiltered", "toBeFiltered") + val items = RawJsonConverter.rowsToRawItems(df.columns, "key", df.collect().toSeq).map(r => (r.key, r.columns)).toMap + items("k1")("toBeFiltered") shouldBe Json.Null + items("k2")("toBeFiltered") shouldBe Json.Null + items("k2")("notFiltered") shouldBe Json.fromString("string") + items("k3")("toBeFiltered") shouldBe Json.fromString("but not here") + items("k3")("notFiltered") shouldBe Json.fromString("string2") + } + + it should "filter out columns completely when not inferring schema (confirming it filters from RAW)" in { + val tableName = "with-some-null-values" + val df = rawRead(table = tableName, database = testData.dbName, inferSchema = false, filterNullFields = Some(true)) + df.count() shouldBe 3 + val items = RawJsonConverter.rowsToRawItems(df.columns, "key", df.collect().toSeq).map(r => (r.key, r.columns)).toMap + items("k1")("columns") shouldBe Json.fromString("{}") + items("k2")("columns") shouldBe Json.fromString("{\"notFiltered\":\"string\"}") + val columnsParsed: JsonObject = parseColumns(items("k3")) + columnsParsed("notFiltered") shouldBe Some(Json.fromString("string2")) + columnsParsed("toBeFiltered") shouldBe Some(Json.fromString("but not here")) + } + + it should "return column in schema, even if every row has it filtered out" in { + val tableName = "with-only-null-values-for-field" + val df = rawRead(table = tableName, database = testData.dbName, inferSchema = true, filterNullFields = Some(true)) + df.count() shouldBe 3 + df.schema.fieldNames.toSet shouldBe Set("key", "lastUpdatedTime", "notFiltered", "toBeFiltered") + } + + it should "not filter out null column values when filtering is not set" in { + val tableName = "with-some-null-values" + // We run this without inferSchema, as schema would hide whether the fields are filtered or not. + val df = rawRead(table = tableName, database = testData.dbName, inferSchema = false) + df.count() shouldBe 3 + validateWhenFilteringIsNotEnabled(df) + } + + it should "not filter out null column values when filtering is explicitly disabled" in { + val tableName = "with-some-null-values" + // We run this without inferSchema, as schema would hide whether the fields are filtered or not. + val df = rawRead(table = tableName, database = testData.dbName, inferSchema = false, filterNullFields = Some(false)) + df.count() shouldBe 3 + validateWhenFilteringIsNotEnabled(df) + } + + private def validateWhenFilteringIsNotEnabled(df: DataFrame): Unit = { + val rows: Map[String, Map[String, Json]] = RawJsonConverter.rowsToRawItems(df.columns, "key", df.collect().toSeq) + .map(r => (r.key, r.columns)) + .toMap + + rows("k1")("columns") shouldBe Json.fromString("{\"toBeFiltered\":null}") + + val columnsParsedk2: JsonObject = parseColumns(rows("k2")) + columnsParsedk2("toBeFiltered") shouldBe Some(Json.Null) + columnsParsedk2("notFiltered") shouldBe Some(Json.fromString("string")) + + val columnsParsedk3 = parseColumns(rows("k3")) + columnsParsedk3("toBeFiltered") shouldBe Some(Json.fromString("but not here")) + columnsParsedk3("notFiltered") shouldBe Some(Json.fromString("string2")) + () + } + + private def parseColumns(row: Map[String, Json]): JsonObject = { + io.circe.parser.parse(row("columns").asString.get) match { + case Right(json) => json.asObject.get + case Left(error) => throw error + } + } + } diff --git a/src/test/scala/cognite/spark/v1/SparkTest.scala b/src/test/scala/cognite/spark/v1/SparkTest.scala index 1c5c33adf..f792b365b 100644 --- a/src/test/scala/cognite/spark/v1/SparkTest.scala +++ b/src/test/scala/cognite/spark/v1/SparkTest.scala @@ -246,11 +246,11 @@ trait SparkTest { val updateAndUpsert: TableFor1[String] = Table(heading = "mode", "upsert", "update") - def getDefaultConfig(auth: CdfSparkAuth, projectName: String): RelationConfig = + def getDefaultConfig(auth: CdfSparkAuth, projectName: String, cluster: String = "api", applicationName: Option[String] = Some("SparkDatasourceTestApp")): RelationConfig = RelationConfig( auth = auth, clientTag = Some("SparkDatasourceTestTag"), - applicationName = Some("SparkDatasourceTestApp"), + applicationName = applicationName, projectName = projectName, batchSize = Some(Constants.DefaultBatchSize), limitPerPartition = None, @@ -262,7 +262,7 @@ trait SparkTest { collectTestMetrics = false, metricsTrackAttempts = false, metricsPrefix = "", - baseUrl = Constants.DefaultBaseUrl, + baseUrl = s"https://${cluster}.cognitedata.com", onConflict = OnConflictOption.Abort, applicationId = spark.sparkContext.applicationId, parallelismPerPartition = Constants.DefaultParallelismPerPartition, @@ -273,7 +273,8 @@ trait SparkTest { rawEnsureParent = false, enableSinglePartitionDeleteAssetHierarchy = false, tracingParent = new Kernel(Map.empty), - useSharedThrottle = false + useSharedThrottle = false, + serverSideFilterNullValuesOnNonSchemaRawQueries = false ) private def getCounterSafe(metricsNamespace: String, resource: String): Option[Long] = { diff --git a/src/test/scala/cognite/spark/v1/SttpClientBackendFactoryTest.scala b/src/test/scala/cognite/spark/v1/SttpClientBackendFactoryTest.scala index d4c55f01a..80e0fccd3 100644 --- a/src/test/scala/cognite/spark/v1/SttpClientBackendFactoryTest.scala +++ b/src/test/scala/cognite/spark/v1/SttpClientBackendFactoryTest.scala @@ -16,5 +16,4 @@ class SttpClientBackendFactoryTest extends FlatSpec with Matchers { asyncHttpClient.close() asyncHttpClient.isClosed shouldBe true } - }