diff --git a/.gitignore b/.gitignore index 24b0593d51d6c..f16172cebd1f3 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,7 @@ node_modules tests/data/wasm/*/target heaptrack.* massif.* +.idea # tilt tilt_modules/ diff --git a/Cargo.lock b/Cargo.lock index dfccefda9e01b..2dc083ffb3e55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10418,7 +10418,7 @@ dependencies = [ [[package]] name = "vector" -version = "0.39.0" +version = "0.39.0-databricks-v1" dependencies = [ "apache-avro", "approx", diff --git a/Cargo.toml b/Cargo.toml index ae9039b6572bd..4645895bf99cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "vector" -version = "0.39.0" +version = "0.39.0-databricks-v1" authors = ["Vector Contributors "] edition = "2021" description = "A lightweight and ultra-fast tool for building observability pipelines" diff --git a/README.databricks.md b/README.databricks.md new file mode 100644 index 0000000000000..bf7743bbf05de --- /dev/null +++ b/README.databricks.md @@ -0,0 +1,8 @@ +This lists custom changes merged in Databricks fork of Vector. +1. Fix premature ack when data file is full. https://github.com/databricks/vector/pull/1 +2. Retry s3 request even when observing ConstructionFailure to avoid data loss. https://github.com/databricks/vector/pull/2 +3. Updating/adding INFO logs for when vector sends to cloud storage. https://github.com/databricks/vector/pull/5 +4. Allow retries on all sink exceptions https://github.com/databricks/vector/pull/7 +5. Also allowing retries on AccessDenied exceptions in AWS https://github.com/databricks/vector/pull/12 +6. Updating version to also carry a Databricks version https://github.com/databricks/vector/pull/13 +7. Add a new event for successful upload to cloud storage (+ rework old send) https://github.com/databricks/vector/pull/14 diff --git a/lib/vector-buffers/src/lib.rs b/lib/vector-buffers/src/lib.rs index d8b28c278ac75..94eb63bfc9951 100644 --- a/lib/vector-buffers/src/lib.rs +++ b/lib/vector-buffers/src/lib.rs @@ -104,10 +104,10 @@ impl InMemoryBufferable for T where /// An item that can be buffered. /// /// This supertrait serves as the base trait for any item that can be pushed into a buffer. -pub trait Bufferable: InMemoryBufferable + Encodable {} +pub trait Bufferable: InMemoryBufferable + Encodable + Clone {} // Blanket implementation for anything that is already bufferable. -impl Bufferable for T where T: InMemoryBufferable + Encodable {} +impl Bufferable for T where T: InMemoryBufferable + Encodable + Clone {} pub trait EventCount { fn event_count(&self) -> usize; diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/basic.rs b/lib/vector-buffers/src/variants/disk_v2/tests/basic.rs index 781e30ad5b217..6be2a72d5ea36 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/basic.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/basic.rs @@ -3,7 +3,8 @@ use std::io::Cursor; use futures::{stream, StreamExt}; use tokio_test::{assert_pending, assert_ready, task::spawn}; use tracing::Instrument; -use vector_common::finalization::Finalizable; +use vector_common::finalization::{AddBatchNotifier, BatchStatus, Finalizable}; +use vector_common::finalization::{BatchNotifier}; use super::{create_default_buffer_v2, read_next, read_next_some}; use crate::{ @@ -12,6 +13,72 @@ use crate::{ variants::disk_v2::{tests::create_default_buffer_v2_with_usage, writer::RecordWriter}, EventCount, }; +use crate::variants::disk_v2::WriterError; + +// Test to validate that the record is not acknowledged even when there is data file full error in write file to disk. +#[tokio::test] +async fn archive_record_not_acknowledge_when_data_file_full() { + let mut record_writer = + RecordWriter::new(Cursor::new(Vec::new()), 99, 16_384, 105, 55); + let mut record = SizedRecord::new(1); + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + record.add_batch_notifier(batch); + + match record_writer.archive_record(1, record) { + Ok(_) => { + assert!(false, "Was expecting WriterError::DataFileFull but got Ok") + }, + Err(we) => { + match we { + WriterError::DataFileFull { + record: _old_record, + serialized_len: _, + } => { + match receiver.try_recv() { + Err(_) => { + assert!(true, "Was expecting event to be not delivered"); + } + // Do not expect try_recv to return Ok. + _ => { + assert!(false, "Was expecting receiver's try_recv to not complete"); + } + } + }, + _ => { + assert!(false, "Was expecting WriterError::DataFileFull but got other Error"); + } + } + } + } +} + +// Test to validate that the record is acknowledged written to disk/archived. +#[tokio::test] +async fn archive_record_acknowledge_when_archived() { + let mut record_writer = + RecordWriter::new(Cursor::new(Vec::new()), 0, 16_384, 105, 55); + let mut record = SizedRecord::new(1); + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + record.add_batch_notifier(batch); + + match record_writer.archive_record(1, record) { + Ok(_) => { + assert!(true, "Was expecting Ok"); + match receiver.try_recv() { + Err(_) => { + assert!(false, "Was expecting event to be delivered"); + } + // Do expect try_recv to return Ok. + Ok(status) => { + assert!(status == BatchStatus::Delivered, "Was expecting receiver's try_recv to complete"); + } + } + }, + Err(_) => { + assert!(false, "Was expecting archive record to succeed"); + } + } +} #[tokio::test] async fn basic_read_write_loop() { diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs b/lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs index 0d51d000bb7b8..389a91f7b84ba 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs @@ -700,7 +700,7 @@ async fn reader_throws_error_when_record_is_undecodable_via_metadata() { } } - #[derive(Debug)] + #[derive(Debug, Clone)] struct ControllableRecord(u8); impl Encodable for ControllableRecord { diff --git a/lib/vector-buffers/src/variants/disk_v2/writer.rs b/lib/vector-buffers/src/variants/disk_v2/writer.rs index fc8f60672e1af..35fe6e92fe815 100644 --- a/lib/vector-buffers/src/variants/disk_v2/writer.rs +++ b/lib/vector-buffers/src/variants/disk_v2/writer.rs @@ -467,6 +467,9 @@ where // the actual encoded size and then check it against the limit. // // C'est la vie. + // Clone record here because encode will consume it and ack it to source. + // We have not yet archived the record, so we should avoid acknowledging the event. + let old_record = record.clone(); let encode_result = { let mut encode_buf = (&mut self.encode_buf).limit(self.max_record_size); record.encode(&mut encode_buf) @@ -532,15 +535,8 @@ where "Archived record is too large to fit in remaining free space of current data file." ); - // We have to decode the record back out to actually be able to give it back. If we - // can't decode it for some reason, this is entirely an unrecoverable error, since an - // encoded record should always be decodable within the same process that encoded it. - let record = T::decode(T::get_metadata(), &self.encode_buf[..]).map_err(|_| { - WriterError::InconsistentState { - reason: "failed to decode record immediately after encoding it".to_string(), - } - })?; - + // assign old record rather that decoding again + let record = old_record; return Err(WriterError::DataFileFull { record, serialized_len, diff --git a/src/aws/mod.rs b/src/aws/mod.rs index 2f4ea2678a637..d5592254bfc73 100644 --- a/src/aws/mod.rs +++ b/src/aws/mod.rs @@ -52,7 +52,7 @@ static RETRIABLE_CODES: OnceLock = OnceLock::new(); pub fn is_retriable_error(error: &SdkError) -> bool { match error { SdkError::TimeoutError(_) | SdkError::DispatchFailure(_) => true, - SdkError::ConstructionFailure(_) => false, + SdkError::ConstructionFailure(_) => true, SdkError::ResponseError(err) => check_response(err.raw()), SdkError::ServiceError(err) => check_response(err.raw()), _ => { @@ -82,7 +82,7 @@ fn check_response(res: &HttpResponse) -> bool { // // Now just look for those when it's a client_error let re = RETRIABLE_CODES.get_or_init(|| { - RegexSet::new(["RequestTimeout", "RequestExpired", "ThrottlingException"]) + RegexSet::new(["RequestTimeout", "RequestExpired", "ThrottlingException", "ExpiredToken", "AccessDenied"]) .expect("invalid regex") }); diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index a303cdabe6175..ae945cb50edf9 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -111,6 +111,8 @@ mod redis; #[cfg(feature = "transforms-impl-reduce")] mod reduce; mod remap; +#[cfg(all(feature = "aws-core", feature = "sinks-azure_blob"))] +mod retries; mod sample; #[cfg(feature = "sinks-sematext")] mod sematext_metrics; @@ -249,6 +251,8 @@ pub(crate) use self::redis::*; pub(crate) use self::reduce::*; #[cfg(feature = "transforms-remap")] pub(crate) use self::remap::*; +#[cfg(all(feature = "aws-core", feature = "sinks-azure_blob"))] +pub(crate) use self::retries::*; #[cfg(feature = "transforms-impl-sample")] pub(crate) use self::sample::*; #[cfg(feature = "sinks-sematext")] diff --git a/src/internal_events/retries.rs b/src/internal_events/retries.rs new file mode 100644 index 0000000000000..892d3301cc3b4 --- /dev/null +++ b/src/internal_events/retries.rs @@ -0,0 +1,22 @@ +use metrics::counter; +use vector_lib::internal_event::InternalEvent; + +#[derive(Debug)] +pub struct CheckRetryEvent<'a> { + pub status_code: &'a str, + pub retry: bool, +} + +impl InternalEvent for CheckRetryEvent<'_> { + fn emit(self) { + debug!( + message = "Considering retry on error.", + status_code = self.status_code, + retry = self.retry, + ); + counter!("sink_retries_total", 1, + "status_code" => self.status_code.to_string(), + "retry" => self.retry.to_string(), + ); + } +} diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index c1c8067fb8909..8228de8680ac2 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -18,7 +18,7 @@ use crate::{ self, config::{S3Options, S3RetryLogic}, partitioner::S3KeyPartitioner, - service::S3Service, + service::{S3Service, S3Response}, sink::S3Sink, }, util::{ @@ -204,6 +204,14 @@ impl S3SinkConfig { let request_limits = self.request.into_settings(); let service = ServiceBuilder::new() .settings(request_limits, S3RetryLogic) + // Add another layer after retries for emitting our event log message + // Returns back the same result so it continues to work downstream + .map_result(|result: Result| { + if let Ok(ref response) = result { + response.send_event_metadata.emit_upload_event(); + } + result + }) .service(service); let offset = self diff --git a/src/sinks/aws_s3/sink.rs b/src/sinks/aws_s3/sink.rs index 0443f377a55e6..1c8f6baf2befa 100644 --- a/src/sinks/aws_s3/sink.rs +++ b/src/sinks/aws_s3/sink.rs @@ -18,7 +18,7 @@ use crate::{ }, util::{ metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression, - RequestBuilder, + RequestBuilder, vector_event::VectorSendEventMetadata, }, }, }; @@ -64,6 +64,7 @@ impl RequestBuilder<(S3PartitionKey, Vec)> for S3RequestOptions { let metadata = S3Metadata { partition_key, s3_key: s3_key_prefix, + count: events.len(), finalizers, }; @@ -103,8 +104,17 @@ impl RequestBuilder<(S3PartitionKey, Vec)> for S3RequestOptions { s3metadata.s3_key = format_s3_key(&s3metadata.s3_key, &filename, &extension); + let body = payload.into_payload(); + + VectorSendEventMetadata { + bytes: body.len(), + events_len: s3metadata.count, + blob: s3metadata.s3_key.clone(), + container: self.bucket.clone(), + }.emit_sending_event(); + S3Request { - body: payload.into_payload(), + body: body, bucket: self.bucket.clone(), metadata: s3metadata, request_metadata, diff --git a/src/sinks/azure_blob/config.rs b/src/sinks/azure_blob/config.rs index bdf1182f384ee..f63e661410ccf 100644 --- a/src/sinks/azure_blob/config.rs +++ b/src/sinks/azure_blob/config.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use azure_storage_blobs::prelude::*; +use std::result::Result as StdResult; use tower::ServiceBuilder; use vector_lib::codecs::{encoding::Framer, JsonSerializerConfig, NewlineDelimitedEncoderConfig}; use vector_lib::configurable::configurable_component; @@ -13,7 +14,7 @@ use crate::{ config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext}, sinks::{ azure_common::{ - self, config::AzureBlobRetryLogic, service::AzureBlobService, sink::AzureBlobSink, + self, config::{AzureBlobRetryLogic, AzureBlobResponse}, service::AzureBlobService, sink::AzureBlobSink, }, util::{ partitioner::KeyPartitioner, BatchConfig, BulkSizeBasedDefaultBatchSettings, @@ -213,6 +214,14 @@ impl AzureBlobSinkConfig { let request_limits = self.request.into_settings(); let service = ServiceBuilder::new() .settings(request_limits, AzureBlobRetryLogic) + // Add another layer after retries for emitting our event log message + // Returns back the same result so it continues to work downstream + .map_result(|result: StdResult| { + if let Ok(ref response) = result { + response.send_event_metadata.emit_upload_event(); + } + result + }) .service(AzureBlobService::new(client)); // Configure our partitioning/batching. diff --git a/src/sinks/azure_blob/request_builder.rs b/src/sinks/azure_blob/request_builder.rs index 507874ab406f1..f9e38275b23de 100644 --- a/src/sinks/azure_blob/request_builder.rs +++ b/src/sinks/azure_blob/request_builder.rs @@ -12,7 +12,7 @@ use crate::{ azure_common::config::{AzureBlobMetadata, AzureBlobRequest}, util::{ metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression, - RequestBuilder, + RequestBuilder, vector_event::VectorSendEventMetadata, }, }, }; @@ -50,6 +50,7 @@ impl RequestBuilder<(String, Vec)> for AzureBlobRequestOptions { let finalizers = events.take_finalizers(); let azure_metadata = AzureBlobMetadata { partition_key, + container_name: self.container_name.clone(), count: events.len(), byte_size: events.estimated_json_encoded_size_of(), finalizers, @@ -82,13 +83,12 @@ impl RequestBuilder<(String, Vec)> for AzureBlobRequestOptions { let blob_data = payload.into_payload(); - debug!( - message = "Sending events.", - bytes = ?blob_data.len(), - events_len = ?azure_metadata.count, - blob = ?azure_metadata.partition_key, - container = ?self.container_name, - ); + VectorSendEventMetadata { + bytes: blob_data.len(), + events_len: azure_metadata.count, + blob: azure_metadata.partition_key.clone(), + container: self.container_name.clone(), + }.emit_sending_event(); AzureBlobRequest { blob_data, diff --git a/src/sinks/azure_common/config.rs b/src/sinks/azure_common/config.rs index 5e10ea797c305..405385f56648c 100644 --- a/src/sinks/azure_common/config.rs +++ b/src/sinks/azure_common/config.rs @@ -16,7 +16,8 @@ use vector_lib::{ use crate::{ event::{EventFinalizers, EventStatus, Finalizable}, - sinks::{util::retries::RetryLogic, Healthcheck}, + internal_events::{CheckRetryEvent}, + sinks::{util::{retries::RetryLogic, vector_event::VectorSendEventMetadata}, Healthcheck}, }; #[derive(Debug, Clone)] @@ -47,6 +48,7 @@ impl MetaDescriptive for AzureBlobRequest { #[derive(Clone, Debug)] pub struct AzureBlobMetadata { pub partition_key: String, + pub container_name: String, pub count: usize, pub byte_size: JsonSize, pub finalizers: EventFinalizers, @@ -60,8 +62,22 @@ impl RetryLogic for AzureBlobRetryLogic { type Response = AzureBlobResponse; fn is_retriable_error(&self, error: &Self::Error) -> bool { - error.status().is_server_error() - || StatusCode::TOO_MANY_REQUESTS.as_u16() == Into::::into(error.status()) + // For now, retry request in all cases + + // error.status().is_server_error() + // || StatusCode::TOO_MANY_REQUESTS.as_u16() == Into::::into(error.status()) + let retry = true; + info!( + message = "Considered retry on error.", + error = %error, + retry = retry, + ); + + emit!(CheckRetryEvent { + status_code: error.error_code().unwrap_or(""), + retry: retry, + }); + retry } } @@ -70,6 +86,8 @@ pub struct AzureBlobResponse { pub inner: PutBlockBlobResponse, pub events_byte_size: GroupedCountByteSize, pub byte_size: usize, + // Extending S3 response with additional information relevant for vector send event logs + pub send_event_metadata: VectorSendEventMetadata, } impl DriverResponse for AzureBlobResponse { @@ -191,3 +209,29 @@ pub fn build_client( } Ok(std::sync::Arc::new(client)) } + +#[cfg(test)] +mod tests { + use super::*; + use azure_core::{ + StatusCode, BytesStream, Response, + error::HttpError, + headers::Headers, + }; + + #[tokio::test] + async fn test_retriable() { + // Create dummy response + // For now, specify only status code for testing + // BadRequest (400) should retry + let response = Response::new( + StatusCode::BadRequest, + Headers::new(), + Box::pin(BytesStream::new("test")) + ); + let error = HttpError::new(response).await; + assert!( + AzureBlobRetryLogic.is_retriable_error(&error) + ); + } +} diff --git a/src/sinks/azure_common/service.rs b/src/sinks/azure_common/service.rs index eed5a068c48fa..a7e70ed268153 100644 --- a/src/sinks/azure_common/service.rs +++ b/src/sinks/azure_common/service.rs @@ -9,7 +9,10 @@ use futures::future::BoxFuture; use tower::Service; use tracing::Instrument; -use crate::sinks::azure_common::config::{AzureBlobRequest, AzureBlobResponse}; +use crate::sinks::{ + azure_common::config::{AzureBlobRequest, AzureBlobResponse}, + util::vector_event::VectorSendEventMetadata, +}; #[derive(Clone)] pub struct AzureBlobService { @@ -49,6 +52,13 @@ impl Service for AzureBlobService { None => blob, }; + let send_event_metadata = VectorSendEventMetadata { + bytes: byte_size, + events_len: request.metadata.count, + blob: request.metadata.partition_key.clone(), + container: request.metadata.container_name.clone(), + }; + let result = blob .into_future() .instrument(info_span!("request").or_current()) @@ -61,6 +71,7 @@ impl Service for AzureBlobService { .request_metadata .into_events_estimated_json_encoded_byte_size(), byte_size, + send_event_metadata, }) }) } diff --git a/src/sinks/s3_common/config.rs b/src/sinks/s3_common/config.rs index a8256d8654f39..a7ce6a920490f 100644 --- a/src/sinks/s3_common/config.rs +++ b/src/sinks/s3_common/config.rs @@ -9,6 +9,7 @@ use aws_smithy_runtime_api::{ client::{orchestrator::HttpResponse, result::SdkError}, http::StatusCode, }; +use aws_smithy_types::error::metadata::ProvideErrorMetadata; use futures::FutureExt; use snafu::Snafu; use vector_lib::configurable::configurable_component; @@ -19,6 +20,7 @@ use crate::{ common::s3::S3ClientBuilder, config::ProxyConfig, http::status, + internal_events::{CheckRetryEvent}, sinks::{util::retries::RetryLogic, Healthcheck}, tls::TlsConfig, }; @@ -316,7 +318,18 @@ impl RetryLogic for S3RetryLogic { type Response = S3Response; fn is_retriable_error(&self, error: &Self::Error) -> bool { - is_retriable_error(error) + let retry = is_retriable_error(error); + info!( + message = "Considered retry on error.", + error = %error, + retry = retry, + ); + + emit!(CheckRetryEvent { + status_code: error.code().unwrap_or(""), + retry: retry, + }); + retry } } @@ -377,6 +390,13 @@ mod tests { use super::S3StorageClass; use crate::serde::json::to_string; + use super::*; + use aws_sdk_s3::operation::put_object::PutObjectError; + use aws_smithy_runtime_api::client::{result::SdkError, orchestrator::HttpResponse}; + use aws_smithy_types::body::SdkBody; + + use std::fmt; + #[test] fn storage_class_names() { for &(name, storage_class) in &[ @@ -397,4 +417,34 @@ mod tests { assert_eq!(result, storage_class); } } + + #[test] + fn test_retriable() { + // Handle unhandled + 400 status code case (from expired token code) + // Example response with token/host data removed + let response = "Once(Some(b\"\\nExpiredTokenThe provided token has expired.\"))"; + assert!( + S3RetryLogic.is_retriable_error( + &SdkError::::service_error( + PutObjectError::unhandled(BadError), + HttpResponse::new( + http::StatusCode::from_u16(400).unwrap().into(), + SdkBody::from(response) + ) + ) + ) + ); + } + + + #[derive(Debug)] + struct BadError; + + impl fmt::Display for BadError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "error") + } + } + + impl std::error::Error for BadError {} } diff --git a/src/sinks/s3_common/service.rs b/src/sinks/s3_common/service.rs index f651a576c50a5..2d382caf68851 100644 --- a/src/sinks/s3_common/service.rs +++ b/src/sinks/s3_common/service.rs @@ -18,6 +18,8 @@ use vector_lib::stream::DriverResponse; use super::config::S3Options; use super::partitioner::S3PartitionKey; +use crate::sinks::util::vector_event::VectorSendEventMetadata; + #[derive(Debug, Clone)] pub struct S3Request { pub body: Bytes, @@ -48,12 +50,15 @@ impl MetaDescriptive for S3Request { pub struct S3Metadata { pub partition_key: S3PartitionKey, pub s3_key: String, + pub count: usize, pub finalizers: EventFinalizers, } #[derive(Debug)] pub struct S3Response { - events_byte_size: GroupedCountByteSize, + pub events_byte_size: GroupedCountByteSize, + // Extending S3 response with additional information relevant for vector send event logs + pub send_event_metadata: VectorSendEventMetadata, } impl DriverResponse for S3Response { @@ -123,6 +128,13 @@ impl Service for S3Service { .request_metadata .into_events_estimated_json_encoded_byte_size(); + let send_event_metadata = VectorSendEventMetadata { + bytes: request.body.len(), + events_len: request.metadata.count, + blob: request.metadata.s3_key.clone(), + container: request.bucket.clone(), + }; + let client = self.client.clone(); Box::pin(async move { @@ -146,7 +158,10 @@ impl Service for S3Service { let result = request.send().in_current_span().await; - result.map(|_| S3Response { events_byte_size }) + result.map(|_| S3Response { + events_byte_size, + send_event_metadata + }) }) } } diff --git a/src/sinks/util/mod.rs b/src/sinks/util/mod.rs index c71b49163f0d2..eb25a2e1d9c8c 100644 --- a/src/sinks/util/mod.rs +++ b/src/sinks/util/mod.rs @@ -26,6 +26,7 @@ pub mod udp; #[cfg(all(any(feature = "sinks-socket", feature = "sinks-statsd"), unix))] pub mod unix; pub mod uri; +pub mod vector_event; pub mod zstd; use std::borrow::Cow; diff --git a/src/sinks/util/retries.rs b/src/sinks/util/retries.rs index e265be25a9089..8b72d534b4971 100644 --- a/src/sinks/util/retries.rs +++ b/src/sinks/util/retries.rs @@ -173,7 +173,9 @@ where error!(message = "Retries exhausted; dropping the request.", %error, internal_log_rate_limit = true); return None; } - + // First attempt to cast the error into something of type L::Error + // L::Error is the error specified in the definition of the RetryLogic (HTTPError for AzureRetryLogic for ex.) + // So it's looking for an error that is compatible with the retry logic type if let Some(expected) = error.downcast_ref::() { if self.logic.is_retriable_error(expected) { warn!(message = "Retrying after error.", error = %expected, internal_log_rate_limit = true); @@ -186,6 +188,7 @@ where ); None } + // Next attempt to cast the error into a timeout error } else if error.downcast_ref::().is_some() { warn!( message = "Request timed out. If this happens often while the events are actually reaching their destination, try decreasing `batch.max_bytes` and/or using `compression` if applicable. Alternatively `request.timeout_secs` can be increased.", @@ -193,12 +196,18 @@ where ); Some(self.build_retry()) } else { + // If we can't cast to either of the above cases, then give up + // However, for Azure, we may end up in this case for transient issues + // For example, a connection reset error that occasionally pops up registers with status code 104 + // This is not a status code defined for the HTTPError and the downcast fails even though we can retry in this case + // As such, we currently retry with all unexpected errors like this (TODO: refine down to more specific issues) error!( - message = "Unexpected error type; dropping the request.", + // message = "Unexpected error type; dropping the request.", + message = "Unexpected error type encountered... Retrying", %error, internal_log_rate_limit = true ); - None + Some(self.build_retry()) } } } @@ -381,6 +390,39 @@ mod tests { assert_ready_err!(fut.poll()); } + #[tokio::test] + async fn service_error_retry_even_with_diff_error() { + trace_init(); + + time::pause(); + + let policy = FibonacciRetryPolicy::new( + 5, + Duration::from_secs(1), + Duration::from_secs(10), + SvcRetryLogic, + JitterMode::None, + ); + + let (mut svc, mut handle) = mock::spawn_layer(RetryLayer::new(policy)); + + assert_ready_ok!(svc.poll_ready()); + + let fut = svc.call("hello"); + let mut fut = task::spawn(fut); + + // Even if you can't re-cast, you should still retry + assert_request_eq!(handle, "hello").send_error(BadError); + + assert_pending!(fut.poll()); + + time::advance(Duration::from_secs(2)).await; + assert_pending!(fut.poll()); + + assert_request_eq!(handle, "hello").send_response("world"); + assert_eq!(fut.await.unwrap(), "world"); + } + #[tokio::test] async fn timeout_error() { trace_init(); @@ -508,4 +550,16 @@ mod tests { } impl std::error::Error for Error {} + + // Dummy base error that isn't of expected type + #[derive(Debug)] + struct BadError; + + impl fmt::Display for BadError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "error") + } + } + + impl std::error::Error for BadError {} } diff --git a/src/sinks/util/vector_event.rs b/src/sinks/util/vector_event.rs new file mode 100644 index 0000000000000..940b49ff36057 --- /dev/null +++ b/src/sinks/util/vector_event.rs @@ -0,0 +1,36 @@ +// Structs used for our vector event logs + +// Struct for vector send events (sending, uploaded) +#[derive(Clone, Debug)] +pub struct VectorSendEventMetadata { + pub bytes: usize, + pub events_len: usize, + pub blob: String, + pub container: String, +} + +impl VectorSendEventMetadata { + pub fn emit_upload_event(&self) { + info!( + message = "Uploaded events.", + bytes = self.bytes, + events_len = self.events_len, + blob = self.blob, + container = self.container, + // VECTOR_UPLOADED_MESSAGES_EVENT + vector_event_type = 4 + ); + } + + pub fn emit_sending_event(&self) { + info!( + message = "Sending events.", + bytes = self.bytes, + events_len = self.events_len, + blob = self.blob, + container = self.container, + // VECTOR_SENDING_MESSAGES_EVENT + vector_event_type = 3 + ); + } +}