Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timestamp Partition For Date Outside Range #269

Open
archy-bold opened this issue May 14, 2020 · 0 comments
Open

Timestamp Partition For Date Outside Range #269

archy-bold opened this issue May 14, 2020 · 0 comments

Comments

@archy-bold
Copy link

I've come across an issue with the timestamp partition where if the timestamp given is more than 1 year before today or 6 months in the future, it gets rejected and causes the connector to get stuck trying to send the batches over and over.

Here's my configuration:

{
  "name": "bigquery-streams",
  "config": {
    "name": "bigquery-streams",
    "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
    "tasks.max": "1",

    "topics": "STREAMS_IDS",
    "sanitizeTopics": "true",

    "autoUpdateSchemas": false,
    "bigQueryPartitionDecorator": false,
    "timestampPartitionFieldName": "TIMESTAMP",

    "project": "xx",
    "datasets": ".*=xx",
    "keyfile": "/keyfile.json",
    "keySource": "FILE",

    "schemaRetriever": "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
    "schemaRegistryLocation": "http://schema-registry:8081",

    "transforms": "ConvertTimestamp",
    "transforms.RemoveFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.RemoveFields.blacklist": "ID",
    "transforms.InsertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.InsertTimestamp.timestamp.field": "TIMESTAMP",
    "transforms.ConvertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.ConvertTimestamp.field": "TIMESTAMP",
    "transforms.ConvertTimestamp.target.type": "Timestamp"
  }
}

Here's a sample of some of the errors:

connect | [2020-05-14 15:33:47,697] ERROR Task failed with com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException error: table insertion failed for the following rows:
connect | [row index 0]: invalid: Value 1624291503000000 for field timestamp of the destination table spatial-encoder-259717:wata_streams_test.STREAMS_IDS is outside the allowed bounds. You can only stream to date range within 365 days in the past and 183 days in the future relative to the current date.
connect | [row index 1]: stopped:
connect | [row index 2]: stopped:
connect | [row index 3]: stopped:
connect | [row index 4]: stopped:
connect | [row index 5]: stopped:
connect | [row index 6]: stopped:
connect | [row index 7]: stopped:
connect | [row index 8]: stopped:
connect | [row index 9]: stopped:
connect | [row index 10]: stopped:
connect | [row index 11]: stopped:
connect | [row index 12]: stopped:
connect | [row index 13]: stopped:
connect | [row index 14]: stopped:
connect | [row index 15]: stopped:
connect | [row index 16]: stopped:
connect | [row index 17]: stopped:
connect | [row index 18]: stopped:
connect | [row index 499]: stopped: (com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor)
connect | Exception in thread "pool-37-thread-72" com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: table insertion failed for the following rows:
connect | [row index 0]: invalid: Value 1624291503000000 for field timestamp of the destination table spatial-encoder-259717:wata_streams_test.STREAMS_IDS is outside the allowed bounds. You can only stream to date range within 365 days in the past and 183 days in the future relative to the current date.
connect | [row index 1]: stopped:
connect | [row index 2]: stopped:
connect | [row index 3]: stopped:
connect | [row index 4]: stopped:
connect | [row index 5]: stopped:
connect | [row index 6]: stopped:
connect | [row index 7]: stopped:
connect | [row index 8]: stopped:
connect | [row index 9]: stopped:
connect | [row index 10]: stopped:
connect | [row index 11]: stopped:
connect | [row index 12]: stopped:
connect | [row index 499]: stopped:
connect | at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:131)
connect | at com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:80)
connect | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect | at java.lang.Thread.run(Thread.java:748)
connect | at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:131)
connect | at com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:80)
connect | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect | at java.lang.Thread.run(Thread.java:748)
connect | [2020-05-14 15:33:46,031] ERROR WorkerSinkTask{id=bigquery-streams-0} Offset commit failed, rewinding to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect | com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Some write threads encountered unrecoverable errors: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException, com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException, com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException, com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException, com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException,com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException, com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException, com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; See logs for more detail
connect | at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.awaitCurrentTasks(KCBQThreadPoolExecutor.java:97)
connect | at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:127)
connect | at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:379)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:208)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
connect | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect | at java.lang.Thread.run(Thread.java:748)
connect | [2020-05-14 15:33:46,032] INFO [Consumer clientId=connector-consumer-bigquery-streams-0, groupId=connect-bigquery-streams] Seeking to offset 311 for partition STREAMS_IDS-2 (org.apache.kafka.clients.consumer.KafkaConsumer)
connect | [2020-05-14 15:33:46,032] INFO [Consumer clientId=connector-consumer-bigquery-streams-0, groupId=connect-bigquery-streams] Seeking to offset 221 for partition STREAMS_IDS-3 (org.apache.kafka.clients.consumer.KafkaConsumer)
connect | [2020-05-14 15:33:46,032] INFO [Consumer clientId=connector-consumer-bigquery-streams-0, groupId=connect-bigquery-streams] Seeking to offset 266 for partition STREAMS_IDS-0 (org.apache.kafka.clients.consumer.KafkaConsumer)
connect | [2020-05-14 15:33:46,032] INFO [Consumer clientId=connector-consumer-bigquery-streams-0, groupId=connect-bigquery-streams] Seeking to offset 316 for partition STREAMS_IDS-1 (org.apache.kafka.clients.consumer.KafkaConsumer)
connect | [2020-05-14 15:33:46,032] ERROR WorkerSinkTask{id=bigquery-streams-0} Commit of offsets threw an unexpected exception for sequence number 8: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect | com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Some write threads encountered unrecoverable errors: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException, com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException, com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException, com.wepay.kafka.connect.bigquery.exception.BigQu$ryConnectException, com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException,com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException, com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException, com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; See logs for more detail
connect | at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.awaitCurrentTasks(KCBQThreadPoolExecutor.java:97)
connect | at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:127)
connect | at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:379)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:208)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
connect | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect | at java.lang.Thread.run(Thread.java:748)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant