Skip to content

Commit

Permalink
add support for s3 express (#309)
Browse files Browse the repository at this point in the history
Adds support for s3 express
  • Loading branch information
brayniac authored Nov 7, 2024
1 parent e12df02 commit d25c8a4
Showing 1 changed file with 75 additions and 53 deletions.
128 changes: 75 additions & 53 deletions src/clients/store/s3/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::workload::{ClientWorkItemKind, StoreClientRequest};
use crate::*;
use http::Uri;
use std::fmt::Display;

use async_channel::Receiver;
use bytes::{Bytes, BytesMut};
Expand Down Expand Up @@ -69,20 +71,16 @@ async fn task(
})
.clone();

// https://[BUCKET_NAME].s3.[REGION].amazonaws.com"
// https://[BUCKET_NAME].s3express-usw2-az1.[REGION].amazonaws.com

let parts: Vec<&str> = auth.host().split('.').collect();

if parts.len() != 5 {
eprintln!("expected endpoint to be in the form: bucket.region.amazonaws.com");
eprintln!("expected endpoint to be in the form: bucket.zone.region.amazonaws.com");
std::process::exit(1);
};

let bucket = parts[0].to_string();
let region = parts[2].to_string();

let port = auth.port_u16().unwrap_or(443);

let _connect_addr = format!("{auth}:{port}");

let root_store =
rustls::RootCertStore::from_iter(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());

Expand Down Expand Up @@ -138,12 +136,8 @@ async fn task(
StoreClientRequest::Get(r) => {
let key = &*r.key;

let request = S3RequestBuilder::get_object(
region.clone(),
bucket.clone(),
key.to_string(),
)
.build(&access_key, &secret_key);
let request = S3RequestBuilder::get_object(uri.clone(), key.to_string())
.build(&access_key, &secret_key);

let start = Instant::now();

Expand Down Expand Up @@ -207,12 +201,13 @@ async fn task(
STORE_RESPONSE_EX.increment();
STORE_GET_EX.increment();

debug!("Error Body:\n{}", String::from_utf8_lossy(&body));
error!("Error Body:\n{}", String::from_utf8_lossy(&body));
}
}
}
Err(e) => {
debug!("error: {e}");
error!("error: {e}");
CONNECT_CURR.decrement();
continue;
}
}
Expand All @@ -221,13 +216,8 @@ async fn task(
let key = &*r.key;
let value = r.value.clone();

let request = S3RequestBuilder::put_object(
region.clone(),
bucket.clone(),
key.to_string(),
value,
)
.build(&access_key, &secret_key);
let request = S3RequestBuilder::put_object(uri.clone(), key.to_string(), value)
.build(&access_key, &secret_key);

let start = Instant::now();

Expand Down Expand Up @@ -274,12 +264,8 @@ async fn task(
StoreClientRequest::Delete(r) => {
let key = &*r.key;

let request = S3RequestBuilder::delete_object(
region.clone(),
bucket.clone(),
key.to_string(),
)
.build(&access_key, &secret_key);
let request = S3RequestBuilder::delete_object(uri.clone(), key.to_string())
.build(&access_key, &secret_key);

let start = Instant::now();

Expand Down Expand Up @@ -348,22 +334,43 @@ pub struct S3RequestBuilder {
}

impl S3RequestBuilder {
fn new(
region: String,
bucket: String,
method: Method,
relative_uri: String,
content: Bytes,
) -> Self {
fn new(endpoint: Uri, method: Method, relative_uri: String, content: Bytes) -> Self {
let now = Utc::now();
// let date = format!("{}", now.format("%Y%m%d"));
let datetime = format!("{}", now.format("%Y%m%dT%H%M%SZ"));
// let rfc2822 = now.to_rfc2822().to_string();

let content_sha256 = sha256_sum(&content);

let mut headers = HeaderMap::new();

// https://[BUCKET_NAME].s3.[REGION].amazonaws.com"
// https://[BUCKET_NAME].s3express-usw2-az1.[REGION].amazonaws.com

let parts: Vec<&str> = endpoint.authority().unwrap().host().split('.').collect();

if parts.len() != 5 {
eprintln!("expected endpoint to be in the form: bucket.zone.region.amazonaws.com");
std::process::exit(1);
};

let bucket = parts[0];
let zone = parts[1];
let region = parts[2].to_string();

let class = if zone == "s3" {
StorageClass::Standard
} else {
StorageClass::Express
};

let uri = match class {
StorageClass::Standard => {
format!("https://{bucket}.s3.amazonaws.com{relative_uri}")
}
StorageClass::Express => {
format!("https://{}{relative_uri}", endpoint.authority().unwrap())
}
};

headers.insert(
"host",
format!("{bucket}.s3.amazonaws.com").parse().unwrap(),
Expand All @@ -374,7 +381,7 @@ impl S3RequestBuilder {
let inner = http::Request::builder()
.version(Version::HTTP_11)
.method(method)
.uri(&format!("https://{bucket}.s3.amazonaws.com{relative_uri}"))
.uri(uri)
.header("host", &format!("{bucket}.s3.amazonaws.com"))
.header("x-amz-content-sha256", &content_sha256)
.header("x-amz-date", datetime);
Expand Down Expand Up @@ -443,34 +450,49 @@ impl S3RequestBuilder {
.unwrap()
}

pub fn delete_object(region: String, bucket: String, key: String) -> Self {
pub fn delete_object(endpoint: Uri, key: String) -> Self {
Self::new(
region,
bucket,
endpoint,
Method::DELETE,
format!("/{key}"),
Vec::new().into(),
)
}

pub fn get_object(region: String, bucket: String, key: String) -> Self {
Self::new(
region,
bucket,
Method::GET,
format!("/{key}"),
Vec::new().into(),
)
pub fn get_object(endpoint: Uri, key: String) -> Self {
Self::new(endpoint, Method::GET, format!("/{key}"), Vec::new().into())
}

pub fn put_object(region: String, bucket: String, key: String, value: Bytes) -> Self {
let mut s = Self::new(region, bucket, Method::PUT, format!("/{key}"), value);
pub fn put_object(endpoint: Uri, key: String, value: Bytes) -> Self {
let parts: Vec<&str> = endpoint.authority().unwrap().host().split('.').collect();

let class = if parts[1] == "s3" {
StorageClass::Standard
} else {
StorageClass::Express
};

let mut s = Self::new(endpoint, Method::PUT, format!("/{key}"), value);

s.inner = s
.inner
.header("date", s.timestamp.to_rfc2822())
.header("x-amz-storage-class", "STANDARD");
.header("x-amz-storage-class", class.to_string());

s
}
}

enum StorageClass {
Standard,
Express,
}

impl Display for StorageClass {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
match self {
Self::Standard => write!(f, "STANDARD"),
Self::Express => write!(f, "EXPRESS_ONEZONE"),
}
}
}

0 comments on commit d25c8a4

Please sign in to comment.