Fix agent swarm cancellation and status UI
This commit is contained in:
+162
-19
@@ -17,8 +17,8 @@ use crate::tools::spec::{
|
||||
optional_bool, optional_str, optional_u64,
|
||||
};
|
||||
use crate::tools::subagent::{
|
||||
SharedSubAgentManager, SubAgentAssignment, SubAgentResult, SubAgentRuntime, SubAgentStatus,
|
||||
SubAgentType,
|
||||
MailboxMessage, SharedSubAgentManager, SubAgentAssignment, SubAgentResult, SubAgentRuntime,
|
||||
SubAgentStatus, SubAgentType,
|
||||
};
|
||||
|
||||
const SWARM_POLL_INTERVAL: Duration = Duration::from_millis(250);
|
||||
@@ -66,6 +66,7 @@ enum SwarmTaskState {
|
||||
Done(SubAgentResult),
|
||||
Failed(String),
|
||||
Skipped(String),
|
||||
Cancelled(String),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
@@ -101,6 +102,7 @@ enum SwarmStatus {
|
||||
Partial,
|
||||
Timeout,
|
||||
Failed,
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
impl SwarmStatus {
|
||||
@@ -115,6 +117,7 @@ impl SwarmStatus {
|
||||
Self::Partial => "partial",
|
||||
Self::Timeout => "timeout",
|
||||
Self::Failed => "failed",
|
||||
Self::Cancelled => "cancelled",
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -614,10 +617,38 @@ async fn run_swarm(
|
||||
let mut retry_ready_at: HashMap<String, Instant> = HashMap::new();
|
||||
let mut fail_fast_triggered = false;
|
||||
let mut timed_out = false;
|
||||
let mut cancelled = false;
|
||||
|
||||
loop {
|
||||
let mut changed = false;
|
||||
|
||||
if runtime.cancel_token.is_cancelled() {
|
||||
cancelled = true;
|
||||
cancel_swarm_tasks(
|
||||
shared_manager,
|
||||
runtime,
|
||||
&mut states,
|
||||
&mut pending,
|
||||
&mut running,
|
||||
&mut running_started_at,
|
||||
&mut retry_ready_at,
|
||||
"Cancelled",
|
||||
)
|
||||
.await?;
|
||||
if publish_progress {
|
||||
let progress = build_progress_outcome(
|
||||
&swarm_id,
|
||||
start,
|
||||
&task_order,
|
||||
&states,
|
||||
SwarmStatus::Cancelled,
|
||||
);
|
||||
store_swarm_outcome(&progress, persistence_path.as_deref());
|
||||
emit_swarm_status(runtime.event_tx.as_ref(), &progress);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
if !running.is_empty() {
|
||||
let snapshots = {
|
||||
let manager = shared_manager.lock().await;
|
||||
@@ -757,6 +788,7 @@ async fn run_swarm(
|
||||
if fail_fast_triggered {
|
||||
apply_fail_fast(
|
||||
shared_manager,
|
||||
runtime,
|
||||
&mut states,
|
||||
&mut pending,
|
||||
&mut running,
|
||||
@@ -893,6 +925,7 @@ async fn run_swarm(
|
||||
if fail_fast_triggered {
|
||||
apply_fail_fast(
|
||||
shared_manager,
|
||||
runtime,
|
||||
&mut states,
|
||||
&mut pending,
|
||||
&mut running,
|
||||
@@ -920,7 +953,7 @@ async fn run_swarm(
|
||||
if Instant::now() >= deadline {
|
||||
timed_out = true;
|
||||
if !running.is_empty() {
|
||||
cancel_running_tasks(shared_manager, &running, &mut states).await?;
|
||||
cancel_running_tasks(shared_manager, runtime, &running, &mut states).await?;
|
||||
running.clear();
|
||||
running_started_at.clear();
|
||||
}
|
||||
@@ -940,13 +973,19 @@ async fn run_swarm(
|
||||
}
|
||||
|
||||
if !changed {
|
||||
tokio::time::sleep(SWARM_POLL_INTERVAL).await;
|
||||
tokio::select! {
|
||||
biased;
|
||||
() = runtime.cancel_token.cancelled() => {}
|
||||
() = tokio::time::sleep(SWARM_POLL_INTERVAL) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let outcomes = build_task_outcomes(&task_order, &states);
|
||||
let counts = build_counts(&outcomes);
|
||||
let status = if fail_fast_triggered {
|
||||
let status = if cancelled {
|
||||
SwarmStatus::Cancelled
|
||||
} else if fail_fast_triggered {
|
||||
SwarmStatus::Failed
|
||||
} else if timed_out {
|
||||
SwarmStatus::Timeout
|
||||
@@ -1225,41 +1264,76 @@ fn dependencies_failed(task: &SwarmTaskSpec, states: &HashMap<String, SwarmTaskS
|
||||
SubAgentStatus::Interrupted(_) | SubAgentStatus::Failed(_) | SubAgentStatus::Cancelled
|
||||
),
|
||||
Some(SwarmTaskState::Failed(_)) | Some(SwarmTaskState::Skipped(_)) => true,
|
||||
Some(SwarmTaskState::Cancelled(_)) => true,
|
||||
_ => false,
|
||||
})
|
||||
}
|
||||
|
||||
async fn cancel_running_tasks(
|
||||
manager: &SharedSubAgentManager,
|
||||
runtime: &SubAgentRuntime,
|
||||
running: &HashMap<String, String>,
|
||||
states: &mut HashMap<String, SwarmTaskState>,
|
||||
) -> Result<(), ToolError> {
|
||||
let mut manager = manager.lock().await;
|
||||
for (task_id, agent_id) in running {
|
||||
match manager.cancel(agent_id) {
|
||||
Ok(snapshot) => {
|
||||
states.insert(task_id.clone(), SwarmTaskState::Done(snapshot));
|
||||
}
|
||||
Err(err) => {
|
||||
states.insert(
|
||||
task_id.clone(),
|
||||
SwarmTaskState::Failed(format!("Failed to cancel agent: {err}")),
|
||||
);
|
||||
let mut cancelled_agents = Vec::new();
|
||||
{
|
||||
let mut manager = manager.lock().await;
|
||||
for (task_id, agent_id) in running {
|
||||
match manager.cancel(agent_id) {
|
||||
Ok(snapshot) => {
|
||||
if matches!(snapshot.status, SubAgentStatus::Cancelled) {
|
||||
cancelled_agents.push(snapshot.agent_id.clone());
|
||||
}
|
||||
states.insert(task_id.clone(), SwarmTaskState::Done(snapshot));
|
||||
}
|
||||
Err(err) => {
|
||||
states.insert(
|
||||
task_id.clone(),
|
||||
SwarmTaskState::Failed(format!("Failed to cancel agent: {err}")),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(mailbox) = runtime.mailbox.as_ref() {
|
||||
for agent_id in cancelled_agents {
|
||||
let _ = mailbox.send(MailboxMessage::Cancelled { agent_id });
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn cancel_swarm_tasks(
|
||||
manager: &SharedSubAgentManager,
|
||||
runtime: &SubAgentRuntime,
|
||||
states: &mut HashMap<String, SwarmTaskState>,
|
||||
pending: &mut HashSet<String>,
|
||||
running: &mut HashMap<String, String>,
|
||||
running_started_at: &mut HashMap<String, Instant>,
|
||||
retry_ready_at: &mut HashMap<String, Instant>,
|
||||
reason: &str,
|
||||
) -> Result<(), ToolError> {
|
||||
cancel_running_tasks(manager, runtime, running, states).await?;
|
||||
for task_id in pending.drain() {
|
||||
states.insert(task_id, SwarmTaskState::Cancelled(reason.to_string()));
|
||||
}
|
||||
running.clear();
|
||||
running_started_at.clear();
|
||||
retry_ready_at.clear();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn apply_fail_fast(
|
||||
manager: &SharedSubAgentManager,
|
||||
runtime: &SubAgentRuntime,
|
||||
states: &mut HashMap<String, SwarmTaskState>,
|
||||
pending: &mut HashSet<String>,
|
||||
running: &mut HashMap<String, String>,
|
||||
running_started_at: &mut HashMap<String, Instant>,
|
||||
retry_ready_at: &mut HashMap<String, Instant>,
|
||||
) -> Result<(), ToolError> {
|
||||
cancel_running_tasks(manager, running, states).await?;
|
||||
cancel_running_tasks(manager, runtime, running, states).await?;
|
||||
for task_id in pending.drain() {
|
||||
states.insert(
|
||||
task_id,
|
||||
@@ -1353,6 +1427,15 @@ fn build_task_outcomes(
|
||||
steps_taken: 0,
|
||||
duration_ms: 0,
|
||||
},
|
||||
Some(SwarmTaskState::Cancelled(message)) => SwarmTaskOutcome {
|
||||
task_id: task_id.clone(),
|
||||
agent_id: None,
|
||||
status: SwarmTaskStatus::Cancelled,
|
||||
result: None,
|
||||
error: Some(message.clone()),
|
||||
steps_taken: 0,
|
||||
duration_ms: 0,
|
||||
},
|
||||
_ => SwarmTaskOutcome {
|
||||
task_id: task_id.clone(),
|
||||
agent_id: None,
|
||||
@@ -1507,11 +1590,20 @@ fn visit(
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{
|
||||
SwarmStatus, SwarmTaskSpec, build_initial_outcome, parse_swarm_id, resolve_task_assignment,
|
||||
retry_delay_for_attempt, task_retry_count, task_timeout, validate_swarm_tasks,
|
||||
SwarmStatus, SwarmTaskSpec, SwarmTaskStatus, build_initial_outcome, parse_swarm_id,
|
||||
resolve_task_assignment, retry_delay_for_attempt, run_swarm, task_retry_count,
|
||||
task_timeout, validate_swarm_tasks,
|
||||
};
|
||||
use crate::client::DeepSeekClient;
|
||||
use crate::config::Config;
|
||||
use crate::tools::spec::ToolContext;
|
||||
use crate::tools::subagent::{SubAgentManager, SubAgentRuntime};
|
||||
use serde_json::json;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tempfile::tempdir;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
fn task(id: &str, deps: &[&str]) -> SwarmTaskSpec {
|
||||
SwarmTaskSpec {
|
||||
@@ -1608,6 +1700,57 @@ mod tests {
|
||||
assert_eq!(outcome.counts.pending, 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cancelled_runtime_returns_cancelled_swarm_without_spawning() {
|
||||
let temp = tempdir().expect("tempdir");
|
||||
let manager = Arc::new(Mutex::new(SubAgentManager::new(
|
||||
temp.path().to_path_buf(),
|
||||
2,
|
||||
)));
|
||||
let config = Config {
|
||||
api_key: Some("test-key".to_string()),
|
||||
..Default::default()
|
||||
};
|
||||
let client = DeepSeekClient::new(&config).expect("client");
|
||||
let cancel_token = CancellationToken::new();
|
||||
cancel_token.cancel();
|
||||
let context = ToolContext::new(temp.path()).with_cancel_token(cancel_token.clone());
|
||||
let runtime = SubAgentRuntime::new(
|
||||
client,
|
||||
"deepseek-v4-flash".to_string(),
|
||||
context,
|
||||
true,
|
||||
None,
|
||||
manager.clone(),
|
||||
)
|
||||
.with_cancel_token(cancel_token);
|
||||
|
||||
let outcome = run_swarm(
|
||||
&manager,
|
||||
&runtime,
|
||||
"swarm_test".to_string(),
|
||||
vec![task("a", &[])],
|
||||
None,
|
||||
Duration::from_secs(60),
|
||||
1,
|
||||
false,
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("swarm should return clean cancellation");
|
||||
|
||||
assert!(matches!(outcome.status, SwarmStatus::Cancelled));
|
||||
assert_eq!(outcome.counts.cancelled, 1);
|
||||
assert_eq!(outcome.counts.running, 0);
|
||||
assert_eq!(outcome.counts.pending, 0);
|
||||
assert!(matches!(
|
||||
outcome.tasks[0].status,
|
||||
SwarmTaskStatus::Cancelled
|
||||
));
|
||||
assert!(manager.lock().await.list().is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_swarm_id_supports_alias() {
|
||||
let input = json!({ "id": "swarm_1234" });
|
||||
|
||||
@@ -21,6 +21,7 @@ use crate::tools::subagent::SubAgentStatus;
|
||||
use crate::tools::todo::TodoStatus;
|
||||
|
||||
use super::app::{App, SidebarFocus};
|
||||
use super::subagent_routing::active_fanout_counts;
|
||||
use super::ui::truncate_line_to_width;
|
||||
|
||||
pub fn render_sidebar(f: &mut Frame, area: Rect, app: &App) {
|
||||
@@ -330,11 +331,16 @@ fn render_sidebar_subagents(f: &mut Frame, area: Rect, app: &App) {
|
||||
.or_insert(0) += 1;
|
||||
acc
|
||||
});
|
||||
let (fanout_running, fanout_total) = active_fanout_counts(app)
|
||||
.map(|(running, total)| (running, Some(total)))
|
||||
.unwrap_or((0, None));
|
||||
|
||||
let summary = SidebarSubagentSummary {
|
||||
cached_total: app.subagent_cache.len(),
|
||||
cached_running,
|
||||
progress_only_count,
|
||||
fanout_total,
|
||||
fanout_running,
|
||||
role_counts,
|
||||
};
|
||||
let lines = subagent_navigator_lines(&summary, content_width);
|
||||
@@ -350,6 +356,8 @@ pub struct SidebarSubagentSummary {
|
||||
pub cached_total: usize,
|
||||
pub cached_running: usize,
|
||||
pub progress_only_count: usize,
|
||||
pub fanout_total: Option<usize>,
|
||||
pub fanout_running: usize,
|
||||
pub role_counts: std::collections::BTreeMap<String, usize>,
|
||||
}
|
||||
|
||||
@@ -361,7 +369,8 @@ pub fn subagent_navigator_lines(
|
||||
) -> Vec<Line<'static>> {
|
||||
let mut lines: Vec<Line<'static>> = Vec::with_capacity(4);
|
||||
|
||||
if summary.cached_total == 0 && summary.progress_only_count == 0 {
|
||||
let fanout_total = summary.fanout_total.unwrap_or(0);
|
||||
if summary.cached_total == 0 && summary.progress_only_count == 0 && fanout_total == 0 {
|
||||
lines.push(Line::from(Span::styled(
|
||||
"No agents",
|
||||
Style::default().fg(palette::TEXT_MUTED),
|
||||
@@ -369,8 +378,9 @@ pub fn subagent_navigator_lines(
|
||||
return lines;
|
||||
}
|
||||
|
||||
let live_running = summary.cached_running + summary.progress_only_count;
|
||||
let total = summary.cached_total + summary.progress_only_count;
|
||||
let live_running =
|
||||
(summary.cached_running + summary.progress_only_count).max(summary.fanout_running);
|
||||
let total = (summary.cached_total + summary.progress_only_count).max(fanout_total);
|
||||
let done = total.saturating_sub(live_running);
|
||||
let header = if live_running > 0 {
|
||||
vec![
|
||||
@@ -476,6 +486,8 @@ mod tests {
|
||||
cached_total: 3,
|
||||
cached_running: 2,
|
||||
progress_only_count: 0,
|
||||
fanout_total: None,
|
||||
fanout_running: 0,
|
||||
role_counts,
|
||||
};
|
||||
let text = lines_to_text(&subagent_navigator_lines(&summary, 64));
|
||||
@@ -492,6 +504,23 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn navigator_uses_fanout_total_when_swarm_has_seeded_slots() {
|
||||
let summary = SidebarSubagentSummary {
|
||||
cached_total: 1,
|
||||
cached_running: 1,
|
||||
progress_only_count: 0,
|
||||
fanout_total: Some(6),
|
||||
fanout_running: 1,
|
||||
role_counts: std::collections::BTreeMap::new(),
|
||||
};
|
||||
|
||||
let text = lines_to_text(&subagent_navigator_lines(&summary, 64));
|
||||
|
||||
assert!(text[0].contains("1 running"), "header: {:?}", text[0]);
|
||||
assert!(text[0].contains("/ 6"), "fanout total: {:?}", text[0]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn navigator_settled_state_says_done() {
|
||||
let mut role_counts = std::collections::BTreeMap::new();
|
||||
@@ -500,6 +529,8 @@ mod tests {
|
||||
cached_total: 1,
|
||||
cached_running: 0,
|
||||
progress_only_count: 0,
|
||||
fanout_total: None,
|
||||
fanout_running: 0,
|
||||
role_counts,
|
||||
};
|
||||
let text = lines_to_text(&subagent_navigator_lines(&summary, 32));
|
||||
@@ -517,6 +548,8 @@ mod tests {
|
||||
cached_total: 6,
|
||||
cached_running: 6,
|
||||
progress_only_count: 0,
|
||||
fanout_total: None,
|
||||
fanout_running: 0,
|
||||
role_counts,
|
||||
};
|
||||
let lines = subagent_navigator_lines(&summary, 16);
|
||||
|
||||
@@ -3,12 +3,13 @@
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::task_manager::{TaskRecord, TaskStatus, TaskSummary};
|
||||
use crate::tools::spec::{ToolError, ToolResult};
|
||||
use crate::tools::subagent::{MailboxMessage, SubAgentResult, SubAgentStatus};
|
||||
use crate::tui::app::{App, AppMode, TaskPanelEntry};
|
||||
use crate::tui::history::{HistoryCell, SubAgentCell, summarize_tool_output};
|
||||
use crate::tui::pager::PagerView;
|
||||
use crate::tui::widgets::agent_card::{
|
||||
AgentLifecycle, DelegateCard, FanoutCard, apply_to_delegate, apply_to_fanout,
|
||||
AgentLifecycle, DelegateCard, FanoutCard, WorkerSlot, apply_to_delegate, apply_to_fanout,
|
||||
};
|
||||
|
||||
pub(super) fn running_agent_count(app: &App) -> usize {
|
||||
@@ -24,6 +25,154 @@ pub(super) fn running_agent_count(app: &App) -> usize {
|
||||
ids.len()
|
||||
}
|
||||
|
||||
pub(super) fn active_fanout_counts(app: &App) -> Option<(usize, usize)> {
|
||||
let idx = app.last_fanout_card_index?;
|
||||
let Some(HistoryCell::SubAgent(SubAgentCell::Fanout(card))) = app.history.get(idx) else {
|
||||
return None;
|
||||
};
|
||||
let running = card
|
||||
.workers
|
||||
.iter()
|
||||
.filter(|slot| matches!(slot.status, AgentLifecycle::Running))
|
||||
.count();
|
||||
Some((running, card.worker_count()))
|
||||
}
|
||||
|
||||
pub(super) fn seed_fanout_card_from_tool_call(
|
||||
app: &mut App,
|
||||
name: &str,
|
||||
input: &serde_json::Value,
|
||||
) -> bool {
|
||||
if name != "agent_swarm" {
|
||||
return false;
|
||||
}
|
||||
|
||||
let Some(tasks) = input.get("tasks").and_then(serde_json::Value::as_array) else {
|
||||
return false;
|
||||
};
|
||||
if tasks.is_empty() {
|
||||
return false;
|
||||
}
|
||||
|
||||
let ids = tasks
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(idx, task)| {
|
||||
let task_id = task
|
||||
.get("id")
|
||||
.and_then(serde_json::Value::as_str)
|
||||
.map(str::trim)
|
||||
.filter(|value| !value.is_empty())
|
||||
.map(ToOwned::to_owned)
|
||||
.unwrap_or_else(|| idx.to_string());
|
||||
format!("task:{task_id}")
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let history_threshold_before_push = app.history.len();
|
||||
let active_in_flight = app.active_cell.is_some();
|
||||
let card = FanoutCard::new(name.to_string()).with_workers(ids);
|
||||
app.add_message(HistoryCell::SubAgent(SubAgentCell::Fanout(card)));
|
||||
shift_active_virtual_indices_after_history_insert(
|
||||
app,
|
||||
active_in_flight,
|
||||
history_threshold_before_push,
|
||||
);
|
||||
app.last_fanout_card_index = Some(app.history.len().saturating_sub(1));
|
||||
app.mark_history_updated();
|
||||
true
|
||||
}
|
||||
|
||||
pub(super) fn sync_fanout_card_from_tool_result(
|
||||
app: &mut App,
|
||||
name: &str,
|
||||
result: &Result<ToolResult, ToolError>,
|
||||
) -> bool {
|
||||
if name != "agent_swarm" {
|
||||
return false;
|
||||
}
|
||||
let Ok(tool_result) = result else {
|
||||
return false;
|
||||
};
|
||||
let Ok(payload) = serde_json::from_str::<serde_json::Value>(&tool_result.content) else {
|
||||
return false;
|
||||
};
|
||||
let Some(tasks) = payload
|
||||
.get("tasks")
|
||||
.and_then(serde_json::Value::as_array)
|
||||
.filter(|tasks| !tasks.is_empty())
|
||||
else {
|
||||
return false;
|
||||
};
|
||||
|
||||
let workers = tasks
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(idx, task)| {
|
||||
let task_id = task
|
||||
.get("task_id")
|
||||
.and_then(serde_json::Value::as_str)
|
||||
.map(str::trim)
|
||||
.filter(|value| !value.is_empty())
|
||||
.map(ToOwned::to_owned)
|
||||
.unwrap_or_else(|| idx.to_string());
|
||||
let agent_id = task
|
||||
.get("agent_id")
|
||||
.and_then(serde_json::Value::as_str)
|
||||
.map(str::trim)
|
||||
.filter(|value| !value.is_empty())
|
||||
.map(ToOwned::to_owned)
|
||||
.unwrap_or_else(|| format!("task:{task_id}"));
|
||||
let status = task
|
||||
.get("status")
|
||||
.and_then(serde_json::Value::as_str)
|
||||
.map(status_to_lifecycle)
|
||||
.unwrap_or(AgentLifecycle::Pending);
|
||||
WorkerSlot { agent_id, status }
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let Some(idx) = app.last_fanout_card_index else {
|
||||
return false;
|
||||
};
|
||||
let Some(HistoryCell::SubAgent(SubAgentCell::Fanout(card))) = app.history.get_mut(idx) else {
|
||||
return false;
|
||||
};
|
||||
card.workers = workers;
|
||||
app.mark_history_updated();
|
||||
true
|
||||
}
|
||||
|
||||
fn status_to_lifecycle(status: &str) -> AgentLifecycle {
|
||||
match status.trim().to_ascii_lowercase().as_str() {
|
||||
"completed" => AgentLifecycle::Completed,
|
||||
"running" => AgentLifecycle::Running,
|
||||
"failed" | "interrupted" => AgentLifecycle::Failed,
|
||||
"cancelled" | "canceled" | "skipped" => AgentLifecycle::Cancelled,
|
||||
_ => AgentLifecycle::Pending,
|
||||
}
|
||||
}
|
||||
|
||||
fn shift_active_virtual_indices_after_history_insert(
|
||||
app: &mut App,
|
||||
active_in_flight: bool,
|
||||
threshold: usize,
|
||||
) {
|
||||
if !active_in_flight {
|
||||
return;
|
||||
}
|
||||
for idx in app.tool_cells.values_mut() {
|
||||
if *idx >= threshold {
|
||||
*idx = idx.wrapping_add(1);
|
||||
}
|
||||
}
|
||||
for (cell_idx, _) in app.exploring_entries.values_mut() {
|
||||
if *cell_idx >= threshold {
|
||||
*cell_idx = cell_idx.wrapping_add(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn reconcile_subagent_activity_state(app: &mut App) {
|
||||
let running_agents: Vec<(String, String)> = app
|
||||
.subagent_cache
|
||||
@@ -134,7 +283,7 @@ pub(super) fn handle_subagent_mailbox(app: &mut App, _seq: u64, message: &Mailbo
|
||||
&& let Some(HistoryCell::SubAgent(SubAgentCell::Fanout(card))) =
|
||||
app.history.get_mut(idx)
|
||||
{
|
||||
card.upsert_worker(&agent_id, AgentLifecycle::Running);
|
||||
card.claim_pending_worker(&agent_id, AgentLifecycle::Running);
|
||||
app.subagent_card_index.insert(agent_id, idx);
|
||||
} else {
|
||||
let mut card = FanoutCard::new(dispatch_kind.unwrap_or("fanout").to_string());
|
||||
|
||||
@@ -247,12 +247,33 @@ pub(super) fn handle_tool_call_started(
|
||||
);
|
||||
}
|
||||
|
||||
/// Extract per-child prompts from a fan-out tool's input. Currently no
|
||||
/// top-level tool exposes a prompt list — fan-out lives inside the RLM
|
||||
/// REPL via `llm_query_batched`. Kept as a stable hook for any future
|
||||
/// fan-out tool we add.
|
||||
fn extract_fanout_prompts(_name: &str, _input: &serde_json::Value) -> Option<Vec<String>> {
|
||||
None
|
||||
/// Extract per-child prompts from a fan-out tool's input. `agent_swarm`
|
||||
/// carries a structured `tasks` list up front, so the transcript can show
|
||||
/// one readable row per child instead of a collapsed JSON args blob.
|
||||
fn extract_fanout_prompts(name: &str, input: &serde_json::Value) -> Option<Vec<String>> {
|
||||
if name != "agent_swarm" {
|
||||
return None;
|
||||
}
|
||||
|
||||
let prompts = input
|
||||
.get("tasks")
|
||||
.and_then(serde_json::Value::as_array)?
|
||||
.iter()
|
||||
.filter_map(|task| {
|
||||
task.get("objective")
|
||||
.and_then(serde_json::Value::as_str)
|
||||
.or_else(|| task.get("prompt").and_then(serde_json::Value::as_str))
|
||||
.map(str::trim)
|
||||
.filter(|value| !value.is_empty())
|
||||
.map(ToOwned::to_owned)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if prompts.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(prompts)
|
||||
}
|
||||
}
|
||||
|
||||
/// Push a tool cell as a new entry in `active_cell`, register the tool id,
|
||||
|
||||
+107
-6
@@ -47,6 +47,7 @@ use crate::session_manager::{
|
||||
};
|
||||
use crate::task_manager::{NewTaskRequest, SharedTaskManager, TaskManager, TaskManagerConfig};
|
||||
use crate::tools::spec::RuntimeToolServices;
|
||||
use crate::tools::subagent::SubAgentStatus;
|
||||
use crate::tui::command_palette::{
|
||||
CommandPaletteView, build_entries as build_command_palette_entries,
|
||||
};
|
||||
@@ -64,8 +65,10 @@ use crate::tui::shell_job_routing::{
|
||||
add_shell_job_message, format_shell_job_list, format_shell_poll, open_shell_job_pager,
|
||||
};
|
||||
use crate::tui::subagent_routing::{
|
||||
format_task_list, handle_subagent_mailbox, open_task_pager, reconcile_subagent_activity_state,
|
||||
running_agent_count, sort_subagents_in_place, task_mode_label, task_summary_to_panel_entry,
|
||||
active_fanout_counts, format_task_list, handle_subagent_mailbox, open_task_pager,
|
||||
reconcile_subagent_activity_state, running_agent_count, seed_fanout_card_from_tool_call,
|
||||
sort_subagents_in_place, sync_fanout_card_from_tool_result, task_mode_label,
|
||||
task_summary_to_panel_entry,
|
||||
};
|
||||
#[cfg(test)]
|
||||
use crate::tui::tool_routing::exploring_label;
|
||||
@@ -575,6 +578,7 @@ async fn run_event_loop(
|
||||
app.last_fanout_card_index = None;
|
||||
}
|
||||
}
|
||||
seed_fanout_card_from_tool_call(app, &name, &input);
|
||||
handle_tool_call_started(app, &id, &name, &input);
|
||||
}
|
||||
EngineEvent::ToolCallComplete { id, name, result } => {
|
||||
@@ -599,6 +603,7 @@ async fn run_event_loop(
|
||||
}],
|
||||
});
|
||||
handle_tool_call_complete(app, &id, &name, &result);
|
||||
sync_fanout_card_from_tool_result(app, &name, &result);
|
||||
|
||||
// Immediately refresh the task panel sidebar when a
|
||||
// tool that changes task state completes, so the
|
||||
@@ -614,6 +619,18 @@ async fn run_event_loop(
|
||||
tasks.into_iter().map(task_summary_to_panel_entry).collect();
|
||||
last_task_refresh = Instant::now();
|
||||
}
|
||||
if matches!(
|
||||
name.as_str(),
|
||||
"agent_spawn"
|
||||
| "agent_swarm"
|
||||
| "spawn_agents_on_csv"
|
||||
| "agent_cancel"
|
||||
| "agent_wait"
|
||||
| "agent_result"
|
||||
| "agent_status"
|
||||
) {
|
||||
let _ = engine_handle.send(Op::ListSubAgents).await;
|
||||
}
|
||||
}
|
||||
EngineEvent::TurnStarted { turn_id } => {
|
||||
app.is_loading = true;
|
||||
@@ -677,6 +694,13 @@ async fn run_event_loop(
|
||||
}
|
||||
crate::core::events::TurnOutcomeStatus::Failed => "failed".to_string(),
|
||||
});
|
||||
if matches!(
|
||||
status,
|
||||
crate::core::events::TurnOutcomeStatus::Interrupted
|
||||
| crate::core::events::TurnOutcomeStatus::Failed
|
||||
) {
|
||||
let _ = engine_handle.send(Op::ListSubAgents).await;
|
||||
}
|
||||
let turn_tokens = usage.input_tokens + usage.output_tokens;
|
||||
app.total_tokens = app.total_tokens.saturating_add(turn_tokens);
|
||||
app.total_conversation_tokens =
|
||||
@@ -882,12 +906,18 @@ async fn run_event_loop(
|
||||
let _ = engine_handle.send(Op::ListSubAgents).await;
|
||||
}
|
||||
EngineEvent::AgentProgress { id, status } => {
|
||||
app.agent_progress
|
||||
.insert(id.clone(), summarize_tool_output(&status));
|
||||
let display = friendly_subagent_progress(app, &id, &status);
|
||||
if is_noisy_subagent_progress(&status) {
|
||||
app.agent_progress
|
||||
.entry(id.clone())
|
||||
.or_insert_with(|| display.clone());
|
||||
} else {
|
||||
app.agent_progress.insert(id.clone(), display.clone());
|
||||
}
|
||||
if app.agent_activity_started_at.is_none() {
|
||||
app.agent_activity_started_at = Some(Instant::now());
|
||||
}
|
||||
app.status_message = Some(format!("Sub-agent {id}: {status}"));
|
||||
app.status_message = Some(format!("Sub-agent {id}: {display}"));
|
||||
}
|
||||
EngineEvent::AgentComplete { id, result } => {
|
||||
app.agent_progress.remove(&id);
|
||||
@@ -4517,7 +4547,8 @@ fn render_footer(f: &mut Frame, area: Rect, app: &mut App) {
|
||||
// Surface one compact live status row in the footer whenever a turn
|
||||
// is live. Tool turns get the current action plus active/done counts;
|
||||
// non-tool work falls back to the existing dot-pulse label.
|
||||
props.state_label = active_tool_status_label(app)
|
||||
props.state_label = active_subagent_status_label(app)
|
||||
.or_else(|| active_tool_status_label(app))
|
||||
.unwrap_or_else(|| crate::tui::widgets::footer_working_label(dot_frame));
|
||||
props.state_color = palette::DEEPSEEK_SKY;
|
||||
|
||||
@@ -4553,6 +4584,76 @@ fn footer_working_strip_active(app: &App) -> bool {
|
||||
app.is_loading || app.is_compacting || running_agent_count(app) > 0 || turn_in_progress
|
||||
}
|
||||
|
||||
fn is_noisy_subagent_progress(status: &str) -> bool {
|
||||
let status = status.trim().to_ascii_lowercase();
|
||||
status.contains("requesting model response")
|
||||
}
|
||||
|
||||
fn subagent_objective_summary(app: &App, id: &str) -> Option<String> {
|
||||
app.subagent_cache
|
||||
.iter()
|
||||
.find(|agent| agent.agent_id == id)
|
||||
.map(|agent| summarize_tool_output(&agent.assignment.objective))
|
||||
.filter(|summary| !summary.is_empty())
|
||||
}
|
||||
|
||||
fn friendly_subagent_progress(app: &App, id: &str, status: &str) -> String {
|
||||
if !is_noisy_subagent_progress(status) {
|
||||
return summarize_tool_output(status);
|
||||
}
|
||||
|
||||
if let Some(summary) = subagent_objective_summary(app, id) {
|
||||
return format!("working on {summary}");
|
||||
}
|
||||
if let Some(existing) = app.agent_progress.get(id)
|
||||
&& !is_noisy_subagent_progress(existing)
|
||||
&& existing != "working"
|
||||
{
|
||||
return existing.clone();
|
||||
}
|
||||
"working".to_string()
|
||||
}
|
||||
|
||||
fn active_subagent_status_label(app: &App) -> Option<String> {
|
||||
let running = running_agent_count(app);
|
||||
let fanout = active_fanout_counts(app);
|
||||
let fanout_running = fanout.map_or(0, |(running, _)| running);
|
||||
if running == 0 && fanout_running == 0 {
|
||||
return None;
|
||||
}
|
||||
|
||||
let display_running = running.max(fanout_running);
|
||||
let total = fanout
|
||||
.map(|(_, total)| total)
|
||||
.unwrap_or(display_running)
|
||||
.max(display_running);
|
||||
let detail = app
|
||||
.subagent_cache
|
||||
.iter()
|
||||
.find(|agent| matches!(agent.status, SubAgentStatus::Running))
|
||||
.map(|agent| summarize_tool_output(&agent.assignment.objective))
|
||||
.filter(|summary| !summary.is_empty())
|
||||
.or_else(|| {
|
||||
app.agent_progress
|
||||
.values()
|
||||
.find(|value| !is_noisy_subagent_progress(value) && value.as_str() != "working")
|
||||
.cloned()
|
||||
})
|
||||
.unwrap_or_else(|| "working".to_string());
|
||||
let detail = truncate_line_to_width(&detail, 34);
|
||||
let elapsed = app
|
||||
.agent_activity_started_at
|
||||
.or(app.turn_started_at)
|
||||
.map(|started| format!("{}s", started.elapsed().as_secs()));
|
||||
|
||||
let mut parts = vec![format!("agents {display_running}/{total}"), detail];
|
||||
if let Some(elapsed) = elapsed {
|
||||
parts.push(elapsed);
|
||||
}
|
||||
parts.push("Alt+4".to_string());
|
||||
Some(parts.join(" \u{00B7} "))
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct ActiveToolStatusSnapshot {
|
||||
primary_running: Option<String>,
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::tui::file_mention::{
|
||||
try_autocomplete_file_mention, user_request_with_file_mentions, visible_mention_menu_entries,
|
||||
};
|
||||
use crate::tui::history::{
|
||||
ExecCell, ExecSource, GenericToolCell, HistoryCell, ToolCell, ToolStatus,
|
||||
ExecCell, ExecSource, GenericToolCell, HistoryCell, SubAgentCell, ToolCell, ToolStatus,
|
||||
};
|
||||
use crate::tui::views::{ModalView, ViewAction};
|
||||
use crate::working_set::Workspace;
|
||||
@@ -2178,10 +2178,8 @@ fn second_thinking_block_appends_new_entry_in_same_active_cell() {
|
||||
|
||||
// ---- per-child prompt wiring ----
|
||||
//
|
||||
// `extract_fanout_prompts` is the hook for any future fan-out tool that
|
||||
// wants its child prompts rendered as one row per child. Right now no
|
||||
// top-level tool populates it (fan-out lives inside the RLM REPL via
|
||||
// `llm_query_batched`), so the path always returns `None`.
|
||||
// `extract_fanout_prompts` keeps fan-out tools readable by rendering one
|
||||
// row per child instead of a collapsed JSON args blob.
|
||||
|
||||
#[test]
|
||||
fn non_fanout_tool_does_not_populate_prompts() {
|
||||
@@ -2206,6 +2204,190 @@ fn non_fanout_tool_does_not_populate_prompts() {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn agent_swarm_populates_prompt_rows_from_tasks() {
|
||||
let mut app = create_test_app();
|
||||
|
||||
handle_tool_call_started(
|
||||
&mut app,
|
||||
"swarm-1",
|
||||
"agent_swarm",
|
||||
&serde_json::json!({
|
||||
"tasks": [
|
||||
{
|
||||
"id": "state",
|
||||
"objective": "Read the current repo state",
|
||||
"prompt": "Inspect git status and recent commits"
|
||||
},
|
||||
{
|
||||
"id": "docs",
|
||||
"prompt": "Update docs for the release"
|
||||
}
|
||||
]
|
||||
}),
|
||||
);
|
||||
|
||||
let active = app.active_cell.as_ref().expect("active cell present");
|
||||
let HistoryCell::Tool(ToolCell::Generic(generic)) = &active.entries()[0] else {
|
||||
panic!("expected GenericToolCell for agent_swarm");
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
generic.prompts.as_ref(),
|
||||
Some(&vec![
|
||||
"Read the current repo state".to_string(),
|
||||
"Update docs for the release".to_string(),
|
||||
])
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn agent_swarm_seeded_fanout_card_uses_declared_task_count() {
|
||||
let mut app = create_test_app();
|
||||
|
||||
assert!(seed_fanout_card_from_tool_call(
|
||||
&mut app,
|
||||
"agent_swarm",
|
||||
&serde_json::json!({
|
||||
"tasks": [
|
||||
{ "id": "a", "prompt": "First task" },
|
||||
{ "id": "b", "prompt": "Second task" },
|
||||
{ "id": "c", "prompt": "Third task" }
|
||||
]
|
||||
}),
|
||||
));
|
||||
|
||||
let HistoryCell::SubAgent(SubAgentCell::Fanout(card)) = &app.history[0] else {
|
||||
panic!("expected seeded fanout card");
|
||||
};
|
||||
assert_eq!(card.worker_count(), 3);
|
||||
assert_eq!(active_fanout_counts(&app), Some((0, 3)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn seeded_fanout_card_preserves_existing_active_tool_indices() {
|
||||
let mut app = create_test_app();
|
||||
handle_tool_call_started(
|
||||
&mut app,
|
||||
"search-1",
|
||||
"file_search",
|
||||
&serde_json::json!({ "query": "swarm" }),
|
||||
);
|
||||
assert_eq!(app.tool_cells.get("search-1").copied(), Some(0));
|
||||
|
||||
assert!(seed_fanout_card_from_tool_call(
|
||||
&mut app,
|
||||
"agent_swarm",
|
||||
&serde_json::json!({
|
||||
"tasks": [
|
||||
{ "id": "a", "prompt": "First task" },
|
||||
{ "id": "b", "prompt": "Second task" }
|
||||
]
|
||||
}),
|
||||
));
|
||||
|
||||
assert_eq!(
|
||||
app.tool_cells.get("search-1").copied(),
|
||||
Some(1),
|
||||
"active tool virtual index should shift after history insertion"
|
||||
);
|
||||
|
||||
let result = crate::tools::spec::ToolResult::success("done");
|
||||
handle_tool_call_complete(&mut app, "search-1", "file_search", &Ok(result));
|
||||
let active = app.active_cell.as_ref().expect("active cell present");
|
||||
let HistoryCell::Tool(ToolCell::Generic(generic)) = &active.entries()[0] else {
|
||||
panic!("expected GenericToolCell for file_search");
|
||||
};
|
||||
assert_eq!(generic.status, ToolStatus::Success);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn agent_swarm_result_sync_replaces_seeded_slots_with_final_task_outcomes() {
|
||||
let mut app = create_test_app();
|
||||
assert!(seed_fanout_card_from_tool_call(
|
||||
&mut app,
|
||||
"agent_swarm",
|
||||
&serde_json::json!({
|
||||
"tasks": [
|
||||
{ "id": "a", "prompt": "First task" },
|
||||
{ "id": "b", "prompt": "Second task" }
|
||||
]
|
||||
}),
|
||||
));
|
||||
|
||||
let result = crate::tools::spec::ToolResult::success(
|
||||
serde_json::json!({
|
||||
"swarm_id": "swarm_test",
|
||||
"status": "partial",
|
||||
"duration_ms": 100,
|
||||
"counts": {
|
||||
"total": 2,
|
||||
"completed": 1,
|
||||
"interrupted": 0,
|
||||
"failed": 0,
|
||||
"cancelled": 1,
|
||||
"skipped": 0,
|
||||
"running": 0,
|
||||
"pending": 0
|
||||
},
|
||||
"tasks": [
|
||||
{
|
||||
"task_id": "a",
|
||||
"agent_id": "agent_done",
|
||||
"status": "completed",
|
||||
"result": "ok",
|
||||
"steps_taken": 1,
|
||||
"duration_ms": 50
|
||||
},
|
||||
{
|
||||
"task_id": "b",
|
||||
"agent_id": null,
|
||||
"status": "cancelled",
|
||||
"error": "Cancelled",
|
||||
"steps_taken": 0,
|
||||
"duration_ms": 0
|
||||
}
|
||||
]
|
||||
})
|
||||
.to_string(),
|
||||
);
|
||||
|
||||
assert!(sync_fanout_card_from_tool_result(
|
||||
&mut app,
|
||||
"agent_swarm",
|
||||
&Ok(result),
|
||||
));
|
||||
|
||||
let HistoryCell::SubAgent(SubAgentCell::Fanout(card)) = &app.history[0] else {
|
||||
panic!("expected synced fanout card");
|
||||
};
|
||||
assert_eq!(card.worker_count(), 2);
|
||||
assert_eq!(card.workers[0].agent_id, "agent_done");
|
||||
assert_eq!(
|
||||
card.workers[0].status,
|
||||
crate::tui::widgets::agent_card::AgentLifecycle::Completed
|
||||
);
|
||||
assert_eq!(card.workers[1].agent_id, "task:b");
|
||||
assert_eq!(
|
||||
card.workers[1].status,
|
||||
crate::tui::widgets::agent_card::AgentLifecycle::Cancelled
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn noisy_subagent_progress_keeps_existing_objective_summary() {
|
||||
let mut app = create_test_app();
|
||||
app.agent_progress.insert(
|
||||
"agent_live".to_string(),
|
||||
"starting: inspect release state".to_string(),
|
||||
);
|
||||
|
||||
let display =
|
||||
friendly_subagent_progress(&app, "agent_live", "step 1/8: requesting model response");
|
||||
|
||||
assert_eq!(display, "starting: inspect release state");
|
||||
}
|
||||
|
||||
/// Regression for issue #65: `truncate_line_to_width` with a tiny budget
|
||||
/// must respect display widths, not codepoint counts. The old branch counted
|
||||
/// chars and overran the budget for any double-width grapheme, which
|
||||
|
||||
@@ -179,9 +179,7 @@ impl FanoutCard {
|
||||
}
|
||||
}
|
||||
|
||||
/// Pre-seed worker slots when the fanout size is known up front (the
|
||||
/// `agent_swarm` tool dispatches N children atomically).
|
||||
#[cfg(test)]
|
||||
/// Pre-seed worker slots when the fanout size is known up front.
|
||||
pub fn with_workers<I, S>(mut self, ids: I) -> Self
|
||||
where
|
||||
I: IntoIterator<Item = S>,
|
||||
@@ -208,6 +206,27 @@ impl FanoutCard {
|
||||
}
|
||||
}
|
||||
|
||||
/// Attach a real agent id to the first pending placeholder slot. Swarm
|
||||
/// cards are seeded from task ids before child agents exist; when a child
|
||||
/// starts, this keeps the dot count stable instead of appending a second
|
||||
/// circle for the same unit of work.
|
||||
pub fn claim_pending_worker(&mut self, agent_id: &str, status: AgentLifecycle) {
|
||||
if let Some(slot) = self.workers.iter_mut().find(|s| s.agent_id == agent_id) {
|
||||
slot.status = status;
|
||||
return;
|
||||
}
|
||||
if let Some(slot) = self
|
||||
.workers
|
||||
.iter_mut()
|
||||
.find(|s| matches!(s.status, AgentLifecycle::Pending))
|
||||
{
|
||||
slot.agent_id = agent_id.to_string();
|
||||
slot.status = status;
|
||||
return;
|
||||
}
|
||||
self.upsert_worker(agent_id, status);
|
||||
}
|
||||
|
||||
fn counts(&self) -> (usize, usize, usize, usize) {
|
||||
let mut done = 0usize;
|
||||
let mut running = 0usize;
|
||||
@@ -289,7 +308,6 @@ impl FanoutCard {
|
||||
|
||||
/// Worker count (slots seeded or observed via mailbox).
|
||||
#[must_use]
|
||||
#[cfg(test)]
|
||||
pub fn worker_count(&self) -> usize {
|
||||
self.workers.len()
|
||||
}
|
||||
@@ -403,11 +421,11 @@ pub fn apply_to_fanout(card: &mut FanoutCard, msg: &MailboxMessage) -> bool {
|
||||
let id = msg.agent_id();
|
||||
match msg {
|
||||
MailboxMessage::Started { .. } => {
|
||||
card.upsert_worker(id, AgentLifecycle::Running);
|
||||
card.claim_pending_worker(id, AgentLifecycle::Running);
|
||||
true
|
||||
}
|
||||
MailboxMessage::Progress { .. } | MailboxMessage::ToolCallStarted { .. } => {
|
||||
card.upsert_worker(id, AgentLifecycle::Running);
|
||||
card.claim_pending_worker(id, AgentLifecycle::Running);
|
||||
true
|
||||
}
|
||||
MailboxMessage::ToolCallCompleted { .. } => true,
|
||||
@@ -575,6 +593,21 @@ mod tests {
|
||||
assert_eq!(card.workers[0].status, AgentLifecycle::Pending);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fanout_started_claims_seeded_pending_slot_without_growing_grid() {
|
||||
let mut card = FanoutCard::new("agent_swarm").with_workers(["task:a", "task:b"]);
|
||||
let started =
|
||||
MailboxMessage::started("agent_live", crate::tools::subagent::SubAgentType::General);
|
||||
|
||||
assert!(apply_to_fanout(&mut card, &started));
|
||||
|
||||
assert_eq!(card.worker_count(), 2);
|
||||
assert_eq!(card.workers[0].agent_id, "agent_live");
|
||||
assert_eq!(card.workers[0].status, AgentLifecycle::Running);
|
||||
assert_eq!(card.workers[1].agent_id, "task:b");
|
||||
assert_eq!(card.workers[1].status, AgentLifecycle::Pending);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fanout_apply_transitions_worker_through_lifecycle() {
|
||||
let mut card = FanoutCard::new("swarm").with_workers(["w_1"]);
|
||||
|
||||
Reference in New Issue
Block a user