Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add upsert/delete support via periodic MERGE statements #264

Open
C0urante opened this issue Apr 16, 2020 · 12 comments
Open

Add upsert/delete support via periodic MERGE statements #264

C0urante opened this issue Apr 16, 2020 · 12 comments

Comments

@C0urante
Copy link
Collaborator

Although it's not a common use case for BigQuery, it's come up that it'd be nice to support upserts and deletes with this connector. I'd like to propose the following way to allow the connector to optionally support that functionality, without negatively impacting existing users of the connector in append-only mode:

  1. Derive the key fields for the upsert and/or delete from the key of the message in Kafka, which will be required to be either a map or a struct. This will allow us to use the field names from that key and map them to columns in the destination BigQuery table.
  2. Insert all records received by the connector into an intermediate table, possibly named something like ${table}_tmp, where ${table} is the name of the table that the records are destined for.
  3. Periodically perform a MERGE operation that uses the intermediate table as its source and includes logic for upsert (if the value of the record is non-null) and/or delete (if the value of the record is null) based on the columns present in the record key.

This should only take place if the user enables upserts or deletes; otherwise, the connector will continue streaming directly to the intended table (or GCS bucket) as it does now.

There's definitely some details that need to be ironed out here, such as what will happen if the fields in the record key change over time, how this would interact with table partitioning, and how/where the MERGE would be performed, but this is the overall idea.

@mtagle, @criccomini, what are your thoughts on this?

@C0urante
Copy link
Collaborator Author

@mtagle @criccomini bump :)
Would you guys be interested in this and willing to review a PR for it?

@criccomini
Copy link
Contributor

🧠 (thinking)

@whynick1
Copy link
Contributor

If this feature is to prevent duplicate records, it is not very helpful, because we can add a view on top of BQ table with deduplication logic.

If this feature is to help with table compaction. Then I see value of it. (if too many duplicates exists in BQ, query table without compaction might be costly in both time and money)

But I also have some concerns about MERGE. As one of the UML statement Google provides, there is a quota limit of 1500 to 4000 per day.

Even we take full advantage of it, still means KCBQ connector (under upsert/delete mode) will no longer be real-time streaming. Meanwhile, I suspect if want to do so, because BQ(OLAP) is deigned for append-only. Though UML statements available, it's kind of mis-use to run them frequently.

Last but not least, as intermediate table grow, MERGE become more expensive. Are we going to clear it at any point, like after each MERGE complete? If so, would that affect data streaming?

@criccomini
Copy link
Contributor

In general, I'm concerned about the benefit that this feature would provide vs. the complexity it introduces. Do you have a concrete need for this, or is it more of a hypothetical use case?

@mtagle
Copy link
Contributor

mtagle commented Apr 30, 2020

I'm actually working on something like this for our own BQ data, but outside of the connector.

My concern with putting this within the connector is that it just becomes incredibly complex (and very separate from the "normal" data flow). The connector isn't actually just doing stuff per-message anymore, it's doing stuff per-message, and and then also occasionally scheduling some other task that has nothing to do with incoming messages? Even if you put all this logic into the connector, you already are going to need some externally created view to squish these two tables together if you actually want a real-time view of the data that actually takes advantage of the sparser table.

I'm also curious about the intent/benefit of this change.

@C0urante
Copy link
Collaborator Author

Thanks guys! Responses for each of you:

@whynick1 -

  • The DML quota for BigQuery has now been lifted: https://cloud.google.com/blog/products/data-analytics/dml-without-limits-now-in-bigquery.
  • Although the potential for misalignment between users' expectations and the behavior of the connector is definitely there if we do this sort of batch-style writing via MERGE, there's already precedent in the connector itself for batch writing with the GCS batching flow, and the rate at which the MERGE is executed can be made configurable to users who have use cases that require lower latency.
  • We'd definitely want to clear the intermediate table after a successful merge. I haven't worked through this too far yet but I think the way to do it might be to actually have two intermediate tables, and swap back and forth between them. That way, as soon as we start a merge that uses one of those as a source, we can just start streaming into the second table without halting consumption of data from Kafka. The tradeoffs would be that now there'd be two intermediate tables to manage instead of one, which might present an issue to users w/r/t operational complexity if they're trying to lock down access to tables with sensitive data in them, and would be a little harder to implement (what would you do, for example, if there was an incomplete merge in progress on both tables?).

@criccomini -

  • Yeah, we've got a couple customers asking for this. We've advised them on BigQuery idiom as far as deduplication logic via query goes, but they'd still like this option as there are a lot of tables that they'd have to manage and a lot of downstream readers of those tables, so being able to perform that logic before the data even hits BQ (or rather, hits the destination table) is valuable to them.

@mtagle -

  • Isn't the GCS batch loading similar? IIUC, data is written to GCS, then a single task periodically triggers a load job that pulls it in to BigQuery.
  • There'd definitely be an impact on latency for availability of data in BigQuery; I think the best we could do OOTB in this case would be to make the frequency of the merge configurable and let users know that they won't see data immediately if upsert/delete is enabled.

