Skip to content

Commit

Permalink
Perform HEAD request for HttpStore::head (apache#4837)
Browse files Browse the repository at this point in the history
* Perform HEAD request for HttpStore::head

* Logical merge conflicts

* Review feedback
  • Loading branch information
tustvold authored and Ryan Aston committed Nov 6, 2023
1 parent 18c8b55 commit 389b493
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 91 deletions.
20 changes: 14 additions & 6 deletions object_store/src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::client::header::header_meta;
use crate::client::header::{header_meta, HeaderConfig};
use crate::path::Path;
use crate::{Error, GetOptions, GetResult, ObjectMeta};
use crate::{GetResultPayload, Result};
Expand All @@ -28,6 +28,12 @@ use reqwest::Response;
pub trait GetClient: Send + Sync + 'static {
const STORE: &'static str;

/// Configure the [`HeaderConfig`] for this client
const HEADER_CONFIG: HeaderConfig = HeaderConfig {
etag_required: true,
last_modified_required: true,
};

async fn get_request(
&self,
path: &Path,
Expand All @@ -49,10 +55,12 @@ impl<T: GetClient> GetClientExt for T {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let range = options.range.clone();
let response = self.get_request(location, options, false).await?;
let meta = header_meta(location, response.headers(), Default::default())
.map_err(|e| Error::Generic {
store: T::STORE,
source: Box::new(e),
let meta =
header_meta(location, response.headers(), T::HEADER_CONFIG).map_err(|e| {
Error::Generic {
store: T::STORE,
source: Box::new(e),
}
})?;

let stream = response
Expand All @@ -73,7 +81,7 @@ impl<T: GetClient> GetClientExt for T {
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let options = GetOptions::default();
let response = self.get_request(location, options, true).await?;
header_meta(location, response.headers(), Default::default()).map_err(|e| {
header_meta(location, response.headers(), T::HEADER_CONFIG).map_err(|e| {
Error::Generic {
store: T::STORE,
source: Box::new(e),
Expand Down
11 changes: 1 addition & 10 deletions object_store/src/client/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use hyper::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};
use hyper::HeaderMap;
use snafu::{OptionExt, ResultExt, Snafu};

#[derive(Debug)]
#[derive(Debug, Copy, Clone)]
/// Configuration for header extraction
pub struct HeaderConfig {
/// Whether to require an ETag header when extracting [`ObjectMeta`] from headers.
Expand All @@ -37,15 +37,6 @@ pub struct HeaderConfig {
pub last_modified_required: bool,
}

impl Default for HeaderConfig {
fn default() -> Self {
Self {
etag_required: true,
last_modified_required: true,
}
}
}

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("ETag Header missing from response"))]
Expand Down
1 change: 0 additions & 1 deletion object_store/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ pub mod retry;
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
pub mod pagination;

#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
pub mod get;

#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
Expand Down
90 changes: 57 additions & 33 deletions object_store/src/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use crate::client::get::GetClient;
use crate::client::header::HeaderConfig;
use crate::client::retry::{self, RetryConfig, RetryExt};
use crate::client::GetOptionsExt;
use crate::path::{Path, DELIMITER};
use crate::util::deserialize_rfc1123;
use crate::{ClientOptions, GetOptions, ObjectMeta, Result};
use async_trait::async_trait;
use bytes::{Buf, Bytes};
use chrono::{DateTime, Utc};
use percent_encoding::percent_decode_str;
Expand Down Expand Up @@ -238,39 +241,6 @@ impl Client {
Ok(())
}

pub async fn get(&self, location: &Path, options: GetOptions) -> Result<Response> {
let url = self.path_url(location);
let builder = self.client.get(url);
let has_range = options.range.is_some();

let res = builder
.with_get_options(options)
.send_retry(&self.retry_config)
.await
.map_err(|source| match source.status() {
// Some stores return METHOD_NOT_ALLOWED for get on directories
Some(StatusCode::NOT_FOUND | StatusCode::METHOD_NOT_ALLOWED) => {
crate::Error::NotFound {
source: Box::new(source),
path: location.to_string(),
}
}
_ => Error::Request { source }.into(),
})?;

// We expect a 206 Partial Content response if a range was requested
// a 200 OK response would indicate the server did not fulfill the request
if has_range && res.status() != StatusCode::PARTIAL_CONTENT {
return Err(crate::Error::NotSupported {
source: Box::new(Error::RangeNotSupported {
href: location.to_string(),
}),
});
}

Ok(res)
}

pub async fn copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> {
let mut retry = false;
loop {
Expand Down Expand Up @@ -307,6 +277,60 @@ impl Client {
}
}

#[async_trait]
impl GetClient for Client {
const STORE: &'static str = "HTTP";

/// Override the [`HeaderConfig`] to be less strict to support a
/// broader range of HTTP servers (#4831)
const HEADER_CONFIG: HeaderConfig = HeaderConfig {
etag_required: false,
last_modified_required: false,
};

async fn get_request(
&self,
location: &Path,
options: GetOptions,
head: bool,
) -> Result<Response> {
let url = self.path_url(location);
let method = match head {
true => Method::HEAD,
false => Method::GET,
};
let has_range = options.range.is_some();
let builder = self.client.request(method, url);

let res = builder
.with_get_options(options)
.send_retry(&self.retry_config)
.await
.map_err(|source| match source.status() {
// Some stores return METHOD_NOT_ALLOWED for get on directories
Some(StatusCode::NOT_FOUND | StatusCode::METHOD_NOT_ALLOWED) => {
crate::Error::NotFound {
source: Box::new(source),
path: location.to_string(),
}
}
_ => Error::Request { source }.into(),
})?;

// We expect a 206 Partial Content response if a range was requested
// a 200 OK response would indicate the server did not fulfill the request
if has_range && res.status() != StatusCode::PARTIAL_CONTENT {
return Err(crate::Error::NotSupported {
source: Box::new(Error::RangeNotSupported {
href: location.to_string(),
}),
});
}

Ok(res)
}
}

/// The response returned by a PROPFIND request, i.e. list
#[derive(Deserialize, Default)]
pub struct MultiStatus {
Expand Down
47 changes: 6 additions & 41 deletions object_store/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,18 @@
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use futures::StreamExt;
use itertools::Itertools;
use snafu::{OptionExt, ResultExt, Snafu};
use tokio::io::AsyncWrite;
use url::Url;

use crate::client::header::{header_meta, HeaderConfig};
use crate::client::get::GetClientExt;
use crate::http::client::Client;
use crate::path::Path;
use crate::{
ClientConfigKey, ClientOptions, GetOptions, GetResult, GetResultPayload, ListResult,
MultipartId, ObjectMeta, ObjectStore, Result, RetryConfig,
ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, MultipartId,
ObjectMeta, ObjectStore, Result, RetryConfig,
};

mod client;
Expand Down Expand Up @@ -115,46 +115,11 @@ impl ObjectStore for HttpStore {
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let range = options.range.clone();
let response = self.client.get(location, options).await?;
let cfg = HeaderConfig {
last_modified_required: false,
etag_required: false,
};
let meta =
header_meta(location, response.headers(), cfg).context(MetadataSnafu)?;

let stream = response
.bytes_stream()
.map_err(|source| Error::Reqwest { source }.into())
.boxed();

Ok(GetResult {
payload: GetResultPayload::Stream(stream),
range: range.unwrap_or(0..meta.size),
meta,
})
self.client.get_opts(location, options).await
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let status = self.client.list(Some(location), "0").await?;
match status.response.len() {
1 => {
let response = status.response.into_iter().next().unwrap();
response.check_ok()?;
match response.is_dir() {
true => Err(crate::Error::NotFound {
path: location.to_string(),
source: "Is directory".to_string().into(),
}),
false => response.object_meta(self.client.base_url()),
}
}
x => Err(crate::Error::NotFound {
path: location.to_string(),
source: format!("Expected 1 result, got {x}").into(),
}),
}
self.client.head(location).await
}

async fn delete(&self, location: &Path) -> Result<()> {
Expand Down

0 comments on commit 389b493

Please sign in to comment.