diff --git a/crates/tui/src/tools/subagent/mailbox.rs b/crates/tui/src/tools/subagent/mailbox.rs index 1a59504d..ee079c5e 100644 --- a/crates/tui/src/tools/subagent/mailbox.rs +++ b/crates/tui/src/tools/subagent/mailbox.rs @@ -58,6 +58,9 @@ pub enum MailboxMessage { Completed { agent_id: String, summary: String }, /// Agent failed with the carried error message. Failed { agent_id: String, error: String }, + /// Agent was interrupted (e.g. API timeout) with a continuable + /// checkpoint; the worker is parked waiting for continuation input. + Interrupted { agent_id: String, reason: String }, /// Cancellation propagated to this agent. Cancelled { agent_id: String }, /// Incremental token usage from a sub-agent's API call. @@ -83,6 +86,7 @@ impl MailboxMessage { | Self::ToolCallCompleted { agent_id, .. } | Self::Completed { agent_id, .. } | Self::Failed { agent_id, .. } + | Self::Interrupted { agent_id, .. } | Self::Cancelled { agent_id } | Self::TokenUsage { agent_id, .. } => agent_id, Self::ChildSpawned { child_id, .. } => child_id, @@ -460,6 +464,13 @@ mod tests { }, "a8", ), + ( + MailboxMessage::Interrupted { + agent_id: "a10".into(), + reason: "API call timed out".into(), + }, + "a10", + ), ( MailboxMessage::TokenUsage { agent_id: "a9".into(), diff --git a/crates/tui/src/tools/subagent/mod.rs b/crates/tui/src/tools/subagent/mod.rs index f8012e53..c3c6ece2 100644 --- a/crates/tui/src/tools/subagent/mod.rs +++ b/crates/tui/src/tools/subagent/mod.rs @@ -4390,6 +4390,12 @@ async fn run_subagent( checkpoint.clone(), )? }; + if let Some(mb) = runtime.mailbox.as_ref() { + let _ = mb.send(MailboxMessage::Interrupted { + agent_id: agent_id.clone(), + reason: reason.clone(), + }); + } let next_input = tokio::select! { biased; diff --git a/crates/tui/src/tools/subagent/tests.rs b/crates/tui/src/tools/subagent/tests.rs index 4b8e02d2..2128aff7 100644 --- a/crates/tui/src/tools/subagent/tests.rs +++ b/crates/tui/src/tools/subagent/tests.rs @@ -1317,6 +1317,9 @@ async fn api_timeout_preserves_checkpoint_and_agent_eval_continues_from_it() { runtime.client = client; runtime.manager = Arc::clone(&manager); runtime.context = ToolContext::new(tmp.path()); + let (mailbox, mut mailbox_rx) = + crate::tools::subagent::mailbox::Mailbox::new(tokio_util::sync::CancellationToken::new()); + runtime.mailbox = Some(mailbox); let task = SubAgentTask { manager_handle: Arc::clone(&manager), @@ -1375,6 +1378,29 @@ async fn api_timeout_preserves_checkpoint_and_agent_eval_continues_from_it() { .await .expect("first timed-out API attempt should reach the test server"); + let interrupted_envelope = tokio::time::timeout(Duration::from_secs(2), async { + loop { + for env in mailbox_rx.drain() { + if let MailboxMessage::Interrupted { + agent_id: id, + reason, + } = env.message + { + return (id, reason); + } + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .expect("API timeout should publish an Interrupted mailbox lifecycle event"); + assert_eq!(interrupted_envelope.0, agent_id); + assert!( + interrupted_envelope.1.contains("API call timed out"), + "reason should carry the timeout context: {}", + interrupted_envelope.1 + ); + let ctx = runtime.context.clone(); let tool = AgentEvalTool::new(Arc::clone(&manager)); let result = tool diff --git a/crates/tui/src/tui/subagent_routing.rs b/crates/tui/src/tui/subagent_routing.rs index 69ea98f2..f54e7d4d 100644 --- a/crates/tui/src/tui/subagent_routing.rs +++ b/crates/tui/src/tui/subagent_routing.rs @@ -69,6 +69,66 @@ pub(super) fn reconcile_subagent_activity_state(app: &mut App) { } else if app.agent_activity_started_at.is_none() { app.agent_activity_started_at = Some(Instant::now()); } + + reconcile_cards_with_snapshots(app); +} + +/// Sync in-transcript card slots that still render as running against the +/// canonical manager snapshot statuses. A card can miss its terminal mailbox +/// envelope (e.g. API-timeout interruption observed only via `AgentList`), +/// which would otherwise leave the fanout/delegate UI counting the agent as +/// running indefinitely. +fn reconcile_cards_with_snapshots(app: &mut App) { + let non_running: Vec<(String, AgentLifecycle)> = app + .subagent_cache + .iter() + .filter_map(|agent| { + let lifecycle = match &agent.status { + SubAgentStatus::Running => return None, + SubAgentStatus::Interrupted(_) => AgentLifecycle::Interrupted, + SubAgentStatus::Completed => AgentLifecycle::Completed, + SubAgentStatus::Failed(_) => AgentLifecycle::Failed, + SubAgentStatus::Cancelled => AgentLifecycle::Cancelled, + }; + Some((agent.agent_id.clone(), lifecycle)) + }) + .collect(); + for (agent_id, lifecycle) in non_running { + let Some(&idx) = app.subagent_card_index.get(&agent_id) else { + continue; + }; + let updated = match app.history.get_mut(idx) { + Some(HistoryCell::SubAgent(SubAgentCell::Delegate(card))) + if card.agent_id == agent_id + && matches!( + card.status, + AgentLifecycle::Pending | AgentLifecycle::Running + ) => + { + card.status = lifecycle; + true + } + Some(HistoryCell::SubAgent(SubAgentCell::Fanout(card))) => { + match card.workers.iter_mut().find(|slot| { + slot.agent_id == agent_id + && matches!( + slot.status, + AgentLifecycle::Pending | AgentLifecycle::Running + ) + }) { + Some(slot) => { + slot.status = lifecycle; + true + } + None => false, + } + } + _ => false, + }; + if updated { + app.bump_history_cell(idx); + } + } } fn subagent_status_rank(status: &SubAgentStatus) -> u8 { diff --git a/crates/tui/src/tui/ui/tests.rs b/crates/tui/src/tui/ui/tests.rs index 683d8dd3..10887f32 100644 --- a/crates/tui/src/tui/ui/tests.rs +++ b/crates/tui/src/tui/ui/tests.rs @@ -3527,6 +3527,83 @@ fn fanout_started_sibling_bumps_existing_card_revision() { } } +#[test] +fn fanout_interrupted_mailbox_drops_running_count() { + let mut app = create_test_app(); + app.pending_subagent_dispatch = Some("rlm".to_string()); + + for (seq, id) in ["fanout-a", "fanout-b"].iter().enumerate() { + handle_subagent_mailbox( + &mut app, + seq as u64 + 1, + &crate::tools::subagent::MailboxMessage::Started { + agent_id: (*id).to_string(), + agent_type: "default".to_string(), + }, + ); + } + assert_eq!( + crate::tui::subagent_routing::active_fanout_counts(&app), + Some((2, 2)) + ); + + handle_subagent_mailbox( + &mut app, + 3, + &crate::tools::subagent::MailboxMessage::Interrupted { + agent_id: "fanout-a".to_string(), + reason: "API call timed out after 120000ms".to_string(), + }, + ); + + assert_eq!( + crate::tui::subagent_routing::active_fanout_counts(&app), + Some((1, 2)), + "interrupted worker must no longer count as running" + ); +} + +#[test] +fn reconcile_syncs_stale_running_cards_with_interrupted_snapshots() { + let mut app = create_test_app(); + app.pending_subagent_dispatch = Some("rlm".to_string()); + + for (seq, id) in ["fanout-a", "fanout-b"].iter().enumerate() { + handle_subagent_mailbox( + &mut app, + seq as u64 + 1, + &crate::tools::subagent::MailboxMessage::Started { + agent_id: (*id).to_string(), + agent_type: "default".to_string(), + }, + ); + } + let fanout_idx = app.last_fanout_card_index.expect("fanout card index"); + let initial_revision = app.history_revisions[fanout_idx]; + + // The card missed its lifecycle envelope; only the manager snapshot + // (delivered via AgentList) knows the agents were interrupted. + app.subagent_cache = vec![ + make_subagent( + "fanout-a", + crate::tools::subagent::SubAgentStatus::Interrupted("API call timed out".to_string()), + ), + make_subagent("fanout-b", crate::tools::subagent::SubAgentStatus::Running), + ]; + reconcile_subagent_activity_state(&mut app); + + assert_eq!( + crate::tui::subagent_routing::active_fanout_counts(&app), + Some((1, 2)), + "snapshot reconciliation must clear the stale running slot" + ); + assert_ne!( + app.history_revisions[fanout_idx], initial_revision, + "reconciled card must invalidate cached transcript rows" + ); + assert_eq!(running_agent_count(&app), 1); +} + #[test] fn format_token_count_compact_formats_units() { assert_eq!(format_token_count_compact(999), "999"); diff --git a/crates/tui/src/tui/views/mod.rs b/crates/tui/src/tui/views/mod.rs index ea8fafd5..6b487341 100644 --- a/crates/tui/src/tui/views/mod.rs +++ b/crates/tui/src/tui/views/mod.rs @@ -1622,6 +1622,9 @@ fn lifecycle_to_subagent_status(status: AgentLifecycle) -> SubAgentStatus { AgentLifecycle::Completed => SubAgentStatus::Completed, AgentLifecycle::Failed => SubAgentStatus::Failed("failed in transcript".to_string()), AgentLifecycle::Cancelled => SubAgentStatus::Cancelled, + AgentLifecycle::Interrupted => { + SubAgentStatus::Interrupted("interrupted in transcript".to_string()) + } } } diff --git a/crates/tui/src/tui/widgets/agent_card.rs b/crates/tui/src/tui/widgets/agent_card.rs index 7765780d..6b074902 100644 --- a/crates/tui/src/tui/widgets/agent_card.rs +++ b/crates/tui/src/tui/widgets/agent_card.rs @@ -34,11 +34,17 @@ pub enum AgentLifecycle { Completed, Failed, Cancelled, + /// Interrupted with a continuable checkpoint (e.g. API timeout); not + /// running, but recoverable via `agent_eval` continuation. + Interrupted, } impl AgentLifecycle { fn is_terminal(self) -> bool { - matches!(self, Self::Completed | Self::Failed | Self::Cancelled) + matches!( + self, + Self::Completed | Self::Failed | Self::Cancelled | Self::Interrupted + ) } fn label(self) -> &'static str { @@ -48,6 +54,7 @@ impl AgentLifecycle { Self::Completed => "done", Self::Failed => "failed", Self::Cancelled => "cancelled", + Self::Interrupted => "interrupted", } } @@ -58,6 +65,7 @@ impl AgentLifecycle { Self::Completed => palette::STATUS_SUCCESS, Self::Failed => palette::STATUS_ERROR, Self::Cancelled => palette::TEXT_MUTED, + Self::Interrupted => palette::STATUS_WARNING, } } } @@ -265,7 +273,9 @@ impl FanoutCard { match slot.status { AgentLifecycle::Completed => done += 1, AgentLifecycle::Running => running += 1, - AgentLifecycle::Failed | AgentLifecycle::Cancelled => failed += 1, + AgentLifecycle::Failed + | AgentLifecycle::Cancelled + | AgentLifecycle::Interrupted => failed += 1, AgentLifecycle::Pending => pending += 1, } } @@ -277,11 +287,12 @@ impl FanoutCard { let mut s = String::with_capacity(self.workers.len()); for slot in &self.workers { let glyph = match slot.status { - AgentLifecycle::Completed => '\u{25CF}', // ● - AgentLifecycle::Running => '\u{25D0}', // ◐ - AgentLifecycle::Failed => '\u{00D7}', // × - AgentLifecycle::Cancelled => '\u{2298}', // ⊘ - AgentLifecycle::Pending => '\u{25CB}', // ○ + AgentLifecycle::Completed => '\u{25CF}', // ● + AgentLifecycle::Running => '\u{25D0}', // ◐ + AgentLifecycle::Failed => '\u{00D7}', // × + AgentLifecycle::Cancelled => '\u{2298}', // ⊘ + AgentLifecycle::Pending => '\u{25CB}', // ○ + AgentLifecycle::Interrupted => '\u{25CC}', // ◌ }; s.push(glyph); } @@ -327,6 +338,12 @@ impl FanoutCard { let (done, running, failed, pending) = self.counts(); if running > 0 || pending > 0 { AgentLifecycle::Running + } else if self + .workers + .iter() + .any(|slot| matches!(slot.status, AgentLifecycle::Interrupted)) + { + AgentLifecycle::Interrupted } else if failed > 0 && done == 0 { AgentLifecycle::Failed } else if done > 0 { @@ -434,6 +451,10 @@ pub fn apply_to_delegate(card: &mut DelegateCard, msg: &MailboxMessage) -> bool card.status = AgentLifecycle::Failed; card.summary = Some(error.clone()); } + MailboxMessage::Interrupted { reason, .. } => { + card.status = AgentLifecycle::Interrupted; + card.summary = Some(reason.clone()); + } MailboxMessage::Cancelled { .. } => { card.status = AgentLifecycle::Cancelled; } @@ -481,6 +502,10 @@ pub fn apply_to_fanout(card: &mut FanoutCard, msg: &MailboxMessage) -> bool { card.upsert_worker(id, AgentLifecycle::Failed); true } + MailboxMessage::Interrupted { .. } => { + card.upsert_worker(id, AgentLifecycle::Interrupted); + true + } MailboxMessage::Cancelled { .. } => { card.upsert_worker(id, AgentLifecycle::Cancelled); true @@ -747,6 +772,72 @@ mod tests { } } + #[test] + fn delegate_interrupted_leaves_running_and_renders_reason() { + let mut card = DelegateCard::new("agent_int", "general"); + apply_to_delegate( + &mut card, + &MailboxMessage::started("agent_int", crate::tools::subagent::SubAgentType::General), + ); + assert_eq!(card.status, AgentLifecycle::Running); + + let msg = MailboxMessage::Interrupted { + agent_id: "agent_int".into(), + reason: "API call timed out after 120000ms; checkpoint preserved for continuation" + .into(), + }; + assert!(apply_to_delegate(&mut card, &msg)); + assert_eq!(card.status, AgentLifecycle::Interrupted); + + let rendered = render_to_strings(&card.render_lines(80)).join("\n"); + assert!(rendered.contains("[interrupted]"), "{rendered}"); + assert!(rendered.contains("API call timed out"), "{rendered}"); + } + + #[test] + fn fanout_interrupted_worker_leaves_running_counts() { + let mut card = FanoutCard::new("fanout", Locale::En).with_workers(["w_1", "w_2"]); + apply_to_fanout( + &mut card, + &MailboxMessage::started("w_1", crate::tools::subagent::SubAgentType::General), + ); + apply_to_fanout( + &mut card, + &MailboxMessage::started("w_2", crate::tools::subagent::SubAgentType::General), + ); + + let msg = MailboxMessage::Interrupted { + agent_id: "w_1".into(), + reason: "API call timed out".into(), + }; + assert!(apply_to_fanout(&mut card, &msg)); + assert_eq!(card.workers[0].status, AgentLifecycle::Interrupted); + assert_eq!(card.workers[1].status, AgentLifecycle::Running); + + let rendered = render_to_strings(&card.render_lines(80)); + let stats = rendered + .iter() + .find(|line| line.contains("running") && line.contains("pending")) + .expect("counts line present"); + assert!(stats.contains("1 running"), "{stats}"); + assert!( + stats.contains("1 failed"), + "interrupted folds into the non-running attention bucket: {stats}" + ); + + let msg = MailboxMessage::Interrupted { + agent_id: "w_2".into(), + reason: "API call timed out".into(), + }; + assert!(apply_to_fanout(&mut card, &msg)); + let rendered = render_to_strings(&card.render_lines(80)).join("\n"); + assert!( + rendered.contains("[interrupted]"), + "aggregate header should surface interrupted once nothing runs: {rendered}" + ); + assert!(rendered.contains("0 running"), "{rendered}"); + } + #[test] fn fanout_counts_are_localized() { let ids: Vec = (0..16).map(|i| format!("w_{i}")).collect();