diff --git a/docs/source/contributor-guide/ffi.rst b/docs/source/contributor-guide/ffi.rst index c1f9806b3..a40af1234 100644 --- a/docs/source/contributor-guide/ffi.rst +++ b/docs/source/contributor-guide/ffi.rst @@ -176,7 +176,7 @@ By convention the ``datafusion-python`` library expects a Python object that has ``TableProvider`` PyCapsule to have this capsule accessible by calling a function named ``__datafusion_table_provider__``. You can see a complete working example of how to share a ``TableProvider`` from one python library to DataFusion Python in the -`repository examples folder `_. +`repository examples folder `_. This section has been written using ``TableProvider`` as an example. It is the first extension that has been written using this approach and the most thoroughly implemented. diff --git a/examples/datafusion-ffi-example/Cargo.lock b/examples/datafusion-ffi-example/Cargo.lock index e5a1ca8d1..1b4ca6bee 100644 --- a/examples/datafusion-ffi-example/Cargo.lock +++ b/examples/datafusion-ffi-example/Cargo.lock @@ -323,6 +323,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73a47aa0c771b5381de2b7f16998d351a6f4eb839f1e13d48353e17e873d969b" dependencies = [ "bitflags", + "serde", + "serde_json", ] [[package]] @@ -748,9 +750,9 @@ dependencies = [ [[package]] name = "datafusion" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffe060b978f74ab446be722adb8a274e052e005bf6dfd171caadc3abaad10080" +checksum = "cc6cb8c2c81eada072059983657d6c9caf3fddefc43b4a65551d243253254a96" dependencies = [ "arrow", "arrow-ipc", @@ -775,7 +777,6 @@ dependencies = [ "datafusion-functions-nested", "datafusion-functions-table", "datafusion-functions-window", - "datafusion-macros", "datafusion-optimizer", "datafusion-physical-expr", "datafusion-physical-expr-common", @@ -790,7 +791,7 @@ dependencies = [ "object_store", "parking_lot", "parquet", - "rand", + "rand 0.9.1", "regex", "sqlparser", "tempfile", @@ -803,9 +804,9 @@ dependencies = [ [[package]] name = "datafusion-catalog" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61fe34f401bd03724a1f96d12108144f8cd495a3cdda2bf5e091822fb80b7e66" +checksum = "b7be8d1b627843af62e447396db08fe1372d882c0eb8d0ea655fd1fbc33120ee" dependencies = [ "arrow", "async-trait", @@ -829,9 +830,9 @@ dependencies = [ [[package]] name = "datafusion-catalog-listing" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4411b8e3bce5e0fc7521e44f201def2e2d5d1b5f176fb56e8cdc9942c890f00" +checksum = "38ab16c5ae43f65ee525fc493ceffbc41f40dee38b01f643dfcfc12959e92038" dependencies = [ "arrow", "async-trait", @@ -852,9 +853,9 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0734015d81c8375eb5d4869b7f7ecccc2ee8d6cb81948ef737cd0e7b743bd69c" +checksum = "d3d56b2ac9f476b93ca82e4ef5fb00769c8a3f248d12b4965af7e27635fa7e12" dependencies = [ "ahash", "arrow", @@ -876,9 +877,9 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5167bb1d2ccbb87c6bc36c295274d7a0519b14afcfdaf401d53cbcaa4ef4968b" +checksum = "16015071202d6133bc84d72756176467e3e46029f3ce9ad2cb788f9b1ff139b2" dependencies = [ "futures", "log", @@ -887,9 +888,9 @@ dependencies = [ [[package]] name = "datafusion-datasource" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04e602dcdf2f50c2abf297cc2203c73531e6f48b29516af7695d338cf2a778b1" +checksum = "b77523c95c89d2a7eb99df14ed31390e04ab29b43ff793e562bdc1716b07e17b" dependencies = [ "arrow", "async-compression", @@ -912,7 +913,7 @@ dependencies = [ "log", "object_store", "parquet", - "rand", + "rand 0.9.1", "tempfile", "tokio", "tokio-util", @@ -923,9 +924,9 @@ dependencies = [ [[package]] name = "datafusion-datasource-csv" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3bb2253952dc32296ed5b84077cb2e0257fea4be6373e1c376426e17ead4ef6" +checksum = "40d25c5e2c0ebe8434beeea997b8e88d55b3ccc0d19344293f2373f65bc524fc" dependencies = [ "arrow", "async-trait", @@ -948,9 +949,9 @@ dependencies = [ [[package]] name = "datafusion-datasource-json" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b8c7f47a5d2fe03bfa521ec9bafdb8a5c82de8377f60967c3663f00c8790352" +checksum = "3dc6959e1155741ab35369e1dc7673ba30fc45ed568fad34c01b7cb1daeb4d4c" dependencies = [ "arrow", "async-trait", @@ -973,9 +974,9 @@ dependencies = [ [[package]] name = "datafusion-datasource-parquet" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27d15868ea39ed2dc266728b554f6304acd473de2142281ecfa1294bb7415923" +checksum = "b7a6afdfe358d70f4237f60eaef26ae5a1ce7cb2c469d02d5fc6c7fd5d84e58b" dependencies = [ "arrow", "async-trait", @@ -998,21 +999,21 @@ dependencies = [ "object_store", "parking_lot", "parquet", - "rand", + "rand 0.9.1", "tokio", ] [[package]] name = "datafusion-doc" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a91f8c2c5788ef32f48ff56c68e5b545527b744822a284373ac79bba1ba47292" +checksum = "9bcd8a3e3e3d02ea642541be23d44376b5d5c37c2938cce39b3873cdf7186eea" [[package]] name = "datafusion-execution" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06f004d100f49a3658c9da6fb0c3a9b760062d96cd4ad82ccc3b7b69a9fb2f84" +checksum = "670da1d45d045eee4c2319b8c7ea57b26cf48ab77b630aaa50b779e406da476a" dependencies = [ "arrow", "dashmap", @@ -1022,16 +1023,16 @@ dependencies = [ "log", "object_store", "parking_lot", - "rand", + "rand 0.9.1", "tempfile", "url", ] [[package]] name = "datafusion-expr" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a4e4ce3802609be38eeb607ee72f6fe86c3091460de9dbfae9e18db423b3964" +checksum = "b3a577f64bdb7e2cc4043cd97f8901d8c504711fde2dbcb0887645b00d7c660b" dependencies = [ "arrow", "chrono", @@ -1050,9 +1051,9 @@ dependencies = [ [[package]] name = "datafusion-expr-common" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "422ac9cf3b22bbbae8cdf8ceb33039107fde1b5492693168f13bd566b1bcc839" +checksum = "51b7916806ace3e9f41884f230f7f38ebf0e955dfbd88266da1826f29a0b9a6a" dependencies = [ "arrow", "datafusion-common", @@ -1063,9 +1064,9 @@ dependencies = [ [[package]] name = "datafusion-ffi" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cf3fe9ab492c56daeb7beed526690d33622d388b8870472e0b7b7f55490338c" +checksum = "980cca31de37f5dadf7ea18e4ffc2b6833611f45bed5ef9de0831d2abb50f1ef" dependencies = [ "abi_stable", "arrow", @@ -1073,7 +1074,9 @@ dependencies = [ "async-ffi", "async-trait", "datafusion", + "datafusion-functions-aggregate-common", "datafusion-proto", + "datafusion-proto-common", "futures", "log", "prost", @@ -1081,11 +1084,25 @@ dependencies = [ "tokio", ] +[[package]] +name = "datafusion-ffi-example" +version = "0.2.0" +dependencies = [ + "arrow", + "arrow-array", + "arrow-schema", + "async-trait", + "datafusion", + "datafusion-ffi", + "pyo3", + "pyo3-build-config", +] + [[package]] name = "datafusion-functions" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ddf0a0a2db5d2918349c978d42d80926c6aa2459cd8a3c533a84ec4bb63479e" +checksum = "7fb31c9dc73d3e0c365063f91139dc273308f8a8e124adda9898db8085d68357" dependencies = [ "arrow", "arrow-buffer", @@ -1103,7 +1120,7 @@ dependencies = [ "itertools", "log", "md-5", - "rand", + "rand 0.9.1", "regex", "sha2", "unicode-segmentation", @@ -1112,9 +1129,9 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "408a05dafdc70d05a38a29005b8b15e21b0238734dab1e98483fcb58038c5aba" +checksum = "ebb72c6940697eaaba9bd1f746a697a07819de952b817e3fb841fb75331ad5d4" dependencies = [ "ahash", "arrow", @@ -1133,9 +1150,9 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "756d21da2dd6c9bef97af1504970ff56cbf35d03fbd4ffd62827f02f4d2279d4" +checksum = "d7fdc54656659e5ecd49bf341061f4156ab230052611f4f3609612a0da259696" dependencies = [ "ahash", "arrow", @@ -1146,9 +1163,9 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d8d50f6334b378930d992d801a10ac5b3e93b846b39e4a05085742572844537" +checksum = "fad94598e3374938ca43bca6b675febe557e7a14eb627d617db427d70d65118b" dependencies = [ "arrow", "arrow-ord", @@ -1167,9 +1184,9 @@ dependencies = [ [[package]] name = "datafusion-functions-table" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc9a97220736c8fff1446e936be90d57216c06f28969f9ffd3b72ac93c958c8a" +checksum = "de2fc6c2946da5cab8364fb28b5cac3115f0f3a87960b235ed031c3f7e2e639b" dependencies = [ "arrow", "async-trait", @@ -1183,10 +1200,11 @@ dependencies = [ [[package]] name = "datafusion-functions-window" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cefc2d77646e1aadd1d6a9c40088937aedec04e68c5f0465939912e1291f8193" +checksum = "3e5746548a8544870a119f556543adcd88fe0ba6b93723fe78ad0439e0fbb8b4" dependencies = [ + "arrow", "datafusion-common", "datafusion-doc", "datafusion-expr", @@ -1200,9 +1218,9 @@ dependencies = [ [[package]] name = "datafusion-functions-window-common" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd4aff082c42fa6da99ce0698c85addd5252928c908eb087ca3cfa64ff16b313" +checksum = "dcbe9404382cda257c434f22e13577bee7047031dfdb6216dd5e841b9465e6fe" dependencies = [ "datafusion-common", "datafusion-physical-expr-common", @@ -1210,9 +1228,9 @@ dependencies = [ [[package]] name = "datafusion-macros" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df6f88d7ee27daf8b108ba910f9015176b36fbc72902b1ca5c2a5f1d1717e1a1" +checksum = "8dce50e3b637dab0d25d04d2fe79dfdca2b257eabd76790bffd22c7f90d700c8" dependencies = [ "datafusion-expr", "quote", @@ -1221,9 +1239,9 @@ dependencies = [ [[package]] name = "datafusion-optimizer" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "084d9f979c4b155346d3c34b18f4256e6904ded508e9554d90fed416415c3515" +checksum = "03cfaacf06445dc3bbc1e901242d2a44f2cae99a744f49f3fefddcee46240058" dependencies = [ "arrow", "chrono", @@ -1240,9 +1258,9 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64c536062b0076f4e30084065d805f389f9fe38af0ca75bcbac86bc5e9fbab65" +checksum = "1908034a89d7b2630898e06863583ae4c00a0dd310c1589ca284195ee3f7f8a6" dependencies = [ "ahash", "arrow", @@ -1262,9 +1280,9 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8a92b53b3193fac1916a1c5b8e3f4347c526f6822e56b71faa5fb372327a863" +checksum = "47b7a12dd59ea07614b67dbb01d85254fbd93df45bcffa63495e11d3bdf847df" dependencies = [ "ahash", "arrow", @@ -1276,9 +1294,9 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fa0a5ac94c7cf3da97bedabd69d6bbca12aef84b9b37e6e9e8c25286511b5e2" +checksum = "4371cc4ad33978cc2a8be93bd54a232d3f2857b50401a14631c0705f3f910aae" dependencies = [ "arrow", "datafusion-common", @@ -1295,9 +1313,9 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "690c615db468c2e5fe5085b232d8b1c088299a6c63d87fd960a354a71f7acb55" +checksum = "dc47bc33025757a5c11f2cd094c5b6b5ed87f46fa33c023e6fdfa25fcbfade23" dependencies = [ "ahash", "arrow", @@ -1325,9 +1343,9 @@ dependencies = [ [[package]] name = "datafusion-proto" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a1afb2bdb05de7ff65be6883ebfd4ec027bd9f1f21c46aa3afd01927160a83" +checksum = "d8f5d9acd7d96e3bf2a7bb04818373cab6e51de0356e3694b94905fee7b4e8b6" dependencies = [ "arrow", "chrono", @@ -1341,9 +1359,9 @@ dependencies = [ [[package]] name = "datafusion-proto-common" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35b7a5876ebd6b564fb9a1fd2c3a2a9686b787071a256b47e4708f0916f9e46f" +checksum = "09ecb5ec152c4353b60f7a5635489834391f7a291d2b39a4820cd469e318b78e" dependencies = [ "arrow", "datafusion-common", @@ -1352,9 +1370,9 @@ dependencies = [ [[package]] name = "datafusion-session" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad229a134c7406c057ece00c8743c0c34b97f4e72f78b475fe17b66c5e14fa4f" +checksum = "d7485da32283985d6b45bd7d13a65169dcbe8c869e25d01b2cfbc425254b4b49" dependencies = [ "arrow", "async-trait", @@ -1376,9 +1394,9 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64f6ab28b72b664c21a27b22a2ff815fd390ed224c26e89a93b5a8154a4e8607" +checksum = "a466b15632befddfeac68c125f0260f569ff315c6831538cbb40db754134e0df" dependencies = [ "arrow", "bigdecimal", @@ -1441,20 +1459,6 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" -[[package]] -name = "ffi-table-provider" -version = "0.1.0" -dependencies = [ - "arrow", - "arrow-array", - "arrow-schema", - "async-trait", - "datafusion", - "datafusion-ffi", - "pyo3", - "pyo3-build-config", -] - [[package]] name = "fixedbitset" version = "0.5.7" @@ -1488,6 +1492,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -1666,6 +1676,11 @@ name = "hashbrown" version = "0.15.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "84b26c544d002229e640969970a2e74021aadf6e2f96372b9c58eff97de08eb3" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] [[package]] name = "heck" @@ -2271,12 +2286,14 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "petgraph" -version = "0.7.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" +checksum = "54acf3a685220b533e437e264e4d932cfbdc4cc7ec0cd232ed73c08d03b8a7ca" dependencies = [ "fixedbitset", + "hashbrown 0.15.3", "indexmap", + "serde", ] [[package]] @@ -2305,7 +2322,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d" dependencies = [ "phf_shared", - "rand", + "rand 0.8.5", ] [[package]] @@ -2484,19 +2501,27 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ - "libc", + "rand_core 0.6.4", +] + +[[package]] +name = "rand" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97" +dependencies = [ "rand_chacha", - "rand_core", + "rand_core 0.9.3", ] [[package]] name = "rand_chacha" -version = "0.3.1" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.9.3", ] [[package]] @@ -2504,8 +2529,14 @@ name = "rand_core" version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" + +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" dependencies = [ - "getrandom 0.2.16", + "getrandom 0.3.3", ] [[package]] @@ -3032,9 +3063,9 @@ checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" [[package]] name = "uuid" -version = "1.16.0" +version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9" +checksum = "3cf4199d1e5d15ddd86a694e4d0dffa9c323ce759fea589f00fef9d81cc1931d" dependencies = [ "getrandom 0.3.3", "js-sys", diff --git a/examples/datafusion-ffi-example/Cargo.toml b/examples/datafusion-ffi-example/Cargo.toml index 319163554..b26ab48e3 100644 --- a/examples/datafusion-ffi-example/Cargo.toml +++ b/examples/datafusion-ffi-example/Cargo.toml @@ -16,13 +16,13 @@ # under the License. [package] -name = "ffi-table-provider" -version = "0.1.0" +name = "datafusion-ffi-example" +version = "0.2.0" edition = "2021" [dependencies] -datafusion = { version = "47.0.0" } -datafusion-ffi = { version = "47.0.0" } +datafusion = { version = "48.0.0" } +datafusion-ffi = { version = "48.0.0" } pyo3 = { version = "0.23", features = ["extension-module", "abi3", "abi3-py39"] } arrow = { version = "55.0.0" } arrow-array = { version = "55.0.0" } diff --git a/examples/datafusion-ffi-example/python/tests/_test_aggregate_udf.py b/examples/datafusion-ffi-example/python/tests/_test_aggregate_udf.py new file mode 100644 index 000000000..7ea6b295c --- /dev/null +++ b/examples/datafusion-ffi-example/python/tests/_test_aggregate_udf.py @@ -0,0 +1,77 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import pyarrow as pa +from datafusion import SessionContext, col, udaf +from datafusion_ffi_example import MySumUDF + + +def setup_context_with_table(): + ctx = SessionContext() + + # Pick numbers here so we get the same value in both groups + # since we cannot be certain of the output order of batches + batch = pa.RecordBatch.from_arrays( + [ + pa.array([1, 2, 3, None], type=pa.int64()), + pa.array([1, 1, 2, 2], type=pa.int64()), + ], + names=["a", "b"], + ) + ctx.register_record_batches("test_table", [[batch]]) + return ctx + + +def test_ffi_aggregate_register(): + ctx = setup_context_with_table() + my_udaf = udaf(MySumUDF()) + ctx.register_udaf(my_udaf) + + result = ctx.sql("select my_custom_sum(a) from test_table group by b").collect() + + assert len(result) == 2 + assert result[0].num_columns == 1 + + result = [r.column(0) for r in result] + expected = [ + pa.array([3], type=pa.int64()), + pa.array([3], type=pa.int64()), + ] + + assert result == expected + + +def test_ffi_aggregate_call_directly(): + ctx = setup_context_with_table() + my_udaf = udaf(MySumUDF()) + + result = ( + ctx.table("test_table").aggregate([col("b")], [my_udaf(col("a"))]).collect() + ) + + assert len(result) == 2 + assert result[0].num_columns == 2 + + result = [r.column(1) for r in result] + expected = [ + pa.array([3], type=pa.int64()), + pa.array([3], type=pa.int64()), + ] + + assert result == expected diff --git a/examples/datafusion-ffi-example/python/tests/_test_scalar_udf.py b/examples/datafusion-ffi-example/python/tests/_test_scalar_udf.py new file mode 100644 index 000000000..0c949c34a --- /dev/null +++ b/examples/datafusion-ffi-example/python/tests/_test_scalar_udf.py @@ -0,0 +1,70 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import pyarrow as pa +from datafusion import SessionContext, col, udf +from datafusion_ffi_example import IsNullUDF + + +def setup_context_with_table(): + ctx = SessionContext() + + batch = pa.RecordBatch.from_arrays( + [pa.array([1, 2, 3, None])], + names=["a"], + ) + ctx.register_record_batches("test_table", [[batch]]) + return ctx + + +def test_ffi_scalar_register(): + ctx = setup_context_with_table() + my_udf = udf(IsNullUDF()) + ctx.register_udf(my_udf) + + result = ctx.sql("select my_custom_is_null(a) from test_table").collect() + + assert len(result) == 1 + assert result[0].num_columns == 1 + print(result) + + result = [r.column(0) for r in result] + expected = [ + pa.array([False, False, False, True], type=pa.bool_()), + ] + + assert result == expected + + +def test_ffi_scalar_call_directly(): + ctx = setup_context_with_table() + my_udf = udf(IsNullUDF()) + + result = ctx.table("test_table").select(my_udf(col("a"))).collect() + + assert len(result) == 1 + assert result[0].num_columns == 1 + print(result) + + result = [r.column(0) for r in result] + expected = [ + pa.array([False, False, False, True], type=pa.bool_()), + ] + + assert result == expected diff --git a/examples/datafusion-ffi-example/python/tests/_test_window_udf.py b/examples/datafusion-ffi-example/python/tests/_test_window_udf.py new file mode 100644 index 000000000..7d96994b9 --- /dev/null +++ b/examples/datafusion-ffi-example/python/tests/_test_window_udf.py @@ -0,0 +1,89 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import pyarrow as pa +from datafusion import SessionContext, col, udwf +from datafusion_ffi_example import MyRankUDF + + +def setup_context_with_table(): + ctx = SessionContext() + + # Pick numbers here so we get the same value in both groups + # since we cannot be certain of the output order of batches + batch = pa.RecordBatch.from_arrays( + [ + pa.array([40, 10, 30, 20], type=pa.int64()), + ], + names=["a"], + ) + ctx.register_record_batches("test_table", [[batch]]) + return ctx + + +def test_ffi_window_register(): + ctx = setup_context_with_table() + my_udwf = udwf(MyRankUDF()) + ctx.register_udwf(my_udwf) + + result = ctx.sql( + "select a, my_custom_rank() over (order by a) from test_table" + ).collect() + assert len(result) == 1 + assert result[0].num_columns == 2 + + results = [ + (result[0][0][idx].as_py(), result[0][1][idx].as_py()) for idx in range(4) + ] + results.sort() + + expected = [ + (10, 1), + (20, 2), + (30, 3), + (40, 4), + ] + assert results == expected + + +def test_ffi_window_call_directly(): + ctx = setup_context_with_table() + my_udwf = udwf(MyRankUDF()) + + result = ( + ctx.table("test_table") + .select(col("a"), my_udwf().order_by(col("a")).build()) + .collect() + ) + + assert len(result) == 1 + assert result[0].num_columns == 2 + + results = [ + (result[0][0][idx].as_py(), result[0][1][idx].as_py()) for idx in range(4) + ] + results.sort() + + expected = [ + (10, 1), + (20, 2), + (30, 3), + (40, 4), + ] + assert results == expected diff --git a/examples/datafusion-ffi-example/src/aggregate_udf.rs b/examples/datafusion-ffi-example/src/aggregate_udf.rs new file mode 100644 index 000000000..9481fe9c6 --- /dev/null +++ b/examples/datafusion-ffi-example/src/aggregate_udf.rs @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_schema::DataType; +use datafusion::error::Result as DataFusionResult; +use datafusion::functions_aggregate::sum::Sum; +use datafusion::logical_expr::function::AccumulatorArgs; +use datafusion::logical_expr::{Accumulator, AggregateUDF, AggregateUDFImpl, Signature}; +use datafusion_ffi::udaf::FFI_AggregateUDF; +use pyo3::types::PyCapsule; +use pyo3::{pyclass, pymethods, Bound, PyResult, Python}; +use std::any::Any; +use std::sync::Arc; + +#[pyclass(name = "MySumUDF", module = "datafusion_ffi_example", subclass)] +#[derive(Debug, Clone)] +pub(crate) struct MySumUDF { + inner: Arc, +} + +#[pymethods] +impl MySumUDF { + #[new] + fn new() -> Self { + Self { + inner: Arc::new(Sum::new()), + } + } + + fn __datafusion_aggregate_udf__<'py>( + &self, + py: Python<'py>, + ) -> PyResult> { + let name = cr"datafusion_aggregate_udf".into(); + + let func = Arc::new(AggregateUDF::from(self.clone())); + let provider = FFI_AggregateUDF::from(func); + + PyCapsule::new(py, provider, Some(name)) + } +} + +impl AggregateUDFImpl for MySumUDF { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "my_custom_sum" + } + + fn signature(&self) -> &Signature { + self.inner.signature() + } + + fn return_type(&self, arg_types: &[DataType]) -> DataFusionResult { + self.inner.return_type(arg_types) + } + + fn accumulator(&self, acc_args: AccumulatorArgs) -> DataFusionResult> { + self.inner.accumulator(acc_args) + } + + fn coerce_types(&self, arg_types: &[DataType]) -> DataFusionResult> { + self.inner.coerce_types(arg_types) + } +} diff --git a/examples/datafusion-ffi-example/src/catalog_provider.rs b/examples/datafusion-ffi-example/src/catalog_provider.rs index 54e61cf3e..cd2616916 100644 --- a/examples/datafusion-ffi-example/src/catalog_provider.rs +++ b/examples/datafusion-ffi-example/src/catalog_provider.rs @@ -24,7 +24,6 @@ use datafusion::{ catalog::{ CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, SchemaProvider, TableProvider, }, - common::exec_err, datasource::MemTable, error::{DataFusionError, Result}, }; diff --git a/examples/datafusion-ffi-example/src/lib.rs b/examples/datafusion-ffi-example/src/lib.rs index 3a4cf2247..79af276fd 100644 --- a/examples/datafusion-ffi-example/src/lib.rs +++ b/examples/datafusion-ffi-example/src/lib.rs @@ -16,18 +16,27 @@ // under the License. use crate::catalog_provider::MyCatalogProvider; +use crate::aggregate_udf::MySumUDF; +use crate::scalar_udf::IsNullUDF; use crate::table_function::MyTableFunction; use crate::table_provider::MyTableProvider; +use crate::window_udf::MyRankUDF; use pyo3::prelude::*; pub(crate) mod catalog_provider; +pub(crate) mod aggregate_udf; +pub(crate) mod scalar_udf; pub(crate) mod table_function; pub(crate) mod table_provider; +pub(crate) mod window_udf; #[pymodule] fn datafusion_ffi_example(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/examples/datafusion-ffi-example/src/scalar_udf.rs b/examples/datafusion-ffi-example/src/scalar_udf.rs new file mode 100644 index 000000000..727666638 --- /dev/null +++ b/examples/datafusion-ffi-example/src/scalar_udf.rs @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::{Array, BooleanArray}; +use arrow_schema::DataType; +use datafusion::common::ScalarValue; +use datafusion::error::Result as DataFusionResult; +use datafusion::logical_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature, + Volatility, +}; +use datafusion_ffi::udf::FFI_ScalarUDF; +use pyo3::types::PyCapsule; +use pyo3::{pyclass, pymethods, Bound, PyResult, Python}; +use std::any::Any; +use std::sync::Arc; + +#[pyclass(name = "IsNullUDF", module = "datafusion_ffi_example", subclass)] +#[derive(Debug, Clone)] +pub(crate) struct IsNullUDF { + signature: Signature, +} + +#[pymethods] +impl IsNullUDF { + #[new] + fn new() -> Self { + Self { + signature: Signature::new(TypeSignature::Any(1), Volatility::Immutable), + } + } + + fn __datafusion_scalar_udf__<'py>(&self, py: Python<'py>) -> PyResult> { + let name = cr"datafusion_scalar_udf".into(); + + let func = Arc::new(ScalarUDF::from(self.clone())); + let provider = FFI_ScalarUDF::from(func); + + PyCapsule::new(py, provider, Some(name)) + } +} + +impl ScalarUDFImpl for IsNullUDF { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "my_custom_is_null" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> DataFusionResult { + Ok(DataType::Boolean) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DataFusionResult { + let input = &args.args[0]; + + Ok(match input { + ColumnarValue::Array(arr) => match arr.is_nullable() { + true => { + let nulls = arr.nulls().unwrap(); + let nulls = BooleanArray::from_iter(nulls.iter().map(|x| Some(!x))); + ColumnarValue::Array(Arc::new(nulls)) + } + false => ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))), + }, + ColumnarValue::Scalar(sv) => { + ColumnarValue::Scalar(ScalarValue::Boolean(Some(sv == &ScalarValue::Null))) + } + }) + } +} diff --git a/examples/datafusion-ffi-example/src/window_udf.rs b/examples/datafusion-ffi-example/src/window_udf.rs new file mode 100644 index 000000000..e0d397956 --- /dev/null +++ b/examples/datafusion-ffi-example/src/window_udf.rs @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_schema::{DataType, FieldRef}; +use datafusion::error::Result as DataFusionResult; +use datafusion::functions_window::rank::rank_udwf; +use datafusion::logical_expr::function::{PartitionEvaluatorArgs, WindowUDFFieldArgs}; +use datafusion::logical_expr::{PartitionEvaluator, Signature, WindowUDF, WindowUDFImpl}; +use datafusion_ffi::udwf::FFI_WindowUDF; +use pyo3::types::PyCapsule; +use pyo3::{pyclass, pymethods, Bound, PyResult, Python}; +use std::any::Any; +use std::sync::Arc; + +#[pyclass(name = "MyRankUDF", module = "datafusion_ffi_example", subclass)] +#[derive(Debug, Clone)] +pub(crate) struct MyRankUDF { + inner: Arc, +} + +#[pymethods] +impl MyRankUDF { + #[new] + fn new() -> Self { + Self { inner: rank_udwf() } + } + + fn __datafusion_window_udf__<'py>(&self, py: Python<'py>) -> PyResult> { + let name = cr"datafusion_window_udf".into(); + + let func = Arc::new(WindowUDF::from(self.clone())); + let provider = FFI_WindowUDF::from(func); + + PyCapsule::new(py, provider, Some(name)) + } +} + +impl WindowUDFImpl for MyRankUDF { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "my_custom_rank" + } + + fn signature(&self) -> &Signature { + self.inner.signature() + } + + fn partition_evaluator( + &self, + partition_evaluator_args: PartitionEvaluatorArgs, + ) -> DataFusionResult> { + self.inner + .inner() + .partition_evaluator(partition_evaluator_args) + } + + fn field(&self, field_args: WindowUDFFieldArgs) -> DataFusionResult { + self.inner.inner().field(field_args) + } + + fn coerce_types(&self, arg_types: &[DataType]) -> DataFusionResult> { + self.inner.coerce_types(arg_types) + } +} diff --git a/python/datafusion/user_defined.py b/python/datafusion/user_defined.py index dd634c7fb..bd686acbb 100644 --- a/python/datafusion/user_defined.py +++ b/python/datafusion/user_defined.py @@ -22,7 +22,7 @@ import functools from abc import ABCMeta, abstractmethod from enum import Enum -from typing import TYPE_CHECKING, Any, Callable, Optional, TypeVar, overload +from typing import TYPE_CHECKING, Any, Callable, Optional, Protocol, TypeVar, overload import pyarrow as pa @@ -77,6 +77,12 @@ def __str__(self) -> str: return self.name.lower() +class ScalarUDFExportable(Protocol): + """Type hint for object that has __datafusion_scalar_udf__ PyCapsule.""" + + def __datafusion_scalar_udf__(self) -> object: ... # noqa: D105 + + class ScalarUDF: """Class for performing scalar user-defined functions (UDF). @@ -96,6 +102,9 @@ def __init__( See helper method :py:func:`udf` for argument details. """ + if hasattr(func, "__datafusion_scalar_udf__"): + self._udf = df_internal.ScalarUDF.from_pycapsule(func) + return if isinstance(input_types, pa.DataType): input_types = [input_types] self._udf = df_internal.ScalarUDF( @@ -134,6 +143,10 @@ def udf( name: Optional[str] = None, ) -> ScalarUDF: ... + @overload + @staticmethod + def udf(func: ScalarUDFExportable) -> ScalarUDF: ... + @staticmethod def udf(*args: Any, **kwargs: Any): # noqa: D417 """Create a new User-Defined Function (UDF). @@ -147,7 +160,10 @@ def udf(*args: Any, **kwargs: Any): # noqa: D417 Args: func (Callable, optional): Only needed when calling as a function. - Skip this argument when using ``udf`` as a decorator. + Skip this argument when using `udf` as a decorator. If you have a Rust + backed ScalarUDF within a PyCapsule, you can pass this parameter + and ignore the rest. They will be determined directly from the + underlying function. See the online documentation for more information. input_types (list[pa.DataType]): The data types of the arguments to ``func``. This list must be of the same length as the number of arguments. @@ -215,12 +231,31 @@ def wrapper(*args: Any, **kwargs: Any): return decorator + if hasattr(args[0], "__datafusion_scalar_udf__"): + return ScalarUDF.from_pycapsule(args[0]) + if args and callable(args[0]): # Case 1: Used as a function, require the first parameter to be callable return _function(*args, **kwargs) # Case 2: Used as a decorator with parameters return _decorator(*args, **kwargs) + @staticmethod + def from_pycapsule(func: ScalarUDFExportable) -> ScalarUDF: + """Create a Scalar UDF from ScalarUDF PyCapsule object. + + This function will instantiate a Scalar UDF that uses a DataFusion + ScalarUDF that is exported via the FFI bindings. + """ + name = str(func.__class__) + return ScalarUDF( + name=name, + func=func, + input_types=None, + return_type=None, + volatility=None, + ) + class Accumulator(metaclass=ABCMeta): """Defines how an :py:class:`AggregateUDF` accumulates values.""" @@ -242,6 +277,12 @@ def evaluate(self) -> pa.Scalar: """Return the resultant value.""" +class AggregateUDFExportable(Protocol): + """Type hint for object that has __datafusion_aggregate_udf__ PyCapsule.""" + + def __datafusion_aggregate_udf__(self) -> object: ... # noqa: D105 + + class AggregateUDF: """Class for performing scalar user-defined functions (UDF). @@ -263,6 +304,9 @@ def __init__( See :py:func:`udaf` for a convenience function and argument descriptions. """ + if hasattr(accumulator, "__datafusion_aggregate_udf__"): + self._udaf = df_internal.AggregateUDF.from_pycapsule(accumulator) + return self._udaf = df_internal.AggregateUDF( name, accumulator, @@ -307,7 +351,7 @@ def udaf( ) -> AggregateUDF: ... @staticmethod - def udaf(*args: Any, **kwargs: Any): # noqa: D417 + def udaf(*args: Any, **kwargs: Any): # noqa: D417, C901 """Create a new User-Defined Aggregate Function (UDAF). This class allows you to define an aggregate function that can be used in @@ -364,6 +408,10 @@ def udf4() -> Summarize: Args: accum: The accumulator python function. Only needed when calling as a function. Skip this argument when using ``udaf`` as a decorator. + If you have a Rust backed AggregateUDF within a PyCapsule, you can + pass this parameter and ignore the rest. They will be determined + directly from the underlying function. See the online documentation + for more information. input_types: The data types of the arguments to ``accum``. return_type: The data type of the return value. state_type: The data types of the intermediate accumulation. @@ -422,12 +470,32 @@ def wrapper(*args: Any, **kwargs: Any) -> Expr: return decorator + if hasattr(args[0], "__datafusion_aggregate_udf__"): + return AggregateUDF.from_pycapsule(args[0]) + if args and callable(args[0]): # Case 1: Used as a function, require the first parameter to be callable return _function(*args, **kwargs) # Case 2: Used as a decorator with parameters return _decorator(*args, **kwargs) + @staticmethod + def from_pycapsule(func: AggregateUDFExportable) -> AggregateUDF: + """Create an Aggregate UDF from AggregateUDF PyCapsule object. + + This function will instantiate a Aggregate UDF that uses a DataFusion + AggregateUDF that is exported via the FFI bindings. + """ + name = str(func.__class__) + return AggregateUDF( + name=name, + accumulator=func, + input_types=None, + return_type=None, + state_type=None, + volatility=None, + ) + class WindowEvaluator: """Evaluator class for user-defined window functions (UDWF). @@ -588,6 +656,12 @@ def include_rank(self) -> bool: return False +class WindowUDFExportable(Protocol): + """Type hint for object that has __datafusion_window_udf__ PyCapsule.""" + + def __datafusion_window_udf__(self) -> object: ... # noqa: D105 + + class WindowUDF: """Class for performing window user-defined functions (UDF). @@ -608,6 +682,9 @@ def __init__( See :py:func:`udwf` for a convenience function and argument descriptions. """ + if hasattr(func, "__datafusion_window_udf__"): + self._udwf = df_internal.WindowUDF.from_pycapsule(func) + return self._udwf = df_internal.WindowUDF( name, func, input_types, return_type, str(volatility) ) @@ -683,7 +760,10 @@ def biased_numbers() -> BiasedNumbers: Args: func: Only needed when calling as a function. Skip this argument when - using ``udwf`` as a decorator. + using ``udwf`` as a decorator. If you have a Rust backed WindowUDF + within a PyCapsule, you can pass this parameter and ignore the rest. + They will be determined directly from the underlying function. See + the online documentation for more information. input_types: The data types of the arguments. return_type: The data type of the return value. volatility: See :py:class:`Volatility` for allowed values. @@ -692,6 +772,9 @@ def biased_numbers() -> BiasedNumbers: Returns: A user-defined window function that can be used in window function calls. """ + if hasattr(args[0], "__datafusion_window_udf__"): + return WindowUDF.from_pycapsule(args[0]) + if args and callable(args[0]): # Case 1: Used as a function, require the first parameter to be callable return WindowUDF._create_window_udf(*args, **kwargs) @@ -759,6 +842,22 @@ def wrapper(*args: Any, **kwargs: Any) -> Expr: return decorator + @staticmethod + def from_pycapsule(func: WindowUDFExportable) -> WindowUDF: + """Create a Window UDF from WindowUDF PyCapsule object. + + This function will instantiate a Window UDF that uses a DataFusion + WindowUDF that is exported via the FFI bindings. + """ + name = str(func.__class__) + return WindowUDF( + name=name, + func=func, + input_types=None, + return_type=None, + volatility=None, + ) + class TableFunction: """Class for performing user-defined table functions (UDTF). diff --git a/src/functions.rs b/src/functions.rs index b40500b8b..eeef48385 100644 --- a/src/functions.rs +++ b/src/functions.rs @@ -682,7 +682,7 @@ pub fn approx_percentile_cont_with_weight( add_builder_fns_to_aggregate(agg_fn, None, filter, None, None) } -// We handle first_value explicitly because the signature expects an order_by +// We handle last_value explicitly because the signature expects an order_by // https://github.com/apache/datafusion/issues/12376 #[pyfunction] #[pyo3(signature = (expr, distinct=None, filter=None, order_by=None, null_treatment=None))] diff --git a/src/udaf.rs b/src/udaf.rs index 34a9cd51d..78f4e2b0c 100644 --- a/src/udaf.rs +++ b/src/udaf.rs @@ -19,6 +19,10 @@ use std::sync::Arc; use pyo3::{prelude::*, types::PyTuple}; +use crate::common::data_type::PyScalarValue; +use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionResult}; +use crate::expr::PyExpr; +use crate::utils::{parse_volatility, validate_pycapsule}; use datafusion::arrow::array::{Array, ArrayRef}; use datafusion::arrow::datatypes::DataType; use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow}; @@ -27,11 +31,8 @@ use datafusion::error::{DataFusionError, Result}; use datafusion::logical_expr::{ create_udaf, Accumulator, AccumulatorFactoryFunction, AggregateUDF, }; - -use crate::common::data_type::PyScalarValue; -use crate::errors::to_datafusion_err; -use crate::expr::PyExpr; -use crate::utils::parse_volatility; +use datafusion_ffi::udaf::{FFI_AggregateUDF, ForeignAggregateUDF}; +use pyo3::types::PyCapsule; #[derive(Debug)] struct RustAccumulator { @@ -183,6 +184,26 @@ impl PyAggregateUDF { Ok(Self { function }) } + #[staticmethod] + pub fn from_pycapsule(func: Bound<'_, PyAny>) -> PyDataFusionResult { + if func.hasattr("__datafusion_aggregate_udf__")? { + let capsule = func.getattr("__datafusion_aggregate_udf__")?.call0()?; + let capsule = capsule.downcast::().map_err(py_datafusion_err)?; + validate_pycapsule(capsule, "datafusion_aggregate_udf")?; + + let udaf = unsafe { capsule.reference::() }; + let udaf: ForeignAggregateUDF = udaf.try_into()?; + + Ok(Self { + function: udaf.into(), + }) + } else { + Err(crate::errors::PyDataFusionError::Common( + "__datafusion_aggregate_udf__ does not exist on AggregateUDF object.".to_string(), + )) + } + } + /// creates a new PyExpr with the call of the udf #[pyo3(signature = (*args))] fn __call__(&self, args: Vec) -> PyResult { diff --git a/src/udf.rs b/src/udf.rs index 574c9d7b5..de1e3f18c 100644 --- a/src/udf.rs +++ b/src/udf.rs @@ -17,6 +17,8 @@ use std::sync::Arc; +use datafusion_ffi::udf::{FFI_ScalarUDF, ForeignScalarUDF}; +use pyo3::types::PyCapsule; use pyo3::{prelude::*, types::PyTuple}; use datafusion::arrow::array::{make_array, Array, ArrayData, ArrayRef}; @@ -29,8 +31,9 @@ use datafusion::logical_expr::ScalarUDF; use datafusion::logical_expr::{create_udf, ColumnarValue}; use crate::errors::to_datafusion_err; +use crate::errors::{py_datafusion_err, PyDataFusionResult}; use crate::expr::PyExpr; -use crate::utils::parse_volatility; +use crate::utils::{parse_volatility, validate_pycapsule}; /// Create a Rust callable function from a python function that expects pyarrow arrays fn pyarrow_function_to_rust( @@ -105,6 +108,26 @@ impl PyScalarUDF { Ok(Self { function }) } + #[staticmethod] + pub fn from_pycapsule(func: Bound<'_, PyAny>) -> PyDataFusionResult { + if func.hasattr("__datafusion_scalar_udf__")? { + let capsule = func.getattr("__datafusion_scalar_udf__")?.call0()?; + let capsule = capsule.downcast::().map_err(py_datafusion_err)?; + validate_pycapsule(capsule, "datafusion_scalar_udf")?; + + let udf = unsafe { capsule.reference::() }; + let udf: ForeignScalarUDF = udf.try_into()?; + + Ok(Self { + function: udf.into(), + }) + } else { + Err(crate::errors::PyDataFusionError::Common( + "__datafusion_scalar_udf__ does not exist on ScalarUDF object.".to_string(), + )) + } + } + /// creates a new PyExpr with the call of the udf #[pyo3(signature = (*args))] fn __call__(&self, args: Vec) -> PyResult { diff --git a/src/udwf.rs b/src/udwf.rs index a0c8cc59a..4fb98916b 100644 --- a/src/udwf.rs +++ b/src/udwf.rs @@ -27,16 +27,17 @@ use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use crate::common::data_type::PyScalarValue; -use crate::errors::to_datafusion_err; +use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionResult}; use crate::expr::PyExpr; -use crate::utils::parse_volatility; +use crate::utils::{parse_volatility, validate_pycapsule}; use datafusion::arrow::datatypes::DataType; use datafusion::arrow::pyarrow::{FromPyArrow, PyArrowType, ToPyArrow}; use datafusion::error::{DataFusionError, Result}; use datafusion::logical_expr::{ PartitionEvaluator, PartitionEvaluatorFactory, Signature, Volatility, WindowUDF, WindowUDFImpl, }; -use pyo3::types::{PyList, PyTuple}; +use datafusion_ffi::udwf::{FFI_WindowUDF, ForeignWindowUDF}; +use pyo3::types::{PyCapsule, PyList, PyTuple}; #[derive(Debug)] struct RustPartitionEvaluator { @@ -245,6 +246,26 @@ impl PyWindowUDF { Ok(self.function.call(args).into()) } + #[staticmethod] + pub fn from_pycapsule(func: Bound<'_, PyAny>) -> PyDataFusionResult { + if func.hasattr("__datafusion_window_udf__")? { + let capsule = func.getattr("__datafusion_window_udf__")?.call0()?; + let capsule = capsule.downcast::().map_err(py_datafusion_err)?; + validate_pycapsule(capsule, "datafusion_window_udf")?; + + let udwf = unsafe { capsule.reference::() }; + let udwf: ForeignWindowUDF = udwf.try_into()?; + + Ok(Self { + function: udwf.into(), + }) + } else { + Err(crate::errors::PyDataFusionError::Common( + "__datafusion_window_udf__ does not exist on WindowUDF object.".to_string(), + )) + } + } + fn __repr__(&self) -> PyResult { Ok(format!("WindowUDF({})", self.function.name())) }