Skip to content

Commit

Permalink
Support json predicates in python/pandas connector. (#396)
Browse files Browse the repository at this point in the history
* Support json predicates in python/pandas connector.

* fix lint.

* fix lint.
  • Loading branch information
chakankardb authored Sep 14, 2023
1 parent d9ef1d8 commit 9b7c7c4
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 17 deletions.
15 changes: 15 additions & 0 deletions python/delta_sharing/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def __init__(
rest_client: DataSharingRestClient,
*,
predicateHints: Optional[Sequence[str]] = None,
jsonPredicateHints: Optional[str] = None,
limit: Optional[int] = None,
version: Optional[int] = None,
timestamp: Optional[str] = None,
Expand All @@ -45,6 +46,7 @@ def __init__(
assert isinstance(predicateHints, Sequence)
assert all(isinstance(predicateHint, str) for predicateHint in predicateHints)
self._predicateHints = predicateHints
self._jsonPredicateHints = jsonPredicateHints

if limit is not None:
assert isinstance(limit, int) and limit >= 0, "'limit' must be a non-negative int"
Expand All @@ -59,6 +61,16 @@ def table(self) -> Table:
def predicateHints(self, predicateHints: Optional[Sequence[str]]) -> "DeltaSharingReader":
return self._copy(
predicateHints=predicateHints,
jsonPredicateHints=self._jsonPredicateHints,
limit=self._limit,
version=self._version,
timestamp=self._timestamp
)

def jsonPredicateHints(self, jsonPredicateHints: Optional[str]) -> "DeltaSharingReader":
return self._copy(
predicateHints=self._predicateHints,
jsonPredicateHints=jsonPredicateHints,
limit=self._limit,
version=self._version,
timestamp=self._timestamp
Expand All @@ -67,6 +79,7 @@ def predicateHints(self, predicateHints: Optional[Sequence[str]]) -> "DeltaShari
def limit(self, limit: Optional[int]) -> "DeltaSharingReader":
return self._copy(
predicateHints=self._predicateHints,
jsonPredicateHints=self._jsonPredicateHints,
limit=limit,
version=self._version,
timestamp=self._timestamp
Expand All @@ -76,6 +89,7 @@ def to_pandas(self) -> pd.DataFrame:
response = self._rest_client.list_files_in_table(
self._table,
predicateHints=self._predicateHints,
jsonPredicateHints=self._jsonPredicateHints,
limitHint=self._limit,
version=self._version,
timestamp=self._timestamp
Expand Down Expand Up @@ -139,6 +153,7 @@ def _copy(
self,
*,
predicateHints: Optional[Sequence[str]],
jsonPredicateHints: Optional[str],
limit: Optional[int],
version: Optional[int],
timestamp: Optional[str]
Expand Down
3 changes: 3 additions & 0 deletions python/delta_sharing/rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,13 +322,16 @@ def list_files_in_table(
table: Table,
*,
predicateHints: Optional[Sequence[str]] = None,
jsonPredicateHints: Optional[str] = None,
limitHint: Optional[int] = None,
version: Optional[int] = None,
timestamp: Optional[str] = None,
) -> ListFilesInTableResponse:
data: Dict = {}
if predicateHints is not None:
data["predicateHints"] = predicateHints
if jsonPredicateHints is not None:
data["jsonPredicateHints"] = jsonPredicateHints
if limitHint is not None:
data["limitHint"] = limitHint
if version is not None:
Expand Down
60 changes: 43 additions & 17 deletions python/delta_sharing/tests/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def list_files_in_table(
table: Table,
*,
predicateHints: Optional[Sequence[str]] = None,
jsonPredicateHints: Optional[str] = None,
limitHint: Optional[int] = None,
version: Optional[int] = None,
timestamp: Optional[int] = None,
Expand All @@ -57,31 +58,53 @@ def list_files_in_table(
'],"type":"struct"}'
)
)
add_files = [
AddFile(
url=str(tmp_path / "pdf1.parquet"),
id="pdf1",
partition_values={},
size=0,
stats="",
),
AddFile(
url=str(tmp_path / "pdf2.parquet"),
id="pdf2",
partition_values={},
size=0,
stats="",
),
]
add_file1 = AddFile(
url=str(tmp_path / "pdf1.parquet"),
id="pdf1",
partition_values={},
size=0,
stats="",
)
add_file2 = AddFile(
url=str(tmp_path / "pdf2.parquet"),
id="pdf2",
partition_values={},
size=0,
stats="",
)

add_files = [add_file1, add_file2]

if jsonPredicateHints is not None:
add_files = [add_file1]
elif predicateHints is not None:
add_files = [add_file2]

return ListFilesInTableResponse(
delta_table_version=1, protocol=None, metadata=metadata, add_files=add_files
)

reader = DeltaSharingReader(Table("table_name", "share_name", "schema_name"), RestClientMock())
pdf = reader.to_pandas()

expected = pd.concat([pdf1, pdf2]).reset_index(drop=True)
pd.testing.assert_frame_equal(pdf, expected)

reader = DeltaSharingReader(
Table("table_name", "share_name", "schema_name"),
RestClientMock(),
jsonPredicateHints="dummy_hints"
)
pdf = reader.to_pandas()
expected = pd.concat([pdf1]).reset_index(drop=True)
pd.testing.assert_frame_equal(pdf, expected)

reader = DeltaSharingReader(
Table("table_name", "share_name", "schema_name"),
RestClientMock(),
predicateHints="dummy_hints"
)
pdf = reader.to_pandas()
expected = pd.concat([pdf2]).reset_index(drop=True)
pd.testing.assert_frame_equal(pdf, expected)


Expand All @@ -98,6 +121,7 @@ def list_files_in_table(
table: Table,
*,
predicateHints: Optional[Sequence[str]] = None,
jsonPredicateHints: Optional[str] = None,
limitHint: Optional[int] = None,
version: Optional[int] = None,
timestamp: Optional[int] = None,
Expand Down Expand Up @@ -157,6 +181,7 @@ def list_files_in_table(
table: Table,
*,
predicateHints: Optional[Sequence[str]] = None,
jsonPredicateHints: Optional[str] = None,
limitHint: Optional[int] = None,
version: Optional[int] = None,
timestamp: Optional[int] = None,
Expand Down Expand Up @@ -212,6 +237,7 @@ def list_files_in_table(
table: Table,
*,
predicateHints: Optional[Sequence[str]] = None,
jsonPredicateHints: Optional[str] = None,
limitHint: Optional[int] = None,
version: Optional[int] = None,
timestamp: Optional[int] = None,
Expand Down
69 changes: 69 additions & 0 deletions python/delta_sharing/tests/test_rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,75 @@ def test_list_files_in_table_version(
]


@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE)
def test_list_files_with_json_predicates(
rest_client: DataSharingRestClient,
):
# This function invokes the client with the specified json predicate hints
# on the test table which is partitioned by date. The function validates
# that the retuned files contains the expected dates.
def test_hints(hints, expected_dates):
response = rest_client.list_files_in_table(
Table(name="cdf_table_with_partition", share="share8", schema="default"),
jsonPredicateHints=hints
)
assert response.protocol == Protocol(min_reader_version=1)
assert response.metadata == Metadata(
id="e21eb083-6976-4159-90f2-ad88d06b7c7f",
format=Format(provider="parquet", options={}),
schema_string=(
'{"type":"struct","fields":['
'{"name":"name","type":"string","nullable":true,"metadata":{}},'
'{"name":"age","type":"integer","nullable":true,"metadata":{}},'
'{"name":"birthday","type":"date","nullable":true,"metadata":{}}'
"]}"
),
configuration={"enableChangeDataFeed": "true"},
partition_columns=["birthday"]
)

# validate that we get back all expected files.
assert len(expected_dates) == len(response.add_files)
for date in expected_dates:
found = False
for file in response.add_files:
if date in file.url:
found = True
assert found

# Without predicates, we should get back all the files.
test_hints(None, ["2020-01-01", "2020-02-02"])

# These predicates should return file with date 2020-01-01.
hints1 = (
'{"op":"and","children":['
'{"op":"not","children":['
'{"op":"isNull","children":['
'{"op":"column","name":"birthday","valueType":"date"}]}]},'
'{"op":"equal","children":['
'{"op":"column","name":"birthday","valueType":"date"},'
'{"op":"literal","value":"2020-01-01","valueType":"date"}]}'
']}'
)
test_hints(hints1, ["2020-01-01"])

# These predicates should return file with date 2020-02-02.
hints2 = (
'{"op":"and","children":['
'{"op":"not","children":['
'{"op":"isNull","children":['
'{"op":"column","name":"birthday","valueType":"date"}]}]},'
'{"op":"greaterThan","children":['
'{"op":"column","name":"birthday","valueType":"date"},'
'{"op":"literal","value":"2020-01-01","valueType":"date"}]}'
']}'
)
test_hints(hints2, ["2020-02-02"])

# Invalid predicates should return all the files.
test_hints("bad-predicates", ["2020-01-01", "2020-02-02"])


@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE)
def test_list_files_in_table_version_exception(
rest_client: DataSharingRestClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ object TestResource {
serverConfig.setPort(TEST_PORT)
serverConfig.setSsl(SSLConfig(selfSigned = true, null, null, null))
serverConfig.setEvaluatePredicateHints(true)
serverConfig.setEvaluateJsonPredicateHints(true)

serverConfig.save(testConfigFile.getCanonicalPath)
testConfigFile
Expand Down

0 comments on commit 9b7c7c4

Please sign in to comment.