Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/tracing-http-propagator/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ impl LogProcessor for EnrichWithBaggageLogProcessor {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}
}

/// A custom span processor that enriches spans with baggage attributes. Baggage
Expand Down
4 changes: 4 additions & 0 deletions opentelemetry-appender-tracing/benches/log-attributes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ impl LogProcessor for NoopProcessor {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}

/// Creates a single benchmark for a specific number of attributes
Expand Down
4 changes: 4 additions & 0 deletions opentelemetry-appender-tracing/benches/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ impl LogProcessor for NoopProcessor {
) -> bool {
self.enabled
}

fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}

struct NoOpLogLayer {
Expand Down
4 changes: 4 additions & 0 deletions opentelemetry-appender-tracing/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,10 @@ mod tests {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}

#[cfg(feature = "spec_unstable_logs_enabled")]
Expand Down
4 changes: 4 additions & 0 deletions opentelemetry-proto/src/transform/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ mod tests {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}

fn create_test_log_data(
Expand Down
6 changes: 4 additions & 2 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
between logs and traces continues to work when the "trace" feature is
explicitly enabled.

- *Breaking* default implementation of `LogProcessor::shutdown_with_timeout()` method is removed. Implementors must now provide this to guarantee shutdown is performed properly.

## 0.30.0

Released 2025-May-23
Expand All @@ -35,7 +37,7 @@ also modified to suppress telemetry before invoking exporters.

- **Feature**: Implemented and enabled cardinality capping for Metrics by
default. [#2901](https://github.com/open-telemetry/opentelemetry-rust/pull/2901)
- The default cardinality limit is 2000 and can be customized using Views.
- The default cardinality limit is 2000 and can be customized using Views.
- This feature was previously removed in version 0.28 due to the lack of
configurability but has now been reintroduced with the ability to configure
the limit.
Expand Down Expand Up @@ -176,7 +178,7 @@ Released 2025-Mar-21
```

After:

```rust
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult
```
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/benches/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl LogProcessor for NoopProcessor {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/benches/log_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl LogProcessor for ExportingProcessorWithFuture {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down Expand Up @@ -104,7 +104,7 @@ impl LogProcessor for ExportingProcessorWithoutFuture {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-sdk/benches/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl LogProcessor for NoopProcessor {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}
Expand All @@ -71,7 +71,7 @@ impl LogProcessor for CloningProcessor {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down Expand Up @@ -117,7 +117,7 @@ impl LogProcessor for SendToChannelProcessor {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down
12 changes: 9 additions & 3 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ pub trait LogProcessor: Send + Sync + Debug {
/// Shuts down the processor.
/// After shutdown returns the log processor should stop processing any logs.
/// It's up to the implementation on when to drop the LogProcessor.
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we cannot make this change, as this is breaking.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I documented it in the changelog as breaking, am fully aware.

I defend it as a good change because as evidenced here there are existing bugs in production where implementations got that wrong.

The span processor trait does not have this implementation, so removing it improves consistency too.

But if you prefer I can close the PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logs (and Metrics) SDK were declared stable, so we should not make any breaking change.
(trace/span sdk is beta, so we can fix anything there.)

there are existing bugs in production where implementations got that wrong.

Given we can't make breaking change, only option is to better document this so custom processor authors' won't make the mistake!
The existing bug (in this repo) is in an experimental component only right?

But if you prefer I can close the PR.

Let's fix the bug by implementing the right shutdown method in log_processor - that should be a good fix and no breaking change issues.

Agree it's not great. IIRC, we did have some discussions about providing default implementations before declaring stable, and ended up with a less-than-ideal choice :(

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. I'll aim to come back to this PR probably on Monday.

The existing bug (in this repo) is in an experimental component only right?

Unfortunately when running this change I found it also affects SimpleLogProcessor.

https://github.com/open-telemetry/opentelemetry-rust/pull/3133/files#diff-834c43f0e4972291eea6782a3ebf19e9b93f150d0b699dd5991f2ff52dc9c1fcR120

Agree it's not great. IIRC, we did have some discussions about providing default implementations before declaring stable, and ended up with a less-than-ideal choice :(

Ouch.

I guess we can document that shutdown usually should not be implemented and shutdown_with_timeout should be? Can we change the default implementation of shutdown_with_timeout to log a warning? Or is that still too breaking?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding a log is fine. It is not common for users to write processors that require implementing any of these.. Most use cases are around enrichment (adding more attributes), and they don't need to worry about shutdown method at all.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a commit which re-adds this default implementation with an internal warning log.

/// Shuts down the processor with default timeout.
fn shutdown(&self) -> OTelSdkResult {
self.shutdown_with_timeout(Duration::from_secs(5))
Expand Down Expand Up @@ -140,6 +138,10 @@ pub(crate) mod tests {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}

#[derive(Debug)]
Expand All @@ -166,6 +168,10 @@ pub(crate) mod tests {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}

#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
.and_then(std::convert::identity)
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
let max_queue_size = self.max_queue_size;
if dropped_logs > 0 {
Expand Down Expand Up @@ -628,6 +628,10 @@ mod tests {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}

#[derive(Debug)]
Expand All @@ -654,6 +658,10 @@ mod tests {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}
#[test]
fn test_log_data_modification_by_multiple_processors() {
Expand Down
4 changes: 4 additions & 0 deletions opentelemetry-sdk/src/logs/logger_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,10 @@ mod tests {
*res = resource.clone();
self.exporter.set_resource(resource);
}

fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}
}
impl TestProcessorForResource {
fn new(exporter: TestExporterForResource) -> Self {
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ mod tests {
Ok(())
}

fn shutdown(&self) -> crate::error::OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down Expand Up @@ -277,7 +277,7 @@ mod tests {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-sdk/src/logs/simple_log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use opentelemetry::{otel_debug, otel_error, otel_warn, Context, InstrumentationS
use std::fmt::Debug;
use std::sync::atomic::AtomicBool;
use std::sync::Mutex;
use std::time::Duration;

/// A [`LogProcessor`] designed for testing and debugging purpose, that immediately
/// exports log records as they are emitted. Log records are exported synchronously
Expand Down Expand Up @@ -116,11 +117,11 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
self.is_shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);
if let Ok(exporter) = self.exporter.lock() {
exporter.shutdown()
exporter.shutdown_with_timeout(timeout)
} else {
Err(OTelSdkError::InternalFailure(
"SimpleLogProcessor mutex poison at shutdown".into(),
Expand Down
Loading