Skip to content

Commit

Permalink
domain objects
Browse files Browse the repository at this point in the history
  • Loading branch information
avantgardnerio committed Nov 27, 2024
1 parent 296b504 commit 4ba005f
Show file tree
Hide file tree
Showing 12 changed files with 63 additions and 67 deletions.
24 changes: 14 additions & 10 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ use crate::client::list::ListClient;
use crate::client::retry::RetryExt;
use crate::client::s3::{
CompleteMultipartUpload, CompleteMultipartUploadResult, CopyPartResult,
InitiateMultipartUploadResult, ListResponse, MultipartPart,
InitiateMultipartUploadResult, ListResponse,
};
use crate::client::GetOptionsExt;
use crate::multipart::MultipartInfo;
use crate::path::DELIMITER;
use crate::{
Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId, Path,
Expand Down Expand Up @@ -451,6 +452,7 @@ impl S3Client {
}
}

#[allow(dead_code)]
pub(crate) fn request_with_config<'a>(
&'a self,
method: Method,
Expand Down Expand Up @@ -627,7 +629,7 @@ impl S3Client {
) -> Result<MultipartId> {
let response = self
.request(Method::POST, location)
.header("x-amz-checksum-algorithm", "SHA256")
// .header("x-amz-checksum-algorithm", "SHA256") // TODO: restore
.query(&[("uploads", "")])
.with_encryption_headers()
.with_attributes(opts.attributes)
Expand All @@ -651,16 +653,18 @@ impl S3Client {
upload_id: &MultipartId,
part_idx: usize,
data: PutPartPayload<'_>,
) -> Result<MultipartPart> {
) -> Result<MultipartInfo> {
let is_copy = matches!(data, PutPartPayload::Copy(_));
let part = (part_idx + 1).to_string();
let config = S3Config {
checksum: Some(Checksum::SHA256),
..self.config.clone()
};
// TODO: restore
// let config = S3Config {
// checksum: Some(Checksum::SHA256),
// ..self.config.clone()
// };

let mut request = self
.request_with_config(Method::PUT, path, &config)
.request(Method::PUT, path)
// .request_with_config(Method::PUT, path, &config) // TODO: restore
.query(&[("partNumber", &part), ("uploadId", upload_id)])
.idempotent(true);

Expand Down Expand Up @@ -699,7 +703,7 @@ impl S3Client {
response.e_tag
}
};
let part = MultipartPart {
let part = MultipartInfo {
e_tag: content_id,
part_number: part_idx + 1,
checksum_sha256,
Expand All @@ -721,7 +725,7 @@ impl S3Client {
&self,
location: &Path,
upload_id: &str,
parts: Vec<MultipartPart>,
parts: Vec<MultipartInfo>,
mode: CompleteMultipartMode,
) -> Result<PutResult> {
let parts = if parts.is_empty() {
Expand Down
7 changes: 3 additions & 4 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::aws::client::{CompleteMultipartMode, PutPartPayload, RequestError, S3
use crate::client::get::GetClientExt;
use crate::client::list::ListClientExt;
use crate::client::CredentialProvider;
use crate::multipart::MultipartStore;
use crate::multipart::{MultipartInfo, MultipartStore};
use crate::signer::Signer;
use crate::util::STRICT_ENCODE_SET;
use crate::{
Expand Down Expand Up @@ -74,7 +74,6 @@ const STORE: &str = "S3";
/// [`CredentialProvider`] for [`AmazonS3`]
pub type AwsCredentialProvider = Arc<dyn CredentialProvider<Credential = AwsCredential>>;
use crate::client::parts::Parts;
use crate::client::s3::MultipartPart;
pub use credential::{AwsAuthorizer, AwsCredential};

/// Interface for [Amazon S3](https://aws.amazon.com/s3/).
Expand Down Expand Up @@ -440,7 +439,7 @@ impl MultipartStore for AmazonS3 {
id: &MultipartId,
part_idx: usize,
data: PutPayload,
) -> Result<MultipartPart> {
) -> Result<MultipartInfo> {
self.client
.put_part(path, id, part_idx, PutPartPayload::Part(data))
.await
Expand All @@ -450,7 +449,7 @@ impl MultipartStore for AmazonS3 {
&self,
path: &Path,
id: &MultipartId,
parts: Vec<MultipartPart>,
parts: Vec<MultipartInfo>,
) -> Result<PutResult> {
self.client
.complete_multipart(path, id, parts, CompleteMultipartMode::Overwrite)
Expand Down
7 changes: 3 additions & 4 deletions object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ use crate::client::get::GetClient;
use crate::client::header::{get_put_result, HeaderConfig};
use crate::client::list::ListClient;
use crate::client::retry::RetryExt;
use crate::client::s3::MultipartPart;
use crate::client::GetOptionsExt;
use crate::multipart::PartId;
use crate::multipart::{MultipartInfo, PartId};
use crate::path::DELIMITER;
use crate::util::{deserialize_rfc1123, GetRange};
use crate::{
Expand Down Expand Up @@ -559,7 +558,7 @@ impl AzureClient {
path: &Path,
part_idx: usize,
payload: PutPayload,
) -> Result<MultipartPart> {
) -> Result<MultipartInfo> {
let content_id = format!("{part_idx:20}");
let block_id = BASE64_STANDARD.encode(&content_id);

Expand All @@ -569,7 +568,7 @@ impl AzureClient {
.send()
.await?;

let part = MultipartPart {
let part = MultipartInfo {
e_tag: content_id,
part_number: part_idx + 1,
checksum_sha256: None,
Expand Down
6 changes: 3 additions & 3 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ mod credential;
pub type AzureCredentialProvider = Arc<dyn CredentialProvider<Credential = AzureCredential>>;
use crate::azure::client::AzureClient;
use crate::client::parts::Parts;
use crate::client::s3::MultipartPart;
use crate::multipart::MultipartInfo;
pub use builder::{AzureConfigKey, MicrosoftAzureBuilder};
pub use credential::AzureCredential;

Expand Down Expand Up @@ -271,15 +271,15 @@ impl MultipartStore for MicrosoftAzure {
_: &MultipartId,
part_idx: usize,
data: PutPayload,
) -> Result<MultipartPart> {
) -> Result<MultipartInfo> {
self.client.put_block(path, part_idx, data).await
}

async fn complete_multipart(
&self,
path: &Path,
_: &MultipartId,
parts: Vec<MultipartPart>,
parts: Vec<MultipartInfo>,
) -> Result<PutResult> {
let parts = parts.into_iter().map(|p| p.into()).collect();
self.client
Expand Down
8 changes: 4 additions & 4 deletions object_store/src/client/parts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,26 @@
// specific language governing permissions and limitations
// under the License.

use crate::client::s3::MultipartPart;
use crate::multipart::MultipartInfo;
use parking_lot::Mutex;

/// An interior mutable collection of upload parts and their corresponding part index
#[derive(Debug, Default)]
pub(crate) struct Parts(Mutex<Vec<MultipartPart>>);
pub(crate) struct Parts(Mutex<Vec<MultipartInfo>>);

impl Parts {
/// Record the [`PartId`] for a given index
///
/// Note: calling this method multiple times with the same `part_idx`
/// will result in multiple [`PartId`] in the final output
pub(crate) fn put(&self, part: MultipartPart) {
pub(crate) fn put(&self, part: MultipartInfo) {
self.0.lock().push(part)
}

/// Produce the final list of [`PartId`] ordered by `part_idx`
///
/// `expected` is the number of parts expected in the final result
pub(crate) fn finish(&self, expected: usize) -> crate::Result<Vec<MultipartPart>> {
pub(crate) fn finish(&self, expected: usize) -> crate::Result<Vec<MultipartInfo>> {
let mut parts = self.0.lock();
if parts.len() != expected {
return Err(crate::Error::Generic {
Expand Down
8 changes: 4 additions & 4 deletions object_store/src/client/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

//! The list and multipart API used by both GCS and S3

use crate::multipart::PartId;
use crate::multipart::{MultipartInfo, PartId};
use crate::path::Path;
use crate::{ListResult, ObjectMeta, Result};
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -121,8 +121,8 @@ impl From<Vec<PartId>> for CompleteMultipartUpload {
}
}

impl From<Vec<MultipartPart>> for CompleteMultipartUpload {
fn from(value: Vec<MultipartPart>) -> Self {
impl From<Vec<MultipartInfo>> for CompleteMultipartUpload {
fn from(value: Vec<MultipartInfo>) -> Self {
let part = value
.into_iter()
.enumerate()
Expand All @@ -137,7 +137,7 @@ impl From<Vec<MultipartPart>> for CompleteMultipartUpload {
}

#[derive(Debug, Serialize)]
pub struct MultipartPart {
pub(crate) struct MultipartPart {
#[serde(rename = "ETag")]
pub e_tag: String,
#[serde(rename = "PartNumber")]
Expand Down
8 changes: 4 additions & 4 deletions object_store/src/gcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ use crate::client::list::ListClient;
use crate::client::retry::RetryExt;
use crate::client::s3::{
CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult,
ListResponse, MultipartPart,
ListResponse,
};
use crate::client::GetOptionsExt;
use crate::gcp::{GcpCredential, GcpCredentialProvider, GcpSigningCredentialProvider, STORE};
use crate::multipart::PartId;
use crate::multipart::{MultipartInfo, PartId};
use crate::path::{Path, DELIMITER};
use crate::util::hex_encode;
use crate::{
Expand Down Expand Up @@ -411,7 +411,7 @@ impl GoogleCloudStorageClient {
upload_id: &MultipartId,
part_idx: usize,
data: PutPayload,
) -> Result<MultipartPart> {
) -> Result<MultipartInfo> {
let query = &[
("partNumber", &format!("{}", part_idx + 1)),
("uploadId", upload_id),
Expand All @@ -424,7 +424,7 @@ impl GoogleCloudStorageClient {
.do_put()
.await?;

let part = MultipartPart {
let part = MultipartInfo {
e_tag: result.e_tag.unwrap(),
part_number: part_idx + 1,
checksum_sha256: None,
Expand Down
7 changes: 3 additions & 4 deletions object_store/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ use url::Url;
use crate::client::get::GetClientExt;
use crate::client::list::ListClientExt;
use crate::client::parts::Parts;
use crate::client::s3::MultipartPart;
use crate::multipart::MultipartStore;
use crate::multipart::{MultipartInfo, MultipartStore};
pub use builder::{GoogleCloudStorageBuilder, GoogleConfigKey};
pub use credential::{GcpCredential, GcpSigningCredential, ServiceAccountKey};

Expand Down Expand Up @@ -223,15 +222,15 @@ impl MultipartStore for GoogleCloudStorage {
id: &MultipartId,
part_idx: usize,
payload: PutPayload,
) -> Result<MultipartPart> {
) -> Result<MultipartInfo> {
self.client.put_part(path, id, part_idx, payload).await
}

async fn complete_multipart(
&self,
path: &Path,
id: &MultipartId,
parts: Vec<MultipartPart>,
parts: Vec<MultipartInfo>,
) -> Result<PutResult> {
let parts = parts.into_iter().map(|p| p.into()).collect();
self.client.multipart_complete(path, id, parts).await
Expand Down
12 changes: 0 additions & 12 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1350,19 +1350,7 @@ impl From<Error> for std::io::Error {
#[cfg(test)]
mod tests {
use super::*;
use crate::buffered::BufWriter;
use chrono::TimeZone;
use tokio::io::AsyncWriteExt;

macro_rules! maybe_skip_integration {
() => {
if std::env::var("TEST_INTEGRATION").is_err() {
eprintln!("Skipping integration test - set TEST_INTEGRATION");
return;
}
};
}
pub(crate) use maybe_skip_integration;

/// Test that the returned stream does not borrow the lifetime of Path
fn list_store<'a>(
Expand Down
9 changes: 4 additions & 5 deletions object_store/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ use futures::{stream::BoxStream, StreamExt};
use parking_lot::RwLock;
use snafu::{OptionExt, ResultExt, Snafu};

use crate::client::s3::MultipartPart;
use crate::multipart::MultipartStore;
use crate::multipart::{MultipartInfo, MultipartStore};
use crate::util::InvalidGetRange;
use crate::{
path::Path, Attributes, GetRange, GetResult, GetResultPayload, ListResult, MultipartId,
Expand Down Expand Up @@ -413,14 +412,14 @@ impl MultipartStore for InMemory {
id: &MultipartId,
part_idx: usize,
payload: PutPayload,
) -> Result<MultipartPart> {
) -> Result<MultipartInfo> {
let mut storage = self.storage.write();
let upload = storage.upload_mut(id)?;
if part_idx <= upload.parts.len() {
upload.parts.resize(part_idx + 1, None);
}
upload.parts[part_idx] = Some(payload.into());
let part = MultipartPart {
let part = MultipartInfo {
e_tag: "".to_string(),
part_number: 0,
checksum_sha256: None,
Expand All @@ -432,7 +431,7 @@ impl MultipartStore for InMemory {
&self,
path: &Path,
id: &MultipartId,
_parts: Vec<MultipartPart>,
_parts: Vec<MultipartInfo>,
) -> Result<PutResult> {
let mut storage = self.storage.write();
let upload = storage.remove_upload(id)?;
Expand Down
27 changes: 18 additions & 9 deletions object_store/src/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,31 @@
//! cloud storage services. It's designed to offer efficient, non-blocking operations,
//! especially useful when dealing with large files or high-throughput systems.

use async_trait::async_trait;

use crate::client::s3::MultipartPart;
use crate::path::Path;
use crate::{MultipartId, PutPayload, PutResult, Result};
use async_trait::async_trait;

/// Represents a part of a file that has been successfully uploaded in a multipart upload process.
/// An etag of a part of a file that has been successfully uploaded in a multipart upload process.
#[derive(Debug, Clone)]
pub struct PartId {
/// Id of this part
pub content_id: String,
}

impl From<MultipartPart> for PartId {
fn from(value: MultipartPart) -> Self {
PartId {
/// Represents a part of a file that has been successfully uploaded in a multipart upload process.
#[derive(Debug)]
pub struct MultipartInfo {
/// Id of this part
pub e_tag: String,
/// Index of the part + 1
pub part_number: usize,
/// 256 bit SHA of the contents
pub checksum_sha256: Option<String>,
}

impl From<MultipartInfo> for PartId {
fn from(value: MultipartInfo) -> Self {
Self {
content_id: value.e_tag.clone(),
}
}
Expand Down Expand Up @@ -73,7 +82,7 @@ pub trait MultipartStore: Send + Sync + 'static {
id: &MultipartId,
part_idx: usize,
data: PutPayload,
) -> Result<MultipartPart>;
) -> Result<MultipartInfo>;

/// Completes a multipart upload
///
Expand All @@ -85,7 +94,7 @@ pub trait MultipartStore: Send + Sync + 'static {
&self,
path: &Path,
id: &MultipartId,
parts: Vec<MultipartPart>,
parts: Vec<MultipartInfo>,
) -> Result<PutResult>;

/// Aborts a multipart upload
Expand Down
Loading

0 comments on commit 4ba005f

Please sign in to comment.