|
| 1 | +// SPDX-License-Identifier: Apache-2.0 |
| 2 | +// SPDX-FileCopyrightText: Copyright The Lance Authors |
| 3 | + |
| 4 | +use arrow::array::UInt32Array; |
| 5 | +use arrow::datatypes::{DataType, Field, Schema}; |
| 6 | +use arrow::record_batch::{RecordBatch, RecordBatchIterator}; |
| 7 | +use futures::StreamExt; |
| 8 | +use lance::dataset::{WriteMode, WriteParams}; |
| 9 | +use lance::Dataset; |
| 10 | +use std::sync::Arc; |
| 11 | + |
| 12 | +// Writes sample dataset to the given path |
| 13 | +async fn write_dataset(data_path: &str) { |
| 14 | + // Define new schema |
| 15 | + let schema = Arc::new(Schema::new(vec![ |
| 16 | + Field::new("key", DataType::UInt32, false), |
| 17 | + Field::new("value", DataType::UInt32, false), |
| 18 | + ])); |
| 19 | + |
| 20 | + // Create new record batches |
| 21 | + let batch = RecordBatch::try_new( |
| 22 | + schema.clone(), |
| 23 | + vec![ |
| 24 | + Arc::new(UInt32Array::from(vec![1, 2, 3, 4, 5, 6])), |
| 25 | + Arc::new(UInt32Array::from(vec![6, 7, 8, 9, 10, 11])), |
| 26 | + ], |
| 27 | + ) |
| 28 | + .unwrap(); |
| 29 | + |
| 30 | + let batches = RecordBatchIterator::new([Ok(batch)], schema.clone()); |
| 31 | + |
| 32 | + // Define write parameters (e.g. overwrite dataset) |
| 33 | + let write_params = WriteParams { |
| 34 | + mode: WriteMode::Overwrite, |
| 35 | + ..Default::default() |
| 36 | + }; |
| 37 | + |
| 38 | + Dataset::write(batches, data_path, Some(write_params)) |
| 39 | + .await |
| 40 | + .unwrap(); |
| 41 | +} // End write dataset |
| 42 | + |
| 43 | +// Reads dataset from the given path and prints batch size, schema for all record batches. Also extracts and prints a slice from the first batch |
| 44 | +async fn read_dataset(data_path: &str) { |
| 45 | + let dataset = Dataset::open(data_path).await.unwrap(); |
| 46 | + let scanner = dataset.scan(); |
| 47 | + |
| 48 | + let mut batch_stream = scanner.try_into_stream().await.unwrap().map(|b| b.unwrap()); |
| 49 | + |
| 50 | + while let Some(batch) = batch_stream.next().await { |
| 51 | + println!("Batch size: {}, {}", batch.num_rows(), batch.num_columns()); // print size of batch |
| 52 | + println!("Schema: {:?}", batch.schema()); // print schema of recordbatch |
| 53 | + |
| 54 | + println!("Batch: {:?}", batch); // print the entire recordbatch (schema and data) |
| 55 | + } |
| 56 | +} // End read dataset |
| 57 | + |
| 58 | +#[tokio::main] |
| 59 | +async fn main() { |
| 60 | + let data_path: &str = "./temp_data.lance"; |
| 61 | + |
| 62 | + write_dataset(data_path).await; |
| 63 | + read_dataset(data_path).await; |
| 64 | +} |
0 commit comments