Skip to content

Commit

Permalink
fixed issues after rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
mightyshazam committed Jan 9, 2024
1 parent e61a4c0 commit 2aceb87
Show file tree
Hide file tree
Showing 30 changed files with 304 additions and 5,256 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ members = [
"kafka-delta-ingest-wasm",
"kafka-delta-ingest-wasm-sdk",
]
exclude = ["proofs"]

resolver = "2"

[profile.release-with-debug]
Expand Down
1 change: 1 addition & 0 deletions kafka-delta-ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ s3 = [
"rusoto_credential",
"rusoto_s3",
]
wasm = []

[dev-dependencies]
utime = "0.3"
Expand Down
32 changes: 16 additions & 16 deletions kafka-delta-ingest/src/coercions.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use deltalake::{Schema as DeltaSchema, SchemaDataType as DeltaDataType};
use deltalake_core::kernel::Schema as DeltaSchema;
use deltalake_core::kernel::{DataType, PrimitiveType};

use chrono::prelude::*;
use serde_json::Value;
Expand Down Expand Up @@ -34,33 +35,32 @@ pub(crate) struct CoercionArray {
pub(crate) fn create_coercion_tree(schema: &DeltaSchema) -> CoercionTree {
let mut root = HashMap::new();

for field in schema.get_fields() {
if let Some(node) = build_coercion_node(field.get_type()) {
root.insert(field.get_name().to_string(), node);
for field in schema.fields() {
if let Some(node) = build_coercion_node(field.data_type()) {
root.insert(field.name().to_string(), node);
}
}

CoercionTree { root }
}

fn build_coercion_node(r#type: &DeltaDataType) -> Option<CoercionNode> {
match r#type {
DeltaDataType::primitive(r#type) if r#type == "string" => {
Some(CoercionNode::Coercion(Coercion::ToString))
}
DeltaDataType::primitive(r#type) if r#type == "timestamp" => {
Some(CoercionNode::Coercion(Coercion::ToTimestamp))
}
DeltaDataType::r#struct(schema) => {
let nested_context = create_coercion_tree(schema);
fn build_coercion_node(data_type: &DataType) -> Option<CoercionNode> {
match data_type {
DataType::Primitive(primitive) => match primitive {
PrimitiveType::String => Some(CoercionNode::Coercion(Coercion::ToString)),
PrimitiveType::Timestamp => Some(CoercionNode::Coercion(Coercion::ToTimestamp)),
_ => None,
},
DataType::Struct(st) => {
let nested_context = create_coercion_tree(st);
if !nested_context.root.is_empty() {
Some(CoercionNode::Tree(nested_context))
} else {
None
}
}
DeltaDataType::array(schema) => {
build_coercion_node(schema.get_element_type()).and_then(|node| match node {
DataType::Array(array) => {
build_coercion_node(array.element_type()).and_then(|node| match node {
CoercionNode::Coercion(c) => Some(CoercionNode::ArrayPrimitive(c)),
CoercionNode::Tree(t) => Some(CoercionNode::ArrayTree(t)),
_ => None,
Expand Down
6 changes: 3 additions & 3 deletions kafka-delta-ingest/src/dead_letters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crate::transforms::Transformer;
use async_trait::async_trait;
use chrono::prelude::*;
use core::fmt::Debug;
use deltalake::parquet::errors::ParquetError;
use deltalake::{DeltaTable, DeltaTableError};
use deltalake_core::parquet::errors::ParquetError;
use deltalake_core::{DeltaTable, DeltaTableError};
#[cfg(feature = "s3")]
use dynamodb_lock::dynamo_lock_options;
use log::{error, info, warn};
Expand Down Expand Up @@ -254,7 +254,7 @@ impl DeltaSinkDeadLetterQueue {
dynamo_lock_options::DYNAMO_LOCK_PARTITION_KEY_VALUE.to_string() => std::env::var(env_vars::DEAD_LETTER_DYNAMO_LOCK_PARTITION_KEY_VALUE)
.unwrap_or_else(|_| "kafka_delta_ingest-dead_letters".to_string()),
};
#[cfg(feature = "azure")]
#[cfg(all(feature = "azure", not(feature = "s3")))]
let opts = HashMap::default();

let table = crate::delta_helpers::load_table(table_uri, opts.clone()).await?;
Expand Down
14 changes: 7 additions & 7 deletions kafka-delta-ingest/src/delta_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::{DataTypeOffset, DataTypePartition};
use deltalake::protocol::{Action, Add, Txn};
use deltalake::{DeltaTable, DeltaTableError};
use deltalake_core::kernel::{Action, Add, Txn};
use deltalake_core::{DeltaTable, DeltaTableError};
use std::collections::HashMap;

pub(crate) async fn load_table(
table_uri: &str,
options: HashMap<String, String>,
) -> Result<DeltaTable, DeltaTableError> {
let mut table = deltalake::open_table_with_storage_options(table_uri, options).await?;
let mut table = deltalake_core::open_table_with_storage_options(table_uri, options).await?;
table.load().await?;
Ok(table)
}
Expand All @@ -22,12 +22,12 @@ pub(crate) fn build_actions(
.map(|(partition, offset)| {
create_txn_action(txn_app_id_for_partition(app_id, *partition), *offset)
})
.chain(add.drain(..).map(Action::add))
.chain(add.drain(..).map(Action::Add))
.collect()
}

pub(crate) fn create_txn_action(txn_app_id: String, offset: DataTypeOffset) -> Action {
Action::txn(Txn {
Action::Txn(Txn {
app_id: txn_app_id,
version: offset,
last_updated: Some(
Expand All @@ -52,10 +52,10 @@ pub(crate) async fn try_create_checkpoint(
table.load_version(version).await?;
}

deltalake::checkpoints::create_checkpoint(table).await?;
deltalake_core::checkpoints::create_checkpoint(table).await?;
log::info!("Created checkpoint version {}.", version);

let removed = deltalake::checkpoints::cleanup_metadata(table).await?;
let removed = deltalake_core::checkpoints::cleanup_metadata(table).await?;
if removed > 0 {
log::info!("Metadata cleanup, removed {} obsolete logs.", removed);
}
Expand Down
29 changes: 8 additions & 21 deletions kafka-delta-ingest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ extern crate strum_macros;
extern crate serde_json;

use coercions::CoercionTree;
use deltalake::protocol::DeltaOperation;
use deltalake::protocol::OutputMode;
use deltalake::{DeltaTable, DeltaTableError};
use deltalake_core::protocol::DeltaOperation;
use deltalake_core::protocol::OutputMode;
use deltalake_core::{DeltaTable, DeltaTableError};
use futures::stream::StreamExt;
use log::{debug, error, info, warn};
use rdkafka::{
Expand Down Expand Up @@ -754,7 +754,7 @@ impl IngestProcessor {
let dlq = dead_letter_queue_from_options(&opts).await?;
let transformer = Transformer::from_transforms(&opts.transforms)?;
let table = delta_helpers::load_table(table_uri, HashMap::new()).await?;
let coercion_tree = coercions::create_coercion_tree(&table.get_metadata()?.schema);
let coercion_tree = coercions::create_coercion_tree(table.schema().unwrap());
let delta_writer = DataWriter::for_table(&table, HashMap::new())?;
let deserializer = match MessageDeserializerFactory::try_build(&opts.input_format) {
Ok(deserializer) => deserializer,
Expand Down Expand Up @@ -947,11 +947,11 @@ impl IngestProcessor {

if self
.delta_writer
.update_schema(self.table.get_metadata()?)?
.update_schema(self.table.state.delta_metadata().unwrap())?
{
info!("Table schema has been updated");
// Update the coercion tree to reflect the new schema
let coercion_tree = coercions::create_coercion_tree(&self.table.get_metadata()?.schema);
let coercion_tree = coercions::create_coercion_tree(self.table.schema().unwrap());
let _ = std::mem::replace(&mut self.coercion_tree, coercion_tree);

return Err(IngestError::DeltaSchemaChanged);
Expand All @@ -961,25 +961,12 @@ impl IngestProcessor {
let mut attempt_number: u32 = 0;
let actions = build_actions(&partition_offsets, self.opts.app_id.as_str(), add);
loop {
/*let partition_columns = self.table.get_metadata().unwrap().partition_columns.clone();
match deltalake::operations::transaction::commit(
(self.table.object_store().storage_backend()).as_ref(),
&actions,
deltalake::action::DeltaOperation::Write {
mode: deltalake::action::SaveMode::Append,
partition_by: Some(partition_columns),
predicate: None,
},
&self.table.state,
None,
)*/

let epoch_id = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as i64;
match deltalake::operations::transaction::commit(
(self.table.object_store().storage_backend()).as_ref(),
match deltalake_core::operations::transaction::commit(
self.table.log_store().clone().as_ref(),
&actions,
DeltaOperation::StreamingUpdate {
output_mode: OutputMode::Append,
Expand Down
5 changes: 5 additions & 0 deletions kafka-delta-ingest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ use std::str::FromStr;

#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
#[cfg(feature = "s3")]
deltalake_aws::register_handlers(None);
#[cfg(feature = "azure")]
deltalake_azure::register_handlers(None);

#[cfg(feature = "sentry-ext")]
{
let _guard = std::env::var("SENTRY_DSN").ok().map(|dsn| {
Expand Down
14 changes: 7 additions & 7 deletions kafka-delta-ingest/src/offsets.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::delta_helpers::*;
use crate::{DataTypeOffset, DataTypePartition};
use deltalake::protocol::Action;
use deltalake::protocol::DeltaOperation;
use deltalake::protocol::OutputMode;
use deltalake::{DeltaTable, DeltaTableError};
use deltalake_core::kernel::Action;
use deltalake_core::protocol::DeltaOperation;
use deltalake_core::protocol::OutputMode;
use deltalake_core::{DeltaTable, DeltaTableError};
use log::{error, info};

/// Errors returned by `write_offsets_to_delta` function.
Expand Down Expand Up @@ -115,8 +115,8 @@ async fn commit_partition_offsets(
.as_millis() as i64;

table.update().await?;
match deltalake::operations::transaction::commit(
(table.object_store().storage_backend()).as_ref(),
match deltalake_core::operations::transaction::commit(
table.log_store().clone().as_ref(),
&actions,
DeltaOperation::StreamingUpdate {
output_mode: OutputMode::Complete,
Expand Down Expand Up @@ -221,6 +221,6 @@ mod tests {
let v0_path = format!("{}/_delta_log/00000000000000000000.json", &table_path);
std::fs::create_dir_all(Path::new(&v0_path).parent().unwrap()).unwrap();
std::fs::write(&v0_path, VERSION_0).unwrap();
deltalake::open_table(&table_path).await.unwrap()
deltalake_core::open_table(&table_path).await.unwrap()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,6 @@ pub enum TransformError {
#[from]
source: serde_json::Error,
},

#[cfg(feature = "wasm")]
#[error("webassembly error: {source}")]
Wasm {
#[from]
source: Box<dyn std::error::Error>,
},
}

// Error thrown from custom functions registered in the jmespath Runtime
Expand Down
2 changes: 1 addition & 1 deletion kafka-delta-ingest/src/value_buffers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ mod tests {

values.sort();

let expected: Vec<String> = vec!["\"0:0\"", "\"0:1\"", "\"0:2\"", "\"1:0\"", "\"1:1\""]
let expected: Vec<String> = ["\"0:0\"", "\"0:1\"", "\"0:2\"", "\"1:0\"", "\"1:1\""]
.iter()
.map(|s| s.to_string())
.collect();
Expand Down
Loading

0 comments on commit 2aceb87

Please sign in to comment.