forked from jorgecarleitao/arrow2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathavro_read.rs
76 lines (61 loc) · 1.92 KB
/
avro_read.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
use std::io::Cursor;
use avro_rs::types::Record;
use criterion::*;
use arrow2::error::Result;
use arrow2::io::avro::avro_schema::read::read_metadata;
use arrow2::io::avro::read;
use avro_rs::*;
use avro_rs::{Codec, Schema as AvroSchema};
fn schema() -> AvroSchema {
let raw_schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "string"}
]
}
"#;
AvroSchema::parse_str(raw_schema).unwrap()
}
fn write(size: usize, has_codec: bool) -> Result<Vec<u8>> {
let avro = schema();
// a writer needs a schema and something to write to
let mut writer = if has_codec {
Writer::with_codec(&avro, Vec::new(), Codec::Deflate)
} else {
Writer::new(&avro, Vec::new())
};
(0..size).for_each(|_| {
let mut record = Record::new(writer.schema()).unwrap();
record.put("a", "foo");
writer.append(record).unwrap();
});
Ok(writer.into_inner().unwrap())
}
fn read_batch(buffer: &[u8], size: usize) -> Result<()> {
let mut file = Cursor::new(buffer);
let metadata = read_metadata(&mut file)?;
let schema = read::infer_schema(&metadata.record)?;
let reader = read::Reader::new(file, metadata, schema.fields, None);
let mut rows = 0;
for maybe_batch in reader {
let batch = maybe_batch?;
rows += batch.len();
}
assert_eq!(rows, size);
Ok(())
}
fn add_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("avro_read");
for log2_size in (10..=20).step_by(2) {
let size = 2usize.pow(log2_size);
let buffer = write(size, false).unwrap();
group.throughput(Throughput::Elements(size as u64));
group.bench_with_input(BenchmarkId::new("utf8", log2_size), &buffer, |b, buffer| {
b.iter(|| read_batch(buffer, size).unwrap())
});
}
}
criterion_group!(benches, add_benchmark);
criterion_main!(benches);