diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala index 5f391d2126..976c6770e3 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala @@ -741,31 +741,12 @@ object BigQueryTyped { override type ReadP = Unit override type WriteP = Table.WriteParam[T] - private val underlying = { - val readFn = Functions.serializableFn[SchemaAndRecord, T] { x => - BigQueryType[T].fromAvro(x.getRecord) - } - val writeFn = Functions.serializableFn[AvroWriteRequest[T], GenericRecord] { x => - BigQueryType[T].toAvro(x.getElement) - } - val schemaFactory = Functions.serializableFn[TableSchema, org.apache.avro.Schema] { _ => - BigQueryType[T].avroSchema - } - val parseFn = (r: GenericRecord, _: TableSchema) => BigQueryType[T].fromAvro(r) - - BigQueryTypedTable[T]( - beam.BigQueryIO - .read(readFn) - .useAvroLogicalTypes(), - beam.BigQueryIO - .write[T]() - .withAvroFormatFunction(writeFn) - .withAvroSchemaFactory(schemaFactory) - .useAvroLogicalTypes(), - table, - parseFn - ) - } + private val underlying = BigQueryTypedTable[T]( + (i: SchemaAndRecord) => BigQueryType[T].fromAvro(i.getRecord), + BigQueryType[T].toTableRow, + BigQueryType[T].fromTableRow, + table + ) override def testId: String = s"BigQueryIO(${table.spec})"