Skip to content
This repository has been archived by the owner on Oct 28, 2024. It is now read-only.

Merge custom changes into v0.39 #19

Merged
merged 56 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
a5e48bb
chore(releasing): Prepare v0.36.0 release
jszwedko Feb 5, 2024
0894ee8
chore(vrl stdlib): Fix redact doc URL templating
jszwedko Feb 13, 2024
8c3b2ce
chore(releasing): Fix markdown formatting of v0.36.0 release description
jszwedko Feb 13, 2024
1baa6b2
chore(releasing): Add additional note about new VRL decoder
jszwedko Feb 13, 2024
646c9ba
chore(releasing): Fix markdown formatting of v0.36.0 description
jszwedko Feb 13, 2024
a10a137
chore(releases website): 0.36 changelog fixes (#19875)
hdhoang Feb 16, 2024
3057ccf
docs(remap transform): Fix `drop_on_abort` docs (#19918)
jszwedko Feb 20, 2024
f1f8c1b
chore(website): bump openssl version used for links in docs (#19880)
hhromic Feb 21, 2024
f243f1c
chore: Remove used changelog entries
jszwedko Mar 8, 2024
a467715
fix (aws service): use http client so we can use openssl tls. (#19939)
StephenWakely Feb 28, 2024
9def84e
fix(aws service): determine region using our http client (#19972)
StephenWakely Feb 28, 2024
54bcee7
chore(deps): Update lockfree-object-pool to 0.1.5 (#20001)
jszwedko Mar 6, 2024
e4951cc
chore(deps): Bump whoami to 1.5.0 (#20018)
jszwedko Mar 8, 2024
28760fb
fix(aws provider): Enable `credentials-process` for `aws-config` (#20…
jszwedko Mar 8, 2024
6313331
fix(compression): Fix gzip and zlib performance degradation (#20032)
Hexta Mar 8, 2024
a8cd2a2
chore(deps): Update mio (#20005)
jszwedko Mar 5, 2024
1ea58e4
chore(releasing): Add missing changelog entries (#20041)
jszwedko Mar 8, 2024
2857180
chore(releasing): Prepare v0.36.1 release
jszwedko Mar 8, 2024
b58c864
chore(releasing): Fix formatting of v0.36.1 release
jszwedko Mar 11, 2024
c1da408
chore(releasing): Prepare v0.37.0 release
jszwedko Mar 25, 2024
583cd83
fix premature ack when data file is full
anil-db Feb 15, 2024
9555193
retry s3 request and when observing ConstructionFailure
anil-db Mar 11, 2024
dd984ea
chore(docs): note for 0.37 about incorrect ddtags parsing behavior (#…
neuronull Mar 28, 2024
716160d
chore(docs): Remove package deprecation banner (#20181)
jszwedko Mar 26, 2024
a81f3b3
docs(dnstap source, releasing): Add breaking change note for dnstap s…
jszwedko Mar 28, 2024
c242a4f
Merge pull request #3 from flaviofcruz/v0.36.1-custom
flaviofcruz Mar 29, 2024
3f4859f
docs: fix type cardinality docs (#20209)
saitonakamura Apr 1, 2024
3ea0c18
Update with new send logs
andrewma2 Apr 3, 2024
4a4bfb9
Merge pull request #5 from andrewma2/AddSendInfoLogs
andrewma2 Apr 5, 2024
17fb71c
fix(enrichment_tables): bring back support for `GeoLite2-City` db (#2…
esensar Mar 27, 2024
b7495c2
chore(ci): peg `fakeintake` docker image (#20196)
neuronull Mar 27, 2024
4496b3d
chore(ci): Drop `apt-get upgrade` (#20203)
jszwedko Mar 28, 2024
2cb35f1
chore(ci): Remove pip install of modules (#20204)
jszwedko Mar 28, 2024
1cb0b96
chore(ci): Only use one label for selecting GHA runner (#20210)
jszwedko Mar 29, 2024
53c9a7d
chore(deps): Update h2 (#20236)
jszwedko Apr 5, 2024
cb6635a
chore(releasing): Prepare v0.37.1 release
jszwedko Apr 8, 2024
c1b7dbe
Make all sink errors retriable
andrewma2 Apr 15, 2024
ea1dcbb
README
andrewma2 Apr 15, 2024
bd58e68
Add unit tests
andrewma2 Apr 26, 2024
d53cb04
Cover expired token case
andrewma2 May 2, 2024
dc6d128
Add metrics
andrewma2 May 2, 2024
42e8af8
Add info logs + extra comments
andrewma2 May 3, 2024
150a890
Merge pull request #8 from andrewma2/MakeAllRetriesRetriable
andrewma2 May 7, 2024
f638011
Merge remote-tracking branch 'origin/v0.36.1' into v0.37.1-custom
flaviofcruz May 13, 2024
4ae6f57
Merge pull request #11 from flaviofcruz/v0.37.1-custom
flaviofcruz May 13, 2024
c7fbd95
Retry
andrewma2 May 13, 2024
158c818
Update readme
andrewma2 May 15, 2024
a4f73e3
Merge pull request #12 from andrewma2/FixRetry
andrewma2 May 15, 2024
c815e8b
Set version
andrewma2 May 15, 2024
8d43a8f
Update readme
andrewma2 May 15, 2024
f05cd5d
Merge pull request #13 from andrewma2/SetDatabricksVersion
andrewma2 May 16, 2024
83dc2ec
Implement new event + refactor emitting
andrewma2 Jun 12, 2024
acea5c6
Update changelog
andrewma2 Jun 18, 2024
9c772c1
Merge pull request #14 from andrewma2/AfterSendInfoLog
andrewma2 Jun 20, 2024
c75cf38
Merge remote-tracking branch 'databricks/v0.37' into v0.39
andrewma2 Aug 20, 2024
af37f5e
Update version
andrewma2 Aug 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ node_modules
tests/data/wasm/*/target
heaptrack.*
massif.*
.idea

# tilt
tilt_modules/
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "vector"
version = "0.39.0"
version = "0.39.0-databricks-v1"

Check failure on line 3 in Cargo.toml

View workflow job for this annotation

GitHub Actions / Check Spelling

`databricks` is not a recognized word. (unrecognized-spelling)
authors = ["Vector Contributors <[email protected]>"]
edition = "2021"
description = "A lightweight and ultra-fast tool for building observability pipelines"
Expand Down
8 changes: 8 additions & 0 deletions README.databricks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
This lists custom changes merged in Databricks fork of Vector.

Check failure on line 1 in README.databricks.md

View workflow job for this annotation

GitHub Actions / Check Spelling

`Databricks` is not a recognized word. (unrecognized-spelling)

Check failure on line 1 in README.databricks.md

View workflow job for this annotation

GitHub Actions / Check Spelling

`databricks` is not a recognized word. (check-file-path)
1. Fix premature ack when data file is full. https://github.com/databricks/vector/pull/1
2. Retry s3 request even when observing ConstructionFailure to avoid data loss. https://github.com/databricks/vector/pull/2
3. Updating/adding INFO logs for when vector sends to cloud storage. https://github.com/databricks/vector/pull/5
4. Allow retries on all sink exceptions https://github.com/databricks/vector/pull/7
5. Also allowing retries on AccessDenied exceptions in AWS https://github.com/databricks/vector/pull/12
6. Updating version to also carry a Databricks version https://github.com/databricks/vector/pull/13

Check failure on line 7 in README.databricks.md

View workflow job for this annotation

GitHub Actions / Check Spelling

`Databricks` is not a recognized word. (unrecognized-spelling)
7. Add a new event for successful upload to cloud storage (+ rework old send) https://github.com/databricks/vector/pull/14
4 changes: 2 additions & 2 deletions lib/vector-buffers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ impl<T> InMemoryBufferable for T where
/// An item that can be buffered.
///
/// This supertrait serves as the base trait for any item that can be pushed into a buffer.
pub trait Bufferable: InMemoryBufferable + Encodable {}
pub trait Bufferable: InMemoryBufferable + Encodable + Clone {}

// Blanket implementation for anything that is already bufferable.
impl<T> Bufferable for T where T: InMemoryBufferable + Encodable {}
impl<T> Bufferable for T where T: InMemoryBufferable + Encodable + Clone {}

pub trait EventCount {
fn event_count(&self) -> usize;
Expand Down
69 changes: 68 additions & 1 deletion lib/vector-buffers/src/variants/disk_v2/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use std::io::Cursor;
use futures::{stream, StreamExt};
use tokio_test::{assert_pending, assert_ready, task::spawn};
use tracing::Instrument;
use vector_common::finalization::Finalizable;
use vector_common::finalization::{AddBatchNotifier, BatchStatus, Finalizable};
use vector_common::finalization::{BatchNotifier};

use super::{create_default_buffer_v2, read_next, read_next_some};
use crate::{
Expand All @@ -12,6 +13,72 @@ use crate::{
variants::disk_v2::{tests::create_default_buffer_v2_with_usage, writer::RecordWriter},
EventCount,
};
use crate::variants::disk_v2::WriterError;

// Test to validate that the record is not acknowledged even when there is data file full error in write file to disk.
#[tokio::test]
async fn archive_record_not_acknowledge_when_data_file_full() {
let mut record_writer =
RecordWriter::new(Cursor::new(Vec::new()), 99, 16_384, 105, 55);
let mut record = SizedRecord::new(1);
let (batch, mut receiver) = BatchNotifier::new_with_receiver();
record.add_batch_notifier(batch);

match record_writer.archive_record(1, record) {
Ok(_) => {
assert!(false, "Was expecting WriterError::DataFileFull but got Ok")
},
Err(we) => {
match we {
WriterError::DataFileFull {
record: _old_record,
serialized_len: _,
} => {
match receiver.try_recv() {
Err(_) => {
assert!(true, "Was expecting event to be not delivered");
}
// Do not expect try_recv to return Ok.
_ => {
assert!(false, "Was expecting receiver's try_recv to not complete");
}
}
},
_ => {
assert!(false, "Was expecting WriterError::DataFileFull but got other Error");
}
}
}
}
}

// Test to validate that the record is acknowledged written to disk/archived.
#[tokio::test]
async fn archive_record_acknowledge_when_archived() {
let mut record_writer =
RecordWriter::new(Cursor::new(Vec::new()), 0, 16_384, 105, 55);
let mut record = SizedRecord::new(1);
let (batch, mut receiver) = BatchNotifier::new_with_receiver();
record.add_batch_notifier(batch);

match record_writer.archive_record(1, record) {
Ok(_) => {
assert!(true, "Was expecting Ok");
match receiver.try_recv() {
Err(_) => {
assert!(false, "Was expecting event to be delivered");
}
// Do expect try_recv to return Ok.
Ok(status) => {
assert!(status == BatchStatus::Delivered, "Was expecting receiver's try_recv to complete");
}
}
},
Err(_) => {
assert!(false, "Was expecting archive record to succeed");
}
}
}

#[tokio::test]
async fn basic_read_write_loop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ async fn reader_throws_error_when_record_is_undecodable_via_metadata() {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
struct ControllableRecord(u8);

impl Encodable for ControllableRecord {
Expand Down
14 changes: 5 additions & 9 deletions lib/vector-buffers/src/variants/disk_v2/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,9 @@ where
// the actual encoded size and then check it against the limit.
//
// C'est la vie.
// Clone record here because encode will consume it and ack it to source.
// We have not yet archived the record, so we should avoid acknowledging the event.
let old_record = record.clone();
let encode_result = {
let mut encode_buf = (&mut self.encode_buf).limit(self.max_record_size);
record.encode(&mut encode_buf)
Expand Down Expand Up @@ -532,15 +535,8 @@ where
"Archived record is too large to fit in remaining free space of current data file."
);

// We have to decode the record back out to actually be able to give it back. If we
// can't decode it for some reason, this is entirely an unrecoverable error, since an
// encoded record should always be decodable within the same process that encoded it.
let record = T::decode(T::get_metadata(), &self.encode_buf[..]).map_err(|_| {
WriterError::InconsistentState {
reason: "failed to decode record immediately after encoding it".to_string(),
}
})?;

// assign old record rather that decoding again
let record = old_record;
return Err(WriterError::DataFileFull {
record,
serialized_len,
Expand Down
4 changes: 2 additions & 2 deletions src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ static RETRIABLE_CODES: OnceLock<RegexSet> = OnceLock::new();
pub fn is_retriable_error<T>(error: &SdkError<T, HttpResponse>) -> bool {
match error {
SdkError::TimeoutError(_) | SdkError::DispatchFailure(_) => true,
SdkError::ConstructionFailure(_) => false,
SdkError::ConstructionFailure(_) => true,
SdkError::ResponseError(err) => check_response(err.raw()),
SdkError::ServiceError(err) => check_response(err.raw()),
_ => {
Expand Down Expand Up @@ -82,7 +82,7 @@ fn check_response(res: &HttpResponse) -> bool {
//
// Now just look for those when it's a client_error
let re = RETRIABLE_CODES.get_or_init(|| {
RegexSet::new(["RequestTimeout", "RequestExpired", "ThrottlingException"])
RegexSet::new(["RequestTimeout", "RequestExpired", "ThrottlingException", "ExpiredToken", "AccessDenied"])
.expect("invalid regex")
});

Expand Down
4 changes: 4 additions & 0 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ mod redis;
#[cfg(feature = "transforms-impl-reduce")]
mod reduce;
mod remap;
#[cfg(all(feature = "aws-core", feature = "sinks-azure_blob"))]
mod retries;
mod sample;
#[cfg(feature = "sinks-sematext")]
mod sematext_metrics;
Expand Down Expand Up @@ -249,6 +251,8 @@ pub(crate) use self::redis::*;
pub(crate) use self::reduce::*;
#[cfg(feature = "transforms-remap")]
pub(crate) use self::remap::*;
#[cfg(all(feature = "aws-core", feature = "sinks-azure_blob"))]
pub(crate) use self::retries::*;
#[cfg(feature = "transforms-impl-sample")]
pub(crate) use self::sample::*;
#[cfg(feature = "sinks-sematext")]
Expand Down
22 changes: 22 additions & 0 deletions src/internal_events/retries.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use metrics::counter;
use vector_lib::internal_event::InternalEvent;

#[derive(Debug)]
pub struct CheckRetryEvent<'a> {
pub status_code: &'a str,
pub retry: bool,
}

impl InternalEvent for CheckRetryEvent<'_> {
fn emit(self) {
debug!(
message = "Considering retry on error.",
status_code = self.status_code,
retry = self.retry,
);
counter!("sink_retries_total", 1,
"status_code" => self.status_code.to_string(),
"retry" => self.retry.to_string(),
);
}
}
10 changes: 9 additions & 1 deletion src/sinks/aws_s3/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
self,
config::{S3Options, S3RetryLogic},
partitioner::S3KeyPartitioner,
service::S3Service,
service::{S3Service, S3Response},
sink::S3Sink,
},
util::{
Expand Down Expand Up @@ -204,6 +204,14 @@ impl S3SinkConfig {
let request_limits = self.request.into_settings();
let service = ServiceBuilder::new()
.settings(request_limits, S3RetryLogic)
// Add another layer after retries for emitting our event log message
// Returns back the same result so it continues to work downstream
.map_result(|result: Result<S3Response, _>| {
if let Ok(ref response) = result {
response.send_event_metadata.emit_upload_event();
}
result
})
.service(service);

let offset = self
Expand Down
14 changes: 12 additions & 2 deletions src/sinks/aws_s3/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
},
util::{
metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression,
RequestBuilder,
RequestBuilder, vector_event::VectorSendEventMetadata,
},
},
};
Expand Down Expand Up @@ -64,6 +64,7 @@ impl RequestBuilder<(S3PartitionKey, Vec<Event>)> for S3RequestOptions {
let metadata = S3Metadata {
partition_key,
s3_key: s3_key_prefix,
count: events.len(),
finalizers,
};

Expand Down Expand Up @@ -103,8 +104,17 @@ impl RequestBuilder<(S3PartitionKey, Vec<Event>)> for S3RequestOptions {

s3metadata.s3_key = format_s3_key(&s3metadata.s3_key, &filename, &extension);

let body = payload.into_payload();

VectorSendEventMetadata {
bytes: body.len(),
events_len: s3metadata.count,
blob: s3metadata.s3_key.clone(),
container: self.bucket.clone(),
}.emit_sending_event();

S3Request {
body: payload.into_payload(),
body: body,
bucket: self.bucket.clone(),
metadata: s3metadata,
request_metadata,
Expand Down
11 changes: 10 additions & 1 deletion src/sinks/azure_blob/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use azure_storage_blobs::prelude::*;
use std::result::Result as StdResult;
use tower::ServiceBuilder;
use vector_lib::codecs::{encoding::Framer, JsonSerializerConfig, NewlineDelimitedEncoderConfig};
use vector_lib::configurable::configurable_component;
Expand All @@ -13,7 +14,7 @@ use crate::{
config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
sinks::{
azure_common::{
self, config::AzureBlobRetryLogic, service::AzureBlobService, sink::AzureBlobSink,
self, config::{AzureBlobRetryLogic, AzureBlobResponse}, service::AzureBlobService, sink::AzureBlobSink,
},
util::{
partitioner::KeyPartitioner, BatchConfig, BulkSizeBasedDefaultBatchSettings,
Expand Down Expand Up @@ -213,6 +214,14 @@ impl AzureBlobSinkConfig {
let request_limits = self.request.into_settings();
let service = ServiceBuilder::new()
.settings(request_limits, AzureBlobRetryLogic)
// Add another layer after retries for emitting our event log message
// Returns back the same result so it continues to work downstream
.map_result(|result: StdResult<AzureBlobResponse, _>| {
if let Ok(ref response) = result {
response.send_event_metadata.emit_upload_event();
}
result
})
.service(AzureBlobService::new(client));

// Configure our partitioning/batching.
Expand Down
16 changes: 8 additions & 8 deletions src/sinks/azure_blob/request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
azure_common::config::{AzureBlobMetadata, AzureBlobRequest},
util::{
metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression,
RequestBuilder,
RequestBuilder, vector_event::VectorSendEventMetadata,
},
},
};
Expand Down Expand Up @@ -50,6 +50,7 @@ impl RequestBuilder<(String, Vec<Event>)> for AzureBlobRequestOptions {
let finalizers = events.take_finalizers();
let azure_metadata = AzureBlobMetadata {
partition_key,
container_name: self.container_name.clone(),
count: events.len(),
byte_size: events.estimated_json_encoded_size_of(),
finalizers,
Expand Down Expand Up @@ -82,13 +83,12 @@ impl RequestBuilder<(String, Vec<Event>)> for AzureBlobRequestOptions {

let blob_data = payload.into_payload();

debug!(
message = "Sending events.",
bytes = ?blob_data.len(),
events_len = ?azure_metadata.count,
blob = ?azure_metadata.partition_key,
container = ?self.container_name,
);
VectorSendEventMetadata {
bytes: blob_data.len(),
events_len: azure_metadata.count,
blob: azure_metadata.partition_key.clone(),
container: self.container_name.clone(),
}.emit_sending_event();

AzureBlobRequest {
blob_data,
Expand Down
Loading
Loading