Skip to content

Commit

Permalink
Revert "Handle TableRow json specifically (#5589)"
Browse files Browse the repository at this point in the history
This reverts commit 9aeba0d.
  • Loading branch information
clairemcginty committed Feb 7, 2025
1 parent 158dce1 commit 592fbac
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,57 @@ final private[client] class TableOps(client: Client) {
def exists(tableSpec: String): Boolean =
exists(bq.BigQueryHelpers.parseTableSpec(tableSpec))

/**
* This is annoying but the GCP BQ client v2 does not accept BQ json rows in the same format as BQ
* load. JSON column are expected as string instead of parsed json
*/
private def normalizeRows(schema: TableSchema)(tableRow: TableRow): TableRow =
normalizeRows(schema.getFields.asScala.toList)(tableRow)

private def normalizeRows(fields: List[TableFieldSchema])(tableRow: TableRow): TableRow = {
import com.spotify.scio.bigquery._

fields.foldLeft(tableRow) { (row, f) =>
f.getType match {
case "JSON" =>
val name = f.getName
f.getMode match {
case "REQUIRED" =>
row.set(name, row.getJson(name).wkt)
case "NULLABLE" =>
row.getJsonOpt(name).fold(row) { json =>
row.set(name, json.wkt)
}
case "REPEATED" =>
row.set(name, row.getJsonList(name).map(_.wkt).asJava)
}
case "RECORD" | "STRUCT" =>
val name = f.getName
val netedFields = f.getFields.asScala.toList
f.getMode match {
case "REQUIRED" =>
row.set(name, normalizeRows(netedFields)(row.getRecord(name)))
case "NULLABLE" =>
row.getRecordOpt(name).fold(row) { nestedRow =>
row.set(name, normalizeRows(netedFields)(nestedRow))
}
case "REPEATED" =>
row.set(
name,
row
.getRecordList(name)
.map { nestedRow =>
normalizeRows(netedFields)(nestedRow)
}
.asJava
)
}
case _ =>
row
}
}
}

/** Write rows to a table. */
def writeRows(
tableReference: TableReference,
Expand Down Expand Up @@ -262,7 +313,7 @@ final private[client] class TableOps(client: Client) {
case WriteDisposition.WRITE_APPEND =>
}

service.insertAll(tableReference, rows.asJava)
service.insertAll(tableReference, rows.map(normalizeRows(schema)).asJava)
}

/** Write rows to a table. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,10 @@ object TableRowOps {
}

def json(value: AnyRef): Json = value match {
case x: Json => x
case x: String => Json(x)
case _ => throw new UnsupportedOperationException("Cannot convert to json: " + value)
case x: Json => x
case x: TableRow => Json(x)
case x: String => Json(x)
case _ => throw new UnsupportedOperationException("Cannot convert to json: " + value)
}

def bignumeric(value: AnyRef): BigNumeric = value match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ final class ConverterProviderSpec
value <- Gen.alphaStr
} yield Json(s"""{"$key":"$value"}""")
)

implicit val eqByteArrays: Eq[Array[Byte]] = Eq.instance[Array[Byte]](_.toList == _.toList)
implicit val eqByteString: Eq[ByteString] = Eq.instance[ByteString](_ == _)
implicit val eqInstant: Eq[Instant] = Eq.instance[Instant](_ == _)
Expand Down

0 comments on commit 592fbac

Please sign in to comment.