Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to stop infinite client side errors #260

Open
yohei1126 opened this issue Mar 30, 2020 · 5 comments
Open

How to stop infinite client side errors #260

yohei1126 opened this issue Mar 30, 2020 · 5 comments

Comments

@yohei1126
Copy link
Contributor

Hi, I am using kafka-connect-bigquery to sync data from PostgreSQL to BigQuery. Then I found that these errors repeatedly show up on my Kafka connect logs. These fields are required on both PostgreSQL and BigQuery tables. I did not change the PostgreSQL schema so I guess the messages were accidentally transferred to a wrong topic for some reason and these errors might be caused.

I am not able to stop these error by setting bigQueryRetry: 5 since bigQueryRetry is for a backend error and the following errors are client side errors.
Would you advice how we can stop such an infinite client side error?

table insertion failed for the following rows:
	[row index 0]: invalid: Missing required fields: Msg_0_CLOUD_QUERY_TABLE.Msg_2_CLOUD_QUERY_TABLE_after.expired_count, Msg_0_CLOUD_QUERY_TABLE.Msg_2_CLOUD_QUERY_TABLE_after.issued_count, Msg_0_CLOUD_QUERY_TABLE.Msg_2_CLOUD_QUERY_TABLE_after.redeemed_count, Msg_0_CLOUD_QUERY_TABLE.Msg_2_CLOUD_QUERY_TABLE_after.refunded_count.
	[row index 1]: invalid: Missing required fields: Msg_0_CLOUD_QUERY_TABLE.Msg_2_CLOUD_QUERY_TABLE_after.expired_count, Msg_0_CLOUD_QUERY_TABLE.Msg_2_CLOUD_QUERY_TABLE_after.issued_count, Msg_0_CLOUD_QUERY_TABLE.Msg_2_CLOUD_QUERY_TABLE_after.redeemed_count, Msg_0_CLOUD_QUERY_TABLE.Msg_2_CLOUD_QUERY_TABLE_after.refunded_count.
	[row index 2]: invalid: Missing required fields: Msg_0_CLOUD_QUERY_TABLE.Msg_2_CLOUD_QUERY_TABLE_after.expired_count, Msg_0_CLOUD_QUERY_TABLE.Msg_2_CLOUD_QUERY_TABLE_after.issued_count, Msg_0_CLOUD_QUERY_TABLE.Msg_2_CLOUD_QUERY_TABLE_after.redeemed_count, Msg_0_CLOUD_QUERY_TABLE.Msg_2_CLOUD_QUE...
@muscovitebob
Copy link

muscovitebob commented Apr 8, 2020

I also get these errors when I have messages coming in with an incompatible schema that I need to fix manually. Indeed I also use the bigQueryRetry setting and it does not stop these schema updates from retrying. If I do not actively keep track of when this happens I accumulate large KafkaNetworkRead charges for my Confluent managed cluster, hence I am also very interested in setting a limit to schema update retries, is this possible?

In my case I get the following error but I assume it's really the same as @yohei1126's:

com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Failed to update table schema for: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=DATASETNAME, tableId=TABLENAME}}
	at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.attemptSchemaUpdate(AdaptiveBigQueryWriter.java:135)
	at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.performWriteRequest(AdaptiveBigQueryWriter.java:93)
	at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:118)
	at com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:80)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.google.cloud.bigquery.BigQueryException: Provided Schema does not match Table TABLENAME. Field FIELDNAME is missing in new schema
	at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:103)
	at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.patch(HttpBigQueryRpc.java:236)
	at com.google.cloud.bigquery.BigQueryImpl$12.call(BigQueryImpl.java:499)
	at com.google.cloud.bigquery.BigQueryImpl$12.call(BigQueryImpl.java:496)
	at com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:105)
	at com.google.cloud.RetryHelper.run(RetryHelper.java:76)
	at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50)
	at com.google.cloud.bigquery.BigQueryImpl.update(BigQueryImpl.java:495)
	at com.wepay.kafka.connect.bigquery.SchemaManager.updateSchema(SchemaManager.java:81)
	at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.attemptSchemaUpdate(AdaptiveBigQueryWriter.java:132)
	... 6 more
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request

This will literally never get solved without manual intervention so an infinite retry by default makes no sense here.

@yohei1126
Copy link
Contributor Author

This option only work for a backend error or a quota exceeded error only.
https://docs.confluent.io/current/connect/kafka-connect-bigquery/kafka_connect_bigquery_config.html

bigQueryRetry
The number of retry attempts made for a BigQuery request that fails with
a backend error or a quota exceeded error.

@Cyril-Engels
Copy link

Hi! We also get this kind of errors when we have an incompatible schema, with autoUpdateSchemas false. The connector keeps reading the same data and trying inserting into BigQuery.

We found that the connector writes to BigQuery using a separate thread pool, which means that errors encountered during those writes don’t immediately cause the task to fail. Those errors aren’t swallowed silently, however, and do cause the task to throw an exception from its flush method when invoking [KCBQThreadPoolExecutor::awaitCurrentTasks](https://github.com/wepay/kafka-connect-bigquery/blob/4fb38a8f4b3b6db065976639c57106db9896f0db/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/KCBQThreadPoolExecutor.java#L76-L100).

Unfortunately, the framework does not fail tasks when they throw exceptions from their flush method (which is indirectly called as a result of invoking SinkTask::preCommit), and instead, logs the error and resets the consumer to the last successful offset.

[ERROR] 2020-04-06 16:55:59,279 [pool-24-thread-3247501] com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor afterExecute - Task failed with com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException error: table insertion failed for the following rows:
	[row index 0]: invalid: no such field.
	[row index 1]: invalid: no such field.
	[row index 2]: invalid: no such field.

We are thinking about waiting for task execution in BigQuerySinkTask::put, and throw an Exception to fail the task if any non-retriable error happens during writing to BigQuery. Would appreciate any input. Thanks!

@mtagle @criccomini

@muscovitebob
Copy link

muscovitebob commented Jun 7, 2020

Hey @Cyril-Engels would catching the BigQueryConnectException in BigQuerySinkTask::flush and throwing a ConnectException much like we do already with InterruptedException not work in this case for failing the task when we encounter these?

@muscovitebob
Copy link

muscovitebob commented Jun 8, 2020

I see @Cyril-Engels has a PR already that seems like a massive step in the right direction to making this connector safe to run with topics that change schemas frequently. Thanks for that. My team would be extremely happy if that makes it into the current master. Please let me know if I can help with the review process in any way.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants