Skip to content

Commit 91cb61a

Browse files
committed
fix(metrics): fix empty-attrs bind split and bind/collect eviction race
- Make no_attribute_tracker an Arc<TrackerEntry> so bind(&[]) returns a handle to the same tracker that measure(&[]) uses, preventing duplicate data points for empty attribute sets - Re-check bound_count under write lock in collect_and_reset() to prevent a TOCTOU race where bind() increments bound_count after collect() reads it as 0 under a concurrent read lock - Use Ordering::Release for has_been_updated stores in bound handles, matching the unbound measure() path
1 parent e8cac00 commit 91cb61a

4 files changed

Lines changed: 42 additions & 10 deletions

File tree

opentelemetry-sdk/src/metrics/internal/histogram.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ impl<T: Number> BoundMeasure<T> for BoundHistogramHandle<T> {
9999
let f = measurement.into_float();
100100
let index = bounds.partition_point(|&x| x < f);
101101
tracker.aggregator.update((measurement, index));
102-
tracker.has_been_updated.store(true, Ordering::Relaxed);
102+
tracker.has_been_updated.store(true, Ordering::Release);
103103
}
104104
BoundHistogramInner::Fallback { measure, attrs } => {
105105
measure.call(measurement, attrs);

opentelemetry-sdk/src/metrics/internal/mod.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ where
9797
/// Number of different attribute set stored in the `trackers` map.
9898
count: AtomicUsize,
9999
/// Tracker for values with no attributes attached.
100-
no_attribute_tracker: TrackerEntry<A>,
100+
no_attribute_tracker: Arc<TrackerEntry<A>>,
101101
/// Configuration for an Aggregator
102102
config: A::InitConfig,
103103
cardinality_limit: usize,
@@ -116,7 +116,7 @@ where
116116
trackers: RwLock::new(HashMap::with_capacity(
117117
1 + min(DEFAULT_CARDINALITY_LIMIT, cardinality_limit),
118118
)),
119-
no_attribute_tracker: TrackerEntry::new(&config),
119+
no_attribute_tracker: Arc::new(TrackerEntry::new(&config)),
120120
count: AtomicUsize::new(0),
121121
config,
122122
cardinality_limit,
@@ -205,8 +205,10 @@ where
205205
#[cfg(feature = "experimental_metrics_bound_instruments")]
206206
fn bind(&self, attributes: &[KeyValue]) -> Option<Arc<TrackerEntry<A>>> {
207207
if attributes.is_empty() {
208-
let sorted_attrs: Vec<KeyValue> = vec![];
209-
return self.bind_sorted(sorted_attrs);
208+
self.no_attribute_tracker
209+
.bound_count
210+
.fetch_add(1, Ordering::Relaxed);
211+
return Some(Arc::clone(&self.no_attribute_tracker));
210212
}
211213

212214
let sorted_attrs = sort_and_dedup(attributes);
@@ -338,10 +340,13 @@ where
338340

339341
if !stale_entries.is_empty() {
340342
if let Ok(mut trackers) = self.trackers.write() {
341-
// Re-check under write lock to avoid TOCTOU race: a measure() call between
342-
// dropping the read lock and acquiring the write lock could have updated
343-
// an entry we marked as stale.
344-
stale_entries.retain(|entry| !entry.has_been_updated.load(Ordering::Acquire));
343+
// Re-check under write lock to avoid TOCTOU race: a measure() or bind() call
344+
// between dropping the read lock and acquiring the write lock could have
345+
// updated an entry or bound a handle to one we marked as stale.
346+
stale_entries.retain(|entry| {
347+
!entry.has_been_updated.load(Ordering::Acquire)
348+
&& entry.bound_count.load(Ordering::Acquire) == 0
349+
});
345350

346351
if !stale_entries.is_empty() {
347352
let stale_pointers: HashSet<*const TrackerEntry<A>> =

opentelemetry-sdk/src/metrics/internal/sum.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ impl<T: Number> BoundMeasure<T> for BoundSumHandle<T> {
7070
match &self.inner {
7171
BoundSumInner::Direct { tracker } => {
7272
tracker.aggregator.update(measurement);
73-
tracker.has_been_updated.store(true, Ordering::Relaxed);
73+
tracker.has_been_updated.store(true, Ordering::Release);
7474
}
7575
BoundSumInner::Fallback { measure, attrs } => {
7676
measure.call(measurement, attrs);

opentelemetry-sdk/src/metrics/mod.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5337,4 +5337,31 @@ mod tests {
53375337
assert_eq!(sum.data_points.len(), 1);
53385338
assert_eq!(sum.data_points[0].value, 40);
53395339
}
5340+
5341+
#[cfg(feature = "experimental_metrics_bound_instruments")]
5342+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
5343+
async fn bound_counter_empty_attributes_shares_with_unbound() {
5344+
let mut test_context = TestContext::new(Temporality::Cumulative);
5345+
let counter = test_context.u64_counter("test", "my_counter", None);
5346+
let bound = counter.bind(&[]);
5347+
5348+
// Mix bound and unbound calls with empty attributes — they must share
5349+
// the same data point (both route to no_attribute_tracker).
5350+
counter.add(10, &[]);
5351+
bound.add(20);
5352+
counter.add(30, &[]);
5353+
bound.add(40);
5354+
test_context.flush_metrics();
5355+
5356+
let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
5357+
unreachable!()
5358+
};
5359+
5360+
assert_eq!(
5361+
sum.data_points.len(),
5362+
1,
5363+
"Bound and unbound with empty attributes must share the same data point"
5364+
);
5365+
assert_eq!(sum.data_points[0].value, 100);
5366+
}
53405367
}

0 commit comments

Comments
 (0)