Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions opentelemetry-otlp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
- Add support for `OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE` environment variable
to configure metrics temporality. Accepted values: `cumulative` (default), `delta`,
`lowmemory` (case-insensitive). Programmatic `.with_temporality()` overrides the env var.
- Fix panic in gRPC retry path when `experimental-grpc-retry` is enabled and exports run from
a thread without an active Tokio runtime (e.g. thread-based `BatchSpanProcessor`).
The tonic retry wrapper now falls back to `opentelemetry_sdk::runtime::NoAsync` when no
Tokio runtime is present.
- Fix `NoHttpClient` error when multiple HTTP client features are enabled by using priority-based selection (`reqwest-client` > `hyper-client` > `reqwest-blocking-client`). [#2994](https://github.com/open-telemetry/opentelemetry-rust/issues/2994)
- Add partial success response handling for OTLP exporters (traces, metrics, logs) per OTLP spec. Exporters now log warnings when the server returns partial success responses with rejected items and error messages. [#865](https://github.com/open-telemetry/opentelemetry-rust/issues/865)
- Refactor `internal-logs` feature in `opentelemetry-otlp` to reduce unnecessary dependencies[3191](https://github.com/open-telemetry/opentelemetry-rust/pull/3192)
Expand Down
6 changes: 0 additions & 6 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scop
use super::BoxInterceptor;

use crate::retry::RetryPolicy;
#[cfg(feature = "experimental-grpc-retry")]
use opentelemetry_sdk::runtime::Tokio;

pub(crate) struct TonicLogsClient {
inner: Mutex<Option<ClientInner>>,
Expand Down Expand Up @@ -73,10 +71,6 @@ impl LogExporter for TonicLogsClient {
let batch = Arc::new(batch);

match super::tonic_retry_with_backoff(
#[cfg(feature = "experimental-grpc-retry")]
Tokio,
#[cfg(not(feature = "experimental-grpc-retry"))]
(),
self.retry_policy.clone(),
crate::retry_classification::grpc::classify_tonic_status,
"TonicLogsClient.Export",
Expand Down
6 changes: 0 additions & 6 deletions opentelemetry-otlp/src/exporter/tonic/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ use super::BoxInterceptor;
use crate::metric::MetricsClient;

use crate::retry::RetryPolicy;
#[cfg(feature = "experimental-grpc-retry")]
use opentelemetry_sdk::runtime::Tokio;

pub(crate) struct TonicMetricsClient {
inner: Mutex<Option<ClientInner>>,
Expand Down Expand Up @@ -66,10 +64,6 @@ impl TonicMetricsClient {
impl MetricsClient for TonicMetricsClient {
async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
match super::tonic_retry_with_backoff(
#[cfg(feature = "experimental-grpc-retry")]
Tokio,
#[cfg(not(feature = "experimental-grpc-retry"))]
(),
self.retry_policy.clone(),
crate::retry_classification::grpc::classify_tonic_status,
"TonicMetricsClient.Export",
Expand Down
145 changes: 139 additions & 6 deletions opentelemetry-otlp/src/exporter/tonic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::retry::RetryPolicy;
feature = "experimental-grpc-retry",
any(feature = "trace", feature = "metrics", feature = "logs")
))]
use opentelemetry_sdk::runtime::Runtime;
use opentelemetry_sdk::runtime::{NoAsync, Tokio};
#[cfg(all(
feature = "grpc-tonic",
any(feature = "trace", feature = "metrics", feature = "logs")
Expand Down Expand Up @@ -413,19 +413,23 @@ impl TonicExporterBuilder {
feature = "experimental-grpc-retry",
any(feature = "trace", feature = "metrics", feature = "logs")
))]
async fn tonic_retry_with_backoff<R, F, Fut, T>(
runtime: R,
async fn tonic_retry_with_backoff<F, Fut, T>(
policy: RetryPolicy,
classify_fn: fn(&tonic::Status) -> crate::retry::RetryErrorType,
operation_name: &'static str,
operation: F,
) -> Result<T, tonic::Status>
where
R: Runtime,
F: Fn() -> Fut,
Fut: Future<Output = Result<T, tonic::Status>>,
{
retry_with_backoff(runtime, policy, classify_fn, operation_name, operation).await
if tokio::runtime::Handle::try_current().is_ok() {
retry_with_backoff(Tokio, policy, classify_fn, operation_name, operation).await
} else {
// The thread-based BatchSpanProcessor drives exports from a plain thread without
// an active Tokio runtime. In that case, use a sync-compatible delay strategy.
retry_with_backoff(NoAsync, policy, classify_fn, operation_name, operation).await
}
}

/// Provides a unified call path when experimental-grpc-retry is not enabled - just executes the operation once.
Expand All @@ -435,7 +439,6 @@ where
any(feature = "trace", feature = "metrics", feature = "logs")
))]
async fn tonic_retry_with_backoff<F, Fut, T>(
_runtime: (),
_policy: RetryPolicy,
_classify_fn: fn(&tonic::Status) -> crate::retry::RetryErrorType,
_operation_name: &'static str,
Expand Down Expand Up @@ -1032,6 +1035,136 @@ mod tests {
assert!(builder.tonic_config.retry_policy.is_none());
}

#[cfg(feature = "experimental-grpc-retry")]
#[test]
fn test_retry_wrapper_falls_back_without_tokio_runtime() {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};

// Run a future to completion without installing any async runtime.
// This is important because the code path under test should detect the
// absence of Tokio and fall back to NoAsync.
fn block_on_without_runtime<F: std::future::Future>(future: F) -> F::Output {
let waker = futures_util::task::noop_waker_ref();
let mut context = Context::from_waker(waker);
let mut future = std::pin::pin!(future);

loop {
match future.as_mut().poll(&mut context) {
Poll::Ready(value) => return value,
Poll::Pending => std::thread::yield_now(),
}
}
}

let attempts = AtomicUsize::new(0);
let policy = crate::retry::RetryPolicy {
max_retries: 2,
initial_delay_ms: 0,
max_delay_ms: 0,
jitter_ms: 0,
};

let result = block_on_without_runtime(super::tonic_retry_with_backoff(
policy,
crate::retry_classification::grpc::classify_tonic_status,
"test.retry.no_runtime",
|| async {
// Force at least one Pending poll so the local no-runtime poll loop
// exercises both branches.
let mut first_poll = true;
futures_util::future::poll_fn(move |cx| {
if first_poll {
first_poll = false;
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(())
}
})
.await;

let current = attempts.fetch_add(1, Ordering::SeqCst);
if current == 0 {
Err(tonic::Status::unavailable("transient"))
} else {
Ok(())
}
},
));

assert!(result.is_ok());
assert_eq!(attempts.load(Ordering::SeqCst), 2);
}

#[cfg(feature = "experimental-grpc-retry")]
#[tokio::test]
async fn test_retry_wrapper_uses_tokio_runtime_when_available() {
use std::sync::atomic::{AtomicUsize, Ordering};

let attempts = AtomicUsize::new(0);
let policy = crate::retry::RetryPolicy {
max_retries: 2,
initial_delay_ms: 0,
max_delay_ms: 0,
jitter_ms: 0,
};

let result = super::tonic_retry_with_backoff(
policy,
crate::retry_classification::grpc::classify_tonic_status,
"test.retry.with_tokio_runtime",
|| async {
let current = attempts.fetch_add(1, Ordering::SeqCst);
if current == 0 {
Err(tonic::Status::unavailable("transient"))
} else {
Ok(())
}
},
)
.await;

assert!(result.is_ok());
assert_eq!(attempts.load(Ordering::SeqCst), 2);
}

#[cfg(all(
feature = "grpc-tonic",
not(feature = "experimental-grpc-retry"),
any(feature = "trace", feature = "metrics", feature = "logs")
))]
#[tokio::test]
async fn test_retry_wrapper_executes_operation_once_without_retry_feature() {
use std::sync::atomic::{AtomicUsize, Ordering};

let attempts = AtomicUsize::new(0);
let policy = crate::retry::RetryPolicy {
max_retries: 5,
initial_delay_ms: 0,
max_delay_ms: 0,
jitter_ms: 0,
};

let result = super::tonic_retry_with_backoff(
policy,
crate::retry_classification::grpc::classify_tonic_status,
"test.retry.no_feature",
|| async {
attempts.fetch_add(1, Ordering::SeqCst);
Ok::<(), tonic::Status>(())
},
)
.await;

assert!(result.is_ok());
assert_eq!(
attempts.load(Ordering::SeqCst),
1,
"operation should run exactly once when retry feature is disabled"
);
}

