Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add predicate hints and json predicate hints parameters #295

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion python/delta_sharing/delta_sharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
#
from itertools import chain
from typing import BinaryIO, List, Optional, Sequence, TextIO, Tuple, Union
from typing import Any, BinaryIO, Dict, List, Optional, Sequence, TextIO, Tuple, Union
from pathlib import Path

import pandas as pd
Expand Down Expand Up @@ -53,6 +53,8 @@ def _parse_url(url: str) -> Tuple[str, str, str, str]:

def load_as_pandas(
url: str,
jsonPredicateHints: Optional[Dict[str, Any]] = None,
predicateHints: Optional[Sequence[str]] = None,
limit: Optional[int] = None,
version: Optional[int] = None,
timestamp: Optional[str] = None
Expand All @@ -72,6 +74,8 @@ def load_as_pandas(
return DeltaSharingReader(
table=Table(name=table, share=share, schema=schema),
rest_client=DataSharingRestClient(profile),
jsonPredicateHints=jsonPredicateHints,
predicateHints=predicateHints,
limit=limit,
version=version,
timestamp=timestamp
Expand Down
19 changes: 19 additions & 0 deletions python/delta_sharing/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(
table: Table,
rest_client: DataSharingRestClient,
*,
jsonPredicateHints: Optional[Dict[str, Any]] = None,
predicateHints: Optional[Sequence[str]] = None,
limit: Optional[int] = None,
version: Optional[int] = None,
Expand All @@ -40,6 +41,7 @@ def __init__(
self._table = table
self._rest_client = rest_client

self._jsonPredicateHints = jsonPredicateHints
if predicateHints is not None:
assert isinstance(predicateHints, Sequence)
assert all(isinstance(predicateHint, str) for predicateHint in predicateHints)
Expand All @@ -55,8 +57,21 @@ def __init__(
def table(self) -> Table:
return self._table

def jsonPredicateHints(
self,
jsonPredicateHints: Optional[Dict[str, Any]]
) -> "DeltaSharingReader":
return self._copy(
jsonPredicateHints=jsonPredicateHints,
predicateHints=self._predicateHints,
limit=self._limit,
version=self._version,
timestamp=self._timestamp
)

def predicateHints(self, predicateHints: Optional[Sequence[str]]) -> "DeltaSharingReader":
return self._copy(
jsonPredicateHints=self._jsonPredicateHints,
predicateHints=predicateHints,
limit=self._limit,
version=self._version,
Expand All @@ -65,6 +80,7 @@ def predicateHints(self, predicateHints: Optional[Sequence[str]]) -> "DeltaShari

def limit(self, limit: Optional[int]) -> "DeltaSharingReader":
return self._copy(
jsonPredicateHints=self._jsonPredicateHints,
predicateHints=self._predicateHints,
limit=limit,
version=self._version,
Expand All @@ -74,6 +90,7 @@ def limit(self, limit: Optional[int]) -> "DeltaSharingReader":
def to_pandas(self) -> pd.DataFrame:
response = self._rest_client.list_files_in_table(
self._table,
jsonPredicateHints=self._jsonPredicateHints,
predicateHints=self._predicateHints,
limitHint=self._limit,
version=self._version,
Expand Down Expand Up @@ -131,6 +148,7 @@ def table_changes_to_pandas(self, cdfOptions: CdfOptions) -> pd.DataFrame:
def _copy(
self,
*,
jsonPredicateHints: Optional[Dict[str, Any]],
predicateHints: Optional[Sequence[str]],
limit: Optional[int],
version: Optional[int],
Expand All @@ -139,6 +157,7 @@ def _copy(
return DeltaSharingReader(
table=self._table,
rest_client=self._rest_client,
jsonPredicateHints=jsonPredicateHints,
predicateHints=predicateHints,
limit=limit,
version=version,
Expand Down
3 changes: 3 additions & 0 deletions python/delta_sharing/rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,15 @@ def list_files_in_table(
self,
table: Table,
*,
jsonPredicateHints: Optional[Dict[str, Any]] = None,
predicateHints: Optional[Sequence[str]] = None,
limitHint: Optional[int] = None,
version: Optional[int] = None,
timestamp: Optional[str] = None,
) -> ListFilesInTableResponse:
data: Dict = {}
if jsonPredicateHints is not None:
data["jsonPredicateHints"] = jsonPredicateHints
if predicateHints is not None:
data["predicateHints"] = predicateHints
if limitHint is not None:
Expand Down
66 changes: 61 additions & 5 deletions python/delta_sharing/tests/test_delta_sharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
#
from datetime import date, datetime
from typing import Optional, Sequence
from typing import Any, Dict, Optional, Sequence

import pandas as pd
import pytest
Expand Down Expand Up @@ -149,12 +149,14 @@ def list_all_tables(

@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE)
@pytest.mark.parametrize(
"fragments,limit,version,expected",
"fragments,jsonPredicateHints,predicateHints,limit,version,expected",
[
pytest.param(
"share1.default.table1",
None,
None,
None,
None,
pd.DataFrame(
{
"eventTime": [
Expand All @@ -170,6 +172,8 @@ def list_all_tables(
"share2.default.table2",
None,
None,
None,
None,
pd.DataFrame(
{
"eventTime": [
Expand All @@ -185,6 +189,8 @@ def list_all_tables(
"share1.default.table3",
None,
None,
None,
None,
pd.DataFrame(
{
"eventTime": [
Expand All @@ -200,6 +206,8 @@ def list_all_tables(
),
pytest.param(
"share1.default.table3",
None,
None,
0,
None,
pd.DataFrame(
Expand All @@ -213,6 +221,8 @@ def list_all_tables(
),
pytest.param(
"share1.default.table3",
None,
None,
1,
None,
pd.DataFrame(
Expand All @@ -226,6 +236,8 @@ def list_all_tables(
),
pytest.param(
"share1.default.table3",
None,
None,
2,
None,
pd.DataFrame(
Expand All @@ -242,6 +254,8 @@ def list_all_tables(
),
pytest.param(
"share1.default.table3",
None,
None,
3,
None,
pd.DataFrame(
Expand All @@ -259,6 +273,8 @@ def list_all_tables(
),
pytest.param(
"share1.default.table3",
None,
None,
4,
None,
pd.DataFrame(
Expand All @@ -278,6 +294,8 @@ def list_all_tables(
"share8.default.cdf_table_cdf_enabled",
None,
None,
None,
None,
pd.DataFrame(
{
"name": ["1", "2"],
Expand All @@ -290,6 +308,8 @@ def list_all_tables(
pytest.param(
"share8.default.cdf_table_cdf_enabled",
None,
None,
None,
1,
pd.DataFrame(
{
Expand All @@ -304,6 +324,8 @@ def list_all_tables(
"share3.default.table4",
None,
None,
None,
None,
pd.DataFrame(
{
"type": [None, None],
Expand All @@ -320,13 +342,17 @@ def list_all_tables(
"share4.default.test_gzip",
None,
None,
None,
None,
pd.DataFrame({"a": [True], "b": pd.Series([1], dtype="int32"), "c": ["Hi"]}),
id="table column order is not the same as parquet files",
),
pytest.param(
"share_azure.default.table_wasb",
None,
None,
None,
None,
pd.DataFrame(
{
"c1": ["foo bar"],
Expand All @@ -339,6 +365,8 @@ def list_all_tables(
"share_azure.default.table_abfs",
None,
None,
None,
None,
pd.DataFrame(
{
"c1": ["foo bar"],
Expand All @@ -351,6 +379,8 @@ def list_all_tables(
"share_gcp.default.table_gcs",
None,
None,
None,
None,
pd.DataFrame(
{
"c1": ["foo bar"],
Expand All @@ -364,20 +394,31 @@ def list_all_tables(
def test_load_as_pandas_success(
profile_path: str,
fragments: str,
jsonPredicateHints: Optional[Dict[str, Any]],
predicateHints: Optional[Sequence[str]],
limit: Optional[int],
version: Optional[int],
expected: pd.DataFrame
):
pdf = load_as_pandas(f"{profile_path}#{fragments}", limit, version, None)
pdf = load_as_pandas(
f"{profile_path}#{fragments}",
jsonPredicateHints,
predicateHints,
limit,
version,
None
)
pd.testing.assert_frame_equal(pdf, expected)


@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE)
@pytest.mark.parametrize(
"fragments,version,timestamp,error",
"fragments,jsonPredicateHints,predicateHints,version,timestamp,error",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a unit test that specifies these predicate hints and verifies that they are used correctly?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chakankardb
Thank you for the comment, and yes, I have to add an unit test to check if these new parameters are used correctly. I will ping you again when I add the unit test.

[
pytest.param(
"share1.default.table1",
None,
None,
1,
None,
"Reading table by version or timestamp is not supported",
Expand All @@ -386,12 +427,16 @@ def test_load_as_pandas_success(
pytest.param(
"share1.default.table1",
None,
None,
None,
"random_timestamp",
"Reading table by version or timestamp is not supported",
id="timestamp not supported",
),
pytest.param(
"share8.default.cdf_table_cdf_enabled",
None,
None,
1,
"random_timestamp",
"Please only provide one of",
Expand All @@ -400,6 +445,8 @@ def test_load_as_pandas_success(
pytest.param(
"share8.default.cdf_table_cdf_enabled",
None,
None,
None,
"2000-01-01T00:00:00Z",
"Please use a timestamp greater",
id="timestamp too early ",
Expand All @@ -409,12 +456,21 @@ def test_load_as_pandas_success(
def test_load_as_pandas_exception(
profile_path: str,
fragments: str,
jsonPredicateHints: Optional[Dict[str, Any]],
predicateHints: Optional[Sequence[str]],
version: Optional[int],
timestamp: Optional[str],
error: Optional[str]
):
try:
load_as_pandas(f"{profile_path}#{fragments}", None, version, timestamp)
load_as_pandas(
f"{profile_path}#{fragments}",
jsonPredicateHints,
predicateHints,
None,
version,
timestamp
)
assert False
except Exception as e:
assert isinstance(e, HTTPError)
Expand Down
15 changes: 13 additions & 2 deletions python/delta_sharing/tests/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@
import pytest

from datetime import date
from typing import Optional, Sequence
from typing import Any, Dict, Optional, Sequence

import pandas as pd

from delta_sharing.protocol import AddFile, AddCdcFile, CdfOptions, Metadata, RemoveFile, Table
from delta_sharing.protocol import (
AddFile,
AddCdcFile,
CdfOptions,
Metadata,
RemoveFile,
Table,
)
from delta_sharing.reader import DeltaSharingReader
from delta_sharing.rest_client import (
ListFilesInTableResponse,
Expand All @@ -42,6 +49,7 @@ def list_files_in_table(
self,
table: Table,
*,
jsonPredicateHints: Optional[Dict[str, Any]] = None,
predicateHints: Optional[Sequence[str]] = None,
limitHint: Optional[int] = None,
version: Optional[int] = None,
Expand Down Expand Up @@ -97,6 +105,7 @@ def list_files_in_table(
self,
table: Table,
*,
jsonPredicateHints: Optional[Dict[str, Any]] = None,
predicateHints: Optional[Sequence[str]] = None,
limitHint: Optional[int] = None,
version: Optional[int] = None,
Expand Down Expand Up @@ -156,6 +165,7 @@ def list_files_in_table(
self,
table: Table,
*,
jsonPredicateHints: Optional[Dict[str, Any]] = None,
predicateHints: Optional[Sequence[str]] = None,
limitHint: Optional[int] = None,
version: Optional[int] = None,
Expand Down Expand Up @@ -211,6 +221,7 @@ def list_files_in_table(
self,
table: Table,
*,
jsonPredicateHints: Optional[Dict[str, Any]] = None,
predicateHints: Optional[Sequence[str]] = None,
limitHint: Optional[int] = None,
version: Optional[int] = None,
Expand Down