diff --git a/build.sbt b/build.sbt index 84318f379..caae45674 100644 --- a/build.sbt +++ b/build.sbt @@ -28,7 +28,7 @@ lazy val commonSettings = Seq( organization := "com.cognite.spark.datasource", organizationName := "Cognite", organizationHomepage := Some(url("https://cognite.com")), - version := "3.4." + patchVersion, + version := "3.5." + patchVersion, isSnapshot := patchVersion.endsWith("-SNAPSHOT"), crossScalaVersions := supportedScalaVersions, semanticdbEnabled := true, diff --git a/src/main/scala/cognite/spark/v1/wdl/JsonObjectToRow.scala b/src/main/scala/cognite/spark/v1/wdl/JsonObjectToRow.scala index bd49c0cc1..9affc3ee1 100644 --- a/src/main/scala/cognite/spark/v1/wdl/JsonObjectToRow.scala +++ b/src/main/scala/cognite/spark/v1/wdl/JsonObjectToRow.scala @@ -5,6 +5,7 @@ import io.circe.{Json, JsonObject} import org.apache.spark.sql.Row import org.apache.spark.sql.types._ +@deprecated("wdl support is deprecated", since = "0") object JsonObjectToRow { implicit class RequiredOption[T](optionValue: Option[T]) { def orThrow(structFieldName: String, nullable: Boolean, dataType: DataType): Option[T] = diff --git a/src/main/scala/cognite/spark/v1/wdl/RowToJson.scala b/src/main/scala/cognite/spark/v1/wdl/RowToJson.scala index 2d1278905..fb0c52997 100644 --- a/src/main/scala/cognite/spark/v1/wdl/RowToJson.scala +++ b/src/main/scala/cognite/spark/v1/wdl/RowToJson.scala @@ -14,6 +14,7 @@ import java.time.{Instant, LocalDate} import scala.collection.mutable // sscalastyle:off +@deprecated("wdl support is deprecated", since = "0") object RowToJson { /** diff --git a/src/main/scala/cognite/spark/v1/wdl/WdlExceptions.scala b/src/main/scala/cognite/spark/v1/wdl/WdlExceptions.scala index 4f272a08e..00d400c9c 100644 --- a/src/main/scala/cognite/spark/v1/wdl/WdlExceptions.scala +++ b/src/main/scala/cognite/spark/v1/wdl/WdlExceptions.scala @@ -3,10 +3,12 @@ package cognite.spark.v1.wdl import cognite.spark.v1.CdfSparkException import org.apache.spark.sql.types.DataType +@deprecated("wdl support is deprecated", since = "0") class RequiredFieldIsNullException(val structFieldName: String, val dataType: DataType) extends CdfSparkException( s"Required field `${structFieldName}` of type `${dataType.typeName}` should not be NULL.") +@deprecated("wdl support is deprecated", since = "0") class WrongFieldTypeException( val structFieldName: String, val expectedDataType: DataType, diff --git a/src/test/scala/cognite/spark/v1/wdl/RowEquality.scala b/src/test/scala/cognite/spark/v1/wdl/RowEquality.scala deleted file mode 100644 index 60e9f5c50..000000000 --- a/src/test/scala/cognite/spark/v1/wdl/RowEquality.scala +++ /dev/null @@ -1,19 +0,0 @@ -package cognite.spark.v1.wdl - -import org.apache.spark.sql.Row -import org.scalactic.Equality - -private[wdl] object RowEquality { - implicit val rowEq: Equality[Row] = - (a: Row, b: Any) => - b match { - case p: Row => - val fieldNames = a.schema.fieldNames - if (fieldNames.toSet == p.schema.fieldNames.toSet) { - a.getValuesMap(fieldNames.toSeq) == p.getValuesMap(fieldNames.toSeq) - } else { - false - } - case _ => false - } -} diff --git a/src/test/scala/cognite/spark/v1/wdl/RowToJsonTest.scala b/src/test/scala/cognite/spark/v1/wdl/RowToJsonTest.scala deleted file mode 100644 index 95d072476..000000000 --- a/src/test/scala/cognite/spark/v1/wdl/RowToJsonTest.scala +++ /dev/null @@ -1,243 +0,0 @@ -package cognite.spark.v1.wdl - -import cognite.spark.v1.{CdfSparkException, StructTypeEncoder} -import io.circe.Printer -import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema -import org.apache.spark.sql.types._ -import org.scalatest.{FlatSpec, Matchers, ParallelTestExecution} - -import cognite.spark.compiletime.macros.SparkSchemaHelper._ - -class RowToJsonTest extends FlatSpec with Matchers with ParallelTestExecution { - import cognite.spark.compiletime.macros.StructTypeEncoderMacro._ - - it should "convert a Row with number types into a JsonObject" in { - val schema = new StructType() - .add("double", DoubleType) - .add("float", FloatType) - .add("big decimal", DecimalType(10, 10)) - .add("int", IntegerType) - .add("long", LongType) - - val input: Row = new GenericRowWithSchema( - Array( - 10.0, - 11.0f, - BigDecimal(12.2), - 13, - 14L - ), - schema - ) - - val json = RowToJson.toJson(input, schema) - val actual = json.printWith(Printer.spaces2.withSortedKeys) - val expected = - """{ - | "big decimal" : 12.2, - | "double" : 10.0, - | "float" : 11.0, - | "int" : 13, - | "long" : 14 - |}""".stripMargin - assert(actual == expected) - } - - it should "convert a Row with nullable types into a JsonObject" in { - val schema = new StructType() - .add("double", DoubleType) - .add("float", FloatType) - .add("big decimal", DecimalType(10, 10)) - .add("int", IntegerType) - .add("long", LongType) - .add("string", StringType) - - val input: Row = new GenericRowWithSchema( - Array( - null, - null, - null, - null, - null, - null, - ), - schema - ) - - val json = RowToJson.toJson(input, schema) - val actual = json.printWith(Printer.spaces2.withSortedKeys) - val expected = - """{ - | "big decimal" : null, - | "double" : null, - | "float" : null, - | "int" : null, - | "long" : null, - | "string" : null - |}""".stripMargin - assert(actual == expected) - } - - it should "fail to convert a Row with non-nullable types into a JsonObject" in { - val schema = new StructType() - .add("double", DoubleType, nullable = false) - .add("float", FloatType, nullable = false) - .add("big decimal", DecimalType(10, 10), nullable = false) - .add("int", IntegerType, nullable = false) - .add("long", LongType, nullable = false) - .add("string", StringType, nullable = false) - - val input: Row = new GenericRowWithSchema( - Array( - null, - null, - null, - null, - null, - null, - ), - schema - ) - - val expectedException = intercept[CdfSparkException] { - RowToJson.toJsonObject(input, schema, None) - } - - assert(expectedException.getMessage.contains(" should not be NULL.")) - } - - it should "convert a Row with nested StructTypes into a JsonObject" in { - val sourceSchema = new StructType() - .add("assetExternalId", StringType) - .add("sourceName", StringType) - - val schema = new StructType() - .add("name", StringType) - .add("source", sourceSchema) - - val input: Row = new GenericRowWithSchema( - Array( - "MyName", - new GenericRowWithSchema(Array("MyAssetExternalId", "MySourceName"), sourceSchema), - ), - schema - ) - - val json = RowToJson.toJson(input, schema) - val actual = json.printWith(Printer.spaces2.withSortedKeys) - val expected = - """{ - | "name" : "MyName", - | "source" : { - | "assetExternalId" : "MyAssetExternalId", - | "sourceName" : "MySourceName" - | } - |}""".stripMargin - assert(actual == expected) - } - - it should "convert a Row with array of StructTypes into a JsonObject" in { - val sourceSchema = new StructType() - .add("assetExternalId", StringType) - .add("sourceName", StringType) - - val schema = new StructType() - .add("name", StringType) - .add("sources", ArrayType(sourceSchema)) - - val input: Row = new GenericRowWithSchema( - Array( - "MyName", - Array(new GenericRowWithSchema(Array("MyAssetExternalId", "MySourceName"), sourceSchema)), - ), - schema - ) - - val json = RowToJson.toJson(input, schema) - val actual = json.printWith(Printer.spaces2.withSortedKeys) - val expected = - """{ - | "name" : "MyName", - | "sources" : [ - | { - | "assetExternalId" : "MyAssetExternalId", - | "sourceName" : "MySourceName" - | } - | ] - |}""".stripMargin - assert(actual == expected) - } - - it should "give good error message if String is instead instead of struct" in { - case class Person(name: String, age: Double) // { name: age: 23.0 } - case class InnerInput(age: Double) - case class PersonInput(name: String, age: InnerInput) // { name: age: { age: 23.0 } } - - val targetSchema = structType[Person]() - val inputRow = new GenericRowWithSchema( - Array( - "Ola", - new GenericRowWithSchema(Array(23.0), structType[InnerInput]()) - ), - structType[PersonInput]() - ) - val error = intercept[CdfSparkException] { - RowToJson.toJson(inputRow, targetSchema) - } - error.getMessage should include("Field `age` with expected type `DoubleType` contains invalid value: `[23.0]`.") - } - - it should "give good error message when required value is None" in { - - case class Person(name: String, age: Double) - case class PersonInput(name: String, age: Option[Double]) - - val targetSchema = structType[Person]() - val inputRow = new GenericRowWithSchema(Array("Ola Nordmann", None), structType[PersonInput]()) - val error = intercept[CdfSparkException] { - RowToJson.toJson(inputRow, targetSchema) - } - - error.getMessage should include("Required field `age` of type `double` should not be NULL.") - } - - it should "give good error message when required value is not defined" in { - case class Person(name: String, age: Double) - case class PersonInput(name: String) - - val targetSchema = structType[Person]() - val inputRow = new GenericRowWithSchema(Array("Ola Nordmann"), structType[PersonInput]()) - val error = intercept[CdfSparkException] { - RowToJson.toJson(inputRow, targetSchema) - } - - error.getMessage should include("Required field `age` of type `double` should not be NULL.") - } - - it should "give good error message when nested required value is not defined" in { - val targetSchema = new StructType() - .add("name", StringType, nullable = false) - .add( - "address", - new StructType() - .add("address", StringType, nullable = false) - .add("country", StringType, nullable = false)) - - val addressSchema = new StructType() - .add("address", StringType, nullable = false) - val schema = new StructType() - .add("name", StringType, nullable = false) - .add("address", addressSchema, nullable = false) - - val inputRow = new GenericRowWithSchema( - Array("My name", new GenericRowWithSchema(Array("My address"), addressSchema)), - schema) - val error = intercept[CdfSparkException] { - RowToJson.toJson(inputRow, targetSchema) - } - - assert( - error.getMessage.contains("Required field `address.country` of type `string` should not be NULL")) - } -} diff --git a/src/test/scala/cognite/spark/v1/wdl/WdlModelsTest.scala b/src/test/scala/cognite/spark/v1/wdl/WdlModelsTest.scala deleted file mode 100644 index 3337b8f69..000000000 --- a/src/test/scala/cognite/spark/v1/wdl/WdlModelsTest.scala +++ /dev/null @@ -1,66 +0,0 @@ -package cognite.spark.v1.wdl - -import cognite.spark.v1.{CdfSparkException, SparkTest} -import org.scalatest.{FlatSpec, Matchers} - -@deprecated("wdl support is deprecated", since = "0") -class WdlModelsTest extends FlatSpec with Matchers with SparkTest { - it should "get from ingestion name" in { - val wellSource = WdlModels.fromIngestionSchemaName("NptIngestion") - wellSource.shortName should be("nptevents") - wellSource.ingest.map(_.schemaName) should be(Some("NptIngestion")) - } - - it should "get from short name" in { - val dm = WdlModels.fromShortName("depthmeasurements") - dm.shortName should be("depthmeasurements") - dm.ingest.map(_.schemaName) should be(Some("DepthMeasurementIngestion")) - } - - it should "get from short name: case insensitive" in { - val dm = WdlModels.fromShortName("depthMeasurements") - dm.shortName should be("depthmeasurements") - } - - it should "get from retrieval name" in { - val nds = WdlModels.fromRetrievalSchemaName("Nds") - nds.retrieve.schemaName should be("Nds") - nds.shortName should be("ndsevents") - } - - it should "give a good error message if the ingestion schema name doesn't exist" in { - val error = intercept[CdfSparkException] { - WdlModels.fromIngestionSchemaName("Npt") - } - error.getMessage should include("Invalid schema name: `Npt`. The valid options are") - error.getMessage should include("NptIngestion") - } - - it should "give a good error message if the short name doesn't exist" in { - val error = intercept[CdfSparkException] { - WdlModels.fromShortName("NptIngestion") - } - error.getMessage should include( - "Invalid well data layer data type: `NptIngestion`. The valid options are") - error.getMessage should include("nptevents") - } - - it should "give a good error message if the retrieval schema name doesn't exist" in { - val error = intercept[CdfSparkException] { - WdlModels.fromRetrievalSchemaName("NptIngestion") - } - error.getMessage should include("Invalid schema name: `NptIngestion`. The valid options are") - error.getMessage should include("Npt") - } - - it should "give 200 OK for all schema names" in { - import cognite.spark.v1.CdpConnector._ - for (model <- WdlModels.models) { - model.ingest match { - case Some(k) => writeClient.wdl.getSchema(k.schemaName).unsafeRunSync() - case None => - } - writeClient.wdl.getSchema(model.retrieve.schemaName).unsafeRunSync() - } - } -}