-
Notifications
You must be signed in to change notification settings - Fork 472
Refactor compaction to use a stream #32784
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
0184deb
to
bb3caa0
Compare
bb3caa0
to
643a59a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
📈!!
let compact_span = debug_span!("compact::consolidate"); | ||
let res = tokio::time::timeout( | ||
timeout, | ||
// Compaction is cpu intensive, so be polite and spawn it on the isolated runtime. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lost a comment here.
e | ||
) | ||
})? | ||
.map_err(|e| anyhow!(e))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we're replacing the old compact
method with a composition of the three new methods on the "main" codepath, but all the tests and admin tools still hit the old method. This means that we're keeping around some almost-dead code, and we're not giving the new stuff as much coverage as we otherwise would.
IMO it would be reasonable to either allow flagging between the old and new path, or to just implement fn compact
with calls to the new methods...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, compact is reimplemented in terms of these new functions
) | ||
.await?; | ||
let (parts, run_splits, run_meta, updates) = | ||
(batch.parts, batch.run_splits, batch.run_meta, batch.len); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is destructuring a batch just to rebuild it from clones of all the parts, which feels odd... I think we can just pass on the batch?
|
||
if updates == 0 { | ||
continue; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unnecessary / has no effect currently, right?
If so let's skip it... as soon as we're doing actual incremental compactions, we'll want to treat these zero-output runs the same as every other run.
isolated_runtime: Arc<IsolatedRuntime>, | ||
req: CompactReq<T>, | ||
write_schemas: Schemas<K, V>, | ||
) -> impl Stream<Item = Result<CompactRes<T>, anyhow::Error>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's have this return HollowBatch
instead of CompactRes
for now, since it is not actually safe to apply to the spine.
643a59a
to
8267469
Compare
Pull-Request: MaterializeInc#32782
8267469
to
fd51aed
Compare
This is a precursor to incremental compaction
Motivation
Tips for reviewer
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.