Skip to content

Simplify ParquetRecordBatchReader::next control logic #7512

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 16, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 30 additions & 29 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,24 +800,33 @@ impl Iterator for ParquetRecordBatchReader {
type Item = Result<RecordBatch, ArrowError>;

fn next(&mut self) -> Option<Self::Item> {
self.next_inner()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically this PR consolidates the conversion of converting to ParquetError and transposing to a single location rather than inlining it several places in next()

.map_err(|arrow_err| arrow_err.into())
.transpose()
}
}

impl ParquetRecordBatchReader {
/// Returns the next `RecordBatch` from the reader, or `None` if the reader
/// has reached the end of the file.
///
/// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to
/// simplify error handling with `?`
fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
let mut read_records = 0;
match self.selection.as_mut() {
Some(selection) => {
while read_records < self.batch_size && !selection.is_empty() {
let front = selection.pop_front().unwrap();
if front.skip {
let skipped = match self.array_reader.skip_records(front.row_count) {
Ok(skipped) => skipped,
Err(e) => return Some(Err(e.into())),
};
let skipped = self.array_reader.skip_records(front.row_count)?;

if skipped != front.row_count {
return Some(Err(general_err!(
return Err(general_err!(
"failed to skip rows, expected {}, got {}",
front.row_count,
skipped
)
.into()));
));
}
continue;
}
Expand All @@ -839,35 +848,27 @@ impl Iterator for ParquetRecordBatchReader {
}
_ => front.row_count,
};
match self.array_reader.read_records(to_read) {
Ok(0) => break,
Ok(rec) => read_records += rec,
Err(error) => return Some(Err(error.into())),
}
match self.array_reader.read_records(to_read)? {
0 => break,
rec => read_records += rec,
};
}
}
None => {
if let Err(error) = self.array_reader.read_records(self.batch_size) {
return Some(Err(error.into()));
}
self.array_reader.read_records(self.batch_size)?;
}
};

match self.array_reader.consume_batch() {
Err(error) => Some(Err(error.into())),
Ok(array) => {
let struct_array = array.as_struct_opt().ok_or_else(|| {
ArrowError::ParquetError(
"Struct array reader should return struct array".to_string(),
)
});
let array = self.array_reader.consume_batch()?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a pretty good example of how the logic gets simpler

let struct_array = array.as_struct_opt().ok_or_else(|| {
ArrowError::ParquetError("Struct array reader should return struct array".to_string())
})?;

match struct_array {
Err(err) => Some(Err(err)),
Ok(e) => (e.len() > 0).then(|| Ok(RecordBatch::from(e))),
}
}
}
Ok(if struct_array.len() > 0 {
Some(RecordBatch::from(struct_array))
} else {
None
})
}
}

Expand Down
Loading