diff --git a/src/dataflow/sink/kafka.rs b/src/dataflow/sink/kafka.rs index 05e3ec713be97..8f6283dd21e68 100644 --- a/src/dataflow/sink/kafka.rs +++ b/src/dataflow/sink/kafka.rs @@ -16,6 +16,31 @@ use rdkafka::config::ClientConfig; use rdkafka::producer::FutureProducer; use rdkafka::producer::FutureRecord; +// TODO@jldlaughlin: Progress tracking for kafka sinks #1442 +// +// Right now, every time Materialize crashes and recovers these sinks +// will resend each record to Kafka. This is not entirely horrible, but also +// obviously not ideal! But until we have a more concrete idea of what +// people will require from Kafka sinks, we're punting on implementing this. +// +// For posterity, here are some of the options we discussed to +// implement this: +// - Use differential's Consolidate operator on the batches we are +// iterating through. Track our progress in sending batches to Kafka +// using the "watermarks" from the consolidated batch (inclusive on the +// lower bound, exclusive on the upper bound). Store these timestamps +// persistently (in mzdata/catalog) in order to either: +// - Resend everything including and after the last successfully sent batch. +// This assumes the Kafka topic we are sending to handles duplicates. +// - First, send a negative diff of each record in the last successful +// batch. Then, resend everything after. +// +// - Append something like a "Materialize start up timestamp" to the +// end of the Kafka topic name. This accepts resending all of the data, +// but will not duplicate data in a single topic. +// - NB: This, like other decisions we've made, assumes that +// the user has configured their Kafka instance to automatically +// create new topics. pub fn kafka( stream: &Stream, id: GlobalId,