Skip to content

Commit

Permalink
Use TableRow load API for typed BigQuery tables
Browse files Browse the repository at this point in the history
  • Loading branch information
clairemcginty committed Feb 6, 2025
1 parent 57dbbad commit 4d5a5a6
Showing 1 changed file with 6 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -741,31 +741,12 @@ object BigQueryTyped {
override type ReadP = Unit
override type WriteP = Table.WriteParam[T]

private val underlying = {
val readFn = Functions.serializableFn[SchemaAndRecord, T] { x =>
BigQueryType[T].fromAvro(x.getRecord)
}
val writeFn = Functions.serializableFn[AvroWriteRequest[T], GenericRecord] { x =>
BigQueryType[T].toAvro(x.getElement)
}
val schemaFactory = Functions.serializableFn[TableSchema, org.apache.avro.Schema] { _ =>
BigQueryType[T].avroSchema
}
val parseFn = (r: GenericRecord, _: TableSchema) => BigQueryType[T].fromAvro(r)

BigQueryTypedTable[T](
beam.BigQueryIO
.read(readFn)
.useAvroLogicalTypes(),
beam.BigQueryIO
.write[T]()
.withAvroFormatFunction(writeFn)
.withAvroSchemaFactory(schemaFactory)
.useAvroLogicalTypes(),
table,
parseFn
)
}
private val underlying = BigQueryTypedTable[T](
(i: SchemaAndRecord) => BigQueryType[T].fromAvro(i.getRecord),
BigQueryType[T].toTableRow,
BigQueryType[T].fromTableRow,
table
)

override def testId: String = s"BigQueryIO(${table.spec})"

Expand Down

0 comments on commit 4d5a5a6

Please sign in to comment.