Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
feat: Add nested_column_iter_to_arrays to deserialize inner columns (
Browse files Browse the repository at this point in the history
  • Loading branch information
b41sh authored Oct 24, 2023
1 parent 45313f7 commit b073454
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 1 deletion.
19 changes: 19 additions & 0 deletions src/io/parquet/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,22 @@ where
.map(|x| x.map(|x| x.1)),
))
}

/// Basically the same as `column_iter_to_arrays`, with the addition of the `init` parameter
/// to read the inner columns of the nested type directly, instead of reading the entire nested type.
pub fn nested_column_iter_to_arrays<'a, I: 'a>(
columns: Vec<I>,
types: Vec<&PrimitiveType>,
field: Field,
init: Vec<InitNested>,
chunk_size: Option<usize>,
num_rows: usize,
) -> Result<ArrayIter<'a>>
where
I: Pages,
{
Ok(Box::new(
nested::columns_to_iter_recursive(columns, types, field, init, num_rows, chunk_size)?
.map(|x| x.map(|x| x.1)),
))
}
2 changes: 1 addition & 1 deletion src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::{array::Array, error::Result};
use crate::types::{i256, NativeType};
pub use deserialize::{
column_iter_to_arrays, create_list, create_map, get_page_iterator, init_nested, n_columns,
InitNested, NestedArrayIter, NestedState, StructIterator,
nested_column_iter_to_arrays, InitNested, NestedArrayIter, NestedState, StructIterator,
};
pub use file::{FileReader, RowGroupReader};
pub use row_group::*;
Expand Down
85 changes: 85 additions & 0 deletions tests/it/io/parquet/deserialize.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use std::fs::File;

use arrow2::{
array::StructArray,
datatypes::DataType,
error::Result,
io::parquet::read::{
infer_schema, n_columns, nested_column_iter_to_arrays, read_columns, read_metadata,
to_deserializer, BasicDecompressor, InitNested, PageReader,
},
};

#[test]
fn test_deserialize_nested_column() -> Result<()> {
let path = "testing/parquet-testing/data/nested_structs.rust.parquet";
let mut reader = File::open(path).unwrap();

let metadata = read_metadata(&mut reader)?;
let schema = infer_schema(&metadata)?;

let num_rows = metadata.num_rows;
let row_group = metadata.row_groups[0].clone();

let field_columns = schema
.fields
.iter()
.map(|field| read_columns(&mut reader, row_group.columns(), &field.name))
.collect::<Result<Vec<_>>>()?;

let fields = schema.fields.clone();
for (mut columns, field) in field_columns.into_iter().zip(fields.iter()) {
if let DataType::Struct(inner_fields) = &field.data_type {
let mut array_iter =
to_deserializer(columns.clone(), field.clone(), num_rows, None, None)?;
let array = array_iter.next().transpose()?.unwrap();
let expected_array = array
.as_any()
.downcast_ref::<StructArray>()
.unwrap()
.clone();

// deserialize inner values of struct fields.
let init = vec![InitNested::Struct(field.is_nullable)];
let mut values = Vec::with_capacity(inner_fields.len());
for inner_field in inner_fields {
let n = n_columns(&inner_field.data_type);
let inner_columns: Vec<_> = columns.drain(0..n).collect();

let (nestd_columns, types): (Vec<_>, Vec<_>) = inner_columns
.into_iter()
.map(|(column_meta, chunk)| {
let len = chunk.len();
let pages = PageReader::new(
std::io::Cursor::new(chunk),
column_meta,
std::sync::Arc::new(|_, _| true),
vec![],
len * 2 + 1024,
);
(
BasicDecompressor::new(pages, vec![]),
&column_meta.descriptor().descriptor.primitive_type,
)
})
.unzip();

let mut inner_array_iter = nested_column_iter_to_arrays(
nestd_columns,
types,
inner_field.clone(),
init.clone(),
None,
num_rows,
)?;
let inner_array = inner_array_iter.next().transpose()?;
values.push(inner_array.unwrap());
}
let struct_array = StructArray::try_new(field.data_type.clone(), values, None)?;

assert_eq!(expected_array, struct_array);
}
}

Ok(())
}
1 change: 1 addition & 0 deletions tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use arrow2::{
types::{days_ms, NativeType},
};

mod deserialize;
#[cfg(feature = "io_json_integration")]
mod integration;
mod read;
Expand Down

0 comments on commit b073454

Please sign in to comment.