-
Notifications
You must be signed in to change notification settings - Fork 83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce schema evolution via the -S flag #164
base: main
Are you sure you want to change the base?
Conversation
e4a1ade
to
c1bcd37
Compare
c1bcd37
to
820421f
Compare
I stumbled into this while pilfering code from kafka-delta-ingest for another project and discovered that the code in `write_values` which does `record_batch.schema() != arrow_schema` doesn't do what we think it does. Basically if `Decoder` "works" the schema it's going to return is just the schema passed into it. It has no bearing on whether the JSON has the same schema. Don't ask me why. Using the reader's `infer_json_schema_*` functions can provide a Schema that is useful for comparison: let mut value_iter = json_buffer.iter().map(|j| Ok(j.to_owned())); let json_schema = infer_json_schema_from_iterator(value_iter.clone()).expect("Failed to infer!"); let decoder = Decoder::new(Arc::new(json_schema), options); if let Some(batch) = decoder.next_batch(&mut value_iter).expect("Failed to create RecordBatch") { assert_eq!(batch.schema(), arrow_schema_ref, "Schemas don't match!"); } What's even more interesting, is that after a certain number of fields are removed, the Decoder no longer pretends it can Decode the JSON. I am baffled as to why.
…l refactor The intention here is to enable more consistent schema handling within the writers Sponsored-by: Raft LLC
…e writers The DeserializedMessage carries optional inferred schema information along with the message itself. This is useful for understanding whether schema evolution hould happen "later" in the message processing pipeline. The downside of this behavior is that there will be performance impact as arrow_json does schema inference. Sponsored-by: Raft LLC
Turning avro off drops about 50 crates from the default build, so useful for development, but the code would need to be cleaned up to remove this from the default features list See #163
Identified by `cargo +nightly udeps`
This change is a little wrapped up in the introduction of DeserializedMessage but the trade-off for development targeting S3 is that I am linking 382 crates every cycle as opposed to 451. Fixes #163
I don't know why the impl was way down there 😄
This commit introduces some interplay between the IngestProcessor and DataWriter, the latter of which needs to keep track of whether or not it has a changed schema. What should be done with that changed schema must necessarily live in IngestProcessor since that will perform the Delta transaction commits at the tail end of batch processing. There is some potential mismatches between the schema in storage and what the DataWriter has, so this change tries to run the runloop again if the current schema and the evolved schema are incompatible Closes #131 Sponsored-by: Raft LLC
This will ensure the non-evolution case stands relatively speedy! Sponsored-by: Raft LLC
820421f
to
50e81da
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The transform
usage won't work as implemented without a change, but everything else is nice to have.
self.transformer | ||
.transform(&mut v, None as Option<&BorrowedMessage>)?; | ||
// TODO: this can't be right, shouldn't this function takje DeserializedMessage | ||
.transform(&mut v.clone().into(), None as Option<&BorrowedMessage>)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this does what is intended. The transform function takes a mutable reference to the clone of v
instead of a mutable reference to v
itself. The function modifies and consumes the clone, but it only exists in the scope of the input to transform
, so it should be dropped once the method finishes. That means Ok(v)
is returning the original instead of the transformed message. Implementing Into<Value>
for DeserializedMessage
and changing the code to
let mut dm = DeserializedMessage::from(v);
self.transformer
// TODO: this can't be right, shouldn't this function takje DeserializedMessage
.transform(&mut dm, None as Option<&BorrowedMessage>)?;
Ok(dm.into())
should do the trick. Also the Into
implementation will allow for the removal of all of instances of to_owned()
that my last commit added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The transform code might be some bungled merge conflict resolution, I'll be able to look more into this shortly
@@ -348,13 +349,13 @@ impl Transformer { | |||
/// The optional `kafka_message` must be provided to include well known Kafka properties in the value. | |||
pub(crate) fn transform<M>( | |||
&self, | |||
value: &mut Value, | |||
value: &mut DeserializedMessage, | |||
kafka_message: Option<&M>, | |||
) -> Result<(), TransformError> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option, with respect to my earlier comment about changing DeserializedMessage
, is to have transform
return Result<DeserializedMessage, TransformError
. Then it can take value: DeserializedMessage
. That will allow us to mutate and return what we mutated, but that is a bigger code change.
@@ -442,7 +447,7 @@ mod tests { | |||
]; | |||
|
|||
for i in 0..messages.len() { | |||
assert_eq!(messages[i], expected[i]); | |||
assert_eq!(messages[i].message().to_owned(), expected[i]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.into()
works here if implemented, saving us a clone
} | ||
} | ||
|
||
let values = values.into_iter().map(|v| v.message().to_owned()).collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.into()
works here if implemented, saving us a clone
This set of changes implements more nuanced handling of Delta vs. message schema with the introduction of the
-S
command line flag which enables schema evolutions.In order for schema evolution to work, there is a necessary performance hit. kafka-delta-ingest must determine the schema of every message that is read from Kafka, infer its schema, and if necessary add nullable columns to the Delta table. This is related to similar work in delta-rs for the
RecordBatchWriter
but deviates because of the mechanism by which RecordBatches and schema is handled in kafka-delta-ingest.Sponsored-by: Raft LLC
NOTE: This pull request builds on #162