diff --git a/Cargo.lock b/Cargo.lock index b9d9c83..1414e9f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -370,12 +370,27 @@ version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "array-init" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d62b7694a562cdf5a74227903507c56ab2cc8bdd1f781ed5cb4cf9c9f810bfc" +[[package]] +name = "array-util" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e509844de8f09b90a2c3444684a2b6695f4071360e13d2fda0af9f749cc2ed6" +dependencies = [ + "arrayvec", +] + [[package]] name = "arrayref" version = "0.3.9" @@ -601,6 +616,18 @@ dependencies = [ "regex-syntax 0.8.5", ] +[[package]] +name = "async-channel" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-compression" version = "0.4.19" @@ -618,6 +645,34 @@ dependencies = [ "zstd-safe", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "async-trait" version = "0.1.88" @@ -644,6 +699,18 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "auto_enums" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c170965892137a3a9aeb000b4524aa3cc022a310e709d848b6e1cdce4ab4781" +dependencies = [ + "derive_utils", + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "autocfg" version = "1.4.0" @@ -1587,7 +1654,7 @@ dependencies = [ "anstream", "anstyle", "clap_lex", - "strsim", + "strsim 0.11.1", ] [[package]] @@ -1617,6 +1684,15 @@ dependencies = [ "cc", ] +[[package]] +name = "cmsketch" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "553c840ee51da812c6cd621f9f7e07dfb00a49f91283a8e6380c78cba4f61aba" +dependencies = [ + "paste", +] + [[package]] name = "color-eyre" version = "0.6.3" @@ -1660,6 +1736,15 @@ dependencies = [ "unicode-width 0.2.0", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -1959,14 +2044,38 @@ dependencies = [ "memchr", ] +[[package]] +name = "darling" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b750cb3417fd1b327431a470f388520309479ab0bf5e323505daf0290cd3850" +dependencies = [ + "darling_core 0.14.4", + "darling_macro 0.14.4", +] + [[package]] name = "darling" version = "0.20.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" dependencies = [ - "darling_core", - "darling_macro", + "darling_core 0.20.11", + "darling_macro 0.20.11", +] + +[[package]] +name = "darling_core" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "109c1ca6e6b7f82cc233a97004ea8ed7ca123a9af07a8230878fcfda9b158bf0" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim 0.10.0", + "syn 1.0.109", ] [[package]] @@ -1979,17 +2088,28 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "strsim", + "strsim 0.11.1", "syn 2.0.100", ] +[[package]] +name = "darling_macro" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4aab4dbc9f7611d8b55048a3a16d2d010c2c8334e46304b40ac1cc14bf3b48e" +dependencies = [ + "darling_core 0.14.4", + "quote", + "syn 1.0.109", +] + [[package]] name = "darling_macro" version = "0.20.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" dependencies = [ - "darling_core", + "darling_core 0.20.11", "quote", "syn 2.0.100", ] @@ -2599,7 +2719,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78889f4005974b848f130fa5dedae81987f1bc93b107291ea87d900c93b6c3bb" dependencies = [ "deltalake-aws", + "deltalake-azure", "deltalake-core", + "deltalake-gcp", ] [[package]] @@ -2629,6 +2751,24 @@ dependencies = [ "uuid", ] +[[package]] +name = "deltalake-azure" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dfbea4786321ebd88e083ec74ce513ec7fcad9ddc880b611770dee012652567" +dependencies = [ + "async-trait", + "bytes", + "deltalake-core", + "futures", + "object_store", + "regex 1.11.1", + "thiserror 2.0.12", + "tokio", + "tracing", + "url", +] + [[package]] name = "deltalake-core" version = "0.25.0" @@ -2694,6 +2834,24 @@ dependencies = [ "z85", ] +[[package]] +name = "deltalake-gcp" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa80de5b3e9e53eb9a98d976ac1d64a70b798a73d63cdd83497cc317a5063602" +dependencies = [ + "async-trait", + "bytes", + "deltalake-core", + "futures", + "object_store", + "regex 1.11.1", + "thiserror 2.0.12", + "tokio", + "tracing", + "url", +] + [[package]] name = "der" version = "0.6.1" @@ -2759,6 +2917,17 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "derive_utils" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccfae181bab5ab6c5478b2ccb69e4c68a02f8c3ec72f6616bfec9dbc599d2ee0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "digest" version = "0.10.7" @@ -2787,6 +2956,12 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" +[[package]] +name = "downcast-rs" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75b325c5dbd37f80359721ad39aca5a29fb04c89279657cffdda8736d0c0b9d2" + [[package]] name = "dunce" version = "1.0.5" @@ -2917,6 +3092,27 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5692dd7b5a1978a5aeb0ce83b7655c58ca8efdcb79d21036ea249da95afec2c6" +[[package]] +name = "event-listener" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3492acde4c3fc54c845eaab3eed8bd00c7a7d881f78bfc801e43a93dec1331ae" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "eyre" version = "0.6.12" @@ -2995,6 +3191,18 @@ dependencies = [ "miniz_oxide 0.8.8", ] +[[package]] +name = "flume" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "spin", +] + [[package]] name = "fnv" version = "1.0.7" @@ -3031,6 +3239,116 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "foyer" +version = "0.17.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "635c7077026867cb5e5ea576c461f29b1c4151fce7a9d7cc3a1b1a9902d95c65" +dependencies = [ + "anyhow", + "equivalent", + "foyer-common", + "foyer-memory", + "foyer-storage", + "madsim-tokio", + "mixtrics", + "pin-project", + "serde", + "tokio", + "tracing", +] + +[[package]] +name = "foyer-common" +version = "0.17.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ed2316785e80137c7b91bb74dab1dc1967c3272df05825397b73ae8fc527041" +dependencies = [ + "ahash 0.8.11", + "bincode", + "bytes", + "cfg-if", + "itertools 0.14.0", + "madsim-tokio", + "mixtrics", + "parking_lot 0.12.3", + "pin-project", + "serde", + "thiserror 2.0.12", + "tokio", +] + +[[package]] +name = "foyer-intrusive-collections" +version = "0.10.0-dev" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e4fee46bea69e0596130e3210e65d3424e0ac1e6df3bde6636304bdf1ca4a3b" +dependencies = [ + "memoffset", +] + +[[package]] +name = "foyer-memory" +version = "0.17.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "090cf5b89d49fd61e7da9bfae3a1aef605f03196d542b2f8171c74f3add013f4" +dependencies = [ + "ahash 0.8.11", + "arc-swap", + "bitflags 2.9.0", + "cmsketch", + "equivalent", + "foyer-common", + "foyer-intrusive-collections", + "hashbrown 0.15.2", + "itertools 0.14.0", + "madsim-tokio", + "mixtrics", + "parking_lot 0.12.3", + "pin-project", + "serde", + "thiserror 2.0.12", + "tokio", + "tracing", +] + +[[package]] +name = "foyer-storage" +version = "0.17.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "095e857c97d6339d4a4a6424b88d08fe08ad0366bfbfaf65d6ddf55baf3d2a38" +dependencies = [ + "ahash 0.8.11", + "allocator-api2", + "anyhow", + "array-util", + "auto_enums", + "bytes", + "clap", + "equivalent", + "flume", + "foyer-common", + "foyer-memory", + "fs4", + "futures-core", + "futures-util", + "itertools 0.14.0", + "libc", + "lz4", + "madsim-tokio", + "ordered_hash_map", + "parking_lot 0.12.3", + "paste", + "pin-project", + "rand 0.9.0", + "serde", + "thiserror 2.0.12", + "tokio", + "tracing", + "twox-hash 2.1.0", + "zstd", +] + [[package]] name = "fs-err" version = "3.1.0" @@ -3050,6 +3368,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "fs4" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8640e34b88f7652208ce9e88b1a37a2ae95227d84abec377ccd3c5cfeb141ed4" +dependencies = [ + "rustix 1.0.5", + "windows-sys 0.59.0", +] + [[package]] name = "fs_extra" version = "1.3.0" @@ -3279,6 +3607,15 @@ dependencies = [ "ahash 0.7.8", ] +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" +dependencies = [ + "ahash 0.8.11", +] + [[package]] name = "hashbrown" version = "0.14.5" @@ -4101,13 +4438,32 @@ dependencies = [ "hashbrown 0.15.2", ] +[[package]] +name = "lz4" +version = "1.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a20b523e860d03443e98350ceaac5e71c6ba89aea7d960769ec3ce37f4de5af4" +dependencies = [ + "lz4-sys", +] + +[[package]] +name = "lz4-sys" +version = "1.11.1+lz4-1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "lz4_flex" version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" dependencies = [ - "twox-hash", + "twox-hash 1.6.3", ] [[package]] @@ -4121,6 +4477,60 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "madsim" +version = "0.2.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3c97f34bb19cf6a435a4da2187e90acc6bc59faa730e493b28b6d33e1bb9ccb" +dependencies = [ + "ahash 0.8.11", + "async-channel", + "async-stream", + "async-task", + "bincode", + "bytes", + "downcast-rs", + "futures-util", + "lazy_static", + "libc", + "madsim-macros", + "naive-timer", + "panic-message", + "rand 0.8.5", + "rand_xoshiro", + "rustversion", + "serde", + "spin", + "tokio", + "tokio-util", + "toml", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "madsim-macros" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3d248e97b1a48826a12c3828d921e8548e714394bf17274dd0a93910dc946e1" +dependencies = [ + "darling 0.14.4", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "madsim-tokio" +version = "0.2.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d3eb2acc57c82d21d699119b859e2df70a91dbdb84734885a1e72be83bdecb5" +dependencies = [ + "madsim", + "spin", + "tokio", +] + [[package]] name = "maplit" version = "1.0.2" @@ -4182,6 +4592,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "metrics" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25dea7ac8057892855ec285c440160265225438c3c45072613c25a4b26e98ef5" +dependencies = [ + "ahash 0.8.11", + "portable-atomic", +] + [[package]] name = "mime" version = "0.3.17" @@ -4234,6 +4654,31 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "mixtrics" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "749ed12bab176c8a42c13a679dd2de12876d5ad4abe7525548e31ae001a9ebbf" +dependencies = [ + "itertools 0.14.0", + "parking_lot 0.12.3", +] + +[[package]] +name = "naive-timer" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "034a0ad7deebf0c2abcf2435950a6666c3c15ea9d8fad0c0f48efa8a7f843fed" + +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom 0.2.15", +] + [[package]] name = "native-tls" version = "0.2.14" @@ -4392,6 +4837,7 @@ dependencies = [ "bytes", "chrono", "futures", + "httparse", "humantime", "hyper 1.6.0", "itertools 0.13.0", @@ -4402,6 +4848,7 @@ dependencies = [ "rand 0.8.5", "reqwest", "ring", + "rustls-pemfile 2.2.0", "serde", "serde_json", "snafu", @@ -4554,6 +5001,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered_hash_map" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab0e5f22bf6dd04abd854a8874247813a8fa2c8c1260eba6fbb150270ce7c176" +dependencies = [ + "hashbrown 0.13.2", +] + [[package]] name = "outref" version = "0.5.2" @@ -4589,6 +5045,18 @@ dependencies = [ "sha2", ] +[[package]] +name = "panic-message" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384e52fd8fbd4cbe3c317e8216260c21a0f9134de108cea8a4dd4e7e152c472d" + +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.11.2" @@ -4669,7 +5137,7 @@ dependencies = [ "snap", "thrift", "tokio", - "twox-hash", + "twox-hash 1.6.3", "zstd", "zstd-sys", ] @@ -5193,6 +5661,15 @@ dependencies = [ "getrandom 0.3.2", ] +[[package]] +name = "rand_xoshiro" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f97cdb2a36ed4183de61b2f824cc45c9f1037f28afe0a322e9fff4c108b5aaa" +dependencies = [ + "rand_core 0.6.4", +] + [[package]] name = "rayon" version = "1.10.0" @@ -5795,6 +6272,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_spanned" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87607cb1398ed59d48732e575a4c28a7a8ebf2454b964fe3f224f2afc07909e1" +dependencies = [ + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -5831,7 +6317,7 @@ version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d00caa5193a3c8362ac2b73be6b9e768aa5a4b2f721d8f4b339600c3cb51f8e" dependencies = [ - "darling", + "darling 0.20.11", "proc-macro2", "quote", "syn 2.0.100", @@ -6004,6 +6490,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "spki" version = "0.6.0" @@ -6115,6 +6610,12 @@ dependencies = [ "unicode-properties", ] +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + [[package]] name = "strsim" version = "0.11.1" @@ -6390,6 +6891,7 @@ dependencies = [ "anyhow", "arrow", "arrow-schema", + "async-stream", "async-trait", "aws-config", "aws-sdk-s3", @@ -6410,9 +6912,13 @@ dependencies = [ "deltalake", "dotenv", "env_logger", + "flate2", + "foyer", "futures", "lazy_static", "log 0.4.27", + "metrics", + "object_store", "opentelemetry", "opentelemetry-otlp", "opentelemetry_sdk", @@ -6614,11 +7120,26 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd87a5cdd6ffab733b2f74bc4fd7ee5fff6634124999ac278c35fc78c6120148" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit", +] + [[package]] name = "toml_datetime" version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41" +dependencies = [ + "serde", +] [[package]] name = "toml_edit" @@ -6627,6 +7148,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "17b4795ff5edd201c7cd6dca065ae59972ce77d1b80fa0a84d94950ece7d1474" dependencies = [ "indexmap 2.9.0", + "serde", + "serde_spanned", "toml_datetime", "winnow", ] @@ -6785,6 +7308,15 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "twox-hash" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7b17f197b3050ba473acf9181f7b1d3b66d1cf7356c6cc57886662276e65908" +dependencies = [ + "rand 0.8.5", +] + [[package]] name = "typenum" version = "1.18.0" diff --git a/Cargo.toml b/Cargo.toml index 1840972..c4073fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,8 @@ edition = "2024" [dependencies] tokio = { version = "1.43", features = ["full"] } +foyer = {version ="0.17.0",features=["serde"]} +object_store = { version = "0.11.2", features = ["aws", "azure", "gcp"] } datafusion = "46.0.0" arrow = "54.2.0" uuid = { version = "1.13", features = ["v4", "serde"] } @@ -18,7 +20,7 @@ log = "0.4.25" color-eyre = "0.6.3" arrow-schema = "54.1.0" regex = "1.11.1" -deltalake = { version = "0.25.0", features = ["datafusion", "s3"] } +deltalake = { version = "0.25", features = ["datafusion", "s3","azure", "gcs",] } delta_kernel = { version = "0.8.0", features = [ "arrow-conversion", "default-engine", @@ -67,6 +69,9 @@ aws-sdk-s3 = "1.3.0" url = "2.5.4" datafusion-common = "46.0.0" tokio-cron-scheduler = "0.10" +metrics = "0.24.2" +flate2 = "1.1.1" +async-stream = "0.3" [dev-dependencies] serial_test = "3.2.0" @@ -75,5 +80,9 @@ scopeguard = "1.2.0" rand = "0.8.5" [features] -default = [] +default = ["s3", "azure", "gcs"] +s3 = ["deltalake/s3", "object_store/aws"] +azure = ["deltalake/azure", "object_store/azure"] +gcs = ["deltalake/gcs", "object_store/gcp"] test = [] + diff --git a/benches/benchmarks.rs b/benches/benchmarks.rs index a03c664..17441e2 100644 --- a/benches/benchmarks.rs +++ b/benches/benchmarks.rs @@ -34,51 +34,51 @@ fn bench_batch_ingestion(c: &mut Criterion) { let mut records = Vec::with_capacity(batch_size); for _ in 0..batch_size { records.push(IngestRecord { - table_name: "bench_table".to_string(), - project_id: "bench_project".to_string(), - id: Uuid::new_v4().to_string(), - version: 1, - event_type: "bench_event".to_string(), - timestamp: "2025-03-11T12:00:00Z".to_string(), - trace_id: "trace".to_string(), - span_id: "span".to_string(), - parent_span_id: None, - trace_state: None, - start_time: "2025-03-11T12:00:00Z".to_string(), - end_time: Some("2025-03-11T12:00:01Z".to_string()), - duration_ns: 1_000_000_000, - span_name: "span_name".to_string(), - span_kind: "client".to_string(), - span_type: "bench".to_string(), - status: None, - status_code: 0, - status_message: "OK".to_string(), - severity_text: None, - severity_number: 0, - host: "localhost".to_string(), - url_path: "/".to_string(), - raw_url: "/".to_string(), - method: "GET".to_string(), - referer: "".to_string(), - path_params: None, - query_params: None, - request_headers: None, - response_headers: None, - request_body: None, - response_body: None, - endpoint_hash: "hash".to_string(), - shape_hash: "shape".to_string(), - format_hashes: vec!["fmt".to_string()], - field_hashes: vec!["field".to_string()], - sdk_type: "rust".to_string(), - service_version: None, - attributes: None, - events: None, - links: None, - resource: None, + table_name: "bench_table".to_string(), + project_id: "bench_project".to_string(), + id: Uuid::new_v4().to_string(), + version: 1, + event_type: "bench_event".to_string(), + timestamp: "2025-03-11T12:00:00Z".to_string(), + trace_id: "trace".to_string(), + span_id: "span".to_string(), + parent_span_id: None, + trace_state: None, + start_time: "2025-03-11T12:00:00Z".to_string(), + end_time: Some("2025-03-11T12:00:01Z".to_string()), + duration_ns: 1_000_000_000, + span_name: "span_name".to_string(), + span_kind: "client".to_string(), + span_type: "bench".to_string(), + status: None, + status_code: 0, + status_message: "OK".to_string(), + severity_text: None, + severity_number: 0, + host: "localhost".to_string(), + url_path: "/".to_string(), + raw_url: "/".to_string(), + method: "GET".to_string(), + referer: "".to_string(), + path_params: None, + query_params: None, + request_headers: None, + response_headers: None, + request_body: None, + response_body: None, + endpoint_hash: "hash".to_string(), + shape_hash: "shape".to_string(), + format_hashes: vec!["fmt".to_string()], + field_hashes: vec!["field".to_string()], + sdk_type: "rust".to_string(), + service_version: None, + attributes: None, + events: None, + links: None, + resource: None, instrumentation_scope: None, - errors: None, - tags: vec!["tag".to_string()], + errors: None, + tags: vec!["tag".to_string()], }); } @@ -110,51 +110,51 @@ fn bench_insertion_range(c: &mut Criterion) { let mut records = Vec::with_capacity(size); for _ in 0..size { records.push(IngestRecord { - table_name: "bench_table".to_string(), - project_id: "bench_project".to_string(), - id: Uuid::new_v4().to_string(), - version: 1, - event_type: "bench_event".to_string(), - timestamp: "2025-03-11T12:00:00Z".to_string(), - trace_id: "trace".to_string(), - span_id: "span".to_string(), - parent_span_id: None, - trace_state: None, - start_time: "2025-03-11T12:00:00Z".to_string(), - end_time: Some("2025-03-11T12:00:01Z".to_string()), - duration_ns: 1_000_000_000, - span_name: "span_name".to_string(), - span_kind: "client".to_string(), - span_type: "bench".to_string(), - status: None, - status_code: 0, - status_message: "OK".to_string(), - severity_text: None, - severity_number: 0, - host: "localhost".to_string(), - url_path: "/".to_string(), - raw_url: "/".to_string(), - method: "GET".to_string(), - referer: "".to_string(), - path_params: None, - query_params: None, - request_headers: None, - response_headers: None, - request_body: None, - response_body: None, - endpoint_hash: "hash".to_string(), - shape_hash: "shape".to_string(), - format_hashes: vec!["fmt".to_string()], - field_hashes: vec!["field".to_string()], - sdk_type: "rust".to_string(), - service_version: None, - attributes: None, - events: None, - links: None, - resource: None, + table_name: "bench_table".to_string(), + project_id: "bench_project".to_string(), + id: Uuid::new_v4().to_string(), + version: 1, + event_type: "bench_event".to_string(), + timestamp: "2025-03-11T12:00:00Z".to_string(), + trace_id: "trace".to_string(), + span_id: "span".to_string(), + parent_span_id: None, + trace_state: None, + start_time: "2025-03-11T12:00:00Z".to_string(), + end_time: Some("2025-03-11T12:00:01Z".to_string()), + duration_ns: 1_000_000_000, + span_name: "span_name".to_string(), + span_kind: "client".to_string(), + span_type: "bench".to_string(), + status: None, + status_code: 0, + status_message: "OK".to_string(), + severity_text: None, + severity_number: 0, + host: "localhost".to_string(), + url_path: "/".to_string(), + raw_url: "/".to_string(), + method: "GET".to_string(), + referer: "".to_string(), + path_params: None, + query_params: None, + request_headers: None, + response_headers: None, + request_body: None, + response_body: None, + endpoint_hash: "hash".to_string(), + shape_hash: "shape".to_string(), + format_hashes: vec!["fmt".to_string()], + field_hashes: vec!["field".to_string()], + sdk_type: "rust".to_string(), + service_version: None, + attributes: None, + events: None, + links: None, + resource: None, instrumentation_scope: None, - errors: None, - tags: vec!["tag".to_string()], + errors: None, + tags: vec!["tag".to_string()], }); } diff --git a/src/batch_queue.rs b/src/batch_queue.rs index 074eac9..f3afcf6 100644 --- a/src/batch_queue.rs +++ b/src/batch_queue.rs @@ -1,17 +1,18 @@ -use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; use anyhow::Result; use crossbeam::queue::SegQueue; use delta_kernel::arrow::record_batch::RecordBatch; -use tokio::sync::RwLock; -use tokio::time::interval; +use tokio::{sync::RwLock, time::interval}; use tracing::{error, info}; /// BatchQueue collects RecordBatches and processes them at intervals #[derive(Debug)] pub struct BatchQueue { - queue: Arc>, + queue: Arc>, is_shutting_down: Arc>, } @@ -105,14 +106,14 @@ async fn process_batches(db: &Arc, queue: &Arc Result<()> { dotenv::dotenv().ok(); diff --git a/src/database.rs b/src/database.rs index 6b1bc13..169c0c6 100644 --- a/src/database.rs +++ b/src/database.rs @@ -1,57 +1,58 @@ -use crate::persistent_queue::OtelLogsAndSpans; +use std::{any::Any, collections::HashMap, env, fmt, net::SocketAddr, sync::Arc, time::Duration}; + use anyhow::Result; use arrow_schema::SchemaRef; use async_trait::async_trait; -use datafusion::arrow::array::Array; -use datafusion::common::SchemaExt; -use datafusion::common::not_impl_err; -use datafusion::execution::TaskContext; -use datafusion::execution::context::SessionContext; -use datafusion::logical_expr::{Expr, Operator, TableProviderFilterPushDown}; -use datafusion::physical_plan::DisplayAs; -use datafusion::physical_plan::insert::{DataSink, DataSinkExec}; -use datafusion::scalar::ScalarValue; use datafusion::{ + arrow::array::Array, catalog::Session, + common::{SchemaExt, not_impl_err}, datasource::{TableProvider, TableType}, error::{DataFusionError, Result as DFResult}, - logical_expr::{BinaryExpr, dml::InsertOp}, - physical_plan::{DisplayFormatType, ExecutionPlan, SendableRecordBatchStream}, + execution::{TaskContext, context::SessionContext}, + logical_expr::{BinaryExpr, Expr, Operator, TableProviderFilterPushDown, dml::InsertOp}, + physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, + insert::{DataSink, DataSinkExec}, + }, + scalar::ScalarValue, }; use datafusion_postgres::{DfSessionService, HandlerFactory}; use delta_kernel::arrow::record_batch::RecordBatch; -use deltalake::checkpoints; -use deltalake::datafusion::parquet::basic::{Compression, ZstdLevel}; -use deltalake::datafusion::parquet::file::properties::WriterProperties; -use deltalake::operations::transaction::CommitProperties; -use deltalake::{DeltaOps, DeltaTable, DeltaTableBuilder, storage::StorageOptions}; +use deltalake::{ + DeltaOps, DeltaTable, DeltaTableBuilder, checkpoints, + datafusion::parquet::{ + basic::{Compression, ZstdLevel}, + file::properties::WriterProperties, + }, + operations::transaction::CommitProperties, + storage::StorageOptions, +}; use futures::StreamExt; -use std::fmt; -use std::{any::Any, collections::HashMap, env, sync::Arc}; -use std::{net::SocketAddr, time::Duration}; -use tokio::sync::RwLock; -use tokio::{net::TcpListener, time::timeout}; +use tokio::{net::TcpListener, sync::RwLock, time::timeout}; use tokio_stream::wrappers::TcpListenerStream; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info}; use url::Url; +use crate::persistent_queue::OtelLogsAndSpans; + type ProjectConfig = (String, StorageOptions, Arc>); pub type ProjectConfigs = Arc>>; #[derive(Debug)] pub struct Database { - project_configs: ProjectConfigs, - batch_queue: Option>, + project_configs: ProjectConfigs, + batch_queue: Option>, maintenance_shutdown: Arc, } impl Clone for Database { fn clone(&self) -> Self { Self { - project_configs: Arc::clone(&self.project_configs), - batch_queue: self.batch_queue.clone(), + project_configs: Arc::clone(&self.project_configs), + batch_queue: self.batch_queue.clone(), maintenance_shutdown: Arc::clone(&self.maintenance_shutdown), } } @@ -75,8 +76,8 @@ impl Database { let project_configs = HashMap::new(); let db = Self { - project_configs: Arc::new(RwLock::new(project_configs)), - batch_queue: None, // Batch queue is set later + project_configs: Arc::new(RwLock::new(project_configs)), + batch_queue: None, // Batch queue is set later maintenance_shutdown: Arc::new(CancellationToken::new()), }; @@ -150,8 +151,7 @@ impl Database { /// Create and configure a SessionContext with DataFusion settings pub fn create_session_context(&self) -> SessionContext { - use datafusion::config::ConfigOptions; - use datafusion::execution::context::SessionContext; + use datafusion::{config::ConfigOptions, execution::context::SessionContext}; let mut options = ConfigOptions::new(); let _ = options.set("datafusion.sql_parser.enable_information_schema", "true"); @@ -181,9 +181,11 @@ impl Database { /// Register PostgreSQL settings table for compatibility pub fn register_pg_settings_table(&self, ctx: &SessionContext) -> datafusion::error::Result<()> { - use datafusion::arrow::array::StringArray; - use datafusion::arrow::datatypes::{DataType, Field, Schema}; - use datafusion::arrow::record_batch::RecordBatch; + use datafusion::arrow::{ + array::StringArray, + datatypes::{DataType, Field, Schema}, + record_batch::RecordBatch, + }; let schema = Arc::new(Schema::new(vec![ Field::new("name", DataType::Utf8, false), @@ -226,9 +228,13 @@ impl Database { /// Register set_config UDF for PostgreSQL compatibility pub fn register_set_config_udf(&self, ctx: &SessionContext) { - use datafusion::arrow::array::{StringArray, StringBuilder}; - use datafusion::arrow::datatypes::DataType; - use datafusion::logical_expr::{ColumnarValue, ScalarFunctionImplementation, Volatility, create_udf}; + use datafusion::{ + arrow::{ + array::{StringArray, StringBuilder}, + datatypes::DataType, + }, + logical_expr::{ColumnarValue, ScalarFunctionImplementation, Volatility, create_udf}, + }; let set_config_fn: ScalarFunctionImplementation = Arc::new(move |args: &[ColumnarValue]| -> datafusion::error::Result { let param_value_array = match &args[1] { @@ -435,8 +441,6 @@ impl Database { // Records should be grouped by span, and separated into groups then inserted into the // correct table. - use serde_arrow::schema::SchemaLike; - // Convert OtelLogsAndSpans records to Arrow RecordBatch format let fields = OtelLogsAndSpans::fields()?; let batch = serde_arrow::to_record_batch(&fields, &records)?; @@ -594,9 +598,9 @@ impl Database { #[derive(Debug, Clone)] pub struct ProjectRoutingTable { default_project: String, - database: Arc, - schema: SchemaRef, - batch_queue: Option>, + database: Arc, + schema: SchemaRef, + batch_queue: Option>, } impl ProjectRoutingTable { @@ -769,8 +773,7 @@ impl TableProvider for ProjectRoutingTable { #[cfg(test)] mod tests { use chrono::{TimeZone, Utc}; - use datafusion::assert_batches_eq; - use datafusion::prelude::SessionContext; + use datafusion::{assert_batches_eq, prelude::SessionContext}; use dotenv::dotenv; use serial_test::serial; use uuid::Uuid; diff --git a/src/delta.rs b/src/delta.rs new file mode 100644 index 0000000..b615f1a --- /dev/null +++ b/src/delta.rs @@ -0,0 +1,333 @@ +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use deltalake::{ + DeltaTable, DeltaTableBuilder, DeltaTableError, + arrow::record_batch::RecordBatch, + operations::{create::CreateBuilder, write::WriteBuilder}, + storage::ObjectStoreRef, +}; + +use object_store::{aws::AmazonS3Builder, azure::MicrosoftAzureBuilder, gcp::GoogleCloudStorageBuilder, local::LocalFileSystem, memory::InMemory}; +use tokio; +use url::Url; + +use crate::obj_store::{CacheMetrics, DeltaCacheBuilder, DeltaCacheConfig}; + +/// Helper struct for creating Delta tables with caching +pub struct CachedDeltaTableBuilder { + table_uri: String, + cache_config: Option, + storage_options: HashMap, +} + +impl CachedDeltaTableBuilder { + pub fn new>(table_uri: S) -> Self { + Self { + table_uri: table_uri.into(), + cache_config: None, + storage_options: HashMap::new(), + } + } + + /// Enable caching with custom configuration + pub fn with_cache_config(mut self, config: DeltaCacheConfig) -> Self { + self.cache_config = Some(config); + self + } + + /// Enable caching with default configuration + pub fn with_cache(mut self) -> Self { + self.cache_config = Some(DeltaCacheConfig::default()); + self + } + + /// Add storage options (AWS credentials, etc.) + pub fn with_storage_options(mut self, options: HashMap) -> Self { + self.storage_options = options; + self + } + + /// Add a single storage option + pub fn with_storage_option, V: Into>(mut self, key: K, value: V) -> Self { + self.storage_options.insert(key.into(), value.into()); + self + } + + /// Build the Delta table with caching + pub async fn build(&self) -> Result { + let base_store = self.create_base_object_store().await?; + + let final_store: ObjectStoreRef = if let Some(cache_config) = &self.cache_config { + // Wrap with cache + + + (DeltaCacheBuilder::new() + .with_memory_capacity(cache_config.memory_capacity) + .with_disk_capacity(cache_config.disk_capacity) + .with_disk_path(cache_config.disk_cache_dir.clone()) + .with_ttl(Duration::from_secs(cache_config.ttl_seconds)) + .with_compression(cache_config.compression_level) + .enable_metrics(cache_config.enable_metrics) + .enable_cache_warming(cache_config.enable_cache_warming) + .cache_transaction_logs(cache_config.cache_transaction_logs) + .cache_parquet_metadata(cache_config.cache_parquet_metadata) + .cache_checkpoints(cache_config.cache_checkpoints) + .build(base_store) + .await + .map_err(|e| DeltaTableError::ObjectStore { source: e })?) as _ + } else { + base_store + }; + + // Create Delta table with the (potentially cached) object store + DeltaTableBuilder::from_uri(&self.table_uri) + .with_storage_backend(final_store, Url::parse(&self.table_uri).unwrap()) + .load() + .await + } + + /// Create the base object store based on URI scheme + async fn create_base_object_store(&self) -> Result { + let uri = Url::parse(&self.table_uri).map_err(|e| DeltaTableError::Generic(format!("Invalid URI: {e}")))?; + + match uri.scheme() { + "s3" | "s3a" => { + let mut builder = AmazonS3Builder::new(); + + // Apply storage options + for (key, value) in &self.storage_options { + match key.as_str() { + "AWS_ACCESS_KEY_ID" => builder = builder.with_access_key_id(value), + "AWS_SECRET_ACCESS_KEY" => builder = builder.with_secret_access_key(value), + "AWS_REGION" => builder = builder.with_region(value), + "AWS_ENDPOINT" => builder = builder.with_endpoint(value), + "AWS_BUCKET_NAME" => builder = builder.with_bucket_name(value), + _ => {} + } + } + + let bucket = uri.host_str().ok_or_else(|| DeltaTableError::Generic("No bucket in S3 URI".to_string()))?; + + Ok(Arc::new( + builder.with_bucket_name(bucket).build().map_err(|e| DeltaTableError::ObjectStore { source: e })?, + )) + } + + "abfs" | "abfss" => { + let mut builder = MicrosoftAzureBuilder::new(); + + // Apply storage options + for (key, value) in &self.storage_options { + match key.as_str() { + "AZURE_STORAGE_ACCOUNT_NAME" => builder = builder.with_account(value), + "AZURE_STORAGE_ACCOUNT_KEY" => builder = builder.with_access_key(value), + // "AZURE_STORAGE_SAS_TOKEN" => builder = builder.with_sas_authorization(vec![(value,value)]), + "AZURE_STORAGE_CONTAINER_NAME" => builder = builder.with_container_name(value), + _ => {} + } + } + + Ok(Arc::new(builder.build().map_err(|e| DeltaTableError::ObjectStore { source: e })?)) + } + + "gs" => { + let mut builder = GoogleCloudStorageBuilder::new(); + + // Apply storage options + for (key, value) in &self.storage_options { + match key.as_str() { + "GOOGLE_SERVICE_ACCOUNT" => builder = builder.with_service_account_path(value), + "GOOGLE_SERVICE_ACCOUNT_KEY" => builder = builder.with_service_account_key(value), + "GOOGLE_BUCKET_NAME" => builder = builder.with_bucket_name(value), + _ => {} + } + } + + let bucket = uri.host_str().ok_or_else(|| DeltaTableError::Generic("No bucket in GCS URI".to_string()))?; + + Ok(Arc::new( + builder.with_bucket_name(bucket).build().map_err(|e| DeltaTableError::ObjectStore { source: e })?, + )) + } + + "file" => { + let path = uri.to_file_path().map_err(|_| DeltaTableError::Generic("Invalid file path".to_string()))?; + Ok(Arc::new( + LocalFileSystem::new_with_prefix(path).map_err(|e| DeltaTableError::ObjectStore { source: e })?, + )) + } + + "memory" => Ok(Arc::new(InMemory::new())), + + scheme => Err(DeltaTableError::Generic(format!("Unsupported scheme: {scheme}"))), + } + } +} + + + +/// Convenience functions for common Delta operations with caching +pub struct CachedDeltaOps; + +impl CachedDeltaOps { + /// Create a new Delta table with caching enabled + pub async fn create_table( + table_uri: &str, cache_config: Option, + ) -> Result { + let mut builder = CachedDeltaTableBuilder::new(table_uri); + + if let Some(config) = cache_config { + builder = builder.with_cache_config(config); + } + + let table = builder.build().await?; + + CreateBuilder::new() + .with_log_store(table.log_store()) + .with_table_name(table_uri) + + .await + + + } + + /// Open an existing Delta table with caching + pub async fn open_table( + table_uri: &str, cache_config: Option, storage_options: Option>, + ) -> Result { + let mut builder = CachedDeltaTableBuilder::new(table_uri); + + if let Some(config) = cache_config { + builder = builder.with_cache_config(config); + } + + if let Some(options) = storage_options { + builder = builder.with_storage_options(options); + } + + builder.build().await + } + + /// Write data to a Delta table with caching + pub async fn write_to_table(table: &mut DeltaTable, batches: Vec) -> Result<(), DeltaTableError> { + let m = table.state.clone(); + WriteBuilder::new(table.log_store(), m).with_input_batches(batches); + + // Reload the table to see the new data + + Ok(table.load().await.unwrap()) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Example 1: Simple cached Delta table + let cache_config = DeltaCacheConfig { + memory_capacity: 256 * 1024 * 1024, // 256MB + disk_capacity: 1024 * 1024 * 1024, // 1GB + disk_cache_dir: "/tmp/delta_cache".to_string(), + ttl_seconds: 3600, // 1 hour + enable_metrics: true, + enable_cache_warming: true, + ..Default::default() + }; + + let table = CachedDeltaTableBuilder::new("s3://my-bucket/my-table") + .with_cache_config(cache_config) + .with_storage_option("AWS_REGION", "us-west-2") + .with_storage_option("AWS_ACCESS_KEY_ID", "your-access-key") + .with_storage_option("AWS_SECRET_ACCESS_KEY", "your-secret-key") + .build() + .await?; + + println!("Table loaded with {} files", table.get_files_count()); + + + + + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow::{ + array::{Int32Array, StringArray}, + datatypes::{DataType, Field, Schema}, + }; + use tempfile::TempDir; + + use super::*; + + #[tokio::test] + async fn test_cached_delta_table_creation() { + let temp_dir = TempDir::new().unwrap(); + let table_uri = format!("file://{}", temp_dir.path().to_str().unwrap()); + + // Create schema + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ])); + + // Create cache config + let cache_config = DeltaCacheConfig { + memory_capacity: 64 * 1024 * 1024, // 64MB + disk_capacity: 128 * 1024 * 1024, // 128MB + disk_cache_dir: temp_dir.path().join("cache").to_str().unwrap().to_string(), + ttl_seconds: 300, // 5 minutes + enable_metrics: true, + ..Default::default() + }; + + // Create table with caching + let table = CachedDeltaOps::create_table(&table_uri, Some(cache_config)).await.unwrap(); + + // Verify the table was created + assert!(table.get_files_count() == 0); // New table, no data files yet + + + } + + #[tokio::test] + async fn test_write_and_read_with_cache() { + let temp_dir = TempDir::new().unwrap(); + let table_uri = format!("file://{}", temp_dir.path().to_str().unwrap()); + + // Create schema + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ])); + + // Create table with cache + let mut table = CachedDeltaOps::create_table(&table_uri, Some(DeltaCacheConfig::default())).await.unwrap(); + + // Create some test data + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![1, 2, 3])), Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"]))], + ) + .unwrap(); + + // Write data + CachedDeltaOps::write_to_table(&mut table, vec![batch]).await.unwrap(); + + // Read data back (should hit cache on subsequent reads) + let files = table.get_file_uris(); + assert!(files.is_ok()); + + + + // Read again to test cache hit + let _files_again = table.get_file_uris(); + + + } + + #[tokio::test] + async fn test_write_and_read_with_caches3() { + let _s= main(); + } +} diff --git a/src/lib.rs b/src/lib.rs index b4d08dc..f579cf1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,6 @@ // lib.rs - Export modules for use in tests pub mod batch_queue; pub mod database; +pub mod delta; +pub mod obj_store; pub mod persistent_queue; diff --git a/src/main.rs b/src/main.rs index b04f384..3971aa1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,13 +2,14 @@ mod batch_queue; mod database; mod persistent_queue; +use std::{env, sync::Arc}; + use actix_web::{App, HttpResponse, HttpServer, Responder, middleware::Logger, post, web}; use batch_queue::BatchQueue; use database::Database; use dotenv::dotenv; use futures::TryFutureExt; use serde::Deserialize; -use std::{env, sync::Arc}; use tokio::time::{Duration, sleep}; use tokio_util::sync::CancellationToken; use tracing::{error, info}; @@ -20,10 +21,10 @@ struct AppInfo {} #[derive(Deserialize)] struct RegisterProjectRequest { project_id: String, - bucket: String, + bucket: String, access_key: String, secret_key: String, - endpoint: Option, + endpoint: Option, } #[post("/register_project")] diff --git a/src/obj_store.rs b/src/obj_store.rs new file mode 100644 index 0000000..8827157 --- /dev/null +++ b/src/obj_store.rs @@ -0,0 +1,657 @@ +use std::{ + collections::HashMap, + fmt::Debug, + ops::Range, + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +use async_stream::stream; +use async_trait::async_trait; +use bytes::Bytes; +use chrono::DateTime; +use foyer::{DirectFsDeviceOptions, Engine, HybridCache, HybridCacheBuilder}; +use futures::stream::{BoxStream, StreamExt, TryStreamExt}; +use object_store::{ + GetOptions, GetRange, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, + PutResult, Result as ObjectStoreResult, path::Path, +}; +use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; +use tracing::{debug, error, info, warn}; + +/// Configuration for the Delta cache +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeltaCacheConfig { + /// Memory cache capacity in bytes + pub memory_capacity: usize, + /// Disk cache capacity in bytes + pub disk_capacity: usize, + /// Disk cache directory path + pub disk_cache_dir: String, + /// TTL for cached objects in seconds + pub ttl_seconds: u64, + /// Whether to cache transaction logs + pub cache_transaction_logs: bool, + /// Whether to cache parquet metadata + pub cache_parquet_metadata: bool, + /// Whether to cache checkpoint files + pub cache_checkpoints: bool, + /// Maximum object size to cache (in bytes) + pub max_object_size: usize, + /// Enable metrics collection + pub enable_metrics: bool, + /// Cache warming on startup + pub enable_cache_warming: bool, + /// Compression level for cached data (0-9, 0=no compression) + pub compression_level: u8, +} + +impl Default for DeltaCacheConfig { + fn default() -> Self { + Self { + memory_capacity: 256 * 1024 * 1024, // 256MB + disk_capacity: 1024 * 1024 * 1024, // 1GB + disk_cache_dir: "/tmp/delta_cache".to_string(), + ttl_seconds: 3600, // 1 hour + cache_transaction_logs: true, + cache_parquet_metadata: true, + cache_checkpoints: true, + max_object_size: 10 * 1024 * 1024, // 10MB max + enable_metrics: true, + enable_cache_warming: false, + compression_level: 3, // Light compression by default + } + } +} + +/// Cache metrics for monitoring +#[derive(Debug, Default, Clone)] +pub struct CacheMetrics { + pub hits: u64, + pub misses: u64, + pub evictions: u64, + pub errors: u64, + pub total_requests: u64, + pub cache_size_bytes: u64, +} + +impl CacheMetrics { + pub fn hit_rate(&self) -> f64 { + if self.total_requests == 0 { 0.0 } else { self.hits as f64 / self.total_requests as f64 } + } +} + +/// cached object with metadata and compression +#[derive(Debug, Clone, Serialize, Deserialize)] +struct CachedObject { + data: Vec, + cached_at: u64, + original_size: usize, + is_compressed: bool, + etag: Option, + last_modified: Option, +} + +impl CachedObject { + fn new(data: Vec, meta: &ObjectMeta, compression_level: u8) -> Self { + let original_size = data.len(); + let (final_data, is_compressed) = if compression_level > 0 && data.len() > 1024 { + match Self::compress(&data, compression_level) { + Ok(compressed) if compressed.len() < data.len() => (compressed, true), + _ => (data, false), + } + } else { + (data, false) + }; + + Self { + data: final_data, + cached_at: SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(), + original_size, + is_compressed, + etag: meta.e_tag.clone(), + last_modified: Some(meta.last_modified.timestamp()), + } + } + + fn get_data(&self) -> Result, std::io::Error> { + if self.is_compressed { Self::decompress(&self.data) } else { Ok(self.data.clone()) } + } + + fn compress(data: &[u8], level: u8) -> Result, std::io::Error> { + use std::io::Write; + + use flate2::{Compression, write::GzEncoder}; + + let mut encoder = GzEncoder::new(Vec::new(), Compression::new(level as u32)); + encoder.write_all(data)?; + encoder.finish() + } + + fn decompress(data: &[u8]) -> Result, std::io::Error> { + use std::io::Read; + + use flate2::read::GzDecoder; + + let mut decoder = GzDecoder::new(data); + let mut decompressed = Vec::new(); + decoder.read_to_end(&mut decompressed)?; + Ok(decompressed) + } + + fn is_valid(&self, ttl_seconds: u64) -> bool { + let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(); + (now - self.cached_at) < ttl_seconds + } + + fn matches_meta(&self, meta: &ObjectMeta) -> bool { + // Check ETag if available + if let (Some(cached_etag), Some(meta_etag)) = (&self.etag, &meta.e_tag) { + return cached_etag == meta_etag; + } + + // Fallback to last modified time + if let (Some(cached_modified), meta_modified) = (self.last_modified, meta.last_modified) { + return cached_modified == meta_modified.timestamp(); + } + + // If no metadata available, assume valid (will rely on TTL) + true + } +} + +/// Delta-optimized object store cache wrapper +#[derive(Debug)] +pub struct DeltaCachedObjectStore { + inner: Arc, + cache: Arc>, + config: DeltaCacheConfig, + metrics: Arc>, + // Track frequently accessed paths for cache warming + access_patterns: Arc>>, +} + +impl DeltaCachedObjectStore { + /// Create a new cached object store + pub async fn new(inner: Arc, config: DeltaCacheConfig) -> ObjectStoreResult { + // Build the hybrid cache + let cache = HybridCacheBuilder::new() + .memory(config.memory_capacity) + .storage(Engine::Large) + .with_device_options(DirectFsDeviceOptions::new(&config.disk_cache_dir).with_capacity(config.disk_capacity)) + .build() + .await + .map_err(|e| object_store::Error::Generic { + store: "DeltaCache", + source: Box::new(std::io::Error::other(e)), + })?; + + info!( + "Initialized Delta cache: memory={}MB, disk={}GB, dir={}, compression={}", + config.memory_capacity / (1024 * 1024), + config.disk_capacity / (1024 * 1024 * 1024), + config.disk_cache_dir, + if config.compression_level > 0 { "enabled" } else { "disabled" } + ); + + let store = Self { + inner, + cache: Arc::new(cache), + config, + metrics: Arc::new(RwLock::new(CacheMetrics::default())), + access_patterns: Arc::new(RwLock::new(HashMap::new())), + }; + + // Optionally warm the cache + if store.config.enable_cache_warming { + tokio::spawn({ + let store_clone = store.clone(); + async move { + if let Err(e) = store_clone.warm_cache().await { + warn!("Cache warming failed: {}", e); + } + } + }); + } + + Ok(store) + } + + /// Get current cache metrics + pub async fn metrics(&self) -> CacheMetrics { + self.metrics.read().await.clone() + } + + /// Warm the cache by pre-loading frequently accessed files + async fn warm_cache(&self) -> ObjectStoreResult<()> { + info!("Starting cache warming..."); + + // Focus on Delta log directory + let delta_log_prefix = Path::from("_delta_log"); + let mut stream = self.inner.list(Some(&delta_log_prefix)); + let mut warmed_count = 0; + + while let Some(meta_result) = futures::StreamExt::next(&mut stream).await { + let meta = meta_result?; + + // Only warm small, frequently accessed files + if meta.size <= (1024 * 1024) && self.should_cache(&meta.location) { + match self.inner.get(&meta.location).await { + Ok(result) => { + if let Ok(bytes) = result.bytes().await { + let cache_key = self.make_cache_key(&meta.location, None); + let cached_obj = CachedObject::new(bytes.to_vec(), &meta, self.config.compression_level); + + if self.cache.insert(cache_key, cached_obj).get_data().is_ok() { + warmed_count += 1; + } + } + } + Err(e) => debug!("Failed to warm cache for {}: {}", meta.location, e), + } + } + } + + info!("Cache warming completed: {} files preloaded", warmed_count); + Ok(()) + } + + /// Enhanced path caching logic with more granular control + fn should_cache(&self, path: &Path) -> bool { + let path_str = path.as_ref(); + + // Always cache Delta log directory contents + if path_str.contains("_delta_log/") { + // Transaction logs + if path_str.ends_with(".json") && self.config.cache_transaction_logs { + return true; + } + // Checkpoint files + if path_str.ends_with(".checkpoint.parquet") && self.config.cache_checkpoints { + return true; + } + // Other Delta log files (like .crc files) + return true; + } + + // Parquet metadata files + if self.config.cache_parquet_metadata && (path_str.ends_with(".parquet") || path_str.contains("_metadata") || path_str.ends_with("_common_metadata")) { + return true; + } + + false + } + + /// Create cache key from path and optional range + fn make_cache_key(&self, path: &Path, range: Option<&GetRange>) -> String { + match range { + Some(GetRange::Bounded(r)) => format!("{}:{}:{}", path.as_ref(), r.start, r.end), + Some(GetRange::Offset(offset)) => format!("{}:{}:", path.as_ref(), offset), + Some(GetRange::Suffix(suffix)) => format!("{}:suffix:{}", path.as_ref(), suffix), + None => path.as_ref().to_string(), + } + } + + /// Update access patterns for analytics + async fn record_access(&self, path: &str) { + let mut patterns = self.access_patterns.write().await; + *patterns.entry(path.to_string()).or_insert(0) += 1; + } + + /// Update metrics + async fn update_metrics(&self, update_fn: F) + where + F: FnOnce(&mut CacheMetrics), + { + if self.config.enable_metrics { + let mut metrics = self.metrics.write().await; + update_fn(&mut metrics); + } + } + + /// Enhanced cache retrieval with metadata validation + async fn get_with_cache(&self, location: &Path, options: GetOptions) -> ObjectStoreResult { + self.update_metrics(|m| m.total_requests += 1).await; + + // Check if we should cache this path + if !self.should_cache(location) { + debug!("Path not cacheable, delegating: {}", location); + return self.inner.get_opts(location, options).await; + } + + // Record access pattern + self.record_access(location.as_ref()).await; + + // For range requests, bypass cache for now (could be enhanced later) + if options.range.is_some() { + debug!("Range request, bypassing cache: {}", location); + return self.inner.get_opts(location, options).await; + } + + let cache_key = self.make_cache_key(location, None); + + // Try to get from cache first + if let Ok(Some(cached_entry)) = self.cache.get(&cache_key).await { + let cached_obj = cached_entry.value(); + + // Check if cache entry is still valid + if cached_obj.is_valid(self.config.ttl_seconds) { + // Get fresh metadata for validation + match self.inner.head(location).await { + Ok(meta) => { + // Validate cache against metadata + if cached_obj.matches_meta(&meta) { + debug!("Cache hit for: {}", location); + self.update_metrics(|m| m.hits += 1).await; + + // Decompress if needed + match cached_obj.get_data() { + Ok(data) => { + let bytes = Bytes::from(data); + return Ok(GetResult { + payload: GetResultPayload::Stream(Box::pin(futures::stream::once(async { Ok(bytes) }))), + meta: meta.clone(), + range: 0..meta.size, + attributes: Default::default(), + }); + } + Err(e) => { + error!("Failed to decompress cached data for {}: {}", location, e); + // Fall through to cache miss + } + } + } else { + debug!("Cache invalidated due to metadata mismatch: {}", location); + // Remove stale entry + self.cache.remove(&cache_key); + } + } + Err(e) => { + debug!("Failed to get metadata for cache validation: {} - {}", location, e); + // Use cached data anyway if metadata lookup fails + if let Ok(data) = cached_obj.get_data() { + self.update_metrics(|m| m.hits += 1).await; + let bytes = Bytes::from(data); + return Ok(GetResult { + payload: GetResultPayload::Stream(Box::pin(futures::stream::once(async { Ok(bytes) }))), + meta: ObjectMeta { + location: location.clone(), + last_modified: DateTime::::MIN_UTC, + size: cached_obj.original_size , + e_tag: cached_obj.etag.clone(), + version: None, + }, + range: 0..cached_obj.original_size , + attributes: Default::default(), + }); + } + } + } + } else { + debug!("Cache entry expired: {}", location); + self.cache.remove(&cache_key); + } + } + + // Cache miss - fetch from underlying store + debug!("Cache miss, fetching: {}", location); + self.update_metrics(|m| m.misses += 1).await; + + let result = self.inner.get_opts(location, options.clone()).await?; + let meta = result.meta.clone(); + + // Only cache if object size is within limits + if meta.size <= self.config.max_object_size { + // Read the entire payload for caching + let bytes = result.bytes().await?; + + // Create cached object with compression + let cached_obj = CachedObject::new(bytes.to_vec(), &meta, self.config.compression_level); + + // Insert into cache asynchronously + let cache_clone = self.cache.clone(); + let key_clone = cache_key.clone(); + tokio::spawn(async move { + if let Err(e) = cache_clone.insert(key_clone, cached_obj).get_data() { + debug!("Failed to insert into cache: {}", e); + } + }); + + debug!("Cached object: {} (size: {} bytes)", location, bytes.len()); + + // Return the data + Ok(GetResult { + payload: GetResultPayload::Stream(Box::pin(futures::stream::once(async { Ok(bytes) }))), + meta: meta.clone(), + range: 0..meta.size, + attributes: Default::default(), + }) + } else { + warn!("Object too large to cache: {} ({} bytes)", location, meta.size); + Ok(result) + } + } + + /// Get access patterns for analytics + pub async fn get_access_patterns(&self) -> HashMap { + self.access_patterns.read().await.clone() + } + + /// Clear access patterns + pub async fn clear_access_patterns(&self) { + self.access_patterns.write().await.clear(); + } +} + +// Clone implementation for the store +impl Clone for DeltaCachedObjectStore { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + cache: self.cache.clone(), + config: self.config.clone(), + metrics: self.metrics.clone(), + access_patterns: self.access_patterns.clone(), + } + } +} + +#[async_trait] +impl ObjectStore for DeltaCachedObjectStore { + async fn put(&self, location: &Path, payload: PutPayload) -> ObjectStoreResult { + let result = self.inner.put(location, payload).await?; + + // Invalidate cache for this path on writes + if self.should_cache(location) { + let cache_key = self.make_cache_key(location, None); + let _ = self.cache.remove(&cache_key); + debug!("Invalidated cache for: {}", location); + } + + Ok(result) + } + + async fn put_opts(&self, location: &Path, payload: PutPayload, opts: PutOptions) -> ObjectStoreResult { + let result = self.inner.put_opts(location, payload, opts).await?; + + // Invalidate cache for this path on writes + if self.should_cache(location) { + let cache_key = self.make_cache_key(location, None); + let _ = self.cache.remove(&cache_key); + debug!("Invalidated cache for: {}", location); + } + + Ok(result) + } + + async fn put_multipart(&self, location: &Path) -> ObjectStoreResult> { + self.inner.put_multipart(location).await + } + + async fn put_multipart_opts(&self, location: &Path, opts: PutMultipartOpts) -> ObjectStoreResult> { + self.inner.put_multipart_opts(location, opts).await + } + + async fn get(&self, location: &Path) -> ObjectStoreResult { + self.get_with_cache(location, GetOptions::default()).await + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult { + self.get_with_cache(location, options).await + } + + async fn get_range(&self, location: &Path, range: Range) -> ObjectStoreResult { + let options = GetOptions { + range: Some(GetRange::Bounded(range)), + ..Default::default() + }; + let result = self.get_with_cache(location, options).await?; + result.bytes().await + } + + async fn head(&self, location: &Path) -> ObjectStoreResult { + // For metadata requests, always fetch fresh data + self.inner.head(location).await + } + + async fn delete(&self, location: &Path) -> ObjectStoreResult<()> { + let result = self.inner.delete(location).await; + + // Invalidate cache on delete + if self.should_cache(location) { + let cache_key = self.make_cache_key(location, None); + let _ = self.cache.remove(&cache_key); + debug!("Invalidated cache on delete: {}", location); + } + + result + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, ObjectStoreResult> { + let inner = self.inner.clone(); + let prefix = prefix.map(|p| p.to_owned()); + Box::pin(stream! { + let mut stream = inner.list(prefix.as_ref()); + use futures::StreamExt; + while let Some(item) = stream.next().await { + yield item; + } + }) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + let result = self.inner.copy(from, to).await; + + // Invalidate cache for destination + if self.should_cache(to) { + let cache_key = self.make_cache_key(to, None); + let _ = self.cache.remove(&cache_key); + debug!("Invalidated cache on copy destination: {}", to); + } + + result + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + let result = self.inner.copy_if_not_exists(from, to).await; + + // Invalidate cache for destination + if self.should_cache(to) { + let cache_key = self.make_cache_key(to, None); + let _ = self.cache.remove(&cache_key); + debug!("Invalidated cache on conditional copy: {}", to); + } + + result + } +} + +impl std::fmt::Display for DeltaCachedObjectStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "DeltaCached({})", self.inner) + } +} + +/// Enhanced builder with more configuration options +pub struct DeltaCacheBuilder { + config: DeltaCacheConfig, +} + +impl DeltaCacheBuilder { + pub fn new() -> Self { + Self { + config: DeltaCacheConfig::default(), + } + } + + pub fn with_memory_capacity(mut self, capacity: usize) -> Self { + self.config.memory_capacity = capacity; + self + } + + pub fn with_disk_capacity(mut self, capacity: usize) -> Self { + self.config.disk_capacity = capacity; + self + } + + pub fn with_disk_path>(mut self, path: P) -> Self { + self.config.disk_cache_dir = path.into(); + self + } + + pub fn with_ttl(mut self, ttl: Duration) -> Self { + self.config.ttl_seconds = ttl.as_secs(); + self + } + + pub fn with_max_object_size(mut self, size: usize) -> Self { + self.config.max_object_size = size; + self + } + + pub fn with_compression(mut self, level: u8) -> Self { + self.config.compression_level = level.min(9); + self + } + + pub fn enable_metrics(mut self, enable: bool) -> Self { + self.config.enable_metrics = enable; + self + } + + pub fn enable_cache_warming(mut self, enable: bool) -> Self { + self.config.enable_cache_warming = enable; + self + } + + pub fn cache_transaction_logs(mut self, enable: bool) -> Self { + self.config.cache_transaction_logs = enable; + self + } + + pub fn cache_parquet_metadata(mut self, enable: bool) -> Self { + self.config.cache_parquet_metadata = enable; + self + } + + pub fn cache_checkpoints(mut self, enable: bool) -> Self { + self.config.cache_checkpoints = enable; + self + } + + pub async fn build(self, inner: Arc) -> ObjectStoreResult> { + let cached_store = DeltaCachedObjectStore::new(inner, self.config).await?; + Ok(Arc::new(cached_store)) + } +} + +impl Default for DeltaCacheBuilder { + fn default() -> Self { + Self::new() + } +} diff --git a/src/persistent_queue.rs b/src/persistent_queue.rs index 0987b3a..c8d0046 100644 --- a/src/persistent_queue.rs +++ b/src/persistent_queue.rs @@ -1,14 +1,10 @@ -use std::str::FromStr; -use std::sync::Arc; +use std::{str::FromStr, sync::Arc}; -use arrow_schema::{DataType, FieldRef}; -use arrow_schema::{Field, Schema, SchemaRef}; -use delta_kernel::parquet::format::SortingColumn; -use delta_kernel::schema::StructField; +use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef}; +use delta_kernel::{parquet::format::SortingColumn, schema::StructField}; use log::debug; use serde::{Deserialize, Deserializer, Serialize, de::Error as DeError}; -use serde_arrow::schema::SchemaLike; -use serde_arrow::schema::TracingOptions; +use serde_arrow::schema::{SchemaLike, TracingOptions}; use serde_json::json; use serde_with::serde_as; @@ -22,12 +18,12 @@ pub struct OtelLogsAndSpans { #[serde(with = "chrono::serde::ts_microseconds_option")] pub observed_timestamp: Option>, - pub id: String, - pub parent_id: Option, - pub hashes: Vec, // all relevant hashes can be stored here for item identification - pub name: Option, - pub kind: Option, // logs, span, request - pub status_code: Option, + pub id: String, + pub parent_id: Option, + pub hashes: Vec, // all relevant hashes can be stored here for item identification + pub name: Option, + pub kind: Option, // logs, span, request + pub status_code: Option, pub status_message: Option, // Logs specific @@ -36,7 +32,7 @@ pub struct OtelLogsAndSpans { // Severity pub severity: Option, // severity as json - pub severity___severity_text: Option, + pub severity___severity_text: Option, pub severity___severity_number: Option, pub body: Option, // body as json json @@ -46,16 +42,16 @@ pub struct OtelLogsAndSpans { #[serde(with = "chrono::serde::ts_microseconds_option")] pub start_time: Option>, #[serde(with = "chrono::serde::ts_microseconds_option")] - pub end_time: Option>, + pub end_time: Option>, // Context - pub context: Option, // context as json + pub context: Option, // context as json // - pub context___trace_id: Option, - pub context___span_id: Option, + pub context___trace_id: Option, + pub context___span_id: Option, pub context___trace_state: Option, pub context___trace_flags: Option, - pub context___is_remote: Option, + pub context___is_remote: Option, // Events pub events: Option, // events json @@ -64,96 +60,96 @@ pub struct OtelLogsAndSpans { pub links: Option, // links json // Attributes - pub attributes: Option, // attirbutes object as json + pub attributes: Option, // attirbutes object as json // Server and client pub attributes___client___address: Option, - pub attributes___client___port: Option, + pub attributes___client___port: Option, pub attributes___server___address: Option, - pub attributes___server___port: Option, + pub attributes___server___port: Option, // network https://opentelemetry.io/docs/specs/semconv/attributes-registry/network/ - pub attributes___network___local__address: Option, - pub attributes___network___local__port: Option, - pub attributes___network___peer___address: Option, - pub attributes___network___peer__port: Option, - pub attributes___network___protocol___name: Option, + pub attributes___network___local__address: Option, + pub attributes___network___local__port: Option, + pub attributes___network___peer___address: Option, + pub attributes___network___peer__port: Option, + pub attributes___network___protocol___name: Option, pub attributes___network___protocol___version: Option, - pub attributes___network___transport: Option, - pub attributes___network___type: Option, + pub attributes___network___transport: Option, + pub attributes___network___type: Option, // Source Code Attributes - pub attributes___code___number: Option, - pub attributes___code___file___path: Option, + pub attributes___code___number: Option, + pub attributes___code___file___path: Option, pub attributes___code___function___name: Option, - pub attributes___code___line___number: Option, - pub attributes___code___stacktrace: Option, + pub attributes___code___line___number: Option, + pub attributes___code___stacktrace: Option, // Log records. https://opentelemetry.io/docs/specs/semconv/general/logs/ pub attributes___log__record___original: Option, - pub attributes___log__record___uid: Option, + pub attributes___log__record___uid: Option, // Exception https://opentelemetry.io/docs/specs/semconv/exceptions/exceptions-logs/ - pub attributes___error___type: Option, - pub attributes___exception___type: Option, - pub attributes___exception___message: Option, + pub attributes___error___type: Option, + pub attributes___exception___type: Option, + pub attributes___exception___message: Option, pub attributes___exception___stacktrace: Option, // URL https://opentelemetry.io/docs/specs/semconv/attributes-registry/url/ pub attributes___url___fragment: Option, - pub attributes___url___full: Option, - pub attributes___url___path: Option, - pub attributes___url___query: Option, - pub attributes___url___scheme: Option, + pub attributes___url___full: Option, + pub attributes___url___path: Option, + pub attributes___url___query: Option, + pub attributes___url___scheme: Option, // Useragent https://opentelemetry.io/docs/specs/semconv/attributes-registry/user-agent/ pub attributes___user_agent___original: Option, // HTTP https://opentelemetry.io/docs/specs/semconv/http/http-spans/ - pub attributes___http___request___method: Option, + pub attributes___http___request___method: Option, pub attributes___http___request___method_original: Option, - pub attributes___http___response___status_code: Option, - pub attributes___http___request___resend_count: Option, - pub attributes___http___request___body___size: Option, + pub attributes___http___response___status_code: Option, + pub attributes___http___request___resend_count: Option, + pub attributes___http___request___body___size: Option, // Session https://opentelemetry.io/docs/specs/semconv/general/session/ - pub attributes___session___id: Option, + pub attributes___session___id: Option, pub attributes___session___previous___id: Option, // Database https://opentelemetry.io/docs/specs/semconv/database/database-spans/ - pub attributes___db___system___name: Option, - pub attributes___db___collection___name: Option, - pub attributes___db___namespace: Option, - pub attributes___db___operation___name: Option, - pub attributes___db___response___status_code: Option, + pub attributes___db___system___name: Option, + pub attributes___db___collection___name: Option, + pub attributes___db___namespace: Option, + pub attributes___db___operation___name: Option, + pub attributes___db___response___status_code: Option, pub attributes___db___operation___batch___size: Option, - pub attributes___db___query___summary: Option, - pub attributes___db___query___text: Option, + pub attributes___db___query___summary: Option, + pub attributes___db___query___text: Option, // https://opentelemetry.io/docs/specs/semconv/attributes-registry/user/ - pub attributes___user___id: Option, - pub attributes___user___email: Option, + pub attributes___user___id: Option, + pub attributes___user___email: Option, pub attributes___user___full_name: Option, - pub attributes___user___name: Option, - pub attributes___user___hash: Option, + pub attributes___user___name: Option, + pub attributes___user___hash: Option, // Resource pub resource: Option, // resource as json // Resource Attributes (subset) https://opentelemetry.io/docs/specs/semconv/resource/ - pub resource___service___name: Option, - pub resource___service___version: Option, + pub resource___service___name: Option, + pub resource___service___version: Option, pub resource___service___instance___id: Option, - pub resource___service___namespace: Option, + pub resource___service___namespace: Option, pub resource___telemetry___sdk___language: Option, - pub resource___telemetry___sdk___name: Option, - pub resource___telemetry___sdk___version: Option, + pub resource___telemetry___sdk___name: Option, + pub resource___telemetry___sdk___version: Option, pub resource___user_agent___original: Option, // Kept at the bottom to make delta-rs happy, so its schema matches datafusion. // Seems delta removes the partition ids from the normal schema and moves them to the end. // Top-level fields - pub project_id: String, + pub project_id: String, #[serde(default)] #[serde(deserialize_with = "default_on_empty_string")] @@ -225,13 +221,13 @@ impl OtelLogsAndSpans { // Define sorting columns for the parquet files to improve query performance vec![ SortingColumn { - column_idx: 0, // timestamp is likely first in the schema - descending: true, // newest first + column_idx: 0, // timestamp is likely first in the schema + descending: true, // newest first nulls_first: false, }, SortingColumn { - column_idx: 3, // id - descending: false, + column_idx: 3, // id + descending: false, nulls_first: false, }, ] diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 483865d..01d5551 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -1,13 +1,16 @@ #[cfg(test)] mod integration { + use std::{ + collections::HashSet, + sync::{Arc, Mutex}, + time::{Duration, Instant}, + }; + use anyhow::Result; use dotenv::dotenv; use rand::Rng; use scopeguard; use serial_test::serial; - use std::collections::HashSet; - use std::sync::{Arc, Mutex}; - use std::time::{Duration, Instant}; use timefusion::database::Database; use tokio::{sync::Notify, time::sleep}; use tokio_postgres::{Client, NoTls}; diff --git a/tests/sqllogictest.rs b/tests/sqllogictest.rs index e7832e1..ebade9d 100644 --- a/tests/sqllogictest.rs +++ b/tests/sqllogictest.rs @@ -1,15 +1,16 @@ #[cfg(test)] mod sqllogictest_tests { - use anyhow::Result; - use async_trait::async_trait; - use dotenv::dotenv; - use serial_test::serial; - use sqllogictest::{AsyncDB, DBOutput, DefaultColumnType}; use std::{ path::Path, sync::Arc, time::{Duration, Instant}, }; + + use anyhow::Result; + use async_trait::async_trait; + use dotenv::dotenv; + use serial_test::serial; + use sqllogictest::{AsyncDB, DBOutput, DefaultColumnType}; use timefusion::database::Database; use tokio::{sync::Notify, time::sleep}; use tokio_postgres::{NoTls, Row};