Skip to content

[Scheduler] Add metrics for Cluster round trip#2269

Open
DiegoTavares wants to merge 7 commits intoAcademySoftwareFoundation:masterfrom
DiegoTavares:sched_metric_clusters
Open

[Scheduler] Add metrics for Cluster round trip#2269
DiegoTavares wants to merge 7 commits intoAcademySoftwareFoundation:masterfrom
DiegoTavares:sched_metric_clusters

Conversation

@DiegoTavares
Copy link
Copy Markdown
Collaborator

@DiegoTavares DiegoTavares commented May 5, 2026

Add metrics to measure how long a cluster takes to round trip the scheduler loop.

Fix minor issues and refactor a confusing if let Some construct.

LLM usage disclosure

Claude Opus was used to investigate panic surfaces that might lead to abandoned clusters and to implement the metric collecting logic.

Summary by CodeRabbit

  • New Features

    • Added cluster round-trip latency metrics for improved observability.
  • Bug Fixes

    • Enhanced error handling and unwind-safety in resource accounting processes.
  • Performance Improvements

    • Optimized tag processing configuration with reduced chunk sizes.
    • Improved permit management in layer processing.

Scc silently drops inserts where the key already exists.
Also ensure all_sleeping_rounds is reset at the end of each full iteration
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 5, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 1029be6d-7bf5-41c0-8d4b-7c270d80d186

📥 Commits

Reviewing files that changed from the base of the PR and between 7f2c512 and b98b86f.

📒 Files selected for processing (2)
  • rust/config/scheduler.yaml
  • rust/crates/scheduler/src/config/mod.rs

📝 Walkthrough

Walkthrough

Adds per-cluster round-trip timing and a histogram metric, refactors cluster feed streaming to use a dedicated control channel and per-cluster timestamps, improves unwind-safety in async resource loops, switches permit grant to upsert behavior, and reduces default tag-chunk sizes in scheduler config.

Changes

Cluster feed + metrics

Layer / File(s) Summary
Metrics Definition
rust/crates/scheduler/src/metrics/mod.rs
Adds CLUSTER_ROUND_TRIP_SECONDS histogram and observe_cluster_round_trip(duration: Duration) function.
Control Channel + API Wiring
rust/crates/scheduler/src/cluster.rs
Replaces previous cancel channel with a dedicated small feed_sender/feed_receiver control channel while continuing to return Sender<FeedMessage> to callers.
Per-cluster Timing State
rust/crates/scheduler/src/cluster.rs
Introduces last_sent_map: Arc<Mutex<HashMap<Cluster, Instant>>> and producer handle to track emission timestamps.
Instrumentation on Emit
rust/crates/scheduler/src/cluster.rs
On sending a cluster, records now and, if previous timestamp exists, calls observe_cluster_round_trip(now.duration_since(prev)).
Imports
rust/crates/scheduler/src/cluster.rs
Adds Instant, FutureExt, and metrics::observe_cluster_round_trip to imports.

Robustness and behavior tweaks

Layer / File(s) Summary
Async Unwind Safety
rust/crates/scheduler/src/resource_accounting.rs
Wraps resource and subscription recalculation loops with AssertUnwindSafe and uses .catch_unwind() to prevent unwinding across await points. Adds panic::AssertUnwindSafe and futures::FutureExt imports.
Overflow Handling
rust/crates/scheduler/src/resource_accounting.rs
Replaces unwrap_or_else fallback on try_into with explicit match, warning and preserving existing values on overflow.
Permit Upsert
rust/crates/scheduler/src/pipeline/layer_permit.rs
insert_syncupsert_sync when granting a new permit, enabling update-or-insert semantics for existing layer IDs.
Queue Defaults
rust/config/scheduler.yaml, rust/crates/scheduler/src/config/mod.rs
Reduces manual_tags_chunk_size from 100→50 and hostname_tags_chunk_size from 300→50 in defaults.
Minor Formatting
rust/crates/scheduler/src/config/mod.rs
show_names initialization reformatted to a single-line form (no semantic change).

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Suggested reviewers

  • lithorus
  • ramonfigueiredo

Poem

🐰 I timed the hops from burrow to tree,
Feeds found a new path, control wings set free,
Panics now loosened, permits softly change,
Chunk sizes trimmed small — the trail's rearranged. 🥕

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly matches the main objective of the PR: adding metrics to measure cluster round trip duration through the scheduler loop.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@DiegoTavares DiegoTavares marked this pull request as ready for review May 5, 2026 21:12
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
rust/crates/scheduler/src/resource_accounting.rs (1)

