Skip to content

Commit

Permalink
Add an optional expirationTimestamp field for pre signed urls (#312)
Browse files Browse the repository at this point in the history
* actual url print

* tmp

* fix

* fix

* remove debug

* pick conserative threshold

* fix

* fix

* Option

* fi

* synchronized

* refactor code

* fix

* add comment

* resolve comments

* refresh
  • Loading branch information
linzhou-db authored May 23, 2023
1 parent 7591947 commit 44bab62
Show file tree
Hide file tree
Showing 14 changed files with 570 additions and 142 deletions.
16 changes: 12 additions & 4 deletions PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -2414,6 +2414,7 @@ size | Long | The size of this file in bytes. | Required
stats | String | Contains statistics (e.g., count, min/max values for columns) about the data in this file. This field may be missing. A file may or may not have stats. This is a serialized JSON string which can be deserialized to a [Statistics Struct](#per-file-statistics). A client can decide whether to use stats or drop it. | Optional
version | Long | The table version of the file, returned when querying a table data with a version or timestamp parameter. | Optional
timestamp | Long | The unix timestamp corresponding to the table version of the file, in milliseconds, returned when querying a table data with a version or timestamp parameter. | Optional
expirationTimestamp | Long | The unix timestamp corresponding to the expiration of the url, in milliseconds, returned when the server supports the feature. | Optional

Example (for illustration purposes; each JSON object must be a single line in the response):

Expand All @@ -2426,7 +2427,8 @@ Example (for illustration purposes; each JSON object must be a single line in th
"partitionValues": {
"date": "2021-04-28"
},
"stats": "{\"numRecords\":1,\"minValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"maxValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"nullCount\":{\"eventTime\":0}}"
"stats": "{\"numRecords\":1,\"minValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"maxValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"nullCount\":{\"eventTime\":0}}",
"expirationTimestamp": 1652140800000
}
}
```
Expand All @@ -2443,6 +2445,7 @@ size | Long | The size of this file in bytes. | Required
timestamp | Long | The timestamp of the file in milliseconds from epoch. | Required
version | Int32 | The table version of this file. | Required
stats | String | Contains statistics (e.g., count, min/max values for columns) about the data in this file. This field may be missing. A file may or may not have stats. This is a serialized JSON string which can be deserialized to a [Statistics Struct](#per-file-statistics). A client can decide whether to use stats or drop it. | Optional
expirationTimestamp | Long | The unix timestamp corresponding to the expiration of the url, in milliseconds, returned when the server supports the feature. | Optional

Example (for illustration purposes; each JSON object must be a single line in the response):

Expand All @@ -2457,7 +2460,8 @@ Example (for illustration purposes; each JSON object must be a single line in th
},
"timestamp": 1652140800000,
"version": 1,
"stats": "{\"numRecords\":1,\"minValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"maxValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"nullCount\":{\"eventTime\":0}}"
"stats": "{\"numRecords\":1,\"minValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"maxValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"nullCount\":{\"eventTime\":0}}",
"expirationTimestamp": 1652144400000
}
}
```
Expand All @@ -2471,6 +2475,7 @@ partitionValues | Map<String, String> | A map from partition column to value for
size | Long | The size of this file in bytes. | Required
timestamp | Long | The timestamp of the file in milliseconds from epoch. | Required
version | Int32 | The table version of this file. | Required
expirationTimestamp | Long | The unix timestamp corresponding to the expiration of the url, in milliseconds, returned when the server supports the feature. | Optional

Example (for illustration purposes; each JSON object must be a single line in the response):

Expand All @@ -2484,7 +2489,8 @@ Example (for illustration purposes; each JSON object must be a single line in th
"date": "2021-04-28"
},
"timestamp": 1652140800000,
"version": 1
"version": 1,
"expirationTimestamp": 1652144400000
}
}
```
Expand All @@ -2498,6 +2504,7 @@ partitionValues | Map<String, String> | A map from partition column to value for
size | Long | The size of this file in bytes. | Required
timestamp | Long | The timestamp of the file in milliseconds from epoch. | Required
version | Int32 | The table version of this file. | Required
expirationTimestamp | Long | The unix timestamp corresponding to the expiration of the url, in milliseconds, returned when the server supports the feature. | Optional

Example (for illustration purposes; each JSON object must be a single line in the response):

Expand All @@ -2511,7 +2518,8 @@ Example (for illustration purposes; each JSON object must be a single line in th
"date": "2021-04-28"
},
"timestamp": 1652140800000,
"version": 1
"version": 1,
"expirationTimestamp": 1652144400000
}
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,15 @@ import org.apache.hadoop.fs.azurebfs.services.AuthType
import org.apache.hadoop.fs.s3a.DefaultS3ClientFactory
import org.apache.hadoop.util.ReflectionUtils

/**
* @param url The signed url.
* @param expirationTimestamp The expiration timestamp in millis of the signed url, a minimum
* between the timeout of the url and of the token.
*/
case class PreSignedUrl(url: String, expirationTimestamp: Long)

trait CloudFileSigner {
def sign(path: Path): String
def sign(path: Path): PreSignedUrl
}

class S3FileSigner(
Expand All @@ -50,7 +56,7 @@ class S3FileSigner(
private val s3Client = ReflectionUtils.newInstance(classOf[DefaultS3ClientFactory], conf)
.createS3Client(name)

override def sign(path: Path): String = {
override def sign(path: Path): PreSignedUrl = {
val absPath = path.toUri
val bucketName = absPath.getHost
val objectKey = absPath.getPath.stripPrefix("/")
Expand All @@ -60,7 +66,10 @@ class S3FileSigner(
val request = new GeneratePresignedUrlRequest(bucketName, objectKey)
.withMethod(HttpMethod.GET)
.withExpiration(expiration)
s3Client.generatePresignedUrl(request).toString
PreSignedUrl(
s3Client.generatePresignedUrl(request).toString,
System.currentTimeMillis() + SECONDS.toMillis(preSignedUrlTimeoutSeconds)
)
}
}

Expand Down Expand Up @@ -102,7 +111,7 @@ class AzureFileSigner(
sharedAccessPolicy
}

override def sign(path: Path): String = {
override def sign(path: Path): PreSignedUrl = {
val containerRef = blobClient.getContainerReference(container)
val objectKey = objectKeyExtractor(path)
assert(objectKey.nonEmpty, s"cannot get object key from $path")
Expand All @@ -116,7 +125,10 @@ class AzureFileSigner(
SharedAccessProtocols.HTTPS_ONLY
)
val sasTokenCredentials = new StorageCredentialsSharedAccessSignature(sasToken)
sasTokenCredentials.transformUri(blobRef.getUri).toString
PreSignedUrl(
sasTokenCredentials.transformUri(blobRef.getUri).toString,
System.currentTimeMillis() + SECONDS.toMillis(preSignedUrlTimeoutSeconds)
)
}
}

Expand Down Expand Up @@ -202,13 +214,16 @@ class GCSFileSigner(

private val storage = StorageOptions.newBuilder.build.getService

override def sign(path: Path): String = {
override def sign(path: Path): PreSignedUrl = {
val (bucketName, objectName) = GCSFileSigner.getBucketAndObjectNames(path)
assert(objectName.nonEmpty, s"cannot get object key from $path")
val blobInfo = BlobInfo.newBuilder(BlobId.of(bucketName, objectName)).build
storage.signUrl(
PreSignedUrl(
storage.signUrl(
blobInfo, preSignedUrlTimeoutSeconds, SECONDS, Storage.SignUrlOption.withV4Signature())
.toString
.toString,
System.currentTimeMillis() + SECONDS.toMillis(preSignedUrlTimeoutSeconds)
)
}
}

Expand Down
4 changes: 4 additions & 0 deletions server/src/main/scala/io/delta/sharing/server/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ case class AddFile(
size: Long,
@JsonRawValue
stats: String = null,
expirationTimestamp: java.lang.Long = null,
timestamp: java.lang.Long = null,
version: java.lang.Long = null) extends Action {

Expand All @@ -90,6 +91,7 @@ case class AddFileForCDF(
@JsonInclude(JsonInclude.Include.ALWAYS)
partitionValues: Map[String, String],
size: Long,
expirationTimestamp: java.lang.Long = null,
version: Long,
timestamp: Long,
@JsonRawValue
Expand All @@ -104,6 +106,7 @@ case class AddCDCFile(
@JsonInclude(JsonInclude.Include.ALWAYS)
partitionValues: Map[String, String],
size: Long,
expirationTimestamp: java.lang.Long = null,
timestamp: Long,
version: Long)
extends Action {
Expand All @@ -117,6 +120,7 @@ case class RemoveFile(
@JsonInclude(JsonInclude.Include.ALWAYS)
partitionValues: Map[String, String],
size: Long,
expirationTimestamp: java.lang.Long = null,
timestamp: Long,
version: Long)
extends Action {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,9 @@ class DeltaSharedTable(
filteredFiles.map { addFile =>
val cloudPath = absolutePath(deltaLog.dataPath, addFile.path)
val signedUrl = fileSigner.sign(cloudPath)
val modelAddFile = model.AddFile(url = signedUrl,
val modelAddFile = model.AddFile(
url = signedUrl.url,
expirationTimestamp = signedUrl.expirationTimestamp,
id = Hashing.md5().hashString(addFile.path, UTF_8).toString,
partitionValues = addFile.partitionValues,
size = addFile.size,
Expand Down Expand Up @@ -317,8 +319,10 @@ class DeltaSharedTable(
val ts = timestampsByVersion.get(v).orNull
versionActions.foreach {
case a: AddFile if a.dataChange =>
val signedUrl = fileSigner.sign(absolutePath(deltaLog.dataPath, a.path))
val modelAddFile = model.AddFileForCDF(
url = fileSigner.sign(absolutePath(deltaLog.dataPath, a.path)),
url = signedUrl.url,
expirationTimestamp = signedUrl.expirationTimestamp,
id = Hashing.md5().hashString(a.path, UTF_8).toString,
partitionValues = a.partitionValues,
size = a.size,
Expand All @@ -328,8 +332,10 @@ class DeltaSharedTable(
)
actions.append(modelAddFile.wrap)
case r: RemoveFile if r.dataChange =>
val signedUrl = fileSigner.sign(absolutePath(deltaLog.dataPath, r.path))
val modelRemoveFile = model.RemoveFile(
url = fileSigner.sign(absolutePath(deltaLog.dataPath, r.path)),
url = signedUrl.url,
expirationTimestamp = signedUrl.expirationTimestamp,
id = Hashing.md5().hashString(r.path, UTF_8).toString,
partitionValues = r.partitionValues,
size = r.size.get,
Expand Down Expand Up @@ -417,7 +423,8 @@ class DeltaSharedTable(
val cloudPath = absolutePath(deltaLog.dataPath, addCDCFile.path)
val signedUrl = fileSigner.sign(cloudPath)
val modelCDCFile = model.AddCDCFile(
url = signedUrl,
url = signedUrl.url,
expirationTimestamp = signedUrl.expirationTimestamp,
id = Hashing.md5().hashString(addCDCFile.path, UTF_8).toString,
partitionValues = addCDCFile.partitionValues,
size = addCDCFile.size,
Expand All @@ -433,7 +440,8 @@ class DeltaSharedTable(
val cloudPath = absolutePath(deltaLog.dataPath, addFile.path)
val signedUrl = fileSigner.sign(cloudPath)
val modelAddFile = model.AddFileForCDF(
url = signedUrl,
url = signedUrl.url,
expirationTimestamp = signedUrl.expirationTimestamp,
id = Hashing.md5().hashString(addFile.path, UTF_8).toString,
partitionValues = addFile.partitionValues,
size = addFile.size,
Expand All @@ -450,7 +458,8 @@ class DeltaSharedTable(
val cloudPath = absolutePath(deltaLog.dataPath, removeFile.path)
val signedUrl = fileSigner.sign(cloudPath)
val modelRemoveFile = model.RemoveFile(
url = signedUrl,
url = signedUrl.url,
expirationTimestamp = signedUrl.expirationTimestamp,
id = Hashing.md5().hashString(removeFile.path, UTF_8).toString,
partitionValues = removeFile.partitionValues,
size = removeFile.size.get,
Expand Down
Loading

0 comments on commit 44bab62

Please sign in to comment.