diff --git a/crates/tui/src/tools/subagent/mod.rs b/crates/tui/src/tools/subagent/mod.rs index 7bfa1627..d1f9d661 100644 --- a/crates/tui/src/tools/subagent/mod.rs +++ b/crates/tui/src/tools/subagent/mod.rs @@ -103,10 +103,13 @@ const DEFAULT_RESULT_TIMEOUT_MS: u64 = 30_000; const MIN_WAIT_TIMEOUT_MS: u64 = 10_000; const MAX_RESULT_TIMEOUT_MS: u64 = 3_600_000; const COMPLETED_AGENT_RETENTION: Duration = Duration::from_secs(60 * 60); +const MAX_AGENT_WORKER_RECORDS: usize = 256; +const MAX_AGENT_WORKER_EVENTS_PER_RECORD: usize = 128; const SUBAGENT_STATE_SCHEMA_VERSION: u32 = 1; const SUBAGENT_STATE_FILE: &str = "subagents.v1.json"; const SUBAGENT_RESTART_REASON: &str = "Interrupted by process restart"; const SUBAGENT_QUEUED_LAUNCH_REASON: &str = "queued: waiting for an interactive fanout slot"; +const SUBAGENT_MODEL_WAIT_REASON: &str = "waiting for model response"; const VALID_SUBAGENT_TYPES: &str = "general (aliases: general-purpose, general_purpose, worker, default), \ explore (aliases: exploration, explorer), plan (aliases: planning, planner, awaiter), \ @@ -371,7 +374,7 @@ impl SubAgentAssignment { } /// Sub-agent execution types with specialized behavior and tool access. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] #[serde(rename_all = "snake_case")] pub enum SubAgentType { /// General purpose - full tool access for multi-step tasks. @@ -623,6 +626,111 @@ pub struct SubAgentResult { pub from_prior_session: bool, } +/// Headless worker lifecycle states for sub-agent execution. +/// +/// This is the TUI-independent state machine that future CLI/API/workflow +/// surfaces should consume. The legacy `SubAgentStatus` remains the +/// compatibility projection returned by existing `agent_*` tools. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum AgentWorkerStatus { + Queued, + Starting, + Running, + ModelWait, + RunningTool, + Completed, + Failed, + Cancelled, + Interrupted, +} + +/// Tool capability profile requested for a headless worker. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum AgentWorkerToolProfile { + /// Inherit the parent runtime registry for compatibility. + Inherited, + /// Use the listed tools only. + Explicit(Vec), +} + +/// Declarative headless worker request derived from `agent_open`. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct AgentWorkerSpec { + pub worker_id: String, + pub objective: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub role: Option, + pub agent_type: SubAgentType, + pub model: String, + pub workspace: PathBuf, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub git_branch: Option, + pub context_mode: String, + pub fork_context: bool, + pub tool_profile: AgentWorkerToolProfile, + pub max_steps: u32, + pub spawn_depth: u32, + pub max_spawn_depth: u32, +} + +/// Structured headless worker event. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct AgentWorkerEvent { + pub seq: u64, + pub worker_id: String, + pub status: AgentWorkerStatus, + pub timestamp_ms: u64, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub message: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub step: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub tool_name: Option, +} + +/// Canonical headless worker record retained by `SubAgentManager`. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct AgentWorkerRecord { + pub spec: AgentWorkerSpec, + pub status: AgentWorkerStatus, + pub created_at_ms: u64, + pub updated_at_ms: u64, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub started_at_ms: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub completed_at_ms: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub latest_message: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub result_summary: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub error: Option, + #[serde(default)] + pub steps_taken: u32, + #[serde(default)] + pub events: VecDeque, +} + +impl AgentWorkerRecord { + fn new(spec: AgentWorkerSpec, now_ms: u64) -> Self { + Self { + spec, + status: AgentWorkerStatus::Starting, + created_at_ms: now_ms, + updated_at_ms: now_ms, + started_at_ms: None, + completed_at_ms: None, + latest_message: None, + result_summary: None, + error: None, + steps_taken: 0, + events: VecDeque::new(), + } + } +} + fn is_false(b: &bool) -> bool { !*b } @@ -787,6 +895,8 @@ struct PersistedSubAgent { struct PersistedSubAgentState { schema_version: u32, agents: Vec, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + workers: Vec, } impl Default for PersistedSubAgentState { @@ -794,6 +904,7 @@ impl Default for PersistedSubAgentState { Self { schema_version: SUBAGENT_STATE_SCHEMA_VERSION, agents: Vec::new(), + workers: Vec::new(), } } } @@ -1185,6 +1296,8 @@ impl SubAgent { /// Manager for active sub-agents. pub struct SubAgentManager { agents: HashMap, + worker_records: HashMap, + worker_event_seq: u64, #[allow(dead_code)] // Stored for future workspace-scoped operations workspace: PathBuf, state_path: Option, @@ -1213,6 +1326,8 @@ impl SubAgentManager { pub fn new(workspace: PathBuf, max_agents: usize) -> Self { Self { agents: HashMap::new(), + worker_records: HashMap::new(), + worker_event_seq: 0, workspace, state_path: None, max_steps: DEFAULT_MAX_STEPS, @@ -1303,6 +1418,7 @@ impl SubAgentManager { let payload = PersistedSubAgentState { schema_version: SUBAGENT_STATE_SCHEMA_VERSION, agents, + workers: self.sorted_worker_records(), }; write_json_atomic(path, &payload) } @@ -1335,6 +1451,7 @@ impl SubAgentManager { } self.agents.clear(); + self.worker_records.clear(); for persisted in state.agents { let mut status = persisted.status; if matches!(status, SubAgentStatus::Running) { @@ -1384,10 +1501,170 @@ impl SubAgentManager { }; self.agents.insert(persisted.id, agent); } + for worker in state.workers { + self.worker_event_seq = self.worker_event_seq.max( + worker + .events + .iter() + .map(|event| event.seq) + .max() + .unwrap_or(0), + ); + self.worker_records + .insert(worker.spec.worker_id.clone(), worker); + } + self.prune_worker_records(); Ok(()) } + fn sorted_worker_records(&self) -> Vec { + let mut workers: Vec<_> = self.worker_records.values().cloned().collect(); + workers.sort_by(|a, b| { + b.updated_at_ms + .cmp(&a.updated_at_ms) + .then_with(|| a.spec.worker_id.cmp(&b.spec.worker_id)) + }); + workers + } + + fn prune_worker_records(&mut self) { + if self.worker_records.len() <= MAX_AGENT_WORKER_RECORDS { + return; + } + let keep_ids: std::collections::HashSet = self + .sorted_worker_records() + .into_iter() + .take(MAX_AGENT_WORKER_RECORDS) + .map(|record| record.spec.worker_id) + .collect(); + self.worker_records + .retain(|worker_id, _| keep_ids.contains(worker_id)); + } + + fn register_worker(&mut self, spec: AgentWorkerSpec) { + let worker_id = spec.worker_id.clone(); + let now_ms = epoch_millis_now(); + let mut record = AgentWorkerRecord::new(spec, now_ms); + self.push_worker_event( + &mut record, + AgentWorkerStatus::Starting, + Some("starting".to_string()), + None, + None, + now_ms, + ); + self.worker_records.insert(worker_id, record); + self.prune_worker_records(); + } + + pub fn list_worker_records(&self) -> Vec { + self.sorted_worker_records() + } + + pub fn get_worker_record(&self, worker_id: &str) -> Option { + self.worker_records.get(worker_id).cloned() + } + + fn push_worker_event( + &mut self, + record: &mut AgentWorkerRecord, + status: AgentWorkerStatus, + message: Option, + step: Option, + tool_name: Option, + now_ms: u64, + ) { + self.worker_event_seq = self.worker_event_seq.saturating_add(1); + record.events.push_back(AgentWorkerEvent { + seq: self.worker_event_seq, + worker_id: record.spec.worker_id.clone(), + status, + timestamp_ms: now_ms, + message, + step, + tool_name, + }); + while record.events.len() > MAX_AGENT_WORKER_EVENTS_PER_RECORD { + record.events.pop_front(); + } + } + + fn record_worker_event( + &mut self, + worker_id: &str, + status: AgentWorkerStatus, + message: Option, + step: Option, + tool_name: Option, + ) { + let now_ms = epoch_millis_now(); + let Some(mut record) = self.worker_records.remove(worker_id) else { + return; + }; + record.status = status; + record.updated_at_ms = now_ms; + record.latest_message = message.clone(); + if matches!( + status, + AgentWorkerStatus::Starting | AgentWorkerStatus::Running + ) && record.started_at_ms.is_none() + { + record.started_at_ms = Some(now_ms); + } + if matches!( + status, + AgentWorkerStatus::Completed + | AgentWorkerStatus::Failed + | AgentWorkerStatus::Cancelled + | AgentWorkerStatus::Interrupted + ) { + record.completed_at_ms = Some(now_ms); + } + if let Some(step) = step { + record.steps_taken = step; + } + self.push_worker_event(&mut record, status, message, step, tool_name, now_ms); + self.worker_records.insert(worker_id.to_string(), record); + } + + fn record_worker_progress(&mut self, worker_id: &str, message: String) { + let (status, step, tool_name) = worker_progress_event_parts(&message); + self.record_worker_event(worker_id, status, Some(message), step, tool_name); + } + + fn complete_worker_from_result(&mut self, worker_id: &str, result: &SubAgentResult) { + let status = worker_status_from_subagent_status(&result.status); + let message = match &result.status { + SubAgentStatus::Completed => Some("completed".to_string()), + SubAgentStatus::Failed(err) => Some(err.clone()), + SubAgentStatus::Interrupted(reason) => Some(reason.clone()), + SubAgentStatus::Cancelled => Some("cancelled".to_string()), + SubAgentStatus::Running => Some("running".to_string()), + }; + self.record_worker_event(worker_id, status, message, Some(result.steps_taken), None); + if let Some(record) = self.worker_records.get_mut(worker_id) { + record.result_summary = result.result.clone(); + record.steps_taken = result.steps_taken; + if let SubAgentStatus::Failed(err) = &result.status { + record.error = Some(err.clone()); + } + } + } + + fn fail_worker(&mut self, worker_id: &str, error: String) { + self.record_worker_event( + worker_id, + AgentWorkerStatus::Failed, + Some(error.clone()), + None, + None, + ); + if let Some(record) = self.worker_records.get_mut(worker_id) { + record.error = Some(error); + } + } + /// Count running agents. pub fn running_count(&self) -> usize { self.agents @@ -1551,6 +1828,30 @@ impl SubAgentManager { let agent_id = agent.id.clone(); let started_at = agent.started_at; let max_steps = self.max_steps; + let worker_spec = AgentWorkerSpec { + worker_id: agent_id.clone(), + objective: assignment.objective.clone(), + role: assignment.role.clone(), + agent_type: agent_type.clone(), + model: agent.model.clone(), + workspace: agent.workspace.clone(), + git_branch: current_git_branch(&agent.workspace), + context_mode: if options.fork_context { + "forked" + } else { + "fresh" + } + .to_string(), + fork_context: options.fork_context, + tool_profile: match tools.clone() { + Some(tools) => AgentWorkerToolProfile::Explicit(tools), + None => AgentWorkerToolProfile::Inherited, + }, + max_steps, + spawn_depth: runtime.spawn_depth, + max_spawn_depth: runtime.max_spawn_depth, + }; + self.register_worker(worker_spec); if let Some(event_tx) = runtime.event_tx.clone() { let _ = event_tx.try_send(Event::AgentSpawned { @@ -1643,6 +1944,13 @@ impl SubAgentManager { }; if changed { + self.record_worker_event( + agent_id, + AgentWorkerStatus::Cancelled, + Some("cancelled".to_string()), + Some(snapshot.steps_taken), + None, + ); self.persist_state_best_effort(); } Ok(snapshot) @@ -1914,6 +2222,7 @@ impl SubAgentManager { let before = self.agents.len(); let mut auto_cancelled = 0; let timeout = self.running_heartbeat_timeout; + let mut worker_cancellations = Vec::new(); for agent in self.agents.values_mut() { if agent.status == SubAgentStatus::Running && agent.task_handle.is_some() @@ -1935,9 +2244,23 @@ impl SubAgentManager { handle.abort(); } agent.input_tx = None; + worker_cancellations.push(( + agent.id.clone(), + agent.result.clone(), + agent.steps_taken, + )); auto_cancelled += 1; } } + for (agent_id, message, steps_taken) in worker_cancellations { + self.record_worker_event( + &agent_id, + AgentWorkerStatus::Cancelled, + message, + Some(steps_taken), + None, + ); + } self.agents.retain(|_, agent| { if agent.status == SubAgentStatus::Running { true @@ -1954,14 +2277,15 @@ impl SubAgentManager { fn update_from_result(&mut self, agent_id: &str, result: SubAgentResult) { let mut changed = false; if let Some(agent) = self.agents.get_mut(agent_id) { - agent.status = result.status; - agent.assignment = result.assignment; - agent.result = result.result; + agent.status = result.status.clone(); + agent.assignment = result.assignment.clone(); + agent.result = result.result.clone(); agent.steps_taken = result.steps_taken; - agent.checkpoint = result.checkpoint; + agent.checkpoint = result.checkpoint.clone(); agent.task_handle = None; changed = true; } + self.complete_worker_from_result(agent_id, &result); if changed { self.persist_state_best_effort(); } @@ -1970,11 +2294,12 @@ impl SubAgentManager { fn update_failed(&mut self, agent_id: &str, error: String) { let mut changed = false; if let Some(agent) = self.agents.get_mut(agent_id) { - agent.status = SubAgentStatus::Failed(error); + agent.status = SubAgentStatus::Failed(error.clone()); release_resident_leases_for(agent_id); agent.task_handle = None; changed = true; } + self.fail_worker(agent_id, error); if changed { self.persist_state_best_effort(); } @@ -2010,6 +2335,13 @@ impl SubAgentManager { release_resident_leases_for(agent_id); agent.snapshot() }; + self.record_worker_event( + agent_id, + AgentWorkerStatus::Interrupted, + snapshot.result.clone(), + Some(snapshot.steps_taken), + None, + ); self.persist_state_best_effort(); Ok(snapshot) } @@ -2053,6 +2385,13 @@ impl SubAgentManager { agent.last_activity_at = Instant::now(); agent.snapshot() }; + self.record_worker_event( + agent_id, + AgentWorkerStatus::Running, + Some("continued from checkpoint".to_string()), + Some(snapshot.steps_taken), + None, + ); self.persist_state_best_effort(); Ok(snapshot) } @@ -2079,6 +2418,8 @@ pub struct SubAgentSessionProjection { pub continuable: bool, #[serde(default, skip_serializing_if = "is_false")] pub timed_out: bool, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub worker_record: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -2116,6 +2457,7 @@ async fn subagent_session_projection( snapshot: SubAgentResult, timed_out: bool, context: &ToolContext, + worker_record: Option, ) -> SubAgentSessionProjection { let transcript_session_id = format!("agent:{}", snapshot.agent_id); let transcript_payload = json!({ @@ -2165,6 +2507,7 @@ async fn subagent_session_projection( continuable: subagent_checkpoint_is_continuable(&snapshot), snapshot, timed_out, + worker_record, } } @@ -2363,7 +2706,11 @@ impl ToolSpec for AgentOpenTool { let snapshot: SubAgentResult = serde_json::from_str(&result.content).map_err(|e| { ToolError::execution_failed(format!("agent_open projection failed: {e}")) })?; - let projection = subagent_session_projection(snapshot, false, context).await; + let worker_record = { + let manager = self.manager.read().await; + manager.get_worker_record(&snapshot.agent_id) + }; + let projection = subagent_session_projection(snapshot, false, context, worker_record).await; let mut tool_result = ToolResult::json(&projection) .map_err(|e| ToolError::execution_failed(e.to_string()))?; tool_result.metadata = Some(json!({ @@ -2962,7 +3309,12 @@ impl ToolSpec for AgentEvalTool { ) }; - let projection = subagent_session_projection(snapshot, timed_out, context).await; + let worker_record = { + let manager = self.manager.read().await; + manager.get_worker_record(&snapshot.agent_id) + }; + let projection = + subagent_session_projection(snapshot, timed_out, context, worker_record).await; let mut result = ToolResult::json(&projection) .map_err(|e| ToolError::execution_failed(e.to_string()))?; result.metadata = Some(json!({ @@ -3206,11 +3558,15 @@ impl ToolSpec for AgentCloseTool { .resolve_agent_ref(agent_id) .map_err(|e| ToolError::execution_failed(e.to_string()))? }; - let mut manager = self.manager.write().await; - let result = manager - .cancel(&agent_id) - .map_err(|e| ToolError::execution_failed(format!("Failed to close sub-agent: {e}")))?; - let projection = subagent_session_projection(result, false, context).await; + let (result, worker_record) = { + let mut manager = self.manager.write().await; + let result = manager.cancel(&agent_id).map_err(|e| { + ToolError::execution_failed(format!("Failed to close sub-agent: {e}")) + })?; + let worker_record = manager.get_worker_record(&result.agent_id); + (result, worker_record) + }; + let projection = subagent_session_projection(result, false, context, worker_record).await; ToolResult::json(&projection).map_err(|e| ToolError::execution_failed(e.to_string())) } } @@ -4021,6 +4377,13 @@ async fn record_queued_launch_progress(task: &SubAgentTask) { { let mut manager = task.runtime.manager.write().await; manager.touch(&task.agent_id); + manager.record_worker_event( + &task.agent_id, + AgentWorkerStatus::Queued, + Some(SUBAGENT_QUEUED_LAUNCH_REASON.to_string()), + None, + None, + ); } emit_agent_progress( task.runtime.event_tx.as_ref(), @@ -4040,6 +4403,56 @@ fn queued_launch_heartbeat_interval() -> Duration { Duration::from_secs(10) } +struct SubAgentModelWaitHeartbeat { + cancel: CancellationToken, + handle: JoinHandle<()>, +} + +impl SubAgentModelWaitHeartbeat { + fn start(runtime: SubAgentRuntime, agent_id: String, step_label: String) -> Self { + let cancel = CancellationToken::new(); + let task_cancel = cancel.clone(); + let handle = spawn_supervised( + "subagent-model-wait-heartbeat", + std::panic::Location::caller(), + async move { + let heartbeat = subagent_model_wait_heartbeat_interval(); + loop { + tokio::select! { + biased; + () = task_cancel.cancelled() => break, + () = tokio::time::sleep(heartbeat) => { + record_agent_progress( + &runtime, + &agent_id, + format!("{step_label}: {SUBAGENT_MODEL_WAIT_REASON}"), + ); + } + } + } + }, + ); + Self { cancel, handle } + } +} + +impl Drop for SubAgentModelWaitHeartbeat { + fn drop(&mut self) { + self.cancel.cancel(); + self.handle.abort(); + } +} + +#[cfg(test)] +fn subagent_model_wait_heartbeat_interval() -> Duration { + Duration::from_millis(10) +} + +#[cfg(not(test))] +fn subagent_model_wait_heartbeat_interval() -> Duration { + Duration::from_secs(10) +} + /// Notify the engine's parent turn loop that a direct child finished /// (issue #756). Returns `true` if a send was attempted, `false` if the /// notification was skipped because this isn't a direct child or no channel @@ -4216,14 +4629,16 @@ async fn checkpoint_subagent_progress( } fn record_agent_progress(runtime: &SubAgentRuntime, agent_id: &str, message: impl Into) { + let message = message.into(); if let Ok(mut manager) = runtime.manager.try_write() { manager.touch(agent_id); + manager.record_worker_progress(agent_id, message.clone()); } emit_agent_progress( runtime.event_tx.as_ref(), runtime.mailbox.as_ref(), agent_id, - message.into(), + message, ); } @@ -4397,182 +4812,191 @@ async fn run_subagent( // Race the API call against the cancellation token so a parent // cancel during a long thinking turn doesn't have to wait for the // step timeout. - let response = tokio::select! { - biased; - () = runtime.cancel_token.cancelled() => { - record_agent_progress( - runtime, - &agent_id, - format!("{}: cancelled mid-request", format_step_counter(steps, max_steps)), - ); - if let Some(mb) = runtime.mailbox.as_ref() { - let _ = mb.send(MailboxMessage::Cancelled { + let response = { + let model_wait_heartbeat = SubAgentModelWaitHeartbeat::start( + runtime.clone(), + agent_id.clone(), + format_step_counter(steps, max_steps), + ); + let response = tokio::select! { + biased; + () = runtime.cancel_token.cancelled() => { + record_agent_progress( + runtime, + &agent_id, + format!("{}: cancelled mid-request", format_step_counter(steps, max_steps)), + ); + if let Some(mb) = runtime.mailbox.as_ref() { + let _ = mb.send(MailboxMessage::Cancelled { + agent_id: agent_id.clone(), + }); + } + let status = SubAgentStatus::Cancelled; + let duration_ms = u64::try_from(started_at.elapsed().as_millis()).unwrap_or(u64::MAX); + insert_subagent_full_transcript_handle( + runtime, + &agent_id, + &agent_type, + &assignment, + &status, + None, + latest_checkpoint.as_ref(), + &messages, + steps, + duration_ms, + fork_context_enabled, + ) + .await; + return Ok(SubAgentResult { + name: agent_id.clone(), agent_id: agent_id.clone(), + context_mode: if fork_context_enabled { "forked" } else { "fresh" }.to_string(), + fork_context: fork_context_enabled, + workspace: Some(runtime.context.workspace.clone()), + git_branch: current_git_branch(&runtime.context.workspace), + agent_type: agent_type.clone(), + assignment: assignment.clone(), + model: runtime.model.clone(), + nickname: None, + status, + result: None, + steps_taken: steps, + checkpoint: latest_checkpoint.clone(), + duration_ms, + from_prior_session: false, }); } - let status = SubAgentStatus::Cancelled; - let duration_ms = u64::try_from(started_at.elapsed().as_millis()).unwrap_or(u64::MAX); - insert_subagent_full_transcript_handle( - runtime, - &agent_id, - &agent_type, - &assignment, - &status, - None, - latest_checkpoint.as_ref(), - &messages, - steps, - duration_ms, - fork_context_enabled, - ) - .await; - return Ok(SubAgentResult { - name: agent_id.clone(), - agent_id: agent_id.clone(), - context_mode: if fork_context_enabled { "forked" } else { "fresh" }.to_string(), - fork_context: fork_context_enabled, - workspace: Some(runtime.context.workspace.clone()), - git_branch: current_git_branch(&runtime.context.workspace), - agent_type: agent_type.clone(), - assignment: assignment.clone(), - model: runtime.model.clone(), - nickname: None, - status, - result: None, - steps_taken: steps, - checkpoint: latest_checkpoint.clone(), - duration_ms, - from_prior_session: false, - }); - } - api = tokio::time::timeout(runtime.step_api_timeout, runtime.client.create_message(request)) => { - match api { - Ok(response) => response?, - Err(_) => { - let reason = format!( - "API call timed out after {}ms; checkpoint preserved for continuation", - runtime.step_api_timeout.as_millis() - ); - let checkpoint = checkpoint_subagent_progress( - runtime, - &agent_id, - "api_timeout", - &messages, - steps, - true, - ) - .await; - latest_checkpoint = Some(checkpoint.clone()); - record_agent_progress( - runtime, - &agent_id, - format!("{}: interrupted; {reason}", format_step_counter(steps, max_steps)), - ); - let status = SubAgentStatus::Interrupted(reason.clone()); - let duration_ms = - u64::try_from(started_at.elapsed().as_millis()).unwrap_or(u64::MAX); - insert_subagent_full_transcript_handle( - runtime, - &agent_id, - &agent_type, - &assignment, - &status, - Some(&reason), - Some(&checkpoint), - &messages, - steps, - duration_ms, - fork_context_enabled, - ) - .await; - let interrupted_snapshot = { - let mut manager = runtime.manager.write().await; - manager.interrupt_with_checkpoint( - &agent_id, - reason.clone(), - checkpoint.clone(), - )? - }; - if let Some(mb) = runtime.mailbox.as_ref() { - let _ = mb.send(MailboxMessage::Interrupted { - agent_id: agent_id.clone(), - reason: reason.clone(), - }); - } - - let next_input = tokio::select! { - biased; - () = runtime.cancel_token.cancelled() => { - record_agent_progress( - runtime, - &agent_id, - format!("{}: cancelled while interrupted", format_step_counter(steps, max_steps)), - ); - if let Some(mb) = runtime.mailbox.as_ref() { - let _ = mb.send(MailboxMessage::Cancelled { - agent_id: agent_id.clone(), - }); - } - let status = SubAgentStatus::Cancelled; - let duration_ms = u64::try_from(started_at.elapsed().as_millis()) - .unwrap_or(u64::MAX); - insert_subagent_full_transcript_handle( - runtime, - &agent_id, - &agent_type, - &assignment, - &status, - None, - latest_checkpoint.as_ref(), - &messages, - steps, - duration_ms, - fork_context_enabled, - ) - .await; - return Ok(SubAgentResult { - name: agent_id.clone(), - agent_id: agent_id.clone(), - context_mode: if fork_context_enabled { "forked" } else { "fresh" }.to_string(), - fork_context: fork_context_enabled, - workspace: Some(runtime.context.workspace.clone()), - git_branch: current_git_branch(&runtime.context.workspace), - agent_type: agent_type.clone(), - assignment: assignment.clone(), - model: runtime.model.clone(), - nickname: None, - status, - result: None, - steps_taken: steps, - checkpoint: latest_checkpoint.clone(), - duration_ms, - from_prior_session: false, - }); - } - input = input_rx.recv() => input, - }; - let Some(input) = next_input else { - return Ok(interrupted_snapshot); - }; - if input.interrupt { - pending_inputs.clear(); - } - pending_inputs.push_back(input); - latest_checkpoint = Some( - checkpoint_subagent_progress( + api = tokio::time::timeout(runtime.step_api_timeout, runtime.client.create_message(request)) => { + match api { + Ok(response) => response?, + Err(_) => { + let reason = format!( + "API call timed out after {}ms; checkpoint preserved for continuation", + runtime.step_api_timeout.as_millis() + ); + let checkpoint = checkpoint_subagent_progress( runtime, &agent_id, - "continued_after_api_timeout", + "api_timeout", &messages, steps, true, ) - .await, - ); - continue 'steps_loop; + .await; + latest_checkpoint = Some(checkpoint.clone()); + record_agent_progress( + runtime, + &agent_id, + format!("{}: interrupted; {reason}", format_step_counter(steps, max_steps)), + ); + let status = SubAgentStatus::Interrupted(reason.clone()); + let duration_ms = + u64::try_from(started_at.elapsed().as_millis()).unwrap_or(u64::MAX); + insert_subagent_full_transcript_handle( + runtime, + &agent_id, + &agent_type, + &assignment, + &status, + Some(&reason), + Some(&checkpoint), + &messages, + steps, + duration_ms, + fork_context_enabled, + ) + .await; + let interrupted_snapshot = { + let mut manager = runtime.manager.write().await; + manager.interrupt_with_checkpoint( + &agent_id, + reason.clone(), + checkpoint.clone(), + )? + }; + if let Some(mb) = runtime.mailbox.as_ref() { + let _ = mb.send(MailboxMessage::Interrupted { + agent_id: agent_id.clone(), + reason: reason.clone(), + }); + } + + let next_input = tokio::select! { + biased; + () = runtime.cancel_token.cancelled() => { + record_agent_progress( + runtime, + &agent_id, + format!("{}: cancelled while interrupted", format_step_counter(steps, max_steps)), + ); + if let Some(mb) = runtime.mailbox.as_ref() { + let _ = mb.send(MailboxMessage::Cancelled { + agent_id: agent_id.clone(), + }); + } + let status = SubAgentStatus::Cancelled; + let duration_ms = u64::try_from(started_at.elapsed().as_millis()) + .unwrap_or(u64::MAX); + insert_subagent_full_transcript_handle( + runtime, + &agent_id, + &agent_type, + &assignment, + &status, + None, + latest_checkpoint.as_ref(), + &messages, + steps, + duration_ms, + fork_context_enabled, + ) + .await; + return Ok(SubAgentResult { + name: agent_id.clone(), + agent_id: agent_id.clone(), + context_mode: if fork_context_enabled { "forked" } else { "fresh" }.to_string(), + fork_context: fork_context_enabled, + workspace: Some(runtime.context.workspace.clone()), + git_branch: current_git_branch(&runtime.context.workspace), + agent_type: agent_type.clone(), + assignment: assignment.clone(), + model: runtime.model.clone(), + nickname: None, + status, + result: None, + steps_taken: steps, + checkpoint: latest_checkpoint.clone(), + duration_ms, + from_prior_session: false, + }); + } + input = input_rx.recv() => input, + }; + let Some(input) = next_input else { + return Ok(interrupted_snapshot); + }; + if input.interrupt { + pending_inputs.clear(); + } + pending_inputs.push_back(input); + latest_checkpoint = Some( + checkpoint_subagent_progress( + runtime, + &agent_id, + "continued_after_api_timeout", + &messages, + steps, + true, + ) + .await, + ); + continue 'steps_loop; + } } } - } + }; + drop(model_wait_heartbeat); + response }; let mut tool_uses = Vec::new(); @@ -5570,6 +5994,58 @@ fn build_assignment_prompt( ) } +fn worker_status_from_subagent_status(status: &SubAgentStatus) -> AgentWorkerStatus { + match status { + SubAgentStatus::Running => AgentWorkerStatus::Running, + SubAgentStatus::Completed => AgentWorkerStatus::Completed, + SubAgentStatus::Failed(_) => AgentWorkerStatus::Failed, + SubAgentStatus::Cancelled => AgentWorkerStatus::Cancelled, + SubAgentStatus::Interrupted(_) => AgentWorkerStatus::Interrupted, + } +} + +fn worker_progress_event_parts(message: &str) -> (AgentWorkerStatus, Option, Option) { + let step = parse_progress_step(message); + let lower = message.to_ascii_lowercase(); + let status = if lower.contains("queued") { + AgentWorkerStatus::Queued + } else if lower.contains("requesting model response") + || lower.contains(SUBAGENT_MODEL_WAIT_REASON) + { + AgentWorkerStatus::ModelWait + } else if lower.contains("running tool") || lower.contains("executing") { + AgentWorkerStatus::RunningTool + } else if lower.contains("cancelled") { + AgentWorkerStatus::Cancelled + } else if lower.contains("interrupted") || lower.contains("timed out") { + AgentWorkerStatus::Interrupted + } else if lower.contains("complete") { + AgentWorkerStatus::Completed + } else if lower.contains("started") { + AgentWorkerStatus::Starting + } else { + AgentWorkerStatus::Running + }; + (status, step, parse_progress_tool_name(message)) +} + +fn parse_progress_step(message: &str) -> Option { + let rest = message.strip_prefix("step ")?; + let digits: String = rest.chars().take_while(|ch| ch.is_ascii_digit()).collect(); + (!digits.is_empty()) + .then(|| digits.parse::().ok()) + .flatten() +} + +fn parse_progress_tool_name(message: &str) -> Option { + let marker = "tool '"; + let start = message.find(marker)? + marker.len(); + let rest = &message[start..]; + let end = rest.find('\'')?; + let tool = rest[..end].trim(); + (!tool.is_empty()).then(|| tool.to_string()) +} + fn emit_agent_progress( event_tx: Option<&mpsc::Sender>, mailbox: Option<&Mailbox>, diff --git a/crates/tui/src/tools/subagent/tests.rs b/crates/tui/src/tools/subagent/tests.rs index be3ddb90..d3d26666 100644 --- a/crates/tui/src/tools/subagent/tests.rs +++ b/crates/tui/src/tools/subagent/tests.rs @@ -29,6 +29,116 @@ fn make_snapshot(status: SubAgentStatus) -> SubAgentResult { } } +fn make_worker_spec(worker_id: &str, workspace: PathBuf) -> AgentWorkerSpec { + AgentWorkerSpec { + worker_id: worker_id.to_string(), + objective: "inspect the repo".to_string(), + role: Some("explorer".to_string()), + agent_type: SubAgentType::Explore, + model: "deepseek-v4-flash".to_string(), + workspace, + git_branch: None, + context_mode: "fresh".to_string(), + fork_context: false, + tool_profile: AgentWorkerToolProfile::Explicit(vec![ + "read_file".to_string(), + "grep_files".to_string(), + ]), + max_steps: 8, + spawn_depth: 1, + max_spawn_depth: DEFAULT_MAX_SPAWN_DEPTH, + } +} + +#[test] +fn headless_worker_record_tracks_lifecycle_without_tui_projection() { + let tmp = tempdir().expect("tempdir"); + let mut manager = SubAgentManager::new(tmp.path().to_path_buf(), 4); + manager.register_worker(make_worker_spec( + "agent_worker_contract", + tmp.path().to_path_buf(), + )); + + manager.record_worker_event( + "agent_worker_contract", + AgentWorkerStatus::Queued, + Some(SUBAGENT_QUEUED_LAUNCH_REASON.to_string()), + None, + None, + ); + manager.record_worker_progress( + "agent_worker_contract", + "step 1: requesting model response".to_string(), + ); + manager.record_worker_progress( + "agent_worker_contract", + "step 1: running tool 'read_file'".to_string(), + ); + + let mut result = make_snapshot(SubAgentStatus::Completed); + result.agent_id = "agent_worker_contract".to_string(); + result.name = "agent_worker_contract".to_string(); + result.result = Some("worker summary".to_string()); + result.steps_taken = 1; + manager.complete_worker_from_result("agent_worker_contract", &result); + + let record = manager + .get_worker_record("agent_worker_contract") + .expect("worker record"); + assert_eq!(record.status, AgentWorkerStatus::Completed); + assert_eq!(record.spec.agent_type, SubAgentType::Explore); + assert_eq!( + record.spec.tool_profile, + AgentWorkerToolProfile::Explicit(vec!["read_file".to_string(), "grep_files".to_string()]) + ); + assert_eq!(record.result_summary.as_deref(), Some("worker summary")); + assert_eq!(record.steps_taken, 1); + let statuses: Vec<_> = record.events.iter().map(|event| event.status).collect(); + assert!(statuses.contains(&AgentWorkerStatus::Queued)); + assert!(statuses.contains(&AgentWorkerStatus::ModelWait)); + assert!(statuses.contains(&AgentWorkerStatus::RunningTool)); + assert!(statuses.contains(&AgentWorkerStatus::Completed)); + assert!( + record + .events + .iter() + .any(|event| event.tool_name.as_deref() == Some("read_file")) + ); +} + +#[test] +fn headless_worker_records_persist_with_subagent_state() { + let tmp = tempdir().expect("tempdir"); + let state_path = tmp.path().join("subagents.v1.json"); + let mut manager = + SubAgentManager::new(tmp.path().to_path_buf(), 4).with_state_path(state_path.clone()); + manager.register_worker(make_worker_spec( + "agent_persisted", + tmp.path().to_path_buf(), + )); + + let mut result = make_snapshot(SubAgentStatus::Failed("boom".to_string())); + result.agent_id = "agent_persisted".to_string(); + result.name = "agent_persisted".to_string(); + result.steps_taken = 3; + manager.complete_worker_from_result("agent_persisted", &result); + manager.persist_state().expect("persist state"); + + let mut loaded = SubAgentManager::new(tmp.path().to_path_buf(), 4).with_state_path(state_path); + loaded.load_state().expect("load state"); + + let record = loaded.get_worker_record("agent_persisted").expect("record"); + assert_eq!(record.status, AgentWorkerStatus::Failed); + assert_eq!(record.error.as_deref(), Some("boom")); + assert_eq!(record.steps_taken, 3); + assert!( + record + .events + .iter() + .any(|event| event.status == AgentWorkerStatus::Failed) + ); +} + fn init_subagent_git_repo() -> tempfile::TempDir { let dir = tempdir().expect("tempdir"); @@ -531,7 +641,7 @@ async fn session_projection_exposes_forked_prefix_cache_contract() { snapshot.fork_context = true; let ctx = ToolContext::new("."); - let projection = subagent_session_projection(snapshot, false, &ctx).await; + let projection = subagent_session_projection(snapshot, false, &ctx, None).await; assert_eq!(projection.name, "fanout_review"); assert_eq!(projection.context_mode, "forked"); @@ -571,7 +681,7 @@ async fn terminal_session_projection_prefers_full_transcript_handle() { ) }; - let projection = subagent_session_projection(snapshot, false, &ctx).await; + let projection = subagent_session_projection(snapshot, false, &ctx, None).await; assert_eq!(projection.transcript_handle, full_handle); assert_eq!(projection.transcript_handle.name, "full_transcript"); @@ -591,7 +701,7 @@ async fn interrupted_projection_exposes_checkpoint_metadata_and_messages() { snapshot.checkpoint = Some(checkpoint.clone()); let ctx = ToolContext::new("."); - let projection = subagent_session_projection(snapshot, false, &ctx).await; + let projection = subagent_session_projection(snapshot, false, &ctx, None).await; assert_eq!(projection.status, "interrupted"); assert!(projection.terminal); @@ -1464,6 +1574,90 @@ async fn api_timeout_preserves_checkpoint_and_agent_eval_continues_from_it() { assert!(second_request.contains("Please continue with the prior checkpoint.")); } +#[tokio::test] +async fn model_wait_heartbeat_prevents_stale_cleanup_during_api_call() { + let tmp = tempdir().expect("tempdir"); + let manager = Arc::new(RwLock::new( + SubAgentManager::new(tmp.path().to_path_buf(), 2) + .with_running_heartbeat_timeout(Duration::from_millis(250)), + )); + let agent_id = "agent_model_wait_heartbeat".to_string(); + let (task_input_tx, task_input_rx) = mpsc::unbounded_channel(); + let agent = SubAgent::new( + agent_id.clone(), + SubAgentType::General, + "Wait visibly".to_string(), + make_assignment(), + "deepseek-v4-flash".to_string(), + Some("Blue".to_string()), + Some(vec![]), + task_input_tx, + tmp.path().to_path_buf(), + "boot_test".to_string(), + ); + { + let mut guard = manager.write().await; + guard.register_worker(make_worker_spec(&agent_id, tmp.path().to_path_buf())); + guard.agents.insert(agent_id.clone(), agent); + } + + let (client, _calls, _bodies) = delayed_chat_client(Duration::from_millis(150), "done").await; + let (mailbox, mut mailbox_rx) = Mailbox::new(CancellationToken::new()); + let mut runtime = stub_runtime().with_step_api_timeout(Duration::from_secs(2)); + runtime.client = client; + runtime.manager = Arc::clone(&manager); + runtime.context = ToolContext::new(tmp.path()); + runtime.mailbox = Some(mailbox); + + let task = SubAgentTask { + manager_handle: Arc::clone(&manager), + runtime: runtime.clone(), + agent_id: agent_id.clone(), + agent_type: SubAgentType::General, + prompt: "Wait visibly".to_string(), + assignment: make_assignment(), + allowed_tools: Some(vec![]), + fork_context: false, + started_at: Instant::now(), + max_steps: 1, + input_rx: task_input_rx, + launch_gate: None, + }; + let handle = tokio::spawn(run_subagent_task(task)); + + let heartbeat = tokio::time::timeout(Duration::from_secs(2), async { + loop { + let envelope = mailbox_rx.recv().await?; + if let MailboxMessage::Progress { agent_id, status } = envelope.message + && agent_id == "agent_model_wait_heartbeat" + && status.contains(SUBAGENT_MODEL_WAIT_REASON) + { + return Some(status); + } + } + }) + .await + .expect("model wait heartbeat should be published before completion") + .expect("mailbox should stay open until heartbeat"); + assert_eq!(heartbeat, "step 1/1: waiting for model response"); + + { + let mut guard = manager.write().await; + assert_eq!( + guard.cleanup(Duration::from_secs(60 * 60)), + 0, + "fresh model-wait heartbeat must keep the running agent alive" + ); + let record = guard + .get_worker_record(&agent_id) + .expect("worker record should track model wait"); + assert_eq!(record.status, AgentWorkerStatus::ModelWait); + assert_eq!(record.latest_message.as_deref(), Some(heartbeat.as_str())); + } + + handle.await.expect("sub-agent task should finish"); +} + #[tokio::test] async fn spawn_duplicate_session_name_error_names_conflicting_agent() { // #2656: the duplicate-name error must identify the conflicting agent so a diff --git a/crates/tui/src/tui/ui.rs b/crates/tui/src/tui/ui.rs index 96c1847e..434c6123 100644 --- a/crates/tui/src/tui/ui.rs +++ b/crates/tui/src/tui/ui.rs @@ -2948,6 +2948,17 @@ async fn run_event_loop( } } + if final_w == 0 || final_h == 0 { + tracing::debug!( + final_w, + final_h, + "zero-size Resize event ignored while terminal is hidden/minimized" + ); + force_terminal_repaint = true; + app.needs_redraw = true; + continue; + } + // #582: commit the event-reported size to ratatui's // viewport explicitly before the redraw, instead of // relying on `crossterm::terminal::size()` which gets @@ -9095,12 +9106,6 @@ fn enable_windows_ime_console_mode() { /// across focus events and are only re-established by `resume_terminal` /// after a suspension, which always runs a separate path. /// -/// Note: calling this on every FocusGained event pushes one extra Kitty -/// keyboard mode level onto the terminal's stack without a preceding pop. -/// After N focus cycles the stack reaches depth N; at shutdown only one -/// level is popped. On terminals with a finite stack this is benign because -/// the terminal clears the stack on process exit. A future improvement is -/// to pop-then-push here so the stack stays at depth ≤1. fn recover_terminal_modes( writer: &mut W, use_mouse_capture: bool, @@ -9109,6 +9114,7 @@ fn recover_terminal_modes( #[cfg(target_os = "windows")] enable_windows_ime_console_mode(); + pop_keyboard_enhancement_flags(writer); push_keyboard_enhancement_flags(writer); if use_mouse_capture && let Err(err) = execute!(writer, EnableMouseCapture) { tracing::debug!(?err, "EnableMouseCapture ignored");