Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Merge branch 'main' into cmc/arc_datatype
Browse files Browse the repository at this point in the history
  • Loading branch information
emilk committed Jan 12, 2024
2 parents 3c3c6ed + 3ddc6a1 commit 664d021
Show file tree
Hide file tree
Showing 104 changed files with 4,072 additions and 676 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/integration-ipc.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: Integration IPC / Flight

on: [push, pull_request]
# on: [push, pull_request]
on: []

jobs:
docker:
Expand Down
62 changes: 42 additions & 20 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "arrow2"
version = "0.17.0"
version = "0.17.4"
license = "Apache-2.0"
description = "Unofficial implementation of Apache Arrow spec in safe Rust"
homepage = "https://github.com/jorgecarleitao/arrow2"
Expand All @@ -16,11 +16,11 @@ bench = false

[dependencies]
foreign_vec = "0.1.0"
either = "1.6"
either = "1.9"
num-traits = "0.2"
dyn-clone = "1"
bytemuck = { version = "1", features = ["derive"] }
chrono = { version = "0.4", default_features = false, features = ["std"] }
chrono = { version = "0.4.31", default_features = false, features = ["std"] }

# for decimal i256
ethnum = "1"
Expand All @@ -29,10 +29,10 @@ ethnum = "1"
# crate provides HashMap that assumes pre-hashed values.
hash_hasher = "^2.0.3"
# For SIMD utf8 validation
simdutf8 = "0.1.3"
simdutf8 = "0.1.4"

# A Rust port of SwissTable
hashbrown = { version = "0.13", default-features = false, optional = true }
hashbrown = { version = "0.14", default-features = false, features = ["ahash"] }

# for timezone support
chrono-tz = { version = "0.8", optional = true }
Expand All @@ -46,8 +46,8 @@ csv-core = { version = "0.1", optional = true }
# for csv async io
csv-async = { version = "^1.1", optional = true }

regex = { version = "^1.3", optional = true }
regex-syntax = { version = "^0.6", optional = true }
regex = { version = "1.9", optional = true }
regex-syntax = { version = "0.7", optional = true }
streaming-iterator = { version = "0.1", optional = true }
fallible-streaming-iterator = { version = "0.1", optional = true }

Expand All @@ -62,7 +62,7 @@ arrow-format = { version = "0.8", optional = true, features = ["ipc"] }
hex = { version = "^0.4", optional = true }

# for IPC compression
lz4 = { version = "1.23.1", optional = true }
lz4 = { version = "1.24", optional = true }
zstd = { version = "0.12", optional = true }

