Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/tracing-http-propagator/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ impl LogProcessor for EnrichWithBaggageLogProcessor {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}
}

/// A custom span processor that enriches spans with baggage attributes. Baggage
Expand Down
4 changes: 4 additions & 0 deletions opentelemetry-appender-tracing/benches/log-attributes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ impl LogProcessor for NoopProcessor {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}

/// Creates a single benchmark for a specific number of attributes
Expand Down
4 changes: 4 additions & 0 deletions opentelemetry-appender-tracing/benches/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ impl LogProcessor for NoopProcessor {
) -> bool {
self.enabled
}

fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}

struct NoOpLogLayer {
Expand Down
4 changes: 4 additions & 0 deletions opentelemetry-appender-tracing/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,10 @@ mod tests {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}

#[cfg(feature = "spec_unstable_logs_enabled")]
Expand Down
4 changes: 4 additions & 0 deletions opentelemetry-proto/src/transform/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ mod tests {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}

fn create_test_log_data(
Expand Down
6 changes: 4 additions & 2 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
The logs functionality now operates independently, while automatic correlation
between logs and traces continues to work when the "trace" feature is
explicitly enabled.
- **Fix**: Fix shutdown of `SimpleLogProcessor` and async `BatchLogProcessor`.
- Default implementation of `LogProcessor::shutdown_with_timeout()` will now warn to encourage users to implement proper shutdown.

## 0.30.0

Expand All @@ -35,7 +37,7 @@ also modified to suppress telemetry before invoking exporters.

- **Feature**: Implemented and enabled cardinality capping for Metrics by
default. [#2901](https://github.com/open-telemetry/opentelemetry-rust/pull/2901)
- The default cardinality limit is 2000 and can be customized using Views.
- The default cardinality limit is 2000 and can be customized using Views.
- This feature was previously removed in version 0.28 due to the lack of
configurability but has now been reintroduced with the ability to configure
the limit.
Expand Down Expand Up @@ -176,7 +178,7 @@ Released 2025-Mar-21
```

After:

```rust
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult
```
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/benches/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl LogProcessor for NoopProcessor {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/benches/log_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl LogProcessor for ExportingProcessorWithFuture {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down Expand Up @@ -104,7 +104,7 @@ impl LogProcessor for ExportingProcessorWithoutFuture {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-sdk/benches/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl LogProcessor for NoopProcessor {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}
Expand All @@ -71,7 +71,7 @@ impl LogProcessor for CloningProcessor {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down Expand Up @@ -117,7 +117,7 @@ impl LogProcessor for SendToChannelProcessor {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down
21 changes: 20 additions & 1 deletion opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{logs::SdkLogRecord, Resource};

#[cfg(feature = "spec_unstable_logs_enabled")]
use opentelemetry::logs::Severity;
use opentelemetry::InstrumentationScope;
use opentelemetry::{otel_warn, InstrumentationScope};

use std::fmt::Debug;
use std::time::Duration;
Expand All @@ -57,10 +57,21 @@ pub trait LogProcessor: Send + Sync + Debug {
/// Shuts down the processor.
/// After shutdown returns the log processor should stop processing any logs.
/// It's up to the implementation on when to drop the LogProcessor.
///
/// All implementors should implement this method.
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
// It would have been better to make this method required, but that ship
// sailed when the logs API was declared stable.
otel_warn!(
name: "LogProcessor.DefaultShutdownWithTimeout",
message = format!("LogProcessor::shutdown_with_timeout should be implemented by all LogProcessor types")
);
Ok(())
}
/// Shuts down the processor with default timeout.
///
/// Implementors typically do not need to change this method, and can just
/// implement `shutdown_with_timeout`.
fn shutdown(&self) -> OTelSdkResult {
self.shutdown_with_timeout(Duration::from_secs(5))
}
Expand Down Expand Up @@ -140,6 +151,10 @@ pub(crate) mod tests {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}

#[derive(Debug)]
Expand All @@ -166,6 +181,10 @@ pub(crate) mod tests {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}

#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
.and_then(std::convert::identity)
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
let max_queue_size = self.max_queue_size;
if dropped_logs > 0 {
Expand Down Expand Up @@ -628,6 +628,10 @@ mod tests {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}

#[derive(Debug)]
Expand All @@ -654,6 +658,10 @@ mod tests {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}
#[test]
fn test_log_data_modification_by_multiple_processors() {
Expand Down
4 changes: 4 additions & 0 deletions opentelemetry-sdk/src/logs/logger_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,10 @@ mod tests {
*res = resource.clone();
self.exporter.set_resource(resource);
}

fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}
}
impl TestProcessorForResource {
fn new(exporter: TestExporterForResource) -> Self {
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ mod tests {
Ok(())
}

fn shutdown(&self) -> crate::error::OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down Expand Up @@ -277,7 +277,7 @@ mod tests {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-sdk/src/logs/simple_log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use opentelemetry::{otel_debug, otel_error, otel_warn, Context, InstrumentationS
use std::fmt::Debug;
use std::sync::atomic::AtomicBool;
use std::sync::Mutex;
use std::time::Duration;

/// A [`LogProcessor`] designed for testing and debugging purpose, that immediately
/// exports log records as they are emitted. Log records are exported synchronously
Expand Down Expand Up @@ -116,11 +117,11 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
self.is_shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);
if let Ok(exporter) = self.exporter.lock() {
exporter.shutdown()
exporter.shutdown_with_timeout(timeout)
} else {
Err(OTelSdkError::InternalFailure(
"SimpleLogProcessor mutex poison at shutdown".into(),
Expand Down
Loading