[Scheduler] Add metrics for Cluster round trip#2269
[Scheduler] Add metrics for Cluster round trip#2269DiegoTavares wants to merge 7 commits intoAcademySoftwareFoundation:masterfrom
Conversation
Scc silently drops inserts where the key already exists.
Also ensure all_sleeping_rounds is reset at the end of each full iteration
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughAdds per-cluster round-trip timing and a histogram metric, refactors cluster feed streaming to use a dedicated control channel and per-cluster timestamps, improves unwind-safety in async resource loops, switches permit grant to upsert behavior, and reduces default tag-chunk sizes in scheduler config. ChangesCluster feed + metrics
Robustness and behavior tweaks
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rust/crates/scheduler/src/resource_accounting.rs (1)
120-157:⚠️ Potential issue | 🟠 Major | ⚡ Quick winThese refresh loops still die on the first panic and need unwind handling restructured.
Lines 133 and 155 log panics from
catch_unwind(), but because it wraps the entire async block, the task exits immediately afterward. This permanently disables resource recomputation and subscription cache refresh, leaving the scheduler to make decisions from stale accounting data.The unwind boundary must move inside each loop iteration so the task logs the panic and continues running rather than terminating on the first error.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rust/crates/scheduler/src/resource_accounting.rs` around lines 120 - 157, Both background tasks currently wrap the whole async block with catch_unwind so a single panic stops the task; instead, move the unwind boundary inside the loop and catch per-iteration panics so the loop continues. Concretely: in the resource loop wrap each iteration's call to dao.recompute_all_from_proc(&target_shows_opt).await with AssertUnwindSafe(...).catch_unwind().await and on Err(e) log the panic (as currently done) and continue the loop; likewise in the subscription loop wrap each iteration's recalculate_and_refresh(&cache, &dao, &target_shows).await with AssertUnwindSafe(...).catch_unwind().await, log on Err(e) and continue; keep the outer interval/tick logic and CONFIG.queue.* intervals unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rust/crates/scheduler/src/cluster.rs`:
- Around line 379-520: The panic handling currently wraps the whole producer and
receiver async blocks (the async move tasks that contain the main loops in the
cluster feed), so any panic aborts the entire task; change this by moving the
AssertUnwindSafe(...).catch_unwind().await inside each loop iteration: for the
producer loop (the async block that reads self.clusters, sends via sender, and
updates last_sent_map_producer) wrap the per-iteration work in
AssertUnwindSafe(...).catch_unwind().await, log the error (e.g., "Iteration
panicked: {:?}"), and continue the loop so the feed keeps running; do the same
for the receiver loop that matches on FeedMessage (the async block that awaits
feed_receiver.recv(), updates sleep_map and last_sent_map_receiver, and handles
FeedMessage::Stop), so a single iteration panic is logged and skipped without
terminating the whole spawn.
---
Outside diff comments:
In `@rust/crates/scheduler/src/resource_accounting.rs`:
- Around line 120-157: Both background tasks currently wrap the whole async
block with catch_unwind so a single panic stops the task; instead, move the
unwind boundary inside the loop and catch per-iteration panics so the loop
continues. Concretely: in the resource loop wrap each iteration's call to
dao.recompute_all_from_proc(&target_shows_opt).await with
AssertUnwindSafe(...).catch_unwind().await and on Err(e) log the panic (as
currently done) and continue the loop; likewise in the subscription loop wrap
each iteration's recalculate_and_refresh(&cache, &dao, &target_shows).await with
AssertUnwindSafe(...).catch_unwind().await, log on Err(e) and continue; keep the
outer interval/tick logic and CONFIG.queue.* intervals unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: fcbc6f53-5ca3-4275-a8ce-20bb44a4c5dd
📒 Files selected for processing (4)
rust/crates/scheduler/src/cluster.rsrust/crates/scheduler/src/metrics/mod.rsrust/crates/scheduler/src/pipeline/layer_permit.rsrust/crates/scheduler/src/resource_accounting.rs
| let task = AssertUnwindSafe(async move { | ||
| let mut all_sleeping_rounds = 0; | ||
| let feed = self.clusters.clone(); | ||
| let current_index_atomic = self.current_index.clone(); | ||
|
|
||
| loop { | ||
| // Check stop flag | ||
| if stop_flag.load(Ordering::Relaxed) { | ||
| warn!("Cluster received a stop message. Stopping feed."); | ||
| break; | ||
| } | ||
|
|
||
| let current_index = current_index_atomic.load(Ordering::Relaxed); | ||
| let item = clusters[current_index].clone(); | ||
| let next_index = (current_index + 1) % clusters.len(); | ||
| let completed_round = next_index == 0; // Detect wrap-around | ||
| current_index_atomic.store(next_index, Ordering::Relaxed); | ||
|
|
||
| (item, clusters.len(), completed_round) | ||
| }; | ||
|
|
||
| // Skip cluster if it is marked as sleeping | ||
| let is_sleeping = { | ||
| let mut sleep_map_lock = sleep_map.lock().unwrap_or_else(|p| p.into_inner()); | ||
| if let Some(wake_up_time) = sleep_map_lock.get(&item) { | ||
| if *wake_up_time > SystemTime::now() { | ||
| // Still sleeping, skip it | ||
| true | ||
| } else { | ||
| // Remove expired entries | ||
| sleep_map_lock.remove(&item); | ||
| false | ||
| let (item, cluster_size, completed_round) = { | ||
| let clusters = feed.read().unwrap_or_else(|poisoned| poisoned.into_inner()); | ||
| if clusters.is_empty() { | ||
| break; | ||
| } | ||
| } else { | ||
| false | ||
| } | ||
| }; | ||
|
|
||
| if !is_sleeping && sender.send(item).await.is_err() { | ||
| warn!("Cluster receiver dropped. Stopping feed."); | ||
| break; | ||
| } | ||
| let current_index = current_index_atomic.load(Ordering::Relaxed); | ||
| let item = clusters[current_index].clone(); | ||
| let next_index = (current_index + 1) % clusters.len(); | ||
| let completed_round = next_index == 0; // Detect wrap-around | ||
| current_index_atomic.store(next_index, Ordering::Relaxed); | ||
|
|
||
| // At end of round, add backoff sleep | ||
| if completed_round { | ||
| CLUSTER_ROUNDS.fetch_add(1, Ordering::Relaxed); | ||
|
|
||
| // Check if all/most clusters are sleeping | ||
| let sleeping_count = { | ||
| let sleep_map_lock = sleep_map.lock().unwrap_or_else(|p| p.into_inner()); | ||
| sleep_map_lock.len() | ||
| (item, clusters.len(), completed_round) | ||
| }; | ||
| if sleeping_count >= cluster_size { | ||
| // Ensure this doesn't loop forever when there's a limit configured | ||
| all_sleeping_rounds += 1; | ||
| if let Some(max_empty_cycles) = CONFIG.queue.empty_job_cycles_before_quiting | ||
| { | ||
| if all_sleeping_rounds > max_empty_cycles { | ||
| warn!("All clusters have been sleeping for too long"); | ||
| break; | ||
|
|
||
| // Skip cluster if it is marked as sleeping | ||
| let is_sleeping = { | ||
| let mut sleep_map_lock = | ||
| sleep_map.lock().unwrap_or_else(|p| p.into_inner()); | ||
| if let Some(wake_up_time) = sleep_map_lock.get(&item) { | ||
| if *wake_up_time > SystemTime::now() { | ||
| // Still sleeping, skip it | ||
| true | ||
| } else { | ||
| // Remove expired entries | ||
| sleep_map_lock.remove(&item); | ||
| false | ||
| } | ||
| } else { | ||
| false | ||
| } | ||
| }; | ||
|
|
||
| if !is_sleeping { | ||
| if sender.send(item.clone()).await.is_err() { | ||
| warn!("Cluster receiver dropped. Stopping feed."); | ||
| break; | ||
| } | ||
| let now = Instant::now(); | ||
| let mut last_sent_lock = last_sent_map_producer | ||
| .lock() | ||
| .unwrap_or_else(|p| p.into_inner()); | ||
| if let Some(prev) = last_sent_lock.insert(item, now) { | ||
| observe_cluster_round_trip(now.duration_since(prev)); | ||
| } | ||
| } else if !completed_round { | ||
| // Skipped a sleeping cluster mid-round; yield so we don't starve the runtime. | ||
| tokio::task::yield_now().await; | ||
| } | ||
|
|
||
| // All clusters sleeping, sleep longer | ||
| tokio::time::sleep(Duration::from_secs(5)).await; | ||
| } else if sleeping_count > 0 { | ||
| // Some clusters sleeping, brief pause | ||
| tokio::time::sleep(Duration::from_millis(100)).await; | ||
| } else { | ||
| // Active work, minimal pause | ||
| tokio::time::sleep(Duration::from_millis(10)).await; | ||
| // At end of round, add backoff sleep | ||
| if completed_round { | ||
| CLUSTER_ROUNDS.fetch_add(1, Ordering::Relaxed); | ||
|
|
||
| // Check if all/most clusters are sleeping | ||
| let sleeping_count = { | ||
| let sleep_map_lock = | ||
| sleep_map.lock().unwrap_or_else(|p| p.into_inner()); | ||
| sleep_map_lock.len() | ||
| }; | ||
| if sleeping_count >= cluster_size { | ||
| // Ensure this doesn't loop forever when there's a limit configured | ||
| all_sleeping_rounds += 1; | ||
| if let Some(max_empty_cycles) = | ||
| CONFIG.queue.empty_job_cycles_before_quiting | ||
| { | ||
| if all_sleeping_rounds > max_empty_cycles { | ||
| warn!("All clusters have been sleeping for too long"); | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| // All clusters sleeping, sleep longer | ||
| tokio::time::sleep(Duration::from_secs(5)).await; | ||
| } else if sleeping_count > 0 { | ||
| // Some clusters sleeping, brief pause | ||
| all_sleeping_rounds = 0; | ||
| tokio::time::sleep(Duration::from_millis(100)).await; | ||
| } else { | ||
| // Active work, minimal pause | ||
| all_sleeping_rounds = 0; | ||
| tokio::time::sleep(Duration::from_millis(10)).await; | ||
| } | ||
| } | ||
| } | ||
| }); | ||
| if let Err(e) = task.catch_unwind().await { | ||
| error!("Cluster feed producer task panicked: {:?}", e); | ||
| } | ||
| }); | ||
|
|
||
| // Process messages on the receiving end | ||
| let sleep_map = self.sleep_map.clone(); | ||
| let last_sent_map_receiver = last_sent_map.clone(); | ||
| tokio::spawn(async move { | ||
| while let Some(message) = feed_receiver.recv().await { | ||
| match message { | ||
| FeedMessage::Sleep(cluster, duration) => { | ||
| if let Some(wake_up_time) = SystemTime::now().checked_add(duration) { | ||
| debug!("{:?} put to sleep for {}s", cluster, duration.as_secs()); | ||
| { | ||
| let mut sleep_map_lock = | ||
| sleep_map.lock().unwrap_or_else(|p| p.into_inner()); | ||
| sleep_map_lock.insert(cluster, wake_up_time); | ||
| let task = AssertUnwindSafe(async move { | ||
| while let Some(message) = feed_receiver.recv().await { | ||
| match message { | ||
| FeedMessage::Sleep(cluster, duration) => { | ||
| let requested_wake_up_time = SystemTime::now().checked_add(duration); | ||
| if let Some(wake_up_time) = requested_wake_up_time { | ||
| debug!("{:?} put to sleep for {}s", cluster, duration.as_secs()); | ||
| { | ||
| let mut last_sent_lock = last_sent_map_receiver | ||
| .lock() | ||
| .unwrap_or_else(|p| p.into_inner()); | ||
| last_sent_lock.remove(&cluster); | ||
| } | ||
| { | ||
| let mut sleep_map_lock = | ||
| sleep_map.lock().unwrap_or_else(|p| p.into_inner()); | ||
| sleep_map_lock.insert(cluster, wake_up_time); | ||
| } | ||
| } else { | ||
| warn!( | ||
| "Sleep request ignored for {:?}. Invalid duration={}s", | ||
| cluster, | ||
| duration.as_secs() | ||
| ); | ||
| } | ||
| } else { | ||
| warn!( | ||
| "Sleep request ignored for {:?}. Invalid duration={}s", | ||
| cluster, | ||
| duration.as_secs() | ||
| ); | ||
| } | ||
| } | ||
| FeedMessage::Stop() => { | ||
| self.stop_flag.store(true, Ordering::Relaxed); | ||
| break; | ||
| FeedMessage::Stop() => { | ||
| self.stop_flag.store(true, Ordering::Relaxed); | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| }); | ||
| if let Err(e) = task.catch_unwind().await { | ||
| error!("Cluster feed receiver task panicked: {:?}", e); |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In futures 0.3, when FutureExt::catch_unwind()wraps an async block that contains aloop, does a panic inside the loop allow later iterations to continue, or does the future terminate and return Err?
💡 Result:
If the panic occurs inside the async block that’s being wrapped by futures 0.3’s FutureExt::catch_unwind, then the wrapped future does not keep running later loop iterations. Instead, the CatchUnwind future resolves with Err on that poll, and the overall future terminates. In futures 0.3, CatchUnwind is implemented by wrapping the inner future’s poll call in std::panic::catch_unwind. If that inner poll panics, the combinator immediately returns Poll::Ready(Err(...)) from its own poll, so the future is completed (not resumed for another poll/iteration) [1][2]. So: a panic in the loop body will be caught and converted into Err, and execution will not proceed to subsequent loop iterations of the wrapped async block [1][2]. Example behavior to expect conceptually: - First poll where loop body panics → catch_unwind returns Err - No further polls/iterations are performed by the CatchUnwind future, because it has already become Ready [1][2].
Citations:
- 1: https://docs.rs/futures/0.3.14/futures/future/struct.CatchUnwind.html
- 2: https://docs.rs/futures/latest/futures/future/struct.CatchUnwind.html
Move panic handling inside the loops, not around them.
Both the producer task (line 477) and receiver task (line 519) wrap catch_unwind() around the entire async block containing the loop. When a panic occurs during any loop iteration, catch_unwind() catches it and returns Err, terminating the entire future. No further iterations execute. This means one panic permanently kills the task, leaving the producer unable to emit clusters or the receiver unable to process messages—breaking the scheduler feed.
Wrap panic handling per-iteration inside the loop instead, so it can log and recover without terminating the feed:
loop {
let result = AssertUnwindSafe(async {
// iteration work
}).catch_unwind().await;
if let Err(e) = result {
error!("Iteration panicked: {:?}", e);
continue;
}
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rust/crates/scheduler/src/cluster.rs` around lines 379 - 520, The panic
handling currently wraps the whole producer and receiver async blocks (the async
move tasks that contain the main loops in the cluster feed), so any panic aborts
the entire task; change this by moving the
AssertUnwindSafe(...).catch_unwind().await inside each loop iteration: for the
producer loop (the async block that reads self.clusters, sends via sender, and
updates last_sent_map_producer) wrap the per-iteration work in
AssertUnwindSafe(...).catch_unwind().await, log the error (e.g., "Iteration
panicked: {:?}"), and continue the loop so the feed keeps running; do the same
for the receiver loop that matches on FeedMessage (the async block that awaits
feed_receiver.recv(), updates sleep_map and last_sent_map_receiver, and handles
FeedMessage::Stop), so a single iteration panic is logged and skipped without
terminating the whole spawn.
Add metrics to measure how long a cluster takes to round trip the scheduler loop.
Fix minor issues and refactor a confusing
if let Someconstruct.LLM usage disclosure
Claude Opus was used to investigate panic surfaces that might lead to abandoned clusters and to implement the metric collecting logic.
Summary by CodeRabbit
New Features
Bug Fixes
Performance Improvements