Skip to content

Commit

Permalink
Conflicts resolved
Browse files Browse the repository at this point in the history
  • Loading branch information
ognis1205 committed May 30, 2023
2 parents 45e016c + 44bab62 commit c2f80f2
Show file tree
Hide file tree
Showing 29 changed files with 1,372 additions and 163 deletions.
21 changes: 16 additions & 5 deletions PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -1933,7 +1933,10 @@ The request body should be a JSON string containing the following optional field

- **timestamp** (type: String, optional): an optional timestamp string in the [Timestamp Format](#timestamp-format),. If set, will return files as of the table version corresponding to the specified timestamp. This is only supported on tables with history sharing enabled.

- **startingVersion** (type: Long, optional): an optional version number. If set, will return all data change files since startingVersion, including historical metadata if seen in the delta log.
- **startingVersion** (type: Long, optional): an optional version number. If set, will return all data change files since startingVersion, inclusive, including historical metadata if seen in the delta log.

- **endingVersion** (type: Long, optional): an optional version number, only used if startingVersion is set. If set, the server can use it as a hint to avoid returning data change files after `endingVersion`. This is not enforcement. Hence, when sending the `endingVersion` parameter, the client should still handle the case that it may receive files after `endingVersion`.
- The combination of `statingVersion` and `endingVersion` can be used as query window for delta sharing streaming rpcs.

When `predicateHints` and `limitHint` are both present, the server should apply `predicateHints` first then `limitHint`. As these two parameters are hints rather than enforcement, the client must always apply `predicateHints` and `limitHint` on the response returned by the server if it wishes to filter and limit the returned data. An empty JSON object (`{}`) should be provided when these two parameters are missing.

Expand Down Expand Up @@ -2411,6 +2414,7 @@ size | Long | The size of this file in bytes. | Required
stats | String | Contains statistics (e.g., count, min/max values for columns) about the data in this file. This field may be missing. A file may or may not have stats. This is a serialized JSON string which can be deserialized to a [Statistics Struct](#per-file-statistics). A client can decide whether to use stats or drop it. | Optional
version | Long | The table version of the file, returned when querying a table data with a version or timestamp parameter. | Optional
timestamp | Long | The unix timestamp corresponding to the table version of the file, in milliseconds, returned when querying a table data with a version or timestamp parameter. | Optional
expirationTimestamp | Long | The unix timestamp corresponding to the expiration of the url, in milliseconds, returned when the server supports the feature. | Optional

Example (for illustration purposes; each JSON object must be a single line in the response):

Expand All @@ -2423,7 +2427,8 @@ Example (for illustration purposes; each JSON object must be a single line in th
"partitionValues": {
"date": "2021-04-28"
},
"stats": "{\"numRecords\":1,\"minValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"maxValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"nullCount\":{\"eventTime\":0}}"
"stats": "{\"numRecords\":1,\"minValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"maxValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"nullCount\":{\"eventTime\":0}}",
"expirationTimestamp": 1652140800000
}
}
```
Expand All @@ -2440,6 +2445,7 @@ size | Long | The size of this file in bytes. | Required
timestamp | Long | The timestamp of the file in milliseconds from epoch. | Required
version | Int32 | The table version of this file. | Required
stats | String | Contains statistics (e.g., count, min/max values for columns) about the data in this file. This field may be missing. A file may or may not have stats. This is a serialized JSON string which can be deserialized to a [Statistics Struct](#per-file-statistics). A client can decide whether to use stats or drop it. | Optional
expirationTimestamp | Long | The unix timestamp corresponding to the expiration of the url, in milliseconds, returned when the server supports the feature. | Optional

Example (for illustration purposes; each JSON object must be a single line in the response):

Expand All @@ -2454,7 +2460,8 @@ Example (for illustration purposes; each JSON object must be a single line in th
},
"timestamp": 1652140800000,
"version": 1,
"stats": "{\"numRecords\":1,\"minValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"maxValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"nullCount\":{\"eventTime\":0}}"
"stats": "{\"numRecords\":1,\"minValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"maxValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"nullCount\":{\"eventTime\":0}}",
"expirationTimestamp": 1652144400000
}
}
```
Expand All @@ -2468,6 +2475,7 @@ partitionValues | Map<String, String> | A map from partition column to value for
size | Long | The size of this file in bytes. | Required
timestamp | Long | The timestamp of the file in milliseconds from epoch. | Required
version | Int32 | The table version of this file. | Required
expirationTimestamp | Long | The unix timestamp corresponding to the expiration of the url, in milliseconds, returned when the server supports the feature. | Optional

Example (for illustration purposes; each JSON object must be a single line in the response):

Expand All @@ -2481,7 +2489,8 @@ Example (for illustration purposes; each JSON object must be a single line in th
"date": "2021-04-28"
},
"timestamp": 1652140800000,
"version": 1
"version": 1,
"expirationTimestamp": 1652144400000
}
}
```
Expand All @@ -2495,6 +2504,7 @@ partitionValues | Map<String, String> | A map from partition column to value for
size | Long | The size of this file in bytes. | Required
timestamp | Long | The timestamp of the file in milliseconds from epoch. | Required
version | Int32 | The table version of this file. | Required
expirationTimestamp | Long | The unix timestamp corresponding to the expiration of the url, in milliseconds, returned when the server supports the feature. | Optional

Example (for illustration purposes; each JSON object must be a single line in the response):

Expand All @@ -2508,7 +2518,8 @@ Example (for illustration purposes; each JSON object must be a single line in th
"date": "2021-04-28"
},
"timestamp": 1652140800000,
"version": 1
"version": 1,
"expirationTimestamp": 1652144400000
}
}
```
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ To use Delta Sharing connector interactively within the Spark’s Scala/Python s
#### PySpark shell