rand = { version = "0.8", optional = true }
Expand Down Expand Up @@ -92,19 +92,22 @@ serde_json = { version = "^1.0", features = ["preserve_order"], optional = true
strength_reduce = { version = "0.2", optional = true }

# For instruction multiversioning
multiversion = { version = "0.7.1", optional = true }
multiversion = { version = "0.7.3", optional = true }

# For support for odbc
odbc-api = { version = "0.36", optional = true }

# Faster hashing
ahash = "0.8"

# For `LIKE` matching "contains" fast-path
memchr = { version = "2.6", optional = true }

# Support conversion to/from arrow-rs
arrow-buffer = { version = "37.0.0", optional = true }
arrow-schema = { version = "37.0.0", optional = true }
arrow-data = { version = "37.0.0", optional = true }
arrow-array = { version = "37.0.0", optional = true }
arrow-buffer = { version = ">=40", optional = true }
arrow-schema = { version = ">=40", optional = true }
arrow-data = { version = ">=40", optional = true }
arrow-array = { version = ">=40", optional = true }

[target.wasm32-unknown-unknown.dependencies]
getrandom = { version = "0.2", features = ["js"] }
Expand All @@ -114,7 +117,6 @@ getrandom = { version = "0.2", features = ["js"] }
version = "0.17"
optional = true
default_features = false
features = ["async"]

[dev-dependencies]
criterion = "0.4"
Expand All @@ -129,6 +131,14 @@ proptest = { version = "1", default_features = false, features = ["std"] }
avro-rs = { version = "0.13", features = ["snappy"] }
# use for flaky testing
rand = "0.8"
# use for generating and testing random data samples
sample-arrow2 = "0.1"
sample-std = "0.1"
sample-test = "0.1"

# ugly hack needed to match this library in sample_arrow2
[patch.crates-io]
arrow2 = { path = "." }

[package.metadata.docs.rs]
features = ["full"]
Expand All @@ -149,7 +159,7 @@ full = [
"io_ipc_compression",
"io_json_integration",
"io_print",
"io_parquet",
"io_parquet_async",
"io_parquet_compression",
"io_avro",
"io_orc",
Expand All @@ -168,15 +178,18 @@ io_csv_async = ["io_csv_read_async"]
io_csv_read = ["csv", "lexical-core"]
io_csv_read_async = ["csv-async", "lexical-core", "futures"]
io_csv_write = ["csv-core", "streaming-iterator", "lexical-core"]
io_json = ["json-deserializer", "streaming-iterator", "fallible-streaming-iterator", "indexmap", "lexical-core"]
io_json = ["io_json_read", "io_json_write"]
io_json_read = ["json-deserializer", "indexmap", "lexical-core"]
io_json_write = ["streaming-iterator", "fallible-streaming-iterator", "lexical-core"]
io_ipc = ["arrow-format"]
io_ipc_write_async = ["io_ipc", "futures"]
io_ipc_read_async = ["io_ipc", "futures", "async-stream"]
io_ipc_compression = ["lz4", "zstd"]
io_flight = ["io_ipc", "arrow-format/flight-data"]

# base64 + io_ipc because arrow schemas are stored as base64-encoded ipc format.
io_parquet = ["parquet2", "io_ipc", "base64", "futures", "streaming-iterator", "fallible-streaming-iterator"]
io_parquet = ["parquet2", "io_ipc", "base64", "streaming-iterator", "fallible-streaming-iterator"]
io_parquet_async = ["futures", "io_parquet", "parquet2/async"]

io_parquet_compression = [
"io_parquet_zstd",
Expand All @@ -186,6 +199,9 @@ io_parquet_compression = [
"io_parquet_brotli"
]

# sample testing of generated arrow data
io_parquet_sample_test = ["io_parquet_async"]

# compression backends
io_parquet_zstd = ["parquet2/zstd"]
io_parquet_snappy = ["parquet2/snappy"]
Expand All @@ -211,7 +227,8 @@ io_json_integration = ["hex", "serde", "serde_derive", "serde_json", "io_ipc"]
io_print = ["comfy-table"]
# the compute kernels. Disabling this significantly reduces compile time.
compute_aggregate = ["multiversion"]
compute_arithmetics = ["strength_reduce"]
compute_arithmetics_decimal = ["strength_reduce"]
compute_arithmetics = ["strength_reduce", "compute_arithmetics_decimal"]
compute_bitwise = []
compute_boolean = []
compute_boolean_kleene = []
Expand All @@ -223,13 +240,13 @@ compute_filter = []
compute_hash = ["multiversion"]
compute_if_then_else = []
compute_length = []
compute_like = ["regex", "regex-syntax"]
compute_like = ["regex", "regex-syntax", "dep:memchr"]
compute_limit = []
compute_merge_sort = ["itertools", "compute_sort"]
compute_nullif = ["compute_comparison"]
compute_partition = ["compute_sort"]
compute_regex_match = ["regex"]
compute_sort = ["compute_take", "hashbrown"]
compute_sort = ["compute_take"]
compute_substring = []
compute_take = []
compute_temporal = []
Expand Down Expand Up @@ -380,3 +397,8 @@ harness = false
[[bench]]
name = "assign_ops"
harness = false

[[bench]]
name = "like_kernels"
harness = false

22 changes: 22 additions & 0 deletions benches/like_kernels.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use arrow2::util::bench_util::create_string_array;
use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::*;
use arrow2::compute::like::like_utf8_scalar;

fn bench_like(array: &Utf8Array<i32>, pattern: &str) {
criterion::black_box(like_utf8_scalar(array, pattern).unwrap());
}

fn add_benchmark(c: &mut Criterion) {
for size_log2 in 16..21_u32 {
let size = size_log2.pow(2) as usize;
let array = create_string_array::<i32>(100, size, 0.0, 0);
c.bench_function(&format!("LIKE length = 2^{}", size_log2), |b| {
b.iter(|| bench_like(&array, "%abba%"))
});
}
}

criterion_group!(benches, add_benchmark);
criterion_main!(benches);
30 changes: 30 additions & 0 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,18 @@ def case_nested() -> Tuple[dict, pa.Schema, str]:
[""],
]

decimal_nullable = [[Decimal(n) if n is not None else None for n in sublist] if sublist is not None else None for sublist in items_nullable]
decimal_nested = [
[[Decimal(0), Decimal(1)]],
None,
[[Decimal(2), None], [Decimal(3)]],
[[Decimal(4), Decimal(5)], [Decimal(6)]],
[],
[[Decimal(7)], None, [Decimal(9)]],
[[], [None], None],
[[Decimal(10)]],
]

list_struct_nullable = [
[{"a": "a"}, {"a": "b"}],
None,
Expand Down Expand Up @@ -222,7 +234,16 @@ def case_nested() -> Tuple[dict, pa.Schema, str]:
pa.field("list_bool", pa.list_(pa.bool_())),
pa.field("list_utf8", pa.list_(pa.utf8())),
pa.field("list_large_binary", pa.list_(pa.large_binary())),
pa.field("list_decimal_9", pa.list_(pa.decimal128(9, 0))),
pa.field("list_decimal_18", pa.list_(pa.decimal128(18, 0))),
pa.field("list_decimal_26", pa.list_(pa.decimal128(26, 0))),
pa.field("list_decimal256_9", pa.list_(pa.decimal256(9, 0))),
pa.field("list_decimal256_18", pa.list_(pa.decimal256(18, 0))),
pa.field("list_decimal256_26", pa.list_(pa.decimal256(26, 0))),
pa.field("list_decimal256_39", pa.list_(pa.decimal256(39, 0))),
pa.field("list_decimal256_76", pa.list_(pa.decimal256(76, 0))),
pa.field("list_nested_i64", pa.list_(pa.list_(pa.int64()))),
pa.field("list_nested_decimal", pa.list_(pa.list_(pa.decimal128(9, 0)))),
pa.field("list_nested_inner_required_i64", pa.list_(pa.list_(pa.int64()))),
pa.field(
"list_nested_inner_required_required_i64", pa.list_(pa.list_(pa.int64()))
Expand Down Expand Up @@ -251,7 +272,16 @@ def case_nested() -> Tuple[dict, pa.Schema, str]:
"list_bool": boolean,
"list_utf8": string,
"list_large_binary": string,
"list_decimal_9": decimal_nullable,
"list_decimal_18": decimal_nullable,
"list_decimal_26": decimal_nullable,
"list_decimal256_9": decimal_nullable,
"list_decimal256_18": decimal_nullable,
"list_decimal256_26": decimal_nullable,
"list_decimal256_39": decimal_nullable,
"list_decimal256_76": decimal_nullable,
"list_nested_i64": items_nested,
"list_nested_decimal": decimal_nested,
"list_nested_inner_required_i64": items_required_nested,
"list_nested_inner_required_required_i64": items_required_nested_2,
"list_struct_nullable": list_struct_nullable,
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[toolchain]
channel = "nightly-2022-12-05"
channel = "nightly-2023-06-01"
12 changes: 12 additions & 0 deletions src/array/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,18 @@ impl<O: Offset> BinaryArray<O> {
impl_mut_validity!();
impl_into_array!();

/// Returns its internal representation
#[must_use]
pub fn into_inner(self) -> (DataType, OffsetsBuffer<O>, Buffer<u8>, Option<Bitmap>) {
let Self {
data_type,
offsets,
values,
validity,
} = self;
(data_type, offsets, values, validity)
}

/// Try to convert this `BinaryArray` to a `MutableBinaryArray`
#[must_use]
pub fn into_mut(self) -> Either<Self, MutableBinaryArray<O>> {
Expand Down
15 changes: 15 additions & 0 deletions src/array/binary/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,21 @@ impl<O: Offset> MutableBinaryArray<O> {
let (offsets, values) = values_iter(iterator);
Self::try_new(Self::default_data_type(), offsets, values, None).unwrap()
}

/// Extend with a fallible iterator
pub fn extend_fallible<T, I, E>(&mut self, iter: I) -> std::result::Result<(), E>
where
E: std::error::Error,
I: IntoIterator<Item = std::result::Result<Option<T>, E>>,
T: AsRef<[u8]>,
{
let mut iter = iter.into_iter();
self.reserve(iter.size_hint().0, 0);
iter.try_for_each(|x| {
self.push(x?);
Ok(())
})
}
}

impl<O: Offset, T: AsRef<[u8]>> Extend<Option<T>> for MutableBinaryArray<O> {
Expand Down
15 changes: 15 additions & 0 deletions src/array/binary/mutable_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,21 @@ impl<O: Offset> MutableBinaryValuesArray<O> {
}
Ok(array)
}

/// Extend with a fallible iterator
pub fn extend_fallible<T, I, E>(&mut self, iter: I) -> std::result::Result<(), E>
where
E: std::error::Error,
I: IntoIterator<Item = std::result::Result<T, E>>,
T: AsRef<[u8]>,
{
let mut iter = iter.into_iter();
self.reserve(iter.size_hint().0, 0);
iter.try_for_each(|x| {
self.push(x?);
Ok(())
})
}
}

impl<O: Offset, T: AsRef<[u8]>> Extend<T> for MutableBinaryValuesArray<O> {
Expand Down
17 changes: 17 additions & 0 deletions src/array/boolean/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,23 @@ impl BooleanArray {
} = self;
(data_type, values, validity)
}

/// Creates a `[BooleanArray]` from its internal representation.
/// This is the inverted from `[BooleanArray::into_inner]`
///
/// # Safety
/// Callers must ensure all invariants of this struct are upheld.
pub unsafe fn from_inner_unchecked(
data_type: DataType,
values: Bitmap,
validity: Option<Bitmap>,
) -> Self {
Self {
data_type,
values,
validity,
}
}
}

impl Array for BooleanArray {
Expand Down
6 changes: 5 additions & 1 deletion src/array/boolean/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,11 @@ impl TryExtendFromSelf for MutableBooleanArray {
extend_validity(self.len(), &mut self.validity, &other.validity);

let slice = other.values.as_slice();
self.values.extend_from_slice(slice, 0, other.values.len());
// safety: invariant offset + length <= slice.len()
unsafe {
self.values
.extend_from_slice_unchecked(slice, 0, other.values.len());
}
Ok(())
}
}
5 changes: 3 additions & 2 deletions src/array/dictionary/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{hint::unreachable_unchecked, sync::Arc};
use std::{hash::Hash, hint::unreachable_unchecked, sync::Arc};

use crate::{
bitmap::{
Expand All @@ -20,6 +20,7 @@ mod iterator;
mod mutable;
use crate::array::specification::check_indexes_unchecked;
mod typed_iterator;
mod value_map;

use crate::array::dictionary::typed_iterator::{DictValue, DictionaryValuesIterTyped};
pub use iterator::*;
Expand All @@ -33,7 +34,7 @@ use super::{new_null_array, specification::check_indexes};
///
/// Any implementation of this trait must ensure that `always_fits_usize` only
/// returns `true` if all values succeeds on `value::try_into::<usize>().unwrap()`.
pub unsafe trait DictionaryKey: NativeType + TryInto<usize> + TryFrom<usize> {
pub unsafe trait DictionaryKey: NativeType + TryInto<usize> + TryFrom<usize> + Hash {
/// The corresponding [`IntegerType`] of this key
const KEY_TYPE: IntegerType;

Expand Down
Loading

0 comments on commit 664d021

Please sign in to comment.