@mtagle
Copy link
Contributor

mtagle commented May 14, 2020

I agree the GCS batch loading is similar, but the GCS batch loading process from GCS -> BQ doesn't have a schedule. It just runs asynchronously from the kafka to GCS load job. As a result, I have a better understanding of what happens if the connector fails in the GCS loading case than I do with this. Is the schedule some sort of chron-like schedule? What if the connector is down during that time? If instead it's on some sort of internal timer, what happens if the connector is taken down before that internal timer has elapsed? In general, how often do we expect these merges to happen?

I'm not opposed to this change, but I feel like there are a lot of clarifications that need to be made with regards to implementation.

@C0urante
Copy link
Collaborator Author

C0urante commented May 18, 2020

Thanks, Moira. I agree that there are a ton of details that need to be worked out for how to implement this and would be happy to draw up and collaborate on a design doc on how we might go about it.

With this issue I just wanted to gauge interest here and see if you guys would be interested in this kind of feature at all. Is it safe to say that, if it can be implemented reasonably, you'd be amenable to seeing this in the connector?

With regards to your specific questions about timing: I was thinking of periodically triggering merges from the connector itself on a configurable interval; users could adjust the interval based on what kind of throughput vs. latency they'd like to see from the connector. A lower interval would provide lower latency but, if too low, may cause issues where merges start tripping over each other. A high interval might provide higher throughput but would delay the write of new rows into the destination table. Depending on how many intermediate tables we would want to use (1, 2, or n), we could define different expectations for synchrony. If there's only one intermediate table, the connector would have to cease consuming from Kafka until a pending merge has completed; if there's more than one table, the connector could keep consuming and switch from the to-be-merged intermediate table to a fresh one, letting the merge take place in the background.

As far as failure goes, since upsert and delete are idempotent, I think we can just assume that all writes have failed until we've managed to finish a merge and commit the offsets for those records to Kafka. If the connector dies anywhere in the middle of that, on restart we can just pick up from where we last committed offsets and try again; reprocessing already-written data shouldn't affect the contents of the destination table.

@criccomini
Copy link
Contributor

Is it safe to say that, if it can be implemented reasonably, you'd be amenable to seeing this in the connector?

Yep. I suggest a more detailed design document as a next step. That way, we can all get on the same page about implementation details.

@C0urante
Copy link
Collaborator Author

Sounds good! I'll see if I can a Google doc together sometime in the next few days.

@C0urante
Copy link
Collaborator Author

Alright, design doc is up and anyone can comment on it here: https://docs.google.com/document/d/1p8_rLQqR9GIALIruB3-MjqR8EgYdaEw2rlFF1fxRJf0/edit

C0urante referenced this issue in confluentinc/kafka-connect-bigquery Jun 29, 2020
* GH-264: Add embedded integration test for upsert/delete

Also fixes a bug in the schema retriever logic where key schemas were not being
reported to schema retrievers, and improves shutdown logic so that tasks can
stop gracefully when requested by the framework.

* GH-264: Clean up shutdown logic, make logs easier to read

* GC-264: Retain prior shutdown behavior when upsert/delete is not enabled

* GC-264: Refactor merge query construction logic

* GC-264: Fix infinite recursion bug in SchemaRetriever interface
C0urante referenced this issue in confluentinc/kafka-connect-bigquery Jun 29, 2020
C0urante referenced this issue in confluentinc/kafka-connect-bigquery Sep 10, 2020
C0urante referenced this issue in confluentinc/kafka-connect-bigquery Sep 10, 2020
C0urante referenced this issue in confluentinc/kafka-connect-bigquery Sep 10, 2020
C0urante referenced this issue in confluentinc/kafka-connect-bigquery Sep 10, 2020
C0urante referenced this issue in confluentinc/kafka-connect-bigquery Sep 10, 2020
C0urante referenced this issue in confluentinc/kafka-connect-bigquery Sep 10, 2020
C0urante referenced this issue in confluentinc/kafka-connect-bigquery Sep 10, 2020
C0urante referenced this issue in confluentinc/kafka-connect-bigquery Sep 10, 2020
* GH-264: Add embedded integration test for upsert/delete

Also fixes a bug in the schema retriever logic where key schemas were not being
reported to schema retrievers, and improves shutdown logic so that tasks can
stop gracefully when requested by the framework.

* GH-264: Clean up shutdown logic, make logs easier to read

* GC-264: Retain prior shutdown behavior when upsert/delete is not enabled

* GC-264: Refactor merge query construction logic

* GC-264: Fix infinite recursion bug in SchemaRetriever interface
@brankoiliccc
Copy link

@C0urante Thank you for your effort in making this happen. Do you have info on when will this be merged to master? I can see it has already been merged to 2.x. ? Or should we just use 2.x?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants