Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failed to update table schema #289

Open
thienmcff opened this issue Jul 3, 2020 · 6 comments
Open

Failed to update table schema #289

thienmcff opened this issue Jul 3, 2020 · 6 comments

Comments

@thienmcff
Copy link

I have Postgres source connector and Bigquery sink connector, also Schema Registry. I add new field into a table in Postgres and insert a row into the table. Postgres connector detects the changes, but Bigquery failed to update schema (autoUpdateSchamas is true):

[2020-07-03 02:35:58,704] ERROR WorkerSinkTask{id=bq-core-field-connector-1} Commit of offsets threw an unexpected exception for sequence number 96048: null (org.apache.kafka.connect.runtime.WorkerSinkTask:259)
com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Some write threads encountered unrecoverable errors: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; See logs for more detail
at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.awaitCurrentTasks(KCBQThreadPoolExecutor.java:97)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:119)
at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:379)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:208)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-07-03 02:35:58,713] INFO Putting 19 records in the sink. (com.wepay.kafka.connect.bigquery.BigQuerySinkTask:170)
[2020-07-03 02:35:58,874] INFO Putting 29 records in the sink. (com.wepay.kafka.connect.bigquery.BigQuerySinkTask:170)
[2020-07-03 02:35:59,229] INFO Attempting to update table GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=BQ_CORE_FIELD_ALL_SCHEMAS, tableId=core_yga_ed_core_data_device}} with schema Schema{fields=[Field{name=before, type=RECORD, mode=NULLABLE, description=null}, Field{name=after, type=RECORD, mode=NULLABLE, description=null}, Field{name=source, type=RECORD, mode=REQUIRED, description=null}, Field{name=op, type=STRING, mode=REQUIRED, description=null}, Field{name=ts_ms, type=INTEGER, mode=NULLABLE, description=null}, Field{name=transaction, type=RECORD, mode=NULLABLE, description=null}]} (com.wepay.kafka.connect.bigquery.SchemaManager:60)
[2020-07-03 02:35:59,915] ERROR Task failed with com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException error: Failed to update table schema for: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=BQ_CORE_FIELD_ALL_SCHEMAS, tableId=core_yga_ed_core_data_device}} (com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor:68)

@C0urante
Copy link
Collaborator

C0urante commented Jul 3, 2020

@mtthien-tma what's the name of the column you added to your Postgres table?

@thienmcff
Copy link
Author

@C0urante I added 'priority' column to the table.

@C0urante
Copy link
Collaborator

C0urante commented Jul 3, 2020

@mtthien-tma that doesn't appear to be a name of a column in the BigQuery schema in the log you provided. Are you sure that's the only column? The ones listed are before, after, source,op, ts_ms, and transaction.

@thienmcff
Copy link
Author

I am sure that is the field. It should be nested in before and after RECORD. It seems that the log does not show all the fields in RECORD type.

@C0urante
Copy link
Collaborator

C0urante commented Jul 6, 2020

Ah, gotcha. You mentioned you're using Schema Registry; can you provide the Avro schema for the data that's breaking on the connector?

I should let you know--right now I suspect that a non-backwards-compatible change took place on the schema of the data (probably the addition of a non-optional field) and that's what's causing the failures with the connector.

@thienmcff
Copy link
Author

Hi @C0urante Sorry for so late reply. You are right that adding non-optional field seems to be the root cause. I tried adding nullable field in Postgres and could see that BQ connector could create the column in BQ. And it failed when I added required field in Postgres.

I am using BQ connector 1.3. Is there any workaround? Do I have to manually update BQ schema?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants