Skip to content
This repository has been archived by the owner on Oct 28, 2024. It is now read-only.

Commit

Permalink
Merge pull request #19 from andrewma2/v0.39
Browse files Browse the repository at this point in the history
Merge custom changes into v0.39
  • Loading branch information
andrewma2 authored Aug 22, 2024
2 parents 2d4c202 + af37f5e commit 3f8b190
Show file tree
Hide file tree
Showing 22 changed files with 375 additions and 39 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ node_modules
tests/data/wasm/*/target
heaptrack.*
massif.*
.idea

# tilt
tilt_modules/
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "vector"
version = "0.39.0"
version = "0.39.0-databricks-v1"
authors = ["Vector Contributors <[email protected]>"]
edition = "2021"
description = "A lightweight and ultra-fast tool for building observability pipelines"
Expand Down
8 changes: 8 additions & 0 deletions README.databricks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
This lists custom changes merged in Databricks fork of Vector.
1. Fix premature ack when data file is full. https://github.com/databricks/vector/pull/1
2. Retry s3 request even when observing ConstructionFailure to avoid data loss. https://github.com/databricks/vector/pull/2
3. Updating/adding INFO logs for when vector sends to cloud storage. https://github.com/databricks/vector/pull/5
4. Allow retries on all sink exceptions https://github.com/databricks/vector/pull/7
5. Also allowing retries on AccessDenied exceptions in AWS https://github.com/databricks/vector/pull/12
6. Updating version to also carry a Databricks version https://github.com/databricks/vector/pull/13
7. Add a new event for successful upload to cloud storage (+ rework old send) https://github.com/databricks/vector/pull/14
4 changes: 2 additions & 2 deletions lib/vector-buffers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ impl<T> InMemoryBufferable for T where
/// An item that can be buffered.
///
/// This supertrait serves as the base trait for any item that can be pushed into a buffer.
pub trait Bufferable: InMemoryBufferable + Encodable {}
pub trait Bufferable: InMemoryBufferable + Encodable + Clone {}

// Blanket implementation for anything that is already bufferable.
impl<T> Bufferable for T where T: InMemoryBufferable + Encodable {}
impl<T> Bufferable for T where T: InMemoryBufferable + Encodable + Clone {}

pub trait EventCount {
fn event_count(&self) -> usize;
Expand Down
69 changes: 68 additions & 1 deletion lib/vector-buffers/src/variants/disk_v2/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use std::io::Cursor;
use futures::{stream, StreamExt};
use tokio_test::{assert_pending, assert_ready, task::spawn};
use tracing::Instrument;
use vector_common::finalization::Finalizable;
use vector_common::finalization::{AddBatchNotifier, BatchStatus, Finalizable};
use vector_common::finalization::{BatchNotifier};

use super::{create_default_buffer_v2, read_next, read_next_some};
use crate::{
Expand All @@ -12,6 +13,72 @@ use crate::{
variants::disk_v2::{tests::create_default_buffer_v2_with_usage, writer::RecordWriter},
EventCount,
};
use crate::variants::disk_v2::WriterError;

// Test to validate that the record is not acknowledged even when there is data file full error in write file to disk.
#[tokio::test]
async fn archive_record_not_acknowledge_when_data_file_full() {
let mut record_writer =
RecordWriter::new(Cursor::new(Vec::new()), 99, 16_384, 105, 55);
let mut record = SizedRecord::new(1);
let (batch, mut receiver) = BatchNotifier::new_with_receiver();
record.add_batch_notifier(batch);

match record_writer.archive_record(1, record) {
Ok(_) => {
assert!(false, "Was expecting WriterError::DataFileFull but got Ok")
},
Err(we) => {
match we {
WriterError::DataFileFull {
record: _old_record,
serialized_len: _,
} => {
match receiver.try_recv() {
Err(_) => {
assert!(true, "Was expecting event to be not delivered");
}
// Do not expect try_recv to return Ok.
_ => {
assert!(false, "Was expecting receiver's try_recv to not complete");
}
}
},
_ => {
assert!(false, "Was expecting WriterError::DataFileFull but got other Error");
}
}
}
}
}

// Test to validate that the record is acknowledged written to disk/archived.
#[tokio::test]
async fn archive_record_acknowledge_when_archived() {
let mut record_writer =
RecordWriter::new(Cursor::new(Vec::new()), 0, 16_384, 105, 55);
let mut record = SizedRecord::new(1);
let (batch, mut receiver) = BatchNotifier::new_with_receiver();
record.add_batch_notifier(batch);

match record_writer.archive_record(1, record) {
Ok(_) => {
assert!(true, "Was expecting Ok");
match receiver.try_recv() {
Err(_) => {
assert!(false, "Was expecting event to be delivered");
}
// Do expect try_recv to return Ok.
Ok(status) => {
assert!(status == BatchStatus::Delivered, "Was expecting receiver's try_recv to complete");
}
}
},
Err(_) => {
assert!(false, "Was expecting archive record to succeed");
}
}
}

#[tokio::test]
async fn basic_read_write_loop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ async fn reader_throws_error_when_record_is_undecodable_via_metadata() {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
struct ControllableRecord(u8);

impl Encodable for ControllableRecord {
Expand Down
14 changes: 5 additions & 9 deletions lib/vector-buffers/src/variants/disk_v2/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,9 @@ where
// the actual encoded size and then check it against the limit.
//
// C'est la vie.
// Clone record here because encode will consume it and ack it to source.
// We have not yet archived the record, so we should avoid acknowledging the event.
let old_record = record.clone();
let encode_result = {
let mut encode_buf = (&mut self.encode_buf).limit(self.max_record_size);
record.encode(&mut encode_buf)
Expand Down Expand Up @@ -532,15 +535,8 @@ where
"Archived record is too large to fit in remaining free space of current data file."
);

// We have to decode the record back out to actually be able to give it back. If we
// can't decode it for some reason, this is entirely an unrecoverable error, since an
// encoded record should always be decodable within the same process that encoded it.
let record = T::decode(T::get_metadata(), &self.encode_buf[..]).map_err(|_| {
WriterError::InconsistentState {
reason: "failed to decode record immediately after encoding it".to_string(),
}
})?;

// assign old record rather that decoding again
let record = old_record;
return Err(WriterError::DataFileFull {
record,
serialized_len,
Expand Down
4 changes: 2 additions & 2 deletions src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ static RETRIABLE_CODES: OnceLock<RegexSet> = OnceLock::new();
pub fn is_retriable_error<T>(error: &SdkError<T, HttpResponse>) -> bool {
match error {
SdkError::TimeoutError(_) | SdkError::DispatchFailure(_) => true,
SdkError::ConstructionFailure(_) => false,
SdkError::ConstructionFailure(_) => true,
SdkError::ResponseError(err) => check_response(err.raw()),
SdkError::ServiceError(err) => check_response(err.raw()),
_ => {
Expand Down Expand Up @@ -82,7 +82,7 @@ fn check_response(res: &HttpResponse) -> bool {
//
// Now just look for those when it's a client_error
let re = RETRIABLE_CODES.get_or_init(|| {
RegexSet::new(["RequestTimeout", "RequestExpired", "ThrottlingException"])
RegexSet::new(["RequestTimeout", "RequestExpired", "ThrottlingException", "ExpiredToken", "AccessDenied"])
.expect("invalid regex")
});

Expand Down
4 changes: 4 additions & 0 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ mod redis;
#[cfg(feature = "transforms-impl-reduce")]
mod reduce;
mod remap;
#[cfg(all(feature = "aws-core", feature = "sinks-azure_blob"))]
mod retries;
mod sample;
#[cfg(feature = "sinks-sematext")]
mod sematext_metrics;
Expand Down Expand Up @@ -249,6 +251,8 @@ pub(crate) use self::redis::*;
pub(crate) use self::reduce::*;
#[cfg(feature = "transforms-remap")]
pub(crate) use self::remap::*;
#[cfg(all(feature = "aws-core", feature = "sinks-azure_blob"))]
pub(crate) use self::retries::*;
#[cfg(feature = "transforms-impl-sample")]
pub(crate) use self::sample::*;
#[cfg(feature = "sinks-sematext")]
Expand Down
22 changes: 22 additions & 0 deletions src/internal_events/retries.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use metrics::counter;
use vector_lib::internal_event::InternalEvent;

#[derive(Debug)]
pub struct CheckRetryEvent<'a> {
pub status_code: &'a str,
pub retry: bool,
}

impl InternalEvent for CheckRetryEvent<'_> {
fn emit(self) {
debug!(
message = "Considering retry on error.",
status_code = self.status_code,
retry = self.retry,
);
counter!("sink_retries_total", 1,
"status_code" => self.status_code.to_string(),
"retry" => self.retry.to_string(),
);
}
}
10 changes: 9 additions & 1 deletion src/sinks/aws_s3/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
self,
config::{S3Options, S3RetryLogic},
partitioner::S3KeyPartitioner,
service::S3Service,
service::{S3Service, S3Response},
sink::S3Sink,
},
util::{
Expand Down Expand Up @@ -204,6 +204,14 @@ impl S3SinkConfig {
let request_limits = self.request.into_settings();
let service = ServiceBuilder::new()
.settings(request_limits, S3RetryLogic)
// Add another layer after retries for emitting our event log message
// Returns back the same result so it continues to work downstream
.map_result(|result: Result<S3Response, _>| {
if let Ok(ref response) = result {
response.send_event_metadata.emit_upload_event();
}
result
})
.service(service);

let offset = self
Expand Down
14 changes: 12 additions & 2 deletions src/sinks/aws_s3/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
},
util::{
metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression,
RequestBuilder,
RequestBuilder, vector_event::VectorSendEventMetadata,
},
},
};
Expand Down Expand Up @@ -64,6 +64,7 @@ impl RequestBuilder<(S3PartitionKey, Vec<Event>)> for S3RequestOptions {
let metadata = S3Metadata {
partition_key,
s3_key: s3_key_prefix,
count: events.len(),
finalizers,
};

Expand Down Expand Up @@ -103,8 +104,17 @@ impl RequestBuilder<(S3PartitionKey, Vec<Event>)> for S3RequestOptions {

s3metadata.s3_key = format_s3_key(&s3metadata.s3_key, &filename, &extension);

let body = payload.into_payload();

VectorSendEventMetadata {
bytes: body.len(),
events_len: s3metadata.count,
blob: s3metadata.s3_key.clone(),
container: self.bucket.clone(),
}.emit_sending_event();

S3Request {
body: payload.into_payload(),
body: body,
bucket: self.bucket.clone(),
metadata: s3metadata,
request_metadata,
Expand Down
11 changes: 10 additions & 1 deletion src/sinks/azure_blob/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use azure_storage_blobs::prelude::*;
use std::result::Result as StdResult;
use tower::ServiceBuilder;
use vector_lib::codecs::{encoding::Framer, JsonSerializerConfig, NewlineDelimitedEncoderConfig};
use vector_lib::configurable::configurable_component;
Expand All @@ -13,7 +14,7 @@ use crate::{
config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
sinks::{
azure_common::{
self, config::AzureBlobRetryLogic, service::AzureBlobService, sink::AzureBlobSink,
self, config::{AzureBlobRetryLogic, AzureBlobResponse}, service::AzureBlobService, sink::AzureBlobSink,
},
util::{
partitioner::KeyPartitioner, BatchConfig, BulkSizeBasedDefaultBatchSettings,
Expand Down Expand Up @@ -213,6 +214,14 @@ impl AzureBlobSinkConfig {
let request_limits = self.request.into_settings();
let service = ServiceBuilder::new()
.settings(request_limits, AzureBlobRetryLogic)
// Add another layer after retries for emitting our event log message
// Returns back the same result so it continues to work downstream
.map_result(|result: StdResult<AzureBlobResponse, _>| {
if let Ok(ref response) = result {
response.send_event_metadata.emit_upload_event();
}
result
})
.service(AzureBlobService::new(client));

// Configure our partitioning/batching.
Expand Down
16 changes: 8 additions & 8 deletions src/sinks/azure_blob/request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
azure_common::config::{AzureBlobMetadata, AzureBlobRequest},
util::{
metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression,
RequestBuilder,
RequestBuilder, vector_event::VectorSendEventMetadata,
},
},
};
Expand Down Expand Up @@ -50,6 +50,7 @@ impl RequestBuilder<(String, Vec<Event>)> for AzureBlobRequestOptions {
let finalizers = events.take_finalizers();
let azure_metadata = AzureBlobMetadata {
partition_key,
container_name: self.container_name.clone(),
count: events.len(),
byte_size: events.estimated_json_encoded_size_of(),
finalizers,
Expand Down Expand Up @@ -82,13 +83,12 @@ impl RequestBuilder<(String, Vec<Event>)> for AzureBlobRequestOptions {

let blob_data = payload.into_payload();

debug!(
message = "Sending events.",
bytes = ?blob_data.len(),
events_len = ?azure_metadata.count,
blob = ?azure_metadata.partition_key,
container = ?self.container_name,
);
VectorSendEventMetadata {
bytes: blob_data.len(),
events_len: azure_metadata.count,
blob: azure_metadata.partition_key.clone(),
container: self.container_name.clone(),
}.emit_sending_event();

AzureBlobRequest {
blob_data,
Expand Down
Loading

0 comments on commit 3f8b190

Please sign in to comment.