Skip to content
This repository has been archived by the owner on Oct 28, 2024. It is now read-only.

Commit

Permalink
[Vector] Disable content encoding for GCP sink
Browse files Browse the repository at this point in the history
  • Loading branch information
hieu-db committed Sep 5, 2024
1 parent e35950a commit a9be3fc
Showing 1 changed file with 37 additions and 9 deletions.
46 changes: 37 additions & 9 deletions src/sinks/gcp/cloud_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ pub struct GcsSinkConfig {
#[serde(flatten)]
encoding: EncodingConfigWithFraming,

/// Whether to set content encoding for the object
#[serde(default = "crate::serde::default_true")]
#[configurable(metadata(docs::advanced))]
set_content_encoding: bool,

#[configurable(derived)]
#[serde(default)]
compression: Compression,
Expand Down Expand Up @@ -194,6 +199,7 @@ fn default_config(encoding: EncodingConfigWithFraming) -> GcsSinkConfig {
filename_append_uuid: true,
filename_extension: Default::default(),
encoding,
set_content_encoding: true,
compression: Compression::gzip_default(),
batch: Default::default(),
request: Default::default(),
Expand Down Expand Up @@ -377,10 +383,14 @@ impl RequestSettings {
.acl
.map(|acl| HeaderValue::from_str(&to_string(acl)).unwrap());
let content_type = HeaderValue::from_str(encoder.content_type()).unwrap();
let content_encoding = config
.compression
.content_encoding()
.map(|ce| HeaderValue::from_str(&to_string(ce)).unwrap());
let content_encoding = if config.set_content_encoding {
config
.compression
.content_encoding()
.map(|ce| HeaderValue::from_str(&to_string(ce)).unwrap())
} else {
None
};
let storage_class = config.storage_class.unwrap_or_default();
let storage_class = HeaderValue::from_str(&to_string(storage_class)).unwrap();
let metadata = config
Expand Down Expand Up @@ -502,13 +512,19 @@ mod tests {
RequestSettings::new(sink_config, context).expect("Could not create request settings")
}

fn build_request(extension: Option<&str>, uuid: bool, compression: Compression) -> GcsRequest {
fn build_request(
extension: Option<&str>,
uuid: bool,
compression: Compression,
set_content_encoding: bool,
) -> GcsRequest {
let context = SinkContext::default();
let sink_config = GcsSinkConfig {
key_prefix: Some("key/".into()),
filename_time_format: "date".into(),
filename_extension: extension.map(Into::into),
filename_append_uuid: uuid,
set_content_encoding: set_content_encoding,
compression,
..default_config(
(
Expand Down Expand Up @@ -539,16 +555,28 @@ mod tests {

#[test]
fn gcs_build_request() {
let req = build_request(Some("ext"), false, Compression::None);
let req = build_request(Some("ext"), false, Compression::None, true);
assert_eq!(req.key, "key/date.ext".to_string());

let req = build_request(None, false, Compression::None);
let req = build_request(None, false, Compression::None, true);
assert_eq!(req.key, "key/date.log".to_string());

let req = build_request(None, false, Compression::gzip_default());
let req = build_request(None, false, Compression::gzip_default(), true);
assert_eq!(req.key, "key/date.log.gz".to_string());

let req = build_request(None, true, Compression::gzip_default());
let req = build_request(None, true, Compression::gzip_default(), true);
assert_ne!(req.key, "key/date.log.gz".to_string());
}

#[test]
fn gcs_build_request_set_content_encoding() {
let request = build_request(None, false, Compression::gzip_default(), true);
assert_eq!(
request.settings.content_encoding,
HeaderValue::from_str(Compression::gzip_default().content_encoding().unwrap()).ok()
);

let request = build_request(None, false, Compression::gzip_default(), false);
assert!(request.settings.content_encoding.is_none());
}
}

0 comments on commit a9be3fc

Please sign in to comment.