From 68b89034b2a901a6177d1147345cf75dc9c1085d Mon Sep 17 00:00:00 2001 From: Wassbdr Date: Mon, 27 Apr 2026 13:28:34 +0200 Subject: [PATCH 1/4] fix(otlp): avoid panic in grpc retry without tokio runtime Use runtime-aware tonic retry fallback (Tokio -> NoAsync) and add regression test for no-runtime context.\n\nFixes #3432 --- opentelemetry-otlp/CHANGELOG.md | 4 ++ opentelemetry-otlp/src/exporter/tonic/logs.rs | 6 -- .../src/exporter/tonic/metrics.rs | 6 -- opentelemetry-otlp/src/exporter/tonic/mod.rs | 63 +++++++++++++++++-- .../src/exporter/tonic/trace.rs | 6 -- 5 files changed, 61 insertions(+), 24 deletions(-) diff --git a/opentelemetry-otlp/CHANGELOG.md b/opentelemetry-otlp/CHANGELOG.md index 00d19ad8d0..6f8ada0fd4 100644 --- a/opentelemetry-otlp/CHANGELOG.md +++ b/opentelemetry-otlp/CHANGELOG.md @@ -32,6 +32,10 @@ - Add support for `OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE` environment variable to configure metrics temporality. Accepted values: `cumulative` (default), `delta`, `lowmemory` (case-insensitive). Programmatic `.with_temporality()` overrides the env var. +- Fix panic in gRPC retry path when `experimental-grpc-retry` is enabled and exports run from + a thread without an active Tokio runtime (e.g. thread-based `BatchSpanProcessor`). + The tonic retry wrapper now falls back to `opentelemetry_sdk::runtime::NoAsync` when no + Tokio runtime is present. - Fix `NoHttpClient` error when multiple HTTP client features are enabled by using priority-based selection (`reqwest-client` > `hyper-client` > `reqwest-blocking-client`). [#2994](https://github.com/open-telemetry/opentelemetry-rust/issues/2994) - Add partial success response handling for OTLP exporters (traces, metrics, logs) per OTLP spec. Exporters now log warnings when the server returns partial success responses with rejected items and error messages. [#865](https://github.com/open-telemetry/opentelemetry-rust/issues/865) - Refactor `internal-logs` feature in `opentelemetry-otlp` to reduce unnecessary dependencies[3191](https://github.com/open-telemetry/opentelemetry-rust/pull/3192) diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 6704a2a2c5..602f6e4ff8 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -14,8 +14,6 @@ use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scop use super::BoxInterceptor; use crate::retry::RetryPolicy; -#[cfg(feature = "experimental-grpc-retry")] -use opentelemetry_sdk::runtime::Tokio; pub(crate) struct TonicLogsClient { inner: Mutex>, @@ -73,10 +71,6 @@ impl LogExporter for TonicLogsClient { let batch = Arc::new(batch); match super::tonic_retry_with_backoff( - #[cfg(feature = "experimental-grpc-retry")] - Tokio, - #[cfg(not(feature = "experimental-grpc-retry"))] - (), self.retry_policy.clone(), crate::retry_classification::grpc::classify_tonic_status, "TonicLogsClient.Export", diff --git a/opentelemetry-otlp/src/exporter/tonic/metrics.rs b/opentelemetry-otlp/src/exporter/tonic/metrics.rs index c29cc6d460..dc67d78579 100644 --- a/opentelemetry-otlp/src/exporter/tonic/metrics.rs +++ b/opentelemetry-otlp/src/exporter/tonic/metrics.rs @@ -13,8 +13,6 @@ use super::BoxInterceptor; use crate::metric::MetricsClient; use crate::retry::RetryPolicy; -#[cfg(feature = "experimental-grpc-retry")] -use opentelemetry_sdk::runtime::Tokio; pub(crate) struct TonicMetricsClient { inner: Mutex>, @@ -66,10 +64,6 @@ impl TonicMetricsClient { impl MetricsClient for TonicMetricsClient { async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult { match super::tonic_retry_with_backoff( - #[cfg(feature = "experimental-grpc-retry")] - Tokio, - #[cfg(not(feature = "experimental-grpc-retry"))] - (), self.retry_policy.clone(), crate::retry_classification::grpc::classify_tonic_status, "TonicMetricsClient.Export", diff --git a/opentelemetry-otlp/src/exporter/tonic/mod.rs b/opentelemetry-otlp/src/exporter/tonic/mod.rs index 2c6ae2769d..2b8d80f3a8 100644 --- a/opentelemetry-otlp/src/exporter/tonic/mod.rs +++ b/opentelemetry-otlp/src/exporter/tonic/mod.rs @@ -32,7 +32,7 @@ use crate::retry::RetryPolicy; feature = "experimental-grpc-retry", any(feature = "trace", feature = "metrics", feature = "logs") ))] -use opentelemetry_sdk::runtime::Runtime; +use opentelemetry_sdk::runtime::{NoAsync, Tokio}; #[cfg(all( feature = "grpc-tonic", any(feature = "trace", feature = "metrics", feature = "logs") @@ -413,19 +413,23 @@ impl TonicExporterBuilder { feature = "experimental-grpc-retry", any(feature = "trace", feature = "metrics", feature = "logs") ))] -async fn tonic_retry_with_backoff( - runtime: R, +async fn tonic_retry_with_backoff( policy: RetryPolicy, classify_fn: fn(&tonic::Status) -> crate::retry::RetryErrorType, operation_name: &'static str, operation: F, ) -> Result where - R: Runtime, F: Fn() -> Fut, Fut: Future>, { - retry_with_backoff(runtime, policy, classify_fn, operation_name, operation).await + if tokio::runtime::Handle::try_current().is_ok() { + retry_with_backoff(Tokio, policy, classify_fn, operation_name, operation).await + } else { + // The thread-based BatchSpanProcessor drives exports from a plain thread without + // an active Tokio runtime. In that case, use a sync-compatible delay strategy. + retry_with_backoff(NoAsync, policy, classify_fn, operation_name, operation).await + } } /// Provides a unified call path when experimental-grpc-retry is not enabled - just executes the operation once. @@ -435,7 +439,6 @@ where any(feature = "trace", feature = "metrics", feature = "logs") ))] async fn tonic_retry_with_backoff( - _runtime: (), _policy: RetryPolicy, _classify_fn: fn(&tonic::Status) -> crate::retry::RetryErrorType, _operation_name: &'static str, @@ -1032,6 +1035,54 @@ mod tests { assert!(builder.tonic_config.retry_policy.is_none()); } + #[cfg(feature = "experimental-grpc-retry")] + #[test] + fn test_retry_wrapper_falls_back_without_tokio_runtime() { + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::task::{Context, Poll}; + + // Run a future to completion without installing any async runtime. + // This is important because the code path under test should detect the + // absence of Tokio and fall back to NoAsync. + fn block_on_without_runtime(future: F) -> F::Output { + let waker = futures_util::task::noop_waker_ref(); + let mut context = Context::from_waker(waker); + let mut future = std::pin::pin!(future); + + loop { + match future.as_mut().poll(&mut context) { + Poll::Ready(value) => return value, + Poll::Pending => std::thread::yield_now(), + } + } + } + + let attempts = AtomicUsize::new(0); + let policy = crate::retry::RetryPolicy { + max_retries: 2, + initial_delay_ms: 0, + max_delay_ms: 0, + jitter_ms: 0, + }; + + let result = block_on_without_runtime(super::tonic_retry_with_backoff( + policy, + crate::retry_classification::grpc::classify_tonic_status, + "test.retry.no_runtime", + || async { + let current = attempts.fetch_add(1, Ordering::SeqCst); + if current == 0 { + Err(tonic::Status::unavailable("transient")) + } else { + Ok(()) + } + }, + )); + + assert!(result.is_ok()); + assert_eq!(attempts.load(Ordering::SeqCst), 2); + } + #[test] #[cfg(not(any( feature = "tls", diff --git a/opentelemetry-otlp/src/exporter/tonic/trace.rs b/opentelemetry-otlp/src/exporter/tonic/trace.rs index f29a705b33..0a5ec90aea 100644 --- a/opentelemetry-otlp/src/exporter/tonic/trace.rs +++ b/opentelemetry-otlp/src/exporter/tonic/trace.rs @@ -16,8 +16,6 @@ use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Chann use super::BoxInterceptor; use crate::retry::RetryPolicy; -#[cfg(feature = "experimental-grpc-retry")] -use opentelemetry_sdk::runtime::Tokio; pub(crate) struct TonicTracesClient { inner: Mutex>, @@ -75,10 +73,6 @@ impl SpanExporter for TonicTracesClient { let batch = Arc::new(batch); match super::tonic_retry_with_backoff( - #[cfg(feature = "experimental-grpc-retry")] - Tokio, - #[cfg(not(feature = "experimental-grpc-retry"))] - (), self.retry_policy.clone(), crate::retry_classification::grpc::classify_tonic_status, "TonicTracesClient.Export", From dd409ffd45501b9824adaea2d1bc53ba60f99302 Mon Sep 17 00:00:00 2001 From: Wassbdr Date: Mon, 27 Apr 2026 13:52:58 +0200 Subject: [PATCH 2/4] test(otlp): add retry wrapper tests for tokio runtime and no feature case Introduce new tests for the tonic retry wrapper to ensure correct behavior when the Tokio runtime is available and when the retry feature is disabled. These tests validate the retry logic and the number of attempts made during the operation. --- opentelemetry-otlp/src/exporter/tonic/mod.rs | 82 ++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/opentelemetry-otlp/src/exporter/tonic/mod.rs b/opentelemetry-otlp/src/exporter/tonic/mod.rs index 2b8d80f3a8..8850e43226 100644 --- a/opentelemetry-otlp/src/exporter/tonic/mod.rs +++ b/opentelemetry-otlp/src/exporter/tonic/mod.rs @@ -1070,6 +1070,20 @@ mod tests { crate::retry_classification::grpc::classify_tonic_status, "test.retry.no_runtime", || async { + // Force at least one Pending poll so the local no-runtime poll loop + // exercises both branches. + let mut first_poll = true; + futures_util::future::poll_fn(move |cx| { + if first_poll { + first_poll = false; + cx.waker().wake_by_ref(); + Poll::Pending + } else { + Poll::Ready(()) + } + }) + .await; + let current = attempts.fetch_add(1, Ordering::SeqCst); if current == 0 { Err(tonic::Status::unavailable("transient")) @@ -1083,6 +1097,74 @@ mod tests { assert_eq!(attempts.load(Ordering::SeqCst), 2); } + #[cfg(feature = "experimental-grpc-retry")] + #[tokio::test] + async fn test_retry_wrapper_uses_tokio_runtime_when_available() { + use std::sync::atomic::{AtomicUsize, Ordering}; + + let attempts = AtomicUsize::new(0); + let policy = crate::retry::RetryPolicy { + max_retries: 2, + initial_delay_ms: 0, + max_delay_ms: 0, + jitter_ms: 0, + }; + + let result = super::tonic_retry_with_backoff( + policy, + crate::retry_classification::grpc::classify_tonic_status, + "test.retry.with_tokio_runtime", + || async { + let current = attempts.fetch_add(1, Ordering::SeqCst); + if current == 0 { + Err(tonic::Status::unavailable("transient")) + } else { + Ok(()) + } + }, + ) + .await; + + assert!(result.is_ok()); + assert_eq!(attempts.load(Ordering::SeqCst), 2); + } + + #[cfg(all( + feature = "grpc-tonic", + not(feature = "experimental-grpc-retry"), + any(feature = "trace", feature = "metrics", feature = "logs") + ))] + #[tokio::test] + async fn test_retry_wrapper_executes_operation_once_without_retry_feature() { + use std::sync::atomic::{AtomicUsize, Ordering}; + + let attempts = AtomicUsize::new(0); + let policy = crate::retry::RetryPolicy { + max_retries: 5, + initial_delay_ms: 0, + max_delay_ms: 0, + jitter_ms: 0, + }; + + let result = super::tonic_retry_with_backoff( + policy, + crate::retry_classification::grpc::classify_tonic_status, + "test.retry.no_feature", + || async { + attempts.fetch_add(1, Ordering::SeqCst); + Ok::<(), tonic::Status>(()) + }, + ) + .await; + + assert!(result.is_ok()); + assert_eq!( + attempts.load(Ordering::SeqCst), + 1, + "operation should run exactly once when retry feature is disabled" + ); + } + #[test] #[cfg(not(any( feature = "tls", From bed73716c7537eaf6a94ff1c42da2daba70f55d7 Mon Sep 17 00:00:00 2001 From: Wassbdr Date: Mon, 27 Apr 2026 14:06:07 +0200 Subject: [PATCH 3/4] test(metrics): enhance metric tests with retry logic and gRPC connection checks Refactor metric tests to include a retry mechanism for flushing metrics and waiting for the gRPC collector to be reachable. This improves the reliability of the tests by ensuring that metrics are properly emitted and validated under varying conditions. --- .../tests/integration_test/tests/metrics.rs | 98 +++++++++++-------- 1 file changed, 56 insertions(+), 42 deletions(-) diff --git a/opentelemetry-otlp/tests/integration_test/tests/metrics.rs b/opentelemetry-otlp/tests/integration_test/tests/metrics.rs index 8ba7b034c2..e2c1d9ae22 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/metrics.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/metrics.rs @@ -11,10 +11,14 @@ use anyhow::{Ok, Result}; use ctor::dtor; use integration_test_runner::test_utils; use opentelemetry::KeyValue; +use opentelemetry::metrics::MeterProvider as _; use opentelemetry_sdk::metrics::SdkMeterProvider; +use std::net::TcpStream; use std::time::Duration; const SLEEP_DURATION: Duration = Duration::from_secs(5); +const FLUSH_RETRY_SLEEP: Duration = Duration::from_millis(250); +const FLUSH_MAX_RETRIES: usize = 60; #[cfg(test)] #[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] @@ -48,25 +52,15 @@ mod metrictests { async fn metric_helper_tokio() -> Result<()> { let meter_provider = setup_metrics_tokio().await; + wait_for_collector_grpc(); emit_and_validate_metrics(meter_provider) } async fn metric_helper_tokio_current() -> Result<()> { let meter_provider = setup_metrics_tokio().await; - - const METER_NAME: &str = "test_meter"; - const INSTRUMENT_NAME: &str = "test_counter"; - - let meter = opentelemetry::global::meter_provider().meter(METER_NAME); - let expected_uuid = Uuid::new_v4().to_string(); - let counter = meter.u64_counter(INSTRUMENT_NAME).build(); - counter.add( - 10, - &[ - KeyValue::new("mykey1", expected_uuid.clone()), - KeyValue::new("mykey2", "myvalue2"), - ], - ); + wait_for_collector_grpc(); + let expected_uuid = emit_metrics_with_provider(&meter_provider); + force_flush_with_retry(&meter_provider); // In tokio::current_thread flavor, shutdown must be done in a separate thread let shutdown_result = Handle::current() @@ -87,20 +81,9 @@ mod metrictests { fn metric_helper_non_tokio() -> Result<()> { let (meter_provider, _rt) = setup_metrics_non_tokio(true); - const METER_NAME: &str = "test_meter"; - const INSTRUMENT_NAME: &str = "test_counter"; - - // Add data to u64_counter - let meter = opentelemetry::global::meter_provider().meter(METER_NAME); - let expected_uuid = Uuid::new_v4().to_string(); - let counter = meter.u64_counter(INSTRUMENT_NAME).build(); - counter.add( - 10, - &[ - KeyValue::new("mykey1", expected_uuid.clone()), - KeyValue::new("mykey2", "myvalue2"), - ], - ); + wait_for_collector_grpc(); + let expected_uuid = emit_metrics_with_provider(&meter_provider); + force_flush_with_retry(&meter_provider); let shutdown_result = meter_provider.shutdown(); assert!(shutdown_result.is_ok()); @@ -114,20 +97,8 @@ mod metrictests { } fn emit_and_validate_metrics(meter_provider: SdkMeterProvider) -> Result<()> { - const METER_NAME: &str = "test_meter"; - const INSTRUMENT_NAME: &str = "test_counter"; - - // Add data to u64_counter - let meter = opentelemetry::global::meter_provider().meter(METER_NAME); - let expected_uuid = Uuid::new_v4().to_string(); - let counter = meter.u64_counter(INSTRUMENT_NAME).build(); - counter.add( - 10, - &[ - KeyValue::new("mykey1", expected_uuid.clone()), - KeyValue::new("mykey2", "myvalue2"), - ], - ); + let expected_uuid = emit_metrics_with_provider(&meter_provider); + force_flush_with_retry(&meter_provider); let shutdown_result = meter_provider.shutdown(); assert!(shutdown_result.is_ok()); @@ -141,6 +112,49 @@ mod metrictests { Ok(()) } + + fn force_flush_with_retry(meter_provider: &SdkMeterProvider) { + for attempt in 1..=FLUSH_MAX_RETRIES { + if meter_provider.force_flush().is_ok() { + return; + } + if attempt < FLUSH_MAX_RETRIES { + std::thread::sleep(FLUSH_RETRY_SLEEP); + } + } + panic!( + "force_flush failed after {} attempts", + FLUSH_MAX_RETRIES + ); + } + + fn wait_for_collector_grpc() { + for _ in 1..=FLUSH_MAX_RETRIES { + if TcpStream::connect("127.0.0.1:4317").is_ok() { + return; + } + std::thread::sleep(FLUSH_RETRY_SLEEP); + } + panic!("collector gRPC endpoint is not reachable on 127.0.0.1:4317"); + } + + fn emit_metrics_with_provider(meter_provider: &SdkMeterProvider) -> String { + const METER_NAME: &str = "test_meter"; + const INSTRUMENT_NAME: &str = "test_counter"; + + // Use the test-local provider directly to avoid cross-test global races. + let meter = meter_provider.meter(METER_NAME); + let expected_uuid = Uuid::new_v4().to_string(); + let counter = meter.u64_counter(INSTRUMENT_NAME).build(); + counter.add( + 10, + &[ + KeyValue::new("mykey1", expected_uuid.clone()), + KeyValue::new("mykey2", "myvalue2"), + ], + ); + expected_uuid + } } /// From ac1ce143efd4ced85ecb1bc84ab03b42da2974a7 Mon Sep 17 00:00:00 2001 From: Wassbdr Date: Mon, 27 Apr 2026 14:27:05 +0200 Subject: [PATCH 4/4] fix(metrics): simplify panic message in force_flush retry logic Refactor the panic message in the force_flush function to improve readability by removing unnecessary line breaks. This change enhances the clarity of error reporting during metric flushing attempts. --- opentelemetry-otlp/tests/integration_test/tests/metrics.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/opentelemetry-otlp/tests/integration_test/tests/metrics.rs b/opentelemetry-otlp/tests/integration_test/tests/metrics.rs index e2c1d9ae22..851fd9289e 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/metrics.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/metrics.rs @@ -10,8 +10,8 @@ use anyhow::{Ok, Result}; use ctor::dtor; use integration_test_runner::test_utils; -use opentelemetry::KeyValue; use opentelemetry::metrics::MeterProvider as _; +use opentelemetry::KeyValue; use opentelemetry_sdk::metrics::SdkMeterProvider; use std::net::TcpStream; use std::time::Duration; @@ -122,10 +122,7 @@ mod metrictests { std::thread::sleep(FLUSH_RETRY_SLEEP); } } - panic!( - "force_flush failed after {} attempts", - FLUSH_MAX_RETRIES - ); + panic!("force_flush failed after {} attempts", FLUSH_MAX_RETRIES); } fn wait_for_collector_grpc() {