fix(tui): emit interrupted sub-agent lifecycle event and reconcile stale running cards
API-timeout interruptions now publish MailboxMessage::Interrupted so delegate/fanout cards leave the running state, and AgentList snapshot reconciliation syncs any card slot that missed its terminal envelope. Fixes #3080 Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
This commit is contained in:
@@ -58,6 +58,9 @@ pub enum MailboxMessage {
|
||||
Completed { agent_id: String, summary: String },
|
||||
/// Agent failed with the carried error message.
|
||||
Failed { agent_id: String, error: String },
|
||||
/// Agent was interrupted (e.g. API timeout) with a continuable
|
||||
/// checkpoint; the worker is parked waiting for continuation input.
|
||||
Interrupted { agent_id: String, reason: String },
|
||||
/// Cancellation propagated to this agent.
|
||||
Cancelled { agent_id: String },
|
||||
/// Incremental token usage from a sub-agent's API call.
|
||||
@@ -83,6 +86,7 @@ impl MailboxMessage {
|
||||
| Self::ToolCallCompleted { agent_id, .. }
|
||||
| Self::Completed { agent_id, .. }
|
||||
| Self::Failed { agent_id, .. }
|
||||
| Self::Interrupted { agent_id, .. }
|
||||
| Self::Cancelled { agent_id }
|
||||
| Self::TokenUsage { agent_id, .. } => agent_id,
|
||||
Self::ChildSpawned { child_id, .. } => child_id,
|
||||
@@ -460,6 +464,13 @@ mod tests {
|
||||
},
|
||||
"a8",
|
||||
),
|
||||
(
|
||||
MailboxMessage::Interrupted {
|
||||
agent_id: "a10".into(),
|
||||
reason: "API call timed out".into(),
|
||||
},
|
||||
"a10",
|
||||
),
|
||||
(
|
||||
MailboxMessage::TokenUsage {
|
||||
agent_id: "a9".into(),
|
||||
|
||||
@@ -4390,6 +4390,12 @@ async fn run_subagent(
|
||||
checkpoint.clone(),
|
||||
)?
|
||||
};
|
||||
if let Some(mb) = runtime.mailbox.as_ref() {
|
||||
let _ = mb.send(MailboxMessage::Interrupted {
|
||||
agent_id: agent_id.clone(),
|
||||
reason: reason.clone(),
|
||||
});
|
||||
}
|
||||
|
||||
let next_input = tokio::select! {
|
||||
biased;
|
||||
|
||||
@@ -1317,6 +1317,9 @@ async fn api_timeout_preserves_checkpoint_and_agent_eval_continues_from_it() {
|
||||
runtime.client = client;
|
||||
runtime.manager = Arc::clone(&manager);
|
||||
runtime.context = ToolContext::new(tmp.path());
|
||||
let (mailbox, mut mailbox_rx) =
|
||||
crate::tools::subagent::mailbox::Mailbox::new(tokio_util::sync::CancellationToken::new());
|
||||
runtime.mailbox = Some(mailbox);
|
||||
|
||||
let task = SubAgentTask {
|
||||
manager_handle: Arc::clone(&manager),
|
||||
@@ -1375,6 +1378,29 @@ async fn api_timeout_preserves_checkpoint_and_agent_eval_continues_from_it() {
|
||||
.await
|
||||
.expect("first timed-out API attempt should reach the test server");
|
||||
|
||||
let interrupted_envelope = tokio::time::timeout(Duration::from_secs(2), async {
|
||||
loop {
|
||||
for env in mailbox_rx.drain() {
|
||||
if let MailboxMessage::Interrupted {
|
||||
agent_id: id,
|
||||
reason,
|
||||
} = env.message
|
||||
{
|
||||
return (id, reason);
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("API timeout should publish an Interrupted mailbox lifecycle event");
|
||||
assert_eq!(interrupted_envelope.0, agent_id);
|
||||
assert!(
|
||||
interrupted_envelope.1.contains("API call timed out"),
|
||||
"reason should carry the timeout context: {}",
|
||||
interrupted_envelope.1
|
||||
);
|
||||
|
||||
let ctx = runtime.context.clone();
|
||||
let tool = AgentEvalTool::new(Arc::clone(&manager));
|
||||
let result = tool
|
||||
|
||||
@@ -69,6 +69,66 @@ pub(super) fn reconcile_subagent_activity_state(app: &mut App) {
|
||||
} else if app.agent_activity_started_at.is_none() {
|
||||
app.agent_activity_started_at = Some(Instant::now());
|
||||
}
|
||||
|
||||
reconcile_cards_with_snapshots(app);
|
||||
}
|
||||
|
||||
/// Sync in-transcript card slots that still render as running against the
|
||||
/// canonical manager snapshot statuses. A card can miss its terminal mailbox
|
||||
/// envelope (e.g. API-timeout interruption observed only via `AgentList`),
|
||||
/// which would otherwise leave the fanout/delegate UI counting the agent as
|
||||
/// running indefinitely.
|
||||
fn reconcile_cards_with_snapshots(app: &mut App) {
|
||||
let non_running: Vec<(String, AgentLifecycle)> = app
|
||||
.subagent_cache
|
||||
.iter()
|
||||
.filter_map(|agent| {
|
||||
let lifecycle = match &agent.status {
|
||||
SubAgentStatus::Running => return None,
|
||||
SubAgentStatus::Interrupted(_) => AgentLifecycle::Interrupted,
|
||||
SubAgentStatus::Completed => AgentLifecycle::Completed,
|
||||
SubAgentStatus::Failed(_) => AgentLifecycle::Failed,
|
||||
SubAgentStatus::Cancelled => AgentLifecycle::Cancelled,
|
||||
};
|
||||
Some((agent.agent_id.clone(), lifecycle))
|
||||
})
|
||||
.collect();
|
||||
for (agent_id, lifecycle) in non_running {
|
||||
let Some(&idx) = app.subagent_card_index.get(&agent_id) else {
|
||||
continue;
|
||||
};
|
||||
let updated = match app.history.get_mut(idx) {
|
||||
Some(HistoryCell::SubAgent(SubAgentCell::Delegate(card)))
|
||||
if card.agent_id == agent_id
|
||||
&& matches!(
|
||||
card.status,
|
||||
AgentLifecycle::Pending | AgentLifecycle::Running
|
||||
) =>
|
||||
{
|
||||
card.status = lifecycle;
|
||||
true
|
||||
}
|
||||
Some(HistoryCell::SubAgent(SubAgentCell::Fanout(card))) => {
|
||||
match card.workers.iter_mut().find(|slot| {
|
||||
slot.agent_id == agent_id
|
||||
&& matches!(
|
||||
slot.status,
|
||||
AgentLifecycle::Pending | AgentLifecycle::Running
|
||||
)
|
||||
}) {
|
||||
Some(slot) => {
|
||||
slot.status = lifecycle;
|
||||
true
|
||||
}
|
||||
None => false,
|
||||
}
|
||||
}
|
||||
_ => false,
|
||||
};
|
||||
if updated {
|
||||
app.bump_history_cell(idx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn subagent_status_rank(status: &SubAgentStatus) -> u8 {
|
||||
|
||||
@@ -3527,6 +3527,83 @@ fn fanout_started_sibling_bumps_existing_card_revision() {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fanout_interrupted_mailbox_drops_running_count() {
|
||||
let mut app = create_test_app();
|
||||
app.pending_subagent_dispatch = Some("rlm".to_string());
|
||||
|
||||
for (seq, id) in ["fanout-a", "fanout-b"].iter().enumerate() {
|
||||
handle_subagent_mailbox(
|
||||
&mut app,
|
||||
seq as u64 + 1,
|
||||
&crate::tools::subagent::MailboxMessage::Started {
|
||||
agent_id: (*id).to_string(),
|
||||
agent_type: "default".to_string(),
|
||||
},
|
||||
);
|
||||
}
|
||||
assert_eq!(
|
||||
crate::tui::subagent_routing::active_fanout_counts(&app),
|
||||
Some((2, 2))
|
||||
);
|
||||
|
||||
handle_subagent_mailbox(
|
||||
&mut app,
|
||||
3,
|
||||
&crate::tools::subagent::MailboxMessage::Interrupted {
|
||||
agent_id: "fanout-a".to_string(),
|
||||
reason: "API call timed out after 120000ms".to_string(),
|
||||
},
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
crate::tui::subagent_routing::active_fanout_counts(&app),
|
||||
Some((1, 2)),
|
||||
"interrupted worker must no longer count as running"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reconcile_syncs_stale_running_cards_with_interrupted_snapshots() {
|
||||
let mut app = create_test_app();
|
||||
app.pending_subagent_dispatch = Some("rlm".to_string());
|
||||
|
||||
for (seq, id) in ["fanout-a", "fanout-b"].iter().enumerate() {
|
||||
handle_subagent_mailbox(
|
||||
&mut app,
|
||||
seq as u64 + 1,
|
||||
&crate::tools::subagent::MailboxMessage::Started {
|
||||
agent_id: (*id).to_string(),
|
||||
agent_type: "default".to_string(),
|
||||
},
|
||||
);
|
||||
}
|
||||
let fanout_idx = app.last_fanout_card_index.expect("fanout card index");
|
||||
let initial_revision = app.history_revisions[fanout_idx];
|
||||
|
||||
// The card missed its lifecycle envelope; only the manager snapshot
|
||||
// (delivered via AgentList) knows the agents were interrupted.
|
||||
app.subagent_cache = vec![
|
||||
make_subagent(
|
||||
"fanout-a",
|
||||
crate::tools::subagent::SubAgentStatus::Interrupted("API call timed out".to_string()),
|
||||
),
|
||||
make_subagent("fanout-b", crate::tools::subagent::SubAgentStatus::Running),
|
||||
];
|
||||
reconcile_subagent_activity_state(&mut app);
|
||||
|
||||
assert_eq!(
|
||||
crate::tui::subagent_routing::active_fanout_counts(&app),
|
||||
Some((1, 2)),
|
||||
"snapshot reconciliation must clear the stale running slot"
|
||||
);
|
||||
assert_ne!(
|
||||
app.history_revisions[fanout_idx], initial_revision,
|
||||
"reconciled card must invalidate cached transcript rows"
|
||||
);
|
||||
assert_eq!(running_agent_count(&app), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn format_token_count_compact_formats_units() {
|
||||
assert_eq!(format_token_count_compact(999), "999");
|
||||
|
||||
@@ -1622,6 +1622,9 @@ fn lifecycle_to_subagent_status(status: AgentLifecycle) -> SubAgentStatus {
|
||||
AgentLifecycle::Completed => SubAgentStatus::Completed,
|
||||
AgentLifecycle::Failed => SubAgentStatus::Failed("failed in transcript".to_string()),
|
||||
AgentLifecycle::Cancelled => SubAgentStatus::Cancelled,
|
||||
AgentLifecycle::Interrupted => {
|
||||
SubAgentStatus::Interrupted("interrupted in transcript".to_string())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -34,11 +34,17 @@ pub enum AgentLifecycle {
|
||||
Completed,
|
||||
Failed,
|
||||
Cancelled,
|
||||
/// Interrupted with a continuable checkpoint (e.g. API timeout); not
|
||||
/// running, but recoverable via `agent_eval` continuation.
|
||||
Interrupted,
|
||||
}
|
||||
|
||||
impl AgentLifecycle {
|
||||
fn is_terminal(self) -> bool {
|
||||
matches!(self, Self::Completed | Self::Failed | Self::Cancelled)
|
||||
matches!(
|
||||
self,
|
||||
Self::Completed | Self::Failed | Self::Cancelled | Self::Interrupted
|
||||
)
|
||||
}
|
||||
|
||||
fn label(self) -> &'static str {
|
||||
@@ -48,6 +54,7 @@ impl AgentLifecycle {
|
||||
Self::Completed => "done",
|
||||
Self::Failed => "failed",
|
||||
Self::Cancelled => "cancelled",
|
||||
Self::Interrupted => "interrupted",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,6 +65,7 @@ impl AgentLifecycle {
|
||||
Self::Completed => palette::STATUS_SUCCESS,
|
||||
Self::Failed => palette::STATUS_ERROR,
|
||||
Self::Cancelled => palette::TEXT_MUTED,
|
||||
Self::Interrupted => palette::STATUS_WARNING,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -265,7 +273,9 @@ impl FanoutCard {
|
||||
match slot.status {
|
||||
AgentLifecycle::Completed => done += 1,
|
||||
AgentLifecycle::Running => running += 1,
|
||||
AgentLifecycle::Failed | AgentLifecycle::Cancelled => failed += 1,
|
||||
AgentLifecycle::Failed
|
||||
| AgentLifecycle::Cancelled
|
||||
| AgentLifecycle::Interrupted => failed += 1,
|
||||
AgentLifecycle::Pending => pending += 1,
|
||||
}
|
||||
}
|
||||
@@ -277,11 +287,12 @@ impl FanoutCard {
|
||||
let mut s = String::with_capacity(self.workers.len());
|
||||
for slot in &self.workers {
|
||||
let glyph = match slot.status {
|
||||
AgentLifecycle::Completed => '\u{25CF}', // ●
|
||||
AgentLifecycle::Running => '\u{25D0}', // ◐
|
||||
AgentLifecycle::Failed => '\u{00D7}', // ×
|
||||
AgentLifecycle::Cancelled => '\u{2298}', // ⊘
|
||||
AgentLifecycle::Pending => '\u{25CB}', // ○
|
||||
AgentLifecycle::Completed => '\u{25CF}', // ●
|
||||
AgentLifecycle::Running => '\u{25D0}', // ◐
|
||||
AgentLifecycle::Failed => '\u{00D7}', // ×
|
||||
AgentLifecycle::Cancelled => '\u{2298}', // ⊘
|
||||
AgentLifecycle::Pending => '\u{25CB}', // ○
|
||||
AgentLifecycle::Interrupted => '\u{25CC}', // ◌
|
||||
};
|
||||
s.push(glyph);
|
||||
}
|
||||
@@ -327,6 +338,12 @@ impl FanoutCard {
|
||||
let (done, running, failed, pending) = self.counts();
|
||||
if running > 0 || pending > 0 {
|
||||
AgentLifecycle::Running
|
||||
} else if self
|
||||
.workers
|
||||
.iter()
|
||||
.any(|slot| matches!(slot.status, AgentLifecycle::Interrupted))
|
||||
{
|
||||
AgentLifecycle::Interrupted
|
||||
} else if failed > 0 && done == 0 {
|
||||
AgentLifecycle::Failed
|
||||
} else if done > 0 {
|
||||
@@ -434,6 +451,10 @@ pub fn apply_to_delegate(card: &mut DelegateCard, msg: &MailboxMessage) -> bool
|
||||
card.status = AgentLifecycle::Failed;
|
||||
card.summary = Some(error.clone());
|
||||
}
|
||||
MailboxMessage::Interrupted { reason, .. } => {
|
||||
card.status = AgentLifecycle::Interrupted;
|
||||
card.summary = Some(reason.clone());
|
||||
}
|
||||
MailboxMessage::Cancelled { .. } => {
|
||||
card.status = AgentLifecycle::Cancelled;
|
||||
}
|
||||
@@ -481,6 +502,10 @@ pub fn apply_to_fanout(card: &mut FanoutCard, msg: &MailboxMessage) -> bool {
|
||||
card.upsert_worker(id, AgentLifecycle::Failed);
|
||||
true
|
||||
}
|
||||
MailboxMessage::Interrupted { .. } => {
|
||||
card.upsert_worker(id, AgentLifecycle::Interrupted);
|
||||
true
|
||||
}
|
||||
MailboxMessage::Cancelled { .. } => {
|
||||
card.upsert_worker(id, AgentLifecycle::Cancelled);
|
||||
true
|
||||
@@ -747,6 +772,72 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn delegate_interrupted_leaves_running_and_renders_reason() {
|
||||
let mut card = DelegateCard::new("agent_int", "general");
|
||||
apply_to_delegate(
|
||||
&mut card,
|
||||
&MailboxMessage::started("agent_int", crate::tools::subagent::SubAgentType::General),
|
||||
);
|
||||
assert_eq!(card.status, AgentLifecycle::Running);
|
||||
|
||||
let msg = MailboxMessage::Interrupted {
|
||||
agent_id: "agent_int".into(),
|
||||
reason: "API call timed out after 120000ms; checkpoint preserved for continuation"
|
||||
.into(),
|
||||
};
|
||||
assert!(apply_to_delegate(&mut card, &msg));
|
||||
assert_eq!(card.status, AgentLifecycle::Interrupted);
|
||||
|
||||
let rendered = render_to_strings(&card.render_lines(80)).join("\n");
|
||||
assert!(rendered.contains("[interrupted]"), "{rendered}");
|
||||
assert!(rendered.contains("API call timed out"), "{rendered}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fanout_interrupted_worker_leaves_running_counts() {
|
||||
let mut card = FanoutCard::new("fanout", Locale::En).with_workers(["w_1", "w_2"]);
|
||||
apply_to_fanout(
|
||||
&mut card,
|
||||
&MailboxMessage::started("w_1", crate::tools::subagent::SubAgentType::General),
|
||||
);
|
||||
apply_to_fanout(
|
||||
&mut card,
|
||||
&MailboxMessage::started("w_2", crate::tools::subagent::SubAgentType::General),
|
||||
);
|
||||
|
||||
let msg = MailboxMessage::Interrupted {
|
||||
agent_id: "w_1".into(),
|
||||
reason: "API call timed out".into(),
|
||||
};
|
||||
assert!(apply_to_fanout(&mut card, &msg));
|
||||
assert_eq!(card.workers[0].status, AgentLifecycle::Interrupted);
|
||||
assert_eq!(card.workers[1].status, AgentLifecycle::Running);
|
||||
|
||||
let rendered = render_to_strings(&card.render_lines(80));
|
||||
let stats = rendered
|
||||
.iter()
|
||||
.find(|line| line.contains("running") && line.contains("pending"))
|
||||
.expect("counts line present");
|
||||
assert!(stats.contains("1 running"), "{stats}");
|
||||
assert!(
|
||||
stats.contains("1 failed"),
|
||||
"interrupted folds into the non-running attention bucket: {stats}"
|
||||
);
|
||||
|
||||
let msg = MailboxMessage::Interrupted {
|
||||
agent_id: "w_2".into(),
|
||||
reason: "API call timed out".into(),
|
||||
};
|
||||
assert!(apply_to_fanout(&mut card, &msg));
|
||||
let rendered = render_to_strings(&card.render_lines(80)).join("\n");
|
||||
assert!(
|
||||
rendered.contains("[interrupted]"),
|
||||
"aggregate header should surface interrupted once nothing runs: {rendered}"
|
||||
);
|
||||
assert!(rendered.contains("0 running"), "{rendered}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fanout_counts_are_localized() {
|
||||
let ids: Vec<String> = (0..16).map(|i| format!("w_{i}")).collect();
|
||||
|
||||
Reference in New Issue
Block a user