Skip to content

Commit

Permalink
feat: expose http object store (#885)
Browse files Browse the repository at this point in the history
* feat: expose HTTP ObjectStore

The objective is to allow the user to register CSV directly from
an HTTP URL, delaying downloading the file until required

* chore: return PyResult
  • Loading branch information
mesejo authored Oct 6, 2024
1 parent ec8246d commit 6c8292f
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ uuid = { version = "1.9", features = ["v4"] }
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
async-trait = "0.1"
futures = "0.3"
object_store = { version = "0.11.0", features = ["aws", "gcp", "azure"] }
object_store = { version = "0.11.0", features = ["aws", "gcp", "azure", "http"] }
parking_lot = "0.12"
regex-syntax = "0.8"
syn = "2.0.79"
Expand Down
4 changes: 3 additions & 1 deletion python/datafusion/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,9 @@ def __init__(

self.ctx = SessionContextInternal(config, runtime)

def register_object_store(self, schema: str, store: Any, host: str | None) -> None:
def register_object_store(
self, schema: str, store: Any, host: str | None = None
) -> None:
"""Add a new object store into the session.
Args:
Expand Down
12 changes: 2 additions & 10 deletions python/datafusion/object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,6 @@
GoogleCloud = object_store.GoogleCloud
LocalFileSystem = object_store.LocalFileSystem
MicrosoftAzure = object_store.MicrosoftAzure
Http = object_store.Http

__all__ = [
"AmazonS3",
"GoogleCloud",
"LocalFileSystem",
"MicrosoftAzure",
]


def __getattr__(name):
return getattr(object_store, name)
__all__ = ["AmazonS3", "GoogleCloud", "LocalFileSystem", "MicrosoftAzure", "Http"]
12 changes: 10 additions & 2 deletions python/tests/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from pyarrow.csv import write_csv
import pyarrow.dataset as ds
import pytest
from datafusion.object_store import LocalFileSystem
from datafusion.object_store import Http

from datafusion import udf, col

Expand Down Expand Up @@ -139,6 +139,15 @@ def test_register_csv_list(ctx, tmp_path):
assert int_sum == 2 * sum(int_values)


def test_register_http_csv(ctx):
url = "https://raw.githubusercontent.com/ibis-project/testing-data/refs/heads/master/csv/diamonds.csv"
ctx.register_object_store("", Http(url))
ctx.register_csv("remote", url)
assert ctx.table_exist("remote")
res, *_ = ctx.sql("SELECT COUNT(*) AS total FROM remote").to_pylist()
assert res["total"] > 0


def test_register_parquet(ctx, tmp_path):
path = helpers.write_parquet(tmp_path / "a.parquet", helpers.data())
ctx.register_parquet("t", path)
Expand Down Expand Up @@ -494,7 +503,6 @@ def test_register_listing_table(

dir_root = f"file://{dir_root}/" if path_to_str else dir_root

ctx.register_object_store("file://local", LocalFileSystem(), None)
ctx.register_listing_table(
"my_table",
dir_root,
Expand Down
10 changes: 2 additions & 8 deletions python/tests/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,15 @@
# under the License.

import os

import pytest

from datafusion import SessionContext
from datafusion.object_store import LocalFileSystem


@pytest.fixture
def local():
return LocalFileSystem()


@pytest.fixture
def ctx(local):
def ctx():
ctx = SessionContext()
ctx.register_object_store("file://local", local, None)
return ctx


Expand Down
2 changes: 1 addition & 1 deletion python/tests/test_wrapper_coverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def missing_exports(internal_obj, wrapped_obj) -> None:


def test_datafusion_missing_exports() -> None:
"""Check for any missing pythone exports.
"""Check for any missing python exports.
This test verifies that every exposed class, attribute, and function in
the internal (pyo3) module is also exposed in our python wrappers.
Expand Down
1 change: 1 addition & 0 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ impl PySessionContext {
StorageContexts::GoogleCloudStorage(gcs) => (gcs.inner, gcs.bucket_name),
StorageContexts::MicrosoftAzure(azure) => (azure.inner, azure.container_name),
StorageContexts::LocalFileSystem(local) => (local.inner, "".to_string()),
StorageContexts::HTTP(http) => (http.store, http.url),
};

// let users override the host to match the api signature from upstream
Expand Down
31 changes: 31 additions & 0 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@ use pyo3::prelude::*;
use object_store::aws::{AmazonS3, AmazonS3Builder};
use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder};
use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder};
use object_store::http::{HttpBuilder, HttpStore};
use object_store::local::LocalFileSystem;
use pyo3::exceptions::PyValueError;
use url::Url;

#[derive(FromPyObject)]
pub enum StorageContexts {
AmazonS3(PyAmazonS3Context),
GoogleCloudStorage(PyGoogleCloudContext),
MicrosoftAzure(PyMicrosoftAzureContext),
LocalFileSystem(PyLocalFileSystemContext),
HTTP(PyHttpContext),
}

#[pyclass(name = "LocalFileSystem", module = "datafusion.store", subclass)]
Expand Down Expand Up @@ -219,10 +223,37 @@ impl PyAmazonS3Context {
}
}

#[pyclass(name = "Http", module = "datafusion.store", subclass)]
#[derive(Debug, Clone)]
pub struct PyHttpContext {
pub url: String,
pub store: Arc<HttpStore>,
}

#[pymethods]
impl PyHttpContext {
#[new]
fn new(url: String) -> PyResult<Self> {
let store = match Url::parse(url.as_str()) {
Ok(url) => HttpBuilder::new()
.with_url(url.origin().ascii_serialization())
.build(),
Err(_) => HttpBuilder::new().build(),
}
.map_err(|e| PyValueError::new_err(format!("Error: {:?}", e.to_string())))?;

Ok(Self {
url,
store: Arc::new(store),
})
}
}

pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyAmazonS3Context>()?;
m.add_class::<PyMicrosoftAzureContext>()?;
m.add_class::<PyGoogleCloudContext>()?;
m.add_class::<PyLocalFileSystemContext>()?;
m.add_class::<PyHttpContext>()?;
Ok(())
}

0 comments on commit 6c8292f

Please sign in to comment.