Skip to content

Commit

Permalink
Merge branch 'main' into makekvm-private
Browse files Browse the repository at this point in the history
  • Loading branch information
cijothomas authored Mar 11, 2025
2 parents ec463d2 + dac8bd5 commit 66f5cee
Show file tree
Hide file tree
Showing 12 changed files with 50 additions and 26 deletions.
4 changes: 4 additions & 0 deletions opentelemetry-otlp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

- The `OTEL_EXPORTER_OTLP_TIMEOUT`, `OTEL_EXPORTER_OTLP_TRACES_TIMEOUT`, `OTEL_EXPORTER_OTLP_METRICS_TIMEOUT` and `OTEL_EXPORTER_OTLP_LOGS_TIMEOUT` are changed from seconds to miliseconds.
- Fixed `.with_headers()` in `HttpExporterBuilder` to correctly support multiple key/value pairs. [#2699](https://github.com/open-telemetry/opentelemetry-rust/pull/2699)
- Fixed
[#2770](https://github.com/open-telemetry/opentelemetry-rust/issues/2770)
partially to properly handle `shutdown()` when using `http`. (`tonic` still
does not do proper shutdown)

## 0.28.0

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl LogExporter for OtlpHttpClient {
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
let mut client_guard = self.client.lock().map_err(|e| {
OTelSdkError::InternalFailure(format!("Failed to acquire client lock: {}", e))
})?;
Expand Down
30 changes: 16 additions & 14 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@ use opentelemetry_proto::tonic::collector::logs::v1::{
};
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
use tokio::sync::Mutex;
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;

use super::BoxInterceptor;
use tokio::sync::Mutex;

pub(crate) struct TonicLogsClient {
inner: Option<ClientInner>,
inner: Mutex<Option<ClientInner>>,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
}

struct ClientInner {
client: LogsServiceClient<Channel>,
interceptor: Mutex<BoxInterceptor>,
interceptor: BoxInterceptor,
}

impl fmt::Debug for TonicLogsClient {
Expand All @@ -46,23 +46,21 @@ impl TonicLogsClient {
otel_debug!(name: "TonicsLogsClientBuilt");

TonicLogsClient {
inner: Some(ClientInner {
inner: Mutex::new(Some(ClientInner {
client,
interceptor: Mutex::new(interceptor),
}),
interceptor,
})),
resource: Default::default(),
}
}
}

impl LogExporter for TonicLogsClient {
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
let (mut client, metadata, extensions) = match &self.inner {
let (mut client, metadata, extensions) = match self.inner.lock().await.as_mut() {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.lock()
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
.call(Request::new(()))
.map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))?
.into_parts();
Expand All @@ -86,11 +84,15 @@ impl LogExporter for TonicLogsClient {
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
match self.inner.take() {
Some(_) => Ok(()), // Successfully took `inner`, indicating a successful shutdown.
None => Err(OTelSdkError::AlreadyShutdown), // `inner` was already `None`, meaning it's already shut down.
}
fn shutdown(&self) -> OTelSdkResult {
// TODO: Implement actual shutdown
// Due to the use of tokio::sync::Mutex to guard
// the inner client, we need to await the call to lock the mutex
// and that requires async runtime.
// It is possible to fix this by using
// a dedicated thread just to handle shutdown.
// But for now, we just return Ok.
Ok(())
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
Expand Down
9 changes: 9 additions & 0 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,13 @@ impl opentelemetry_sdk::logs::LogExporter for LogExporter {
SupportedTransportClient::Http(client) => client.set_resource(resource),
}
}

fn shutdown(&self) -> OTelSdkResult {
match &self.client {
#[cfg(feature = "grpc-tonic")]
SupportedTransportClient::Tonic(client) => client.shutdown(),
#[cfg(any(feature = "http-proto", feature = "http-json"))]
SupportedTransportClient::Http(client) => client.shutdown(),
}
}
}
4 changes: 4 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@
`LogProcessor` and `LogExporter` traits. `SdkLogger` no longer passes its
`scope` name but instead passes the incoming `name` when invoking
`event_enabled` on processors.
- **Breaking** for custom LogExporter authors: `shutdown()` method in
`LogExporter` trait no longer requires a mutable ref to `self`. If the exporter
needs to mutate state, it should rely on interior mutability.
[2764](https://github.com/open-telemetry/opentelemetry-rust/pull/2764)

## 0.28.0

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ pub trait LogExporter: Send + Sync + Debug {
) -> impl std::future::Future<Output = OTelSdkResult> + Send;

/// Shuts down the exporter.
fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
Ok(())
}
#[cfg(feature = "spec_unstable_logs_enabled")]
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl LogExporter for InMemoryLogExporter {
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
self.shutdown_called
.store(true, std::sync::atomic::Ordering::Relaxed);
if self.should_reset_on_shutdown {
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub(crate) mod tests {
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ mod tests {
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/simple_log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
fn shutdown(&self) -> OTelSdkResult {
self.is_shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);
if let Ok(mut exporter) = self.exporter.lock() {
if let Ok(exporter) = self.exporter.lock() {
exporter.shutdown()
} else {
Err(OTelSdkError::InternalFailure(
Expand Down
15 changes: 10 additions & 5 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -971,15 +971,20 @@ mod tests {
.with_scheduled_delay(Duration::from_millis(10))
.with_max_queue_size(10);
#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
let batch = batch.with_max_concurrent_exports(10);
#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
let batch = batch.with_max_export_timeout(Duration::from_millis(10));
let batch = {
batch
.with_max_concurrent_exports(10)
.with_max_export_timeout(Duration::from_millis(10))
};
let batch = batch.build();
assert_eq!(batch.max_export_batch_size, 10);
assert_eq!(batch.scheduled_delay, Duration::from_millis(10));
assert_eq!(batch.max_export_timeout, Duration::from_millis(10));
assert_eq!(batch.max_concurrent_exports, 10);
assert_eq!(batch.max_queue_size, 10);
#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
{
assert_eq!(batch.max_concurrent_exports, 10);
assert_eq!(batch.max_export_timeout, Duration::from_millis(10));
}
}

// Helper function to create a default test span
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-stdout/src/logs/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl opentelemetry_sdk::logs::LogExporter for LogExporter {
}
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
self.is_shutdown.store(true, atomic::Ordering::SeqCst);
Ok(())
}
Expand Down

0 comments on commit 66f5cee

Please sign in to comment.