Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQueryIO tests #5599

Merged
merged 3 commits into from
Feb 6, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.avro.generic.GenericRecord
import org.apache.beam.sdk.Pipeline.PipelineVisitor
import org.apache.beam.sdk.io.gcp.{bigquery => beam}
import org.apache.beam.sdk.io.Read
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{CreateDisposition, Method}
import org.apache.beam.sdk.runners.TransformHierarchy
import org.apache.beam.sdk.transforms.display.DisplayData
import org.apache.beam.sdk.values.PValue
Expand Down Expand Up @@ -153,6 +153,49 @@ final class BigQueryIOTest extends ScioIOSpec {
unconsumedReads(context) shouldBe empty
}

it should "work with streaming inserts for typed and tablerow elements in batch" in {
val sc = ScioContext()
val spec = Table.Spec("project:dataset.dummy")
sc.empty[BQRecord]().saveAsTypedBigQueryTable(spec)
sc.empty[TableRow]()
.saveAsBigQueryTable(spec, createDisposition = CreateDisposition.CREATE_NEVER)
}

it should "work with streaming inserts for typed and tablerow elements in streaming" in {
val sc = ScioContext()
val spec = Table.Spec("project:dataset.dummy")
val typedStream = testStreamOf[BQRecord].advanceWatermarkToInfinity()
val trStream = testStreamOf[TableRow].advanceWatermarkToInfinity()
sc.testStream(typedStream).saveAsTypedBigQueryTable(spec)
sc.testStream(trStream)
.saveAsBigQueryTable(spec, createDisposition = CreateDisposition.CREATE_NEVER)
}

it should "fail with streaming inserts for generic records in batch" in {
an[IllegalArgumentException] shouldBe thrownBy {
implicit val grCoder: Coder[GenericRecord] = avroGenericRecordCoder
ScioContext()
.empty[GenericRecord]()
.saveAsBigQueryTable(
Table.Spec("project:dataset.dummy"),
createDisposition = CreateDisposition.CREATE_NEVER,
method = Method.STREAMING_INSERTS
)
}
}

it should "fail with streaming inserts for generic records in streaming" in {
an[IllegalArgumentException] shouldBe thrownBy {
implicit val grCoder: Coder[GenericRecord] = avroGenericRecordCoder
ScioContext()
.testStream(testStreamOf[GenericRecord].advanceWatermarkToInfinity())
.saveAsBigQueryTable(
Table.Spec("project:dataset.dummy"),
createDisposition = CreateDisposition.CREATE_NEVER
)
}
}

it should "read the same input table with different predicate and projections using bigQueryStorage" in {

JobTest[JobWithDuplicateInput.type]
Expand Down