#[test]
#[cfg(not(any(
feature = "tls",
Expand Down
6 changes: 0 additions & 6 deletions opentelemetry-otlp/src/exporter/tonic/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Chann
use super::BoxInterceptor;

use crate::retry::RetryPolicy;
#[cfg(feature = "experimental-grpc-retry")]
use opentelemetry_sdk::runtime::Tokio;

pub(crate) struct TonicTracesClient {
inner: Mutex<Option<ClientInner>>,
Expand Down Expand Up @@ -75,10 +73,6 @@ impl SpanExporter for TonicTracesClient {
let batch = Arc::new(batch);

match super::tonic_retry_with_backoff(
#[cfg(feature = "experimental-grpc-retry")]
Tokio,
#[cfg(not(feature = "experimental-grpc-retry"))]
(),
self.retry_policy.clone(),
crate::retry_classification::grpc::classify_tonic_status,
"TonicTracesClient.Export",
Expand Down
95 changes: 53 additions & 42 deletions opentelemetry-otlp/tests/integration_test/tests/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@
use anyhow::{Ok, Result};
use ctor::dtor;
use integration_test_runner::test_utils;
use opentelemetry::metrics::MeterProvider as _;
use opentelemetry::KeyValue;
use opentelemetry_sdk::metrics::SdkMeterProvider;
use std::net::TcpStream;
use std::time::Duration;

const SLEEP_DURATION: Duration = Duration::from_secs(5);
const FLUSH_RETRY_SLEEP: Duration = Duration::from_millis(250);
const FLUSH_MAX_RETRIES: usize = 60;

#[cfg(test)]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
Expand Down Expand Up @@ -48,25 +52,15 @@ mod metrictests {

async fn metric_helper_tokio() -> Result<()> {
let meter_provider = setup_metrics_tokio().await;
wait_for_collector_grpc();
emit_and_validate_metrics(meter_provider)
}

async fn metric_helper_tokio_current() -> Result<()> {
let meter_provider = setup_metrics_tokio().await;

const METER_NAME: &str = "test_meter";
const INSTRUMENT_NAME: &str = "test_counter";

let meter = opentelemetry::global::meter_provider().meter(METER_NAME);
let expected_uuid = Uuid::new_v4().to_string();
let counter = meter.u64_counter(INSTRUMENT_NAME).build();
counter.add(
10,
&[
KeyValue::new("mykey1", expected_uuid.clone()),
KeyValue::new("mykey2", "myvalue2"),
],
);
wait_for_collector_grpc();
let expected_uuid = emit_metrics_with_provider(&meter_provider);
force_flush_with_retry(&meter_provider);

// In tokio::current_thread flavor, shutdown must be done in a separate thread
let shutdown_result = Handle::current()
Expand All @@ -87,20 +81,9 @@ mod metrictests {

fn metric_helper_non_tokio() -> Result<()> {
let (meter_provider, _rt) = setup_metrics_non_tokio(true);
const METER_NAME: &str = "test_meter";
const INSTRUMENT_NAME: &str = "test_counter";

// Add data to u64_counter
let meter = opentelemetry::global::meter_provider().meter(METER_NAME);
let expected_uuid = Uuid::new_v4().to_string();
let counter = meter.u64_counter(INSTRUMENT_NAME).build();
counter.add(
10,
&[
KeyValue::new("mykey1", expected_uuid.clone()),
KeyValue::new("mykey2", "myvalue2"),
],
);
wait_for_collector_grpc();
let expected_uuid = emit_metrics_with_provider(&meter_provider);
force_flush_with_retry(&meter_provider);

let shutdown_result = meter_provider.shutdown();
assert!(shutdown_result.is_ok());
Expand All @@ -114,20 +97,8 @@ mod metrictests {
}

fn emit_and_validate_metrics(meter_provider: SdkMeterProvider) -> Result<()> {
const METER_NAME: &str = "test_meter";
const INSTRUMENT_NAME: &str = "test_counter";

// Add data to u64_counter
let meter = opentelemetry::global::meter_provider().meter(METER_NAME);
let expected_uuid = Uuid::new_v4().to_string();
let counter = meter.u64_counter(INSTRUMENT_NAME).build();
counter.add(
10,
&[
KeyValue::new("mykey1", expected_uuid.clone()),
KeyValue::new("mykey2", "myvalue2"),
],
);
let expected_uuid = emit_metrics_with_provider(&meter_provider);
force_flush_with_retry(&meter_provider);

let shutdown_result = meter_provider.shutdown();
assert!(shutdown_result.is_ok());
Expand All @@ -141,6 +112,46 @@ mod metrictests {

Ok(())
}

fn force_flush_with_retry(meter_provider: &SdkMeterProvider) {
for attempt in 1..=FLUSH_MAX_RETRIES {
if meter_provider.force_flush().is_ok() {
return;
}
if attempt < FLUSH_MAX_RETRIES {
std::thread::sleep(FLUSH_RETRY_SLEEP);
}
}
panic!("force_flush failed after {} attempts", FLUSH_MAX_RETRIES);
}

fn wait_for_collector_grpc() {
for _ in 1..=FLUSH_MAX_RETRIES {
if TcpStream::connect("127.0.0.1:4317").is_ok() {
return;
}
std::thread::sleep(FLUSH_RETRY_SLEEP);
}
panic!("collector gRPC endpoint is not reachable on 127.0.0.1:4317");
}

fn emit_metrics_with_provider(meter_provider: &SdkMeterProvider) -> String {
const METER_NAME: &str = "test_meter";
const INSTRUMENT_NAME: &str = "test_counter";

// Use the test-local provider directly to avoid cross-test global races.
let meter = meter_provider.meter(METER_NAME);
let expected_uuid = Uuid::new_v4().to_string();
let counter = meter.u64_counter(INSTRUMENT_NAME).build();
counter.add(
10,
&[
KeyValue::new("mykey1", expected_uuid.clone()),
KeyValue::new("mykey2", "myvalue2"),
],
);
expected_uuid
}
}

///
Expand Down
Loading