diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/TableOps.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/TableOps.scala index ec7799d505..23230c1561 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/TableOps.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/TableOps.scala @@ -228,6 +228,57 @@ final private[client] class TableOps(client: Client) { def exists(tableSpec: String): Boolean = exists(bq.BigQueryHelpers.parseTableSpec(tableSpec)) + /** + * This is annoying but the GCP BQ client v2 does not accept BQ json rows in the same format as BQ + * load. JSON column are expected as string instead of parsed json + */ + private def normalizeRows(schema: TableSchema)(tableRow: TableRow): TableRow = + normalizeRows(schema.getFields.asScala.toList)(tableRow) + + private def normalizeRows(fields: List[TableFieldSchema])(tableRow: TableRow): TableRow = { + import com.spotify.scio.bigquery._ + + fields.foldLeft(tableRow) { (row, f) => + f.getType match { + case "JSON" => + val name = f.getName + f.getMode match { + case "REQUIRED" => + row.set(name, row.getJson(name).wkt) + case "NULLABLE" => + row.getJsonOpt(name).fold(row) { json => + row.set(name, json.wkt) + } + case "REPEATED" => + row.set(name, row.getJsonList(name).map(_.wkt).asJava) + } + case "RECORD" | "STRUCT" => + val name = f.getName + val netedFields = f.getFields.asScala.toList + f.getMode match { + case "REQUIRED" => + row.set(name, normalizeRows(netedFields)(row.getRecord(name))) + case "NULLABLE" => + row.getRecordOpt(name).fold(row) { nestedRow => + row.set(name, normalizeRows(netedFields)(nestedRow)) + } + case "REPEATED" => + row.set( + name, + row + .getRecordList(name) + .map { nestedRow => + normalizeRows(netedFields)(nestedRow) + } + .asJava + ) + } + case _ => + row + } + } + } + /** Write rows to a table. */ def writeRows( tableReference: TableReference, @@ -262,7 +313,7 @@ final private[client] class TableOps(client: Client) { case WriteDisposition.WRITE_APPEND => } - service.insertAll(tableReference, rows.asJava) + service.insertAll(tableReference, rows.map(normalizeRows(schema)).asJava) } /** Write rows to a table. */ diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/TableRowSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/TableRowSyntax.scala index bd56f5dae8..69283fdf0b 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/TableRowSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/TableRowSyntax.scala @@ -121,9 +121,10 @@ object TableRowOps { } def json(value: AnyRef): Json = value match { - case x: Json => x - case x: String => Json(x) - case _ => throw new UnsupportedOperationException("Cannot convert to json: " + value) + case x: Json => x + case x: TableRow => Json(x) + case x: String => Json(x) + case _ => throw new UnsupportedOperationException("Cannot convert to json: " + value) } def bignumeric(value: AnyRef): BigNumeric = value match { diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala index 580fb6f092..db927444fa 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala @@ -59,6 +59,7 @@ final class ConverterProviderSpec value <- Gen.alphaStr } yield Json(s"""{"$key":"$value"}""") ) + implicit val eqByteArrays: Eq[Array[Byte]] = Eq.instance[Array[Byte]](_.toList == _.toList) implicit val eqByteString: Eq[ByteString] = Eq.instance[ByteString](_ == _) implicit val eqInstant: Eq[Instant] = Eq.instance[Instant](_ == _)