120-157: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

These refresh loops still die on the first panic and need unwind handling restructured.

Lines 133 and 155 log panics from catch_unwind(), but because it wraps the entire async block, the task exits immediately afterward. This permanently disables resource recomputation and subscription cache refresh, leaving the scheduler to make decisions from stale accounting data.

The unwind boundary must move inside each loop iteration so the task logs the panic and continues running rather than terminating on the first error.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rust/crates/scheduler/src/resource_accounting.rs` around lines 120 - 157,
Both background tasks currently wrap the whole async block with catch_unwind so
a single panic stops the task; instead, move the unwind boundary inside the loop
and catch per-iteration panics so the loop continues. Concretely: in the
resource loop wrap each iteration's call to
dao.recompute_all_from_proc(&target_shows_opt).await with
AssertUnwindSafe(...).catch_unwind().await and on Err(e) log the panic (as
currently done) and continue the loop; likewise in the subscription loop wrap
each iteration's recalculate_and_refresh(&cache, &dao, &target_shows).await with
AssertUnwindSafe(...).catch_unwind().await, log on Err(e) and continue; keep the
outer interval/tick logic and CONFIG.queue.* intervals unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@rust/crates/scheduler/src/cluster.rs`:
- Around line 379-520: The panic handling currently wraps the whole producer and
receiver async blocks (the async move tasks that contain the main loops in the
cluster feed), so any panic aborts the entire task; change this by moving the
AssertUnwindSafe(...).catch_unwind().await inside each loop iteration: for the
producer loop (the async block that reads self.clusters, sends via sender, and
updates last_sent_map_producer) wrap the per-iteration work in
AssertUnwindSafe(...).catch_unwind().await, log the error (e.g., "Iteration
panicked: {:?}"), and continue the loop so the feed keeps running; do the same
for the receiver loop that matches on FeedMessage (the async block that awaits
feed_receiver.recv(), updates sleep_map and last_sent_map_receiver, and handles
FeedMessage::Stop), so a single iteration panic is logged and skipped without
terminating the whole spawn.

---

Outside diff comments:
In `@rust/crates/scheduler/src/resource_accounting.rs`:
- Around line 120-157: Both background tasks currently wrap the whole async
block with catch_unwind so a single panic stops the task; instead, move the
unwind boundary inside the loop and catch per-iteration panics so the loop
continues. Concretely: in the resource loop wrap each iteration's call to
dao.recompute_all_from_proc(&target_shows_opt).await with
AssertUnwindSafe(...).catch_unwind().await and on Err(e) log the panic (as
currently done) and continue the loop; likewise in the subscription loop wrap
each iteration's recalculate_and_refresh(&cache, &dao, &target_shows).await with
AssertUnwindSafe(...).catch_unwind().await, log on Err(e) and continue; keep the
outer interval/tick logic and CONFIG.queue.* intervals unchanged.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: fcbc6f53-5ca3-4275-a8ce-20bb44a4c5dd

📥 Commits

Reviewing files that changed from the base of the PR and between 4d98183 and 7f2c512.

📒 Files selected for processing (4)
  • rust/crates/scheduler/src/cluster.rs
  • rust/crates/scheduler/src/metrics/mod.rs
  • rust/crates/scheduler/src/pipeline/layer_permit.rs
  • rust/crates/scheduler/src/resource_accounting.rs

Comment on lines +379 to +520
let task = AssertUnwindSafe(async move {
let mut all_sleeping_rounds = 0;
let feed = self.clusters.clone();
let current_index_atomic = self.current_index.clone();

loop {
// Check stop flag
if stop_flag.load(Ordering::Relaxed) {
warn!("Cluster received a stop message. Stopping feed.");
break;
}

let current_index = current_index_atomic.load(Ordering::Relaxed);
let item = clusters[current_index].clone();
let next_index = (current_index + 1) % clusters.len();
let completed_round = next_index == 0; // Detect wrap-around
current_index_atomic.store(next_index, Ordering::Relaxed);

(item, clusters.len(), completed_round)
};

// Skip cluster if it is marked as sleeping
let is_sleeping = {
let mut sleep_map_lock = sleep_map.lock().unwrap_or_else(|p| p.into_inner());
if let Some(wake_up_time) = sleep_map_lock.get(&item) {
if *wake_up_time > SystemTime::now() {
// Still sleeping, skip it
true
} else {
// Remove expired entries
sleep_map_lock.remove(&item);
false
let (item, cluster_size, completed_round) = {
let clusters = feed.read().unwrap_or_else(|poisoned| poisoned.into_inner());
if clusters.is_empty() {
break;
}
} else {
false
}
};

if !is_sleeping && sender.send(item).await.is_err() {
warn!("Cluster receiver dropped. Stopping feed.");
break;
}
let current_index = current_index_atomic.load(Ordering::Relaxed);
let item = clusters[current_index].clone();
let next_index = (current_index + 1) % clusters.len();
let completed_round = next_index == 0; // Detect wrap-around
current_index_atomic.store(next_index, Ordering::Relaxed);

// At end of round, add backoff sleep
if completed_round {
CLUSTER_ROUNDS.fetch_add(1, Ordering::Relaxed);

// Check if all/most clusters are sleeping
let sleeping_count = {
let sleep_map_lock = sleep_map.lock().unwrap_or_else(|p| p.into_inner());
sleep_map_lock.len()
(item, clusters.len(), completed_round)
};
if sleeping_count >= cluster_size {
// Ensure this doesn't loop forever when there's a limit configured
all_sleeping_rounds += 1;
if let Some(max_empty_cycles) = CONFIG.queue.empty_job_cycles_before_quiting
{
if all_sleeping_rounds > max_empty_cycles {
warn!("All clusters have been sleeping for too long");
break;

// Skip cluster if it is marked as sleeping
let is_sleeping = {
let mut sleep_map_lock =
sleep_map.lock().unwrap_or_else(|p| p.into_inner());
if let Some(wake_up_time) = sleep_map_lock.get(&item) {
if *wake_up_time > SystemTime::now() {
// Still sleeping, skip it
true
} else {
// Remove expired entries
sleep_map_lock.remove(&item);
false
}
} else {
false
}
};

if !is_sleeping {
if sender.send(item.clone()).await.is_err() {
warn!("Cluster receiver dropped. Stopping feed.");
break;
}
let now = Instant::now();
let mut last_sent_lock = last_sent_map_producer
.lock()
.unwrap_or_else(|p| p.into_inner());
if let Some(prev) = last_sent_lock.insert(item, now) {
observe_cluster_round_trip(now.duration_since(prev));
}
} else if !completed_round {
// Skipped a sleeping cluster mid-round; yield so we don't starve the runtime.
tokio::task::yield_now().await;
}

// All clusters sleeping, sleep longer
tokio::time::sleep(Duration::from_secs(5)).await;
} else if sleeping_count > 0 {
// Some clusters sleeping, brief pause
tokio::time::sleep(Duration::from_millis(100)).await;
} else {
// Active work, minimal pause
tokio::time::sleep(Duration::from_millis(10)).await;
// At end of round, add backoff sleep
if completed_round {
CLUSTER_ROUNDS.fetch_add(1, Ordering::Relaxed);

// Check if all/most clusters are sleeping
let sleeping_count = {
let sleep_map_lock =
sleep_map.lock().unwrap_or_else(|p| p.into_inner());
sleep_map_lock.len()
};
if sleeping_count >= cluster_size {
// Ensure this doesn't loop forever when there's a limit configured
all_sleeping_rounds += 1;
if let Some(max_empty_cycles) =
CONFIG.queue.empty_job_cycles_before_quiting
{
if all_sleeping_rounds > max_empty_cycles {
warn!("All clusters have been sleeping for too long");
break;
}
}

// All clusters sleeping, sleep longer
tokio::time::sleep(Duration::from_secs(5)).await;
} else if sleeping_count > 0 {
// Some clusters sleeping, brief pause
all_sleeping_rounds = 0;
tokio::time::sleep(Duration::from_millis(100)).await;
} else {
// Active work, minimal pause
all_sleeping_rounds = 0;
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
}
});
if let Err(e) = task.catch_unwind().await {
error!("Cluster feed producer task panicked: {:?}", e);
}
});

// Process messages on the receiving end
let sleep_map = self.sleep_map.clone();
let last_sent_map_receiver = last_sent_map.clone();
tokio::spawn(async move {
while let Some(message) = feed_receiver.recv().await {
match message {
FeedMessage::Sleep(cluster, duration) => {
if let Some(wake_up_time) = SystemTime::now().checked_add(duration) {
debug!("{:?} put to sleep for {}s", cluster, duration.as_secs());
{
let mut sleep_map_lock =
sleep_map.lock().unwrap_or_else(|p| p.into_inner());
sleep_map_lock.insert(cluster, wake_up_time);
let task = AssertUnwindSafe(async move {
while let Some(message) = feed_receiver.recv().await {
match message {
FeedMessage::Sleep(cluster, duration) => {
let requested_wake_up_time = SystemTime::now().checked_add(duration);
if let Some(wake_up_time) = requested_wake_up_time {
debug!("{:?} put to sleep for {}s", cluster, duration.as_secs());
{
let mut last_sent_lock = last_sent_map_receiver
.lock()
.unwrap_or_else(|p| p.into_inner());
last_sent_lock.remove(&cluster);
}
{
let mut sleep_map_lock =
sleep_map.lock().unwrap_or_else(|p| p.into_inner());
sleep_map_lock.insert(cluster, wake_up_time);
}
} else {
warn!(
"Sleep request ignored for {:?}. Invalid duration={}s",
cluster,
duration.as_secs()
);
}
} else {
warn!(
"Sleep request ignored for {:?}. Invalid duration={}s",
cluster,
duration.as_secs()
);
}
}
FeedMessage::Stop() => {
self.stop_flag.store(true, Ordering::Relaxed);
break;
FeedMessage::Stop() => {
self.stop_flag.store(true, Ordering::Relaxed);
break;
}
}
}
});
if let Err(e) = task.catch_unwind().await {
error!("Cluster feed receiver task panicked: {:?}", e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

In futures 0.3, when FutureExt::catch_unwind()wraps an async block that contains aloop, does a panic inside the loop allow later iterations to continue, or does the future terminate and return Err?

💡 Result:

If the panic occurs inside the async block that’s being wrapped by futures 0.3’s FutureExt::catch_unwind, then the wrapped future does not keep running later loop iterations. Instead, the CatchUnwind future resolves with Err on that poll, and the overall future terminates. In futures 0.3, CatchUnwind is implemented by wrapping the inner future’s poll call in std::panic::catch_unwind. If that inner poll panics, the combinator immediately returns Poll::Ready(Err(...)) from its own poll, so the future is completed (not resumed for another poll/iteration) [1][2]. So: a panic in the loop body will be caught and converted into Err, and execution will not proceed to subsequent loop iterations of the wrapped async block [1][2]. Example behavior to expect conceptually: - First poll where loop body panics → catch_unwind returns Err - No further polls/iterations are performed by the CatchUnwind future, because it has already become Ready [1][2].

Citations:


Move panic handling inside the loops, not around them.

Both the producer task (line 477) and receiver task (line 519) wrap catch_unwind() around the entire async block containing the loop. When a panic occurs during any loop iteration, catch_unwind() catches it and returns Err, terminating the entire future. No further iterations execute. This means one panic permanently kills the task, leaving the producer unable to emit clusters or the receiver unable to process messages—breaking the scheduler feed.

Wrap panic handling per-iteration inside the loop instead, so it can log and recover without terminating the feed:

loop {
    let result = AssertUnwindSafe(async {
        // iteration work
    }).catch_unwind().await;
    
    if let Err(e) = result {
        error!("Iteration panicked: {:?}", e);
        continue;
    }
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rust/crates/scheduler/src/cluster.rs` around lines 379 - 520, The panic
handling currently wraps the whole producer and receiver async blocks (the async
move tasks that contain the main loops in the cluster feed), so any panic aborts
the entire task; change this by moving the
AssertUnwindSafe(...).catch_unwind().await inside each loop iteration: for the
producer loop (the async block that reads self.clusters, sends via sender, and
updates last_sent_map_producer) wrap the per-iteration work in
AssertUnwindSafe(...).catch_unwind().await, log the error (e.g., "Iteration
panicked: {:?}"), and continue the loop so the feed keeps running; do the same
for the receiver loop that matches on FeedMessage (the async block that awaits
feed_receiver.recv(), updates sleep_map and last_sent_map_receiver, and handles
FeedMessage::Stop), so a single iteration panic is logged and skipped without
terminating the whole spawn.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant