fix(tui): keep subagents alive across backgrounded waits
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -29,6 +29,116 @@ fn make_snapshot(status: SubAgentStatus) -> SubAgentResult {
|
||||
}
|
||||
}
|
||||
|
||||
fn make_worker_spec(worker_id: &str, workspace: PathBuf) -> AgentWorkerSpec {
|
||||
AgentWorkerSpec {
|
||||
worker_id: worker_id.to_string(),
|
||||
objective: "inspect the repo".to_string(),
|
||||
role: Some("explorer".to_string()),
|
||||
agent_type: SubAgentType::Explore,
|
||||
model: "deepseek-v4-flash".to_string(),
|
||||
workspace,
|
||||
git_branch: None,
|
||||
context_mode: "fresh".to_string(),
|
||||
fork_context: false,
|
||||
tool_profile: AgentWorkerToolProfile::Explicit(vec![
|
||||
"read_file".to_string(),
|
||||
"grep_files".to_string(),
|
||||
]),
|
||||
max_steps: 8,
|
||||
spawn_depth: 1,
|
||||
max_spawn_depth: DEFAULT_MAX_SPAWN_DEPTH,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn headless_worker_record_tracks_lifecycle_without_tui_projection() {
|
||||
let tmp = tempdir().expect("tempdir");
|
||||
let mut manager = SubAgentManager::new(tmp.path().to_path_buf(), 4);
|
||||
manager.register_worker(make_worker_spec(
|
||||
"agent_worker_contract",
|
||||
tmp.path().to_path_buf(),
|
||||
));
|
||||
|
||||
manager.record_worker_event(
|
||||
"agent_worker_contract",
|
||||
AgentWorkerStatus::Queued,
|
||||
Some(SUBAGENT_QUEUED_LAUNCH_REASON.to_string()),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
manager.record_worker_progress(
|
||||
"agent_worker_contract",
|
||||
"step 1: requesting model response".to_string(),
|
||||
);
|
||||
manager.record_worker_progress(
|
||||
"agent_worker_contract",
|
||||
"step 1: running tool 'read_file'".to_string(),
|
||||
);
|
||||
|
||||
let mut result = make_snapshot(SubAgentStatus::Completed);
|
||||
result.agent_id = "agent_worker_contract".to_string();
|
||||
result.name = "agent_worker_contract".to_string();
|
||||
result.result = Some("worker summary".to_string());
|
||||
result.steps_taken = 1;
|
||||
manager.complete_worker_from_result("agent_worker_contract", &result);
|
||||
|
||||
let record = manager
|
||||
.get_worker_record("agent_worker_contract")
|
||||
.expect("worker record");
|
||||
assert_eq!(record.status, AgentWorkerStatus::Completed);
|
||||
assert_eq!(record.spec.agent_type, SubAgentType::Explore);
|
||||
assert_eq!(
|
||||
record.spec.tool_profile,
|
||||
AgentWorkerToolProfile::Explicit(vec!["read_file".to_string(), "grep_files".to_string()])
|
||||
);
|
||||
assert_eq!(record.result_summary.as_deref(), Some("worker summary"));
|
||||
assert_eq!(record.steps_taken, 1);
|
||||
let statuses: Vec<_> = record.events.iter().map(|event| event.status).collect();
|
||||
assert!(statuses.contains(&AgentWorkerStatus::Queued));
|
||||
assert!(statuses.contains(&AgentWorkerStatus::ModelWait));
|
||||
assert!(statuses.contains(&AgentWorkerStatus::RunningTool));
|
||||
assert!(statuses.contains(&AgentWorkerStatus::Completed));
|
||||
assert!(
|
||||
record
|
||||
.events
|
||||
.iter()
|
||||
.any(|event| event.tool_name.as_deref() == Some("read_file"))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn headless_worker_records_persist_with_subagent_state() {
|
||||
let tmp = tempdir().expect("tempdir");
|
||||
let state_path = tmp.path().join("subagents.v1.json");
|
||||
let mut manager =
|
||||
SubAgentManager::new(tmp.path().to_path_buf(), 4).with_state_path(state_path.clone());
|
||||
manager.register_worker(make_worker_spec(
|
||||
"agent_persisted",
|
||||
tmp.path().to_path_buf(),
|
||||
));
|
||||
|
||||
let mut result = make_snapshot(SubAgentStatus::Failed("boom".to_string()));
|
||||
result.agent_id = "agent_persisted".to_string();
|
||||
result.name = "agent_persisted".to_string();
|
||||
result.steps_taken = 3;
|
||||
manager.complete_worker_from_result("agent_persisted", &result);
|
||||
manager.persist_state().expect("persist state");
|
||||
|
||||
let mut loaded = SubAgentManager::new(tmp.path().to_path_buf(), 4).with_state_path(state_path);
|
||||
loaded.load_state().expect("load state");
|
||||
|
||||
let record = loaded.get_worker_record("agent_persisted").expect("record");
|
||||
assert_eq!(record.status, AgentWorkerStatus::Failed);
|
||||
assert_eq!(record.error.as_deref(), Some("boom"));
|
||||
assert_eq!(record.steps_taken, 3);
|
||||
assert!(
|
||||
record
|
||||
.events
|
||||
.iter()
|
||||
.any(|event| event.status == AgentWorkerStatus::Failed)
|
||||
);
|
||||
}
|
||||
|
||||
fn init_subagent_git_repo() -> tempfile::TempDir {
|
||||
let dir = tempdir().expect("tempdir");
|
||||
|
||||
@@ -531,7 +641,7 @@ async fn session_projection_exposes_forked_prefix_cache_contract() {
|
||||
snapshot.fork_context = true;
|
||||
|
||||
let ctx = ToolContext::new(".");
|
||||
let projection = subagent_session_projection(snapshot, false, &ctx).await;
|
||||
let projection = subagent_session_projection(snapshot, false, &ctx, None).await;
|
||||
|
||||
assert_eq!(projection.name, "fanout_review");
|
||||
assert_eq!(projection.context_mode, "forked");
|
||||
@@ -571,7 +681,7 @@ async fn terminal_session_projection_prefers_full_transcript_handle() {
|
||||
)
|
||||
};
|
||||
|
||||
let projection = subagent_session_projection(snapshot, false, &ctx).await;
|
||||
let projection = subagent_session_projection(snapshot, false, &ctx, None).await;
|
||||
|
||||
assert_eq!(projection.transcript_handle, full_handle);
|
||||
assert_eq!(projection.transcript_handle.name, "full_transcript");
|
||||
@@ -591,7 +701,7 @@ async fn interrupted_projection_exposes_checkpoint_metadata_and_messages() {
|
||||
snapshot.checkpoint = Some(checkpoint.clone());
|
||||
|
||||
let ctx = ToolContext::new(".");
|
||||
let projection = subagent_session_projection(snapshot, false, &ctx).await;
|
||||
let projection = subagent_session_projection(snapshot, false, &ctx, None).await;
|
||||
|
||||
assert_eq!(projection.status, "interrupted");
|
||||
assert!(projection.terminal);
|
||||
@@ -1464,6 +1574,90 @@ async fn api_timeout_preserves_checkpoint_and_agent_eval_continues_from_it() {
|
||||
assert!(second_request.contains("Please continue with the prior checkpoint."));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn model_wait_heartbeat_prevents_stale_cleanup_during_api_call() {
|
||||
let tmp = tempdir().expect("tempdir");
|
||||
let manager = Arc::new(RwLock::new(
|
||||
SubAgentManager::new(tmp.path().to_path_buf(), 2)
|
||||
.with_running_heartbeat_timeout(Duration::from_millis(250)),
|
||||
));
|
||||
let agent_id = "agent_model_wait_heartbeat".to_string();
|
||||
let (task_input_tx, task_input_rx) = mpsc::unbounded_channel();
|
||||
let agent = SubAgent::new(
|
||||
agent_id.clone(),
|
||||
SubAgentType::General,
|
||||
"Wait visibly".to_string(),
|
||||
make_assignment(),
|
||||
"deepseek-v4-flash".to_string(),
|
||||
Some("Blue".to_string()),
|
||||
Some(vec![]),
|
||||
task_input_tx,
|
||||
tmp.path().to_path_buf(),
|
||||
"boot_test".to_string(),
|
||||
);
|
||||
{
|
||||
let mut guard = manager.write().await;
|
||||
guard.register_worker(make_worker_spec(&agent_id, tmp.path().to_path_buf()));
|
||||
guard.agents.insert(agent_id.clone(), agent);
|
||||
}
|
||||
|
||||
let (client, _calls, _bodies) = delayed_chat_client(Duration::from_millis(150), "done").await;
|
||||
let (mailbox, mut mailbox_rx) = Mailbox::new(CancellationToken::new());
|
||||
let mut runtime = stub_runtime().with_step_api_timeout(Duration::from_secs(2));
|
||||
runtime.client = client;
|
||||
runtime.manager = Arc::clone(&manager);
|
||||
runtime.context = ToolContext::new(tmp.path());
|
||||
runtime.mailbox = Some(mailbox);
|
||||
|
||||
let task = SubAgentTask {
|
||||
manager_handle: Arc::clone(&manager),
|
||||
runtime: runtime.clone(),
|
||||
agent_id: agent_id.clone(),
|
||||
agent_type: SubAgentType::General,
|
||||
prompt: "Wait visibly".to_string(),
|
||||
assignment: make_assignment(),
|
||||
allowed_tools: Some(vec![]),
|
||||
fork_context: false,
|
||||
started_at: Instant::now(),
|
||||
max_steps: 1,
|
||||
input_rx: task_input_rx,
|
||||
launch_gate: None,
|
||||
};
|
||||
let handle = tokio::spawn(run_subagent_task(task));
|
||||
|
||||
let heartbeat = tokio::time::timeout(Duration::from_secs(2), async {
|
||||
loop {
|
||||
let envelope = mailbox_rx.recv().await?;
|
||||
if let MailboxMessage::Progress { agent_id, status } = envelope.message
|
||||
&& agent_id == "agent_model_wait_heartbeat"
|
||||
&& status.contains(SUBAGENT_MODEL_WAIT_REASON)
|
||||
{
|
||||
return Some(status);
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("model wait heartbeat should be published before completion")
|
||||
.expect("mailbox should stay open until heartbeat");
|
||||
assert_eq!(heartbeat, "step 1/1: waiting for model response");
|
||||
|
||||
{
|
||||
let mut guard = manager.write().await;
|
||||
assert_eq!(
|
||||
guard.cleanup(Duration::from_secs(60 * 60)),
|
||||
0,
|
||||
"fresh model-wait heartbeat must keep the running agent alive"
|
||||
);
|
||||
let record = guard
|
||||
.get_worker_record(&agent_id)
|
||||
.expect("worker record should track model wait");
|
||||
assert_eq!(record.status, AgentWorkerStatus::ModelWait);
|
||||
assert_eq!(record.latest_message.as_deref(), Some(heartbeat.as_str()));
|
||||
}
|
||||
|
||||
handle.await.expect("sub-agent task should finish");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_duplicate_session_name_error_names_conflicting_agent() {
|
||||
// #2656: the duplicate-name error must identify the conflicting agent so a
|
||||
|
||||
@@ -2948,6 +2948,17 @@ async fn run_event_loop(
|
||||
}
|
||||
}
|
||||
|
||||
if final_w == 0 || final_h == 0 {
|
||||
tracing::debug!(
|
||||
final_w,
|
||||
final_h,
|
||||
"zero-size Resize event ignored while terminal is hidden/minimized"
|
||||
);
|
||||
force_terminal_repaint = true;
|
||||
app.needs_redraw = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
// #582: commit the event-reported size to ratatui's
|
||||
// viewport explicitly before the redraw, instead of
|
||||
// relying on `crossterm::terminal::size()` which gets
|
||||
@@ -9095,12 +9106,6 @@ fn enable_windows_ime_console_mode() {
|
||||
/// across focus events and are only re-established by `resume_terminal`
|
||||
/// after a suspension, which always runs a separate path.
|
||||
///
|
||||
/// Note: calling this on every FocusGained event pushes one extra Kitty
|
||||
/// keyboard mode level onto the terminal's stack without a preceding pop.
|
||||
/// After N focus cycles the stack reaches depth N; at shutdown only one
|
||||
/// level is popped. On terminals with a finite stack this is benign because
|
||||
/// the terminal clears the stack on process exit. A future improvement is
|
||||
/// to pop-then-push here so the stack stays at depth ≤1.
|
||||
fn recover_terminal_modes<W: Write>(
|
||||
writer: &mut W,
|
||||
use_mouse_capture: bool,
|
||||
@@ -9109,6 +9114,7 @@ fn recover_terminal_modes<W: Write>(
|
||||
#[cfg(target_os = "windows")]
|
||||
enable_windows_ime_console_mode();
|
||||
|
||||
pop_keyboard_enhancement_flags(writer);
|
||||
push_keyboard_enhancement_flags(writer);
|
||||
if use_mouse_capture && let Err(err) = execute!(writer, EnableMouseCapture) {
|
||||
tracing::debug!(?err, "EnableMouseCapture ignored");
|
||||
|
||||
Reference in New Issue
Block a user