Skip to content

Commit

Permalink
Merge branch 'open-telemetry:main' into feat/add-resource-builder
Browse files Browse the repository at this point in the history
  • Loading branch information
pitoniak32 authored Nov 23, 2024
2 parents 40249d7 + 8c9babb commit f08a237
Show file tree
Hide file tree
Showing 14 changed files with 115 additions and 52 deletions.
4 changes: 2 additions & 2 deletions examples/self-diagnostics/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// OpenTelemetry uses `tracing` crate for its internal logging. Unless a
// tracing subscriber is set, the logs will be discarded. In this example,
// we configure a `tracing` subscriber to:
// 1. Print logs of level INFO or higher to stdout.
// 1. Print logs of level INFO or higher to stdout using tracing's fmt layer.
// 2. Filter logs from OpenTelemetry's dependencies (like tonic, hyper,
// reqwest etc. which are commonly used by the OTLP exporter) to only print
// ERROR-level logs. This filtering helps reduce repetitive log messages
Expand All @@ -35,7 +35,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// https://github.com/open-telemetry/opentelemetry-rust/issues/761 is
// resolved.

// Target name used by OpenTelemetry always start with "opentelemetry".
// Target names used by all OpenTelemetry official crates always start with "opentelemetry".
// Hence, one may use "add_directive("opentelemetry=off".parse().unwrap())"
// to turn off all logs from OpenTelemetry.

