Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] feat: add materialize_scan_results arrow utility #621

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

zachschuermann
Copy link
Collaborator

@zachschuermann zachschuermann commented Jan 6, 2025

What changes are proposed in this pull request?

  1. new feature flag arrow-compute that pulls in arrow (for arrow-compute) TODO rename?
  2. new utility function in a new module to transform an iterator of ScanResults into an iterator of RecordBatchs by using arrow-compute's filter_record_batch
  3. updates to places this can be used

How was this change tested?

existing

resolves #592

Copy link

codecov bot commented Jan 6, 2025

Codecov Report

Attention: Patch coverage is 75.00000% with 4 lines in your changes missing coverage. Please review.

Project coverage is 83.44%. Comparing base (c3a868f) to head (c947f4a).

Files with missing lines Patch % Lines
kernel/src/engine/arrow_compute.rs 75.00% 0 Missing and 4 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #621      +/-   ##
==========================================
- Coverage   83.45%   83.44%   -0.01%     
==========================================
  Files          74       75       +1     
  Lines       16877    16893      +16     
  Branches    16877    16893      +16     
==========================================
+ Hits        14084    14097      +13     
+ Misses       2135     2133       -2     
- Partials      658      663       +5     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Comment on lines +38 to +39
let batches: Vec<RecordBatch> =
materialize_scan_results(table_changes_scan.execute(engine.clone())?).try_collect()?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would work better to define a materialize_scan_result function that takes DeltaResult<ScanResult> and returns DeltaResult<RecordBatch>? It would be a simpler type signature, and would also allow a more natural use at the call site:

Suggested change
let batches: Vec<RecordBatch> =
materialize_scan_results(table_changes_scan.execute(engine.clone())?).try_collect()?;
let batches: Vec<_> = table_changes_scan
.execute(engine.clone())?
.map(materialize_scan_result)
.try_collect()?;

(more readable IMO even tho it technically has more lines of code)

Comment on lines +16 to +17
let scan_res = res.and_then(|res| Ok((res.full_mask(), res.raw_data?)));
let (mask, data) = scan_res?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let scan_res = res.and_then(|res| Ok((res.full_mask(), res.raw_data?)));
let (mask, data) = scan_res?;
let res = res?
let (mask, data) = (res.full_mask(), res.raw_data?);

Comment on lines +23 to +26
Ok(match mask {
Some(mask) => filter_record_batch(&record_batch, &mask.into())?,
None => record_batch,
})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is updating a mut record_batch potentially simpler?

Suggested change
Ok(match mask {
Some(mask) => filter_record_batch(&record_batch, &mask.into())?,
None => record_batch,
})
if let Some(mask) = mask {
record_batch = filter_record_batch(&record_batch, &mask.into())?;
}
Ok(record_batch)

}
})
.try_collect()?;
let batches: Vec<RecordBatch> =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Type annotation not needed, read_cdf_for_table return value fully constrains it.

(probably a bunch of other call sites can also simplify -- even e.g. arrow print_batches above, that takes a slice, still constrains the inner type so we can annotate as Vec<_>)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

create and arrow utility to materialize ScanResult into record batches
2 participants