```
pyspark --packages io.delta:delta-sharing-spark_2.12:0.6.2
pyspark --packages io.delta:delta-sharing-spark_2.12:0.6.4
```

#### Scala Shell

```
bin/spark-shell --packages io.delta:delta-sharing-spark_2.12:0.6.2
bin/spark-shell --packages io.delta:delta-sharing-spark_2.12:0.6.4
```

### Set up a standalone project
Expand All @@ -148,7 +148,7 @@ You include Delta Sharing connector in your Maven project by adding it as a depe
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-sharing-spark_2.12</artifactId>
<version>0.6.2</version>
<version>0.6.4</version>
</dependency>
```

Expand All @@ -157,7 +157,7 @@ You include Delta Sharing connector in your Maven project by adding it as a depe
You include Delta Sharing connector in your SBT project by adding the following line to your `build.sbt` file:

```scala
libraryDependencies += "io.delta" %% "delta-sharing-spark" % "0.6.2"
libraryDependencies += "io.delta" %% "delta-sharing-spark" % "0.6.4"
```

## Quick Start
Expand Down Expand Up @@ -489,7 +489,7 @@ You can use the pre-built docker image from https://hub.docker.com/r/deltaio/del
```
docker run -p <host-port>:<container-port> \
--mount type=bind,source=<the-server-config-yaml-file>,target=/config/delta-sharing-server-config.yaml \
deltaio/delta-sharing-server:0.6.2 -- --config /config/delta-sharing-server-config.yaml
deltaio/delta-sharing-server:0.6.4 -- --config /config/delta-sharing-server-config.yaml
```

Note that `<container-port>` should be the same as the port defined inside the config file.
Expand Down
4 changes: 4 additions & 0 deletions python/delta_sharing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#

from delta_sharing.delta_sharing import SharingClient, load_as_pandas, load_as_spark
from delta_sharing.delta_sharing import get_table_metadata, get_table_protocol, get_table_version
from delta_sharing.delta_sharing import load_table_changes_as_pandas, load_table_changes_as_spark
from delta_sharing.protocol import Share, Schema, Table
from delta_sharing.version import __version__
Expand All @@ -25,6 +26,9 @@
"Share",
"Schema",
"Table",
"get_table_metadata",
"get_table_protocol",
"get_table_version",
"load_as_pandas",
"load_as_spark",
"load_table_changes_as_pandas",
Expand Down
49 changes: 48 additions & 1 deletion python/delta_sharing/delta_sharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import pandas as pd

from delta_sharing.protocol import CdfOptions
from delta_sharing.protocol import CdfOptions, Protocol, Metadata

try:
from pyspark.sql import DataFrame as PySparkDataFrame
Expand Down Expand Up @@ -51,6 +51,53 @@ def _parse_url(url: str) -> Tuple[str, str, str, str]:
return (profile, share, schema, table)


def get_table_version(
url: str,
starting_timestamp: Optional[str] = None
) -> int:
"""
Get the shared table version using the given url.
:param url: a url under the format "<profile>#<share>.<schema>.<table>"
:param starting_timestamp: a string in the format of YYYY-MM-DDThh:mm:ssZ. Get the version at or
after the given timestamp. The latest table version will be returned if this is not specified.
"""
profile_json, share, schema, table = _parse_url(url)
profile = DeltaSharingProfile.read_from_file(profile_json)
rest_client = DataSharingRestClient(profile)
response = rest_client.query_table_version(
Table(name=table, share=share, schema=schema),
starting_timestamp
)
return response.delta_table_version


def get_table_protocol(url: str) -> Protocol:
"""
Get the shared table protocol using the given url.
:param url: a url under the format "<profile>#<share>.<schema>.<table>"
"""
profile_json, share, schema, table = _parse_url(url)
profile = DeltaSharingProfile.read_from_file(profile_json)
rest_client = DataSharingRestClient(profile)
response = rest_client.query_table_metadata(Table(name=table, share=share, schema=schema))
return response.protocol


def get_table_metadata(url: str) -> Metadata:
"""
Get the shared table metadata using the given url.
:param url: a url under the format "<profile>#<share>.<schema>.<table>"
"""
profile_json, share, schema, table = _parse_url(url)
profile = DeltaSharingProfile.read_from_file(profile_json)
rest_client = DataSharingRestClient(profile)
response = rest_client.query_table_metadata(Table(name=table, share=share, schema=schema))
return response.metadata


def load_as_pandas(
url: str,
jsonPredicateHints: Optional[Dict[str, Any]] = None,
Expand Down
6 changes: 6 additions & 0 deletions python/delta_sharing/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ class Metadata:
schema_string: Optional[str] = None
configuration: Dict[str, str] = field(default_factory=dict)
partition_columns: Sequence[str] = field(default_factory=list)
version: Optional[int] = None
size: Optional[int] = None
num_files: Optional[int] = None

@staticmethod
def from_json(json) -> "Metadata":
Expand All @@ -163,6 +166,9 @@ def from_json(json) -> "Metadata":
schema_string=json["schemaString"],
configuration=configuration,
partition_columns=json["partitionColumns"],
version=json.get("version", None),
size=json.get("size", None),
num_files=json.get("numFiles", None)
)


Expand Down
Loading

0 comments on commit c2f80f2

Please sign in to comment.