Expand Down
1 change: 1 addition & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
.with_resource(Resource::empty())
.build();
```
- `logs::LogData` struct is deprecated, and scheduled to be removed from public API in `v0.28.0`.

## 0.27.0

Expand Down
8 changes: 7 additions & 1 deletion opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext};
use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource};
use crate::{logs::LogError, logs::LogResult};
use opentelemetry::otel_info;
use opentelemetry::{otel_debug, trace::TraceContextExt, Context, InstrumentationScope};

#[cfg(feature = "spec_unstable_logs_enabled")]
Expand Down Expand Up @@ -154,10 +155,15 @@ impl LoggerProviderInner {
impl Drop for LoggerProviderInner {
fn drop(&mut self) {
if !self.is_shutdown.load(Ordering::Relaxed) {
otel_info!(
name: "LoggerProvider.Drop",
message = "Last reference of LoggerProvider dropped, initiating shutdown."
);
let _ = self.shutdown(); // errors are handled within shutdown
} else {
otel_debug!(
name: "LoggerProvider.Drop.AlreadyShutdown"
name: "LoggerProvider.Drop.AlreadyShutdown",
message = "LoggerProvider was already shut down; drop will not attempt shutdown again."
);
}
}
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
pub trait LogProcessor: Send + Sync + Debug {
/// Called when a log record is ready to processed and exported.
///
/// This method receives a mutable reference to `LogData`. If the processor
/// This method receives a mutable reference to `LogRecord`. If the processor
/// needs to handle the export asynchronously, it should clone the data to
/// ensure it can be safely processed without lifetime issues. Any changes
/// made to the log data in this method will be reflected in the next log
/// processor in the chain.
///
/// # Parameters
/// - `record`: A mutable reference to `LogData` representing the log record.
/// - `record`: A mutable reference to `LogRecord` representing the log record.
/// - `instrumentation`: The instrumentation scope associated with the log record.
fn emit(&self, data: &mut LogRecord, instrumentation: &InstrumentationScope);
/// Force the logs lying in the cache to be exported.
Expand Down
4 changes: 4 additions & 0 deletions opentelemetry-sdk/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ pub use log_processor::{
use opentelemetry::InstrumentationScope;
pub use record::{LogRecord, TraceContext};

#[deprecated(
since = "0.27.1",
note = "The struct is not used anywhere in the SDK and will be removed in the next major release."
)]
/// `LogData` represents a single log event without resource context.
#[derive(Clone, Debug)]
pub struct LogData {
Expand Down
39 changes: 29 additions & 10 deletions opentelemetry-sdk/src/metrics/meter_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{

use opentelemetry::{
metrics::{Meter, MeterProvider},
otel_debug, otel_error, InstrumentationScope,
otel_debug, otel_error, otel_info, InstrumentationScope,
};

use crate::metrics::{MetricError, MetricResult};
Expand Down Expand Up @@ -109,6 +109,10 @@ impl SdkMeterProvider {
/// There is no guaranteed that all telemetry be flushed or all resources have
/// been released on error.
pub fn shutdown(&self) -> MetricResult<()> {
otel_info!(
name: "MeterProvider.Shutdown",
message = "User initiated shutdown of MeterProvider."
);
self.inner.shutdown()
}
}
Expand Down Expand Up @@ -139,15 +143,25 @@ impl Drop for SdkMeterProviderInner {
// shutdown(), then we don't need to call shutdown again.
if self.is_shutdown.load(Ordering::Relaxed) {
otel_debug!(
name: "MeterProvider.AlreadyShutdown",
message = "Meter provider was already shut down; drop will not attempt shutdown again."
name: "MeterProvider.Drop.AlreadyShutdown",
message = "MeterProvider was already shut down; drop will not attempt shutdown again."
);
} else if let Err(err) = self.shutdown() {
otel_error!(
name: "MeterProvider.ShutdownFailed",
message = "Shutdown attempt failed during drop of MeterProvider.",
reason = format!("{}", err)
} else {
otel_info!(
name: "MeterProvider.Drop",
message = "Last reference of MeterProvider dropped, initiating shutdown."
);
if let Err(err) = self.shutdown() {
otel_error!(
name: "MeterProvider.Drop.ShutdownFailed",
message = "Shutdown attempt failed during drop of MeterProvider.",
reason = format!("{}", err)
);
} else {
otel_info!(
name: "MeterProvider.Drop.ShutdownCompleted",
);
}
}
}
}
Expand Down Expand Up @@ -231,7 +245,7 @@ impl MeterProviderBuilder {
/// Construct a new [MeterProvider] with this configuration.
pub fn build(self) -> SdkMeterProvider {
SdkMeterProvider {
let meter_provider = SdkMeterProvider {
inner: Arc::new(SdkMeterProviderInner {
pipes: Arc::new(Pipelines::new(
self.resource.unwrap_or_default(),
Expand All @@ -241,7 +255,12 @@ impl MeterProviderBuilder {
meters: Default::default(),
is_shutdown: AtomicBool::new(false),
}),
}
};

otel_info!(
name: "MeterProvider.Built",
);
meter_provider
}
}

Expand Down
8 changes: 8 additions & 0 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,18 @@ impl<RT: Runtime> PeriodicReaderWorker<RT> {
async fn collect_and_export(&mut self) -> MetricResult<()> {
self.reader.collect(&mut self.rm)?;
if self.rm.scope_metrics.is_empty() {
otel_debug!(
name: "PeriodicReaderWorker.NoMetricsToExport",
);
// No metrics to export.
return Ok(());
}

otel_debug!(
name: "PeriodicReaderWorker.InvokeExporter",
message = "Calling exporter's export method with collected metrics.",
count = self.rm.scope_metrics.len(),
);
let export = self.reader.exporter.export(&mut self.rm);
let timeout = self.runtime.delay(self.timeout);
pin_mut!(export);
Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::sync::{Arc, Mutex};
/// An in-memory logs exporter that stores logs data in memory..
///
/// This exporter is useful for testing and debugging purposes.
/// It stores logs in a `Vec<LogData>`. Logs can be retrieved using
/// It stores logs in a `Vec<OwnedLogData>`. Logs can be retrieved using
/// `get_emitted_logs` method.
///
/// # Example
Expand Down Expand Up @@ -65,9 +65,9 @@ pub struct OwnedLogData {
pub struct LogDataWithResource {
/// Log record
pub record: LogRecord,
/// Instrumentation details for the emitter who produced this `LogData`.
/// Instrumentation details for the emitter who produced this `LogRecord`.
pub instrumentation: InstrumentationScope,
/// Resource for the emitter who produced this `LogData`.
/// Resource for the emitter who produced this `LogRecord`.
pub resource: Cow<'static, Resource>,
}

Expand Down Expand Up @@ -137,7 +137,7 @@ impl InMemoryLogExporterBuilder {
}

impl InMemoryLogExporter {
/// Returns the logs emitted via Logger as a vector of `LogData`.
/// Returns the logs emitted via Logger as a vector of `LogDataWithResource`.
///
/// # Example
///
Expand Down
9 changes: 4 additions & 5 deletions opentelemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,18 @@ all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
futures-core = { workspace = true }
futures-sink = "0.3"
once_cell = { workspace = true }
futures-core = { workspace = true, optional = true }
futures-sink = { version = "0.3", optional = true }
pin-project-lite = { workspace = true, optional = true }
thiserror = { workspace = true }
thiserror = { workspace = true, optional = true}
tracing = {workspace = true, optional = true} # optional for opentelemetry internal logging

[target.'cfg(all(target_arch = "wasm32", not(target_os = "wasi")))'.dependencies]
js-sys = "0.3.63"

[features]
default = ["trace", "metrics", "logs"]
trace = ["pin-project-lite"]
trace = ["pin-project-lite", "futures-sink", "futures-core", "thiserror"]
metrics = []
testing = ["trace", "metrics"]
logs = []
Expand Down
12 changes: 9 additions & 3 deletions opentelemetry/src/baggage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,22 @@
//!
//! [W3C Baggage]: https://w3c.github.io/baggage
use crate::{Context, Key, KeyValue, Value};
use once_cell::sync::Lazy;
use std::collections::{hash_map, HashMap};
use std::fmt;
use std::sync::OnceLock;

static DEFAULT_BAGGAGE: Lazy<Baggage> = Lazy::new(Baggage::default);
static DEFAULT_BAGGAGE: OnceLock<Baggage> = OnceLock::new();

const MAX_KEY_VALUE_PAIRS: usize = 180;
const MAX_BYTES_FOR_ONE_PAIR: usize = 4096;
const MAX_LEN_OF_ALL_PAIRS: usize = 8192;

/// Returns the default baggage, ensuring it is initialized only once.
#[inline]
fn get_default_baggage() -> &'static Baggage {
DEFAULT_BAGGAGE.get_or_init(Baggage::default)
}

/// A set of name/value pairs describing user-defined properties.
///
/// ### Baggage Names
Expand Down Expand Up @@ -399,7 +405,7 @@ impl BaggageExt for Context {
}

fn baggage(&self) -> &Baggage {
self.get::<Baggage>().unwrap_or(&DEFAULT_BAGGAGE)
self.get::<Baggage>().unwrap_or(get_default_baggage())
}
}

Expand Down
16 changes: 10 additions & 6 deletions opentelemetry/src/global/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,33 @@
use crate::metrics::{self, Meter, MeterProvider};
use crate::InstrumentationScope;
use once_cell::sync::Lazy;
use std::sync::{Arc, RwLock};
use std::sync::{Arc, OnceLock, RwLock};

type GlobalMeterProvider = Arc<dyn MeterProvider + Send + Sync>;

/// The global `MeterProvider` singleton.
static GLOBAL_METER_PROVIDER: Lazy<RwLock<GlobalMeterProvider>> =
Lazy::new(|| RwLock::new(Arc::new(crate::metrics::noop::NoopMeterProvider::new())));
static GLOBAL_METER_PROVIDER: OnceLock<RwLock<GlobalMeterProvider>> = OnceLock::new();

#[inline]
fn global_meter_provider() -> &'static RwLock<GlobalMeterProvider> {
GLOBAL_METER_PROVIDER
.get_or_init(|| RwLock::new(Arc::new(crate::metrics::noop::NoopMeterProvider::new())))
}

/// Sets the given [`MeterProvider`] instance as the current global meter
/// provider.
pub fn set_meter_provider<P>(new_provider: P)
where
P: metrics::MeterProvider + Send + Sync + 'static,
{
let mut global_provider = GLOBAL_METER_PROVIDER
let mut global_provider = global_meter_provider()
.write()
.expect("GLOBAL_METER_PROVIDER RwLock poisoned");
*global_provider = Arc::new(new_provider);
}

/// Returns an instance of the currently configured global [`MeterProvider`].
pub fn meter_provider() -> GlobalMeterProvider {
GLOBAL_METER_PROVIDER
global_meter_provider()
.read()
.expect("GLOBAL_METER_PROVIDER RwLock poisoned")
.clone()
Expand Down
31 changes: 22 additions & 9 deletions opentelemetry/src/global/propagation.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
use crate::propagation::TextMapPropagator;
use crate::trace::noop::NoopTextMapPropagator;
use once_cell::sync::Lazy;
use std::sync::RwLock;
use std::sync::{OnceLock, RwLock};

/// The current global `TextMapPropagator` propagator.
static GLOBAL_TEXT_MAP_PROPAGATOR: Lazy<RwLock<Box<dyn TextMapPropagator + Send + Sync>>> =
Lazy::new(|| RwLock::new(Box::new(NoopTextMapPropagator::new())));
static GLOBAL_TEXT_MAP_PROPAGATOR: OnceLock<RwLock<Box<dyn TextMapPropagator + Send + Sync>>> =
OnceLock::new();

/// The global default `TextMapPropagator` propagator.
static DEFAULT_TEXT_MAP_PROPAGATOR: Lazy<NoopTextMapPropagator> =
Lazy::new(NoopTextMapPropagator::new);
static DEFAULT_TEXT_MAP_PROPAGATOR: OnceLock<NoopTextMapPropagator> = OnceLock::new();

/// Ensures the `GLOBAL_TEXT_MAP_PROPAGATOR` is initialized with a `NoopTextMapPropagator`.
#[inline]
fn global_text_map_propagator() -> &'static RwLock<Box<dyn TextMapPropagator + Send + Sync>> {
GLOBAL_TEXT_MAP_PROPAGATOR.get_or_init(|| RwLock::new(Box::new(NoopTextMapPropagator::new())))
}

/// Ensures the `DEFAULT_TEXT_MAP_PROPAGATOR` is initialized.
#[inline]
fn default_text_map_propagator() -> &'static NoopTextMapPropagator {
DEFAULT_TEXT_MAP_PROPAGATOR.get_or_init(NoopTextMapPropagator::new)
}

/// Sets the given [`TextMapPropagator`] propagator as the current global propagator.
pub fn set_text_map_propagator<P: TextMapPropagator + Send + Sync + 'static>(propagator: P) {
let _lock = GLOBAL_TEXT_MAP_PROPAGATOR
let _lock = global_text_map_propagator()
.write()
.map(|mut global_propagator| *global_propagator = Box::new(propagator));
}
Expand All @@ -23,8 +33,11 @@ pub fn get_text_map_propagator<T, F>(mut f: F) -> T
where
F: FnMut(&dyn TextMapPropagator) -> T,
{
GLOBAL_TEXT_MAP_PROPAGATOR
global_text_map_propagator()
.read()
.map(|propagator| f(&**propagator))
.unwrap_or_else(|_| f(&*DEFAULT_TEXT_MAP_PROPAGATOR as &dyn TextMapPropagator))
.unwrap_or_else(|_| {
let default_propagator = default_text_map_propagator();
f(default_propagator as &dyn TextMapPropagator)
})
}
Loading

0 comments on commit f08a237

Please sign in to comment.