Skip to content

Add progress tracking todo to Kafka sink #1548

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 15, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/dataflow/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,31 @@ use rdkafka::config::ClientConfig;
use rdkafka::producer::FutureProducer;
use rdkafka::producer::FutureRecord;

// TODO@jldlaughlin: Progress tracking for kafka sinks #1442
//
// Right now, every time Materialize crashes and recovers these sinks
// will resend each record to Kafka. This is not entirely horrible, but also
// obviously not ideal! But until we have a more concrete idea of what
// people will require from Kafka sinks, we're punting on implementing this.
//
// For posterity, here are some of the options we discussed to
// implement this:
// - Use differential's Consolidate operator on the batches we are
// iterating through. Track our progress in sending batches to Kafka
// using the "watermarks" from the consolidated batch (inclusive on the
// lower bound, exclusive on the upper bound). Store these timestamps
// persistently (in mzdata/catalog) in order to either:
// - Resend everything including and after the last successfully sent batch.
// This assumes the Kafka topic we are sending to handles duplicates.
// - First, send a negative diff of each record in the last successful
// batch. Then, resend everything after.
//
// - Append something like a "Materialize start up timestamp" to the
// end of the Kafka topic name. This accepts resending all of the data,
// but will not duplicate data in a single topic.
// - NB: This, like other decisions we've made, assumes that
// the user has configured their Kafka instance to automatically
// create new topics.
pub fn kafka<G>(
stream: &Stream<G, (Row, Timestamp, Diff)>,
id: GlobalId,
Expand Down