diff --git a/python/delta_sharing/reader.py b/python/delta_sharing/reader.py index a927f37d4..2eb2ab8a3 100644 --- a/python/delta_sharing/reader.py +++ b/python/delta_sharing/reader.py @@ -34,6 +34,7 @@ def __init__( rest_client: DataSharingRestClient, *, predicateHints: Optional[Sequence[str]] = None, + jsonPredicateHints: Optional[str] = None, limit: Optional[int] = None, version: Optional[int] = None, timestamp: Optional[str] = None, @@ -45,6 +46,7 @@ def __init__( assert isinstance(predicateHints, Sequence) assert all(isinstance(predicateHint, str) for predicateHint in predicateHints) self._predicateHints = predicateHints + self._jsonPredicateHints = jsonPredicateHints if limit is not None: assert isinstance(limit, int) and limit >= 0, "'limit' must be a non-negative int" @@ -59,6 +61,16 @@ def table(self) -> Table: def predicateHints(self, predicateHints: Optional[Sequence[str]]) -> "DeltaSharingReader": return self._copy( predicateHints=predicateHints, + jsonPredicateHints=self._jsonPredicateHints, + limit=self._limit, + version=self._version, + timestamp=self._timestamp + ) + + def jsonPredicateHints(self, jsonPredicateHints: Optional[str]) -> "DeltaSharingReader": + return self._copy( + predicateHints=self._predicateHints, + jsonPredicateHints=jsonPredicateHints, limit=self._limit, version=self._version, timestamp=self._timestamp @@ -67,6 +79,7 @@ def predicateHints(self, predicateHints: Optional[Sequence[str]]) -> "DeltaShari def limit(self, limit: Optional[int]) -> "DeltaSharingReader": return self._copy( predicateHints=self._predicateHints, + jsonPredicateHints=self._jsonPredicateHints, limit=limit, version=self._version, timestamp=self._timestamp @@ -76,6 +89,7 @@ def to_pandas(self) -> pd.DataFrame: response = self._rest_client.list_files_in_table( self._table, predicateHints=self._predicateHints, + jsonPredicateHints=self._jsonPredicateHints, limitHint=self._limit, version=self._version, timestamp=self._timestamp @@ -139,6 +153,7 @@ def _copy( self, *, predicateHints: Optional[Sequence[str]], + jsonPredicateHints: Optional[str], limit: Optional[int], version: Optional[int], timestamp: Optional[str] diff --git a/python/delta_sharing/rest_client.py b/python/delta_sharing/rest_client.py index 796ffd233..ebee9edfe 100644 --- a/python/delta_sharing/rest_client.py +++ b/python/delta_sharing/rest_client.py @@ -322,6 +322,7 @@ def list_files_in_table( table: Table, *, predicateHints: Optional[Sequence[str]] = None, + jsonPredicateHints: Optional[str] = None, limitHint: Optional[int] = None, version: Optional[int] = None, timestamp: Optional[str] = None, @@ -329,6 +330,8 @@ def list_files_in_table( data: Dict = {} if predicateHints is not None: data["predicateHints"] = predicateHints + if jsonPredicateHints is not None: + data["jsonPredicateHints"] = jsonPredicateHints if limitHint is not None: data["limitHint"] = limitHint if version is not None: diff --git a/python/delta_sharing/tests/test_reader.py b/python/delta_sharing/tests/test_reader.py index bf59e52de..8247a6d3a 100644 --- a/python/delta_sharing/tests/test_reader.py +++ b/python/delta_sharing/tests/test_reader.py @@ -43,6 +43,7 @@ def list_files_in_table( table: Table, *, predicateHints: Optional[Sequence[str]] = None, + jsonPredicateHints: Optional[str] = None, limitHint: Optional[int] = None, version: Optional[int] = None, timestamp: Optional[int] = None, @@ -57,31 +58,53 @@ def list_files_in_table( '],"type":"struct"}' ) ) - add_files = [ - AddFile( - url=str(tmp_path / "pdf1.parquet"), - id="pdf1", - partition_values={}, - size=0, - stats="", - ), - AddFile( - url=str(tmp_path / "pdf2.parquet"), - id="pdf2", - partition_values={}, - size=0, - stats="", - ), - ] + add_file1 = AddFile( + url=str(tmp_path / "pdf1.parquet"), + id="pdf1", + partition_values={}, + size=0, + stats="", + ) + add_file2 = AddFile( + url=str(tmp_path / "pdf2.parquet"), + id="pdf2", + partition_values={}, + size=0, + stats="", + ) + + add_files = [add_file1, add_file2] + + if jsonPredicateHints is not None: + add_files = [add_file1] + elif predicateHints is not None: + add_files = [add_file2] + return ListFilesInTableResponse( delta_table_version=1, protocol=None, metadata=metadata, add_files=add_files ) reader = DeltaSharingReader(Table("table_name", "share_name", "schema_name"), RestClientMock()) pdf = reader.to_pandas() - expected = pd.concat([pdf1, pdf2]).reset_index(drop=True) + pd.testing.assert_frame_equal(pdf, expected) + reader = DeltaSharingReader( + Table("table_name", "share_name", "schema_name"), + RestClientMock(), + jsonPredicateHints="dummy_hints" + ) + pdf = reader.to_pandas() + expected = pd.concat([pdf1]).reset_index(drop=True) + pd.testing.assert_frame_equal(pdf, expected) + + reader = DeltaSharingReader( + Table("table_name", "share_name", "schema_name"), + RestClientMock(), + predicateHints="dummy_hints" + ) + pdf = reader.to_pandas() + expected = pd.concat([pdf2]).reset_index(drop=True) pd.testing.assert_frame_equal(pdf, expected) @@ -98,6 +121,7 @@ def list_files_in_table( table: Table, *, predicateHints: Optional[Sequence[str]] = None, + jsonPredicateHints: Optional[str] = None, limitHint: Optional[int] = None, version: Optional[int] = None, timestamp: Optional[int] = None, @@ -157,6 +181,7 @@ def list_files_in_table( table: Table, *, predicateHints: Optional[Sequence[str]] = None, + jsonPredicateHints: Optional[str] = None, limitHint: Optional[int] = None, version: Optional[int] = None, timestamp: Optional[int] = None, @@ -212,6 +237,7 @@ def list_files_in_table( table: Table, *, predicateHints: Optional[Sequence[str]] = None, + jsonPredicateHints: Optional[str] = None, limitHint: Optional[int] = None, version: Optional[int] = None, timestamp: Optional[int] = None, diff --git a/python/delta_sharing/tests/test_rest_client.py b/python/delta_sharing/tests/test_rest_client.py index c2022ca05..4d07214dd 100644 --- a/python/delta_sharing/tests/test_rest_client.py +++ b/python/delta_sharing/tests/test_rest_client.py @@ -481,6 +481,75 @@ def test_list_files_in_table_version( ] +@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) +def test_list_files_with_json_predicates( + rest_client: DataSharingRestClient, +): + # This function invokes the client with the specified json predicate hints + # on the test table which is partitioned by date. The function validates + # that the retuned files contains the expected dates. + def test_hints(hints, expected_dates): + response = rest_client.list_files_in_table( + Table(name="cdf_table_with_partition", share="share8", schema="default"), + jsonPredicateHints=hints + ) + assert response.protocol == Protocol(min_reader_version=1) + assert response.metadata == Metadata( + id="e21eb083-6976-4159-90f2-ad88d06b7c7f", + format=Format(provider="parquet", options={}), + schema_string=( + '{"type":"struct","fields":[' + '{"name":"name","type":"string","nullable":true,"metadata":{}},' + '{"name":"age","type":"integer","nullable":true,"metadata":{}},' + '{"name":"birthday","type":"date","nullable":true,"metadata":{}}' + "]}" + ), + configuration={"enableChangeDataFeed": "true"}, + partition_columns=["birthday"] + ) + + # validate that we get back all expected files. + assert len(expected_dates) == len(response.add_files) + for date in expected_dates: + found = False + for file in response.add_files: + if date in file.url: + found = True + assert found + + # Without predicates, we should get back all the files. + test_hints(None, ["2020-01-01", "2020-02-02"]) + + # These predicates should return file with date 2020-01-01. + hints1 = ( + '{"op":"and","children":[' + '{"op":"not","children":[' + '{"op":"isNull","children":[' + '{"op":"column","name":"birthday","valueType":"date"}]}]},' + '{"op":"equal","children":[' + '{"op":"column","name":"birthday","valueType":"date"},' + '{"op":"literal","value":"2020-01-01","valueType":"date"}]}' + ']}' + ) + test_hints(hints1, ["2020-01-01"]) + + # These predicates should return file with date 2020-02-02. + hints2 = ( + '{"op":"and","children":[' + '{"op":"not","children":[' + '{"op":"isNull","children":[' + '{"op":"column","name":"birthday","valueType":"date"}]}]},' + '{"op":"greaterThan","children":[' + '{"op":"column","name":"birthday","valueType":"date"},' + '{"op":"literal","value":"2020-01-01","valueType":"date"}]}' + ']}' + ) + test_hints(hints2, ["2020-02-02"]) + + # Invalid predicates should return all the files. + test_hints("bad-predicates", ["2020-01-01", "2020-02-02"]) + + @pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) def test_list_files_in_table_version_exception( rest_client: DataSharingRestClient, diff --git a/server/src/test/scala/io/delta/sharing/server/TestResource.scala b/server/src/test/scala/io/delta/sharing/server/TestResource.scala index 3f84e8004..95e16cec8 100644 --- a/server/src/test/scala/io/delta/sharing/server/TestResource.scala +++ b/server/src/test/scala/io/delta/sharing/server/TestResource.scala @@ -304,6 +304,7 @@ object TestResource { serverConfig.setPort(TEST_PORT) serverConfig.setSsl(SSLConfig(selfSigned = true, null, null, null)) serverConfig.setEvaluatePredicateHints(true) + serverConfig.setEvaluateJsonPredicateHints(true) serverConfig.save(testConfigFile.getCanonicalPath) testConfigFile