diff --git a/Cargo.toml b/Cargo.toml index 4f260231..d2d3e79e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,7 +47,7 @@ uuid = { version = "1.9", features = ["v4"] } mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] } async-trait = "0.1" futures = "0.3" -object_store = { version = "0.11.0", features = ["aws", "gcp", "azure"] } +object_store = { version = "0.11.0", features = ["aws", "gcp", "azure", "http"] } parking_lot = "0.12" regex-syntax = "0.8" syn = "2.0.79" diff --git a/python/datafusion/context.py b/python/datafusion/context.py index b08e62d7..957d7e31 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -450,7 +450,9 @@ def __init__( self.ctx = SessionContextInternal(config, runtime) - def register_object_store(self, schema: str, store: Any, host: str | None) -> None: + def register_object_store( + self, schema: str, store: Any, host: str | None = None + ) -> None: """Add a new object store into the session. Args: diff --git a/python/datafusion/object_store.py b/python/datafusion/object_store.py index c927e761..7cc17506 100644 --- a/python/datafusion/object_store.py +++ b/python/datafusion/object_store.py @@ -22,14 +22,6 @@ GoogleCloud = object_store.GoogleCloud LocalFileSystem = object_store.LocalFileSystem MicrosoftAzure = object_store.MicrosoftAzure +Http = object_store.Http -__all__ = [ - "AmazonS3", - "GoogleCloud", - "LocalFileSystem", - "MicrosoftAzure", -] - - -def __getattr__(name): - return getattr(object_store, name) +__all__ = ["AmazonS3", "GoogleCloud", "LocalFileSystem", "MicrosoftAzure", "Http"] diff --git a/python/tests/test_sql.py b/python/tests/test_sql.py index e39a9f5c..39e5ffe6 100644 --- a/python/tests/test_sql.py +++ b/python/tests/test_sql.py @@ -22,7 +22,7 @@ from pyarrow.csv import write_csv import pyarrow.dataset as ds import pytest -from datafusion.object_store import LocalFileSystem +from datafusion.object_store import Http from datafusion import udf, col @@ -139,6 +139,15 @@ def test_register_csv_list(ctx, tmp_path): assert int_sum == 2 * sum(int_values) +def test_register_http_csv(ctx): + url = "https://raw.githubusercontent.com/ibis-project/testing-data/refs/heads/master/csv/diamonds.csv" + ctx.register_object_store("", Http(url)) + ctx.register_csv("remote", url) + assert ctx.table_exist("remote") + res, *_ = ctx.sql("SELECT COUNT(*) AS total FROM remote").to_pylist() + assert res["total"] > 0 + + def test_register_parquet(ctx, tmp_path): path = helpers.write_parquet(tmp_path / "a.parquet", helpers.data()) ctx.register_parquet("t", path) @@ -494,7 +503,6 @@ def test_register_listing_table( dir_root = f"file://{dir_root}/" if path_to_str else dir_root - ctx.register_object_store("file://local", LocalFileSystem(), None) ctx.register_listing_table( "my_table", dir_root, diff --git a/python/tests/test_store.py b/python/tests/test_store.py index 3ffd9ee4..f85b2831 100644 --- a/python/tests/test_store.py +++ b/python/tests/test_store.py @@ -16,21 +16,15 @@ # under the License. import os + import pytest from datafusion import SessionContext -from datafusion.object_store import LocalFileSystem - - -@pytest.fixture -def local(): - return LocalFileSystem() @pytest.fixture -def ctx(local): +def ctx(): ctx = SessionContext() - ctx.register_object_store("file://local", local, None) return ctx diff --git a/python/tests/test_wrapper_coverage.py b/python/tests/test_wrapper_coverage.py index c53a89c5..86f2d57f 100644 --- a/python/tests/test_wrapper_coverage.py +++ b/python/tests/test_wrapper_coverage.py @@ -55,7 +55,7 @@ def missing_exports(internal_obj, wrapped_obj) -> None: def test_datafusion_missing_exports() -> None: - """Check for any missing pythone exports. + """Check for any missing python exports. This test verifies that every exposed class, attribute, and function in the internal (pyo3) module is also exposed in our python wrappers. diff --git a/src/context.rs b/src/context.rs index 5317a3ed..f445874d 100644 --- a/src/context.rs +++ b/src/context.rs @@ -312,6 +312,7 @@ impl PySessionContext { StorageContexts::GoogleCloudStorage(gcs) => (gcs.inner, gcs.bucket_name), StorageContexts::MicrosoftAzure(azure) => (azure.inner, azure.container_name), StorageContexts::LocalFileSystem(local) => (local.inner, "".to_string()), + StorageContexts::HTTP(http) => (http.store, http.url), }; // let users override the host to match the api signature from upstream diff --git a/src/store.rs b/src/store.rs index 846d96a6..1e5fab47 100644 --- a/src/store.rs +++ b/src/store.rs @@ -22,7 +22,10 @@ use pyo3::prelude::*; use object_store::aws::{AmazonS3, AmazonS3Builder}; use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder}; use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder}; +use object_store::http::{HttpBuilder, HttpStore}; use object_store::local::LocalFileSystem; +use pyo3::exceptions::PyValueError; +use url::Url; #[derive(FromPyObject)] pub enum StorageContexts { @@ -30,6 +33,7 @@ pub enum StorageContexts { GoogleCloudStorage(PyGoogleCloudContext), MicrosoftAzure(PyMicrosoftAzureContext), LocalFileSystem(PyLocalFileSystemContext), + HTTP(PyHttpContext), } #[pyclass(name = "LocalFileSystem", module = "datafusion.store", subclass)] @@ -219,10 +223,37 @@ impl PyAmazonS3Context { } } +#[pyclass(name = "Http", module = "datafusion.store", subclass)] +#[derive(Debug, Clone)] +pub struct PyHttpContext { + pub url: String, + pub store: Arc, +} + +#[pymethods] +impl PyHttpContext { + #[new] + fn new(url: String) -> PyResult { + let store = match Url::parse(url.as_str()) { + Ok(url) => HttpBuilder::new() + .with_url(url.origin().ascii_serialization()) + .build(), + Err(_) => HttpBuilder::new().build(), + } + .map_err(|e| PyValueError::new_err(format!("Error: {:?}", e.to_string())))?; + + Ok(Self { + url, + store: Arc::new(store), + }) + } +} + pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; Ok(()) }