Skip to content
This repository has been archived by the owner on Oct 28, 2024. It is now read-only.

Commit

Permalink
Additional vector granularity
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewma2 committed Sep 11, 2024
1 parent e35950a commit 898be31
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 38 deletions.
1 change: 1 addition & 0 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ mod template;
mod throttle;
mod udp;
mod unix;
pub mod vector_event_logs;
#[cfg(feature = "sinks-websocket")]
mod websocket;

Expand Down
167 changes: 167 additions & 0 deletions src/internal_events/vector_event_logs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
use std::collections::HashMap;
use std::env;
use once_cell::sync::OnceCell;
use serde_json;

use crate::event::Event;
use crate::event::LogEvent;

use crate::vector_lib::ByteSizeOf;

// Structs used for our vector event logs

pub static EVENT_LOG_METADATA_FIELD: OnceCell<String> = OnceCell::new();
pub static EVENT_LOG_METADATA_KEYS: OnceCell<Vec<String>> = OnceCell::new();

pub fn get_event_log_metadata_field() -> &'static String {
// Initialize the static variable once, or return the value if it's already initialized/computed
EVENT_LOG_METADATA_FIELD.get_or_init(|| {
env::var("EVENT_LOG_METADATA_FIELD").unwrap_or_else(|_| "".to_string())
}
)
}

pub fn get_event_log_metadata_keys() -> &'static Vec<String> {
// Initialize the static variable once, or return the value if it's already initialized/computed
EVENT_LOG_METADATA_KEYS.get_or_init(|| {
let vec_string: String = env::var("EVENT_LOG_METADATA_KEYS").unwrap_or_else(|_| "".to_string());
let keys: Vec<String> = serde_json::from_str(&vec_string).unwrap_or(Vec::new());
keys
}
)
}

#[derive(Clone, Debug)]
pub struct MetadataValuesCount {
pub value_map: HashMap<String, String>,
pub count: usize,
pub size: usize,
}

// impl PartialEq for MetadataValues {
// fn eq(&self, other: &Self) -> bool {
// self.key == other.key
// }
// }
//
// impl Eq for MetadataValues {}
//
// impl std::hash::Hash for MetadataValues {
// fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
// self.key.hash(state);
// }
// }
//
// impl MetadataValues {
//
// }

// Struct for vector send events (sending, uploaded)
#[derive(Clone, Debug)]
pub struct VectorEventLogSendMetadata {
pub bytes: usize,
pub events_len: usize,
pub blob: String,
pub container: String,
pub count_map: HashMap<String, MetadataValuesCount>,
}

impl VectorEventLogSendMetadata {
pub fn emit_upload_event(&self) {
info!(
message = "Uploaded events.",
bytes = self.bytes,
events_len = self.events_len,
blob = self.blob,
container = self.container,
// VECTOR_UPLOADED_MESSAGES_EVENT
vector_event_type = 4
);
// Also sending the count break down by specified keys
self.emit_count_map("Uploaded granularity breakdown.", 4)
}

pub fn emit_sending_event(&self) {
info!(
message = "Sending events.",
bytes = self.bytes,
events_len = self.events_len,
blob = self.blob,
container = self.container,
// VECTOR_SENDING_MESSAGES_EVENT
vector_event_type = 3
);
// Also sending the count break down by specified keys
self.emit_count_map("Sending granularity breakdown.", 3)
}

fn emit_count_map(&self, message: &str, event_type: usize) {
for (_, value) in &self.count_map {
info!(
message = message,
keys = serde_json::to_string(&value.value_map).unwrap(),
bytes = value.size,
events_len = value.count,
blob = self.blob,
container = self.container,
vector_event_type = event_type,
);
}
}
}

pub fn build_key(event: &LogEvent) -> String {
let mut key_vals: Vec<String> = Vec::new();
// Get the field that holds the metadata struct itself
let field = get_event_log_metadata_field();
for key_part in get_event_log_metadata_keys() {
if let Ok(value) = event.parse_path_and_get_value(format!("{}.{}", field, key_part)) {
if let Some(val) = value {
// Remove extra quotes from string
key_vals.push(format!("{}={}", key_part, val.to_string().replace("\"", "")));
}
}
}
key_vals.join("/")
}

pub fn build_map(event: &LogEvent) -> HashMap<String, String> {
let mut val_map = HashMap::new();
let field = get_event_log_metadata_field();
for key_part in get_event_log_metadata_keys() {
if let Ok(value) = event.parse_path_and_get_value(format!("{}.{}", field, key_part)) {
if let Some(val) = value {
// Remove extra quotes from string
val_map.insert(key_part.to_string(), val.to_string().replace("\"", ""));
}
}
}
val_map
}

pub fn generate_count_map(events: &Vec<Event>) -> HashMap<String, MetadataValuesCount> {
let mut count_map = HashMap::new();
for event in events {
// Check if it's a log event (see enum defined in lib/vector-core/src/event/mod.rs)
if let Event::Log(log_event) = event {
count_map.entry(build_key(log_event))
.and_modify(
|x: &mut MetadataValuesCount| {
x.count += 1;
// For now, using pre-defined allocated bytes measure for size of event
// This may not be fully consistent with the real size of logs
// But having this a placeholder as consistent size measurement is tricky
x.size += log_event.size_of();
}
)
.or_insert(
MetadataValuesCount {
value_map: build_map(log_event),
count: 1,
size: 0,
}
);
}
}
count_map
}
2 changes: 1 addition & 1 deletion src/sinks/aws_s3/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ impl S3SinkConfig {
// Returns back the same result so it continues to work downstream
.map_result(|result: Result<S3Response, _>| {
if let Ok(ref response) = result {
response.send_event_metadata.emit_upload_event();
response.event_log_metadata.emit_upload_event();
}
result
})
Expand Down
27 changes: 20 additions & 7 deletions src/sinks/aws_s3/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ use crate::{
},
util::{
metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression,
RequestBuilder, vector_event::VectorSendEventMetadata,
RequestBuilder,
},
},
internal_events::vector_event_logs::{VectorEventLogSendMetadata, generate_count_map},
};

#[derive(Clone)]
Expand Down Expand Up @@ -61,11 +62,25 @@ impl RequestBuilder<(S3PartitionKey, Vec<Event>)> for S3RequestOptions {
let finalizers = events.take_finalizers();
let s3_key_prefix = partition_key.key_prefix.clone();

// Create event metadata here as this is where the list of events are available pre-encoding
// And we want to access this list to process the raw events to see specific field values
let event_log_metadata = VectorEventLogSendMetadata {
// Events are not encoded here yet, so byte size is not yet known
// Setting as 0 here and updating when it is set in build_request()
bytes: 0,
events_len: events.len(),
// Similarly the exact blob isn't determined here yet
blob: "".to_string(),
container: self.bucket.clone(),
count_map: generate_count_map(&events),
};

let metadata = S3Metadata {
partition_key,
s3_key: s3_key_prefix,
count: events.len(),
finalizers,
event_log_metadata: event_log_metadata,
};

(metadata, builder, events)
Expand Down Expand Up @@ -106,12 +121,10 @@ impl RequestBuilder<(S3PartitionKey, Vec<Event>)> for S3RequestOptions {

let body = payload.into_payload();

VectorSendEventMetadata {
bytes: body.len(),
events_len: s3metadata.count,
blob: s3metadata.s3_key.clone(),
container: self.bucket.clone(),
}.emit_sending_event();
// Update some components of the metadata since they've been computed now
s3metadata.event_log_metadata.bytes = body.len();
s3metadata.event_log_metadata.blob = s3metadata.s3_key.clone();
s3metadata.event_log_metadata.emit_sending_event();

S3Request {
body: body,
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/azure_blob/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ impl AzureBlobSinkConfig {
// Returns back the same result so it continues to work downstream
.map_result(|result: StdResult<AzureBlobResponse, _>| {
if let Ok(ref response) = result {
response.send_event_metadata.emit_upload_event();
response.event_log_metadata.emit_upload_event();
}
result
})
Expand Down
27 changes: 20 additions & 7 deletions src/sinks/azure_blob/request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ use crate::{
azure_common::config::{AzureBlobMetadata, AzureBlobRequest},
util::{
metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression,
RequestBuilder, vector_event::VectorSendEventMetadata,
RequestBuilder,
},
},
internal_events::vector_event_logs::{VectorEventLogSendMetadata, generate_count_map},
};

#[derive(Clone)]
Expand Down Expand Up @@ -48,12 +49,26 @@ impl RequestBuilder<(String, Vec<Event>)> for AzureBlobRequestOptions {
) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
let (partition_key, mut events) = input;
let finalizers = events.take_finalizers();

// Create event metadata here as this is where the list of events are available pre-encoding
// And we want to access this list to process the raw events to see specific field values
let event_log_metadata = VectorEventLogSendMetadata {
// Events are not encoded here yet, so byte size is not yet known
// Setting as 0 here and updating when it is set in build_request()
bytes: 0,
events_len: events.len(),
// Similarly the exact blob isn't determined here yet
blob: "".to_string(),
container: self.container_name.clone(),
count_map: generate_count_map(&events),
};
let azure_metadata = AzureBlobMetadata {
partition_key,
container_name: self.container_name.clone(),
count: events.len(),
byte_size: events.estimated_json_encoded_size_of(),
finalizers,
event_log_metadata: event_log_metadata,
};

let builder = RequestMetadataBuilder::from_events(&events);
Expand Down Expand Up @@ -83,12 +98,10 @@ impl RequestBuilder<(String, Vec<Event>)> for AzureBlobRequestOptions {

let blob_data = payload.into_payload();

VectorSendEventMetadata {
bytes: blob_data.len(),
events_len: azure_metadata.count,
blob: azure_metadata.partition_key.clone(),
container: self.container_name.clone(),
}.emit_sending_event();
// Update some components of the metadata since they've been computed now
azure_metadata.event_log_metadata.bytes = blob_data.len();
azure_metadata.event_log_metadata.blob = azure_metadata.partition_key.clone();
azure_metadata.event_log_metadata.emit_sending_event();

AzureBlobRequest {
blob_data,
Expand Down
9 changes: 5 additions & 4 deletions src/sinks/azure_common/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use vector_lib::{

use crate::{
event::{EventFinalizers, EventStatus, Finalizable},
internal_events::{CheckRetryEvent},
sinks::{util::{retries::RetryLogic, vector_event::VectorSendEventMetadata}, Healthcheck},
internal_events::{CheckRetryEvent, vector_event_logs::VectorEventLogSendMetadata},
sinks::{util::retries::RetryLogic, Healthcheck},
};

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -52,6 +52,7 @@ pub struct AzureBlobMetadata {
pub count: usize,
pub byte_size: JsonSize,
pub finalizers: EventFinalizers,
pub event_log_metadata: VectorEventLogSendMetadata,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -86,8 +87,8 @@ pub struct AzureBlobResponse {
pub inner: PutBlockBlobResponse,
pub events_byte_size: GroupedCountByteSize,
pub byte_size: usize,
// Extending S3 response with additional information relevant for vector send event logs
pub send_event_metadata: VectorSendEventMetadata,
// Extending Azure response with additional information relevant for vector send event logs
pub event_log_metadata: VectorEventLogSendMetadata,
}

impl DriverResponse for AzureBlobResponse {
Expand Down
10 changes: 1 addition & 9 deletions src/sinks/azure_common/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use tracing::Instrument;

use crate::sinks::{
azure_common::config::{AzureBlobRequest, AzureBlobResponse},
util::vector_event::VectorSendEventMetadata,
};

#[derive(Clone)]
Expand Down Expand Up @@ -52,13 +51,6 @@ impl Service<AzureBlobRequest> for AzureBlobService {
None => blob,
};

let send_event_metadata = VectorSendEventMetadata {
bytes: byte_size,
events_len: request.metadata.count,
blob: request.metadata.partition_key.clone(),
container: request.metadata.container_name.clone(),
};

let result = blob
.into_future()
.instrument(info_span!("request").or_current())
Expand All @@ -71,7 +63,7 @@ impl Service<AzureBlobRequest> for AzureBlobService {
.request_metadata
.into_events_estimated_json_encoded_byte_size(),
byte_size,
send_event_metadata,
event_log_metadata: request.metadata.event_log_metadata,
})
})
}
Expand Down
Loading

0 comments on commit 898be31

Please sign in to comment.