From 385a07f32a7aa23903e5aa19e0e03a791fc62bb3 Mon Sep 17 00:00:00 2001 From: Hunter Bown Date: Fri, 12 Jun 2026 00:49:13 +0000 Subject: [PATCH] feat(tui): queue interactive fanout launches behind a visible launch gate (#3095) Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- crates/tui/src/config.rs | 54 ++++++++++ crates/tui/src/core/engine.rs | 6 ++ crates/tui/src/main.rs | 1 + crates/tui/src/runtime_threads.rs | 1 + crates/tui/src/tools/subagent/mod.rs | 61 ++++++++++- crates/tui/src/tools/subagent/tests.rs | 140 +++++++++++++++++++++++++ crates/tui/src/tui/ui.rs | 1 + 7 files changed, 262 insertions(+), 2 deletions(-) diff --git a/crates/tui/src/config.rs b/crates/tui/src/config.rs index 140c8f80..c3f09d1e 100644 --- a/crates/tui/src/config.rs +++ b/crates/tui/src/config.rs @@ -20,6 +20,11 @@ use crate::hooks::HooksConfig; pub const DEFAULT_MAX_SUBAGENTS: usize = 10; pub const MAX_SUBAGENTS: usize = 20; +/// Default number of direct (depth-1) sub-agents that may execute +/// concurrently in an interactive session before further launches queue +/// for a slot (#3095). Deliberately lower than `DEFAULT_MAX_SUBAGENTS`, +/// which caps total live agents across the whole spawn tree. +pub const DEFAULT_INTERACTIVE_LAUNCH_LIMIT: usize = 4; /// Default per-step DeepSeek API timeout for sub-agent requests, in seconds. /// Matches the legacy hardcoded value so existing configs keep their old /// behavior when `[subagents] api_timeout_secs` is unset (#1806, #1808). @@ -1534,6 +1539,12 @@ pub struct SubagentsConfig { /// setting. Clamped to [1, MAX_SUBAGENTS]. #[serde(default)] pub max_concurrent: Option, + /// Number of direct (depth-1) sub-agents that may execute concurrently + /// before further interactive fanout launches queue for a slot (#3095). + /// Defaults to `DEFAULT_INTERACTIVE_LAUNCH_LIMIT` (4) and is clamped to + /// [1, max_subagents]. + #[serde(default)] + pub interactive_max_launch: Option, /// Per-step DeepSeek API timeout for sub-agent requests, in seconds. The /// timeout wraps `client.create_message` so a stuck single step cannot /// pin the parent's parent-completion wakeup channel indefinitely. @@ -2922,6 +2933,20 @@ impl Config { .clamp(1, MAX_SUBAGENTS) } + /// Number of direct (depth-1) sub-agents that may execute concurrently + /// before further interactive fanout launches queue for a slot (#3095). + /// Reads `[subagents] interactive_max_launch`, defaults to + /// `DEFAULT_INTERACTIVE_LAUNCH_LIMIT`, and clamps to + /// `[1, max_subagents]`. + #[must_use] + pub fn interactive_launch_limit(&self) -> usize { + self.subagents + .as_ref() + .and_then(|cfg| cfg.interactive_max_launch) + .unwrap_or(DEFAULT_INTERACTIVE_LAUNCH_LIMIT) + .clamp(1, self.max_subagents()) + } + /// Resolved per-step DeepSeek API timeout for sub-agents, in seconds. /// /// Reads `[subagents] api_timeout_secs` and clamps to @@ -6811,6 +6836,35 @@ action = "session.compact" assert_eq!(DEFAULT_MAX_SUBAGENTS, 10); } + #[test] + fn interactive_launch_limit_defaults_and_clamps_to_max_subagents() { + assert_eq!( + Config::default().interactive_launch_limit(), + DEFAULT_INTERACTIVE_LAUNCH_LIMIT + ); + + let mut config = Config { + subagents: Some(SubagentsConfig { + interactive_max_launch: Some(50), + ..SubagentsConfig::default() + }), + ..Config::default() + }; + assert_eq!(config.interactive_launch_limit(), config.max_subagents()); + + config.subagents = Some(SubagentsConfig { + interactive_max_launch: Some(0), + ..SubagentsConfig::default() + }); + assert_eq!(config.interactive_launch_limit(), 1); + + config.subagents = Some(SubagentsConfig { + interactive_max_launch: Some(2), + ..SubagentsConfig::default() + }); + assert_eq!(config.interactive_launch_limit(), 2); + } + #[test] fn subagents_max_concurrent_overrides_top_level_cap() { let config = Config { diff --git a/crates/tui/src/core/engine.rs b/crates/tui/src/core/engine.rs index 6135cda8..023bc99d 100644 --- a/crates/tui/src/core/engine.rs +++ b/crates/tui/src/core/engine.rs @@ -278,6 +278,10 @@ pub struct EngineConfig { pub max_steps: u32, /// Maximum number of concurrently active subagents. pub max_subagents: usize, + /// Number of direct (depth-1) sub-agents that may execute concurrently + /// before further interactive fanout launches queue for a slot (#3095). + /// Resolved from `[subagents] interactive_max_launch`. + pub interactive_launch_limit: usize, /// Feature flags controlling tool availability. pub features: Features, /// Auto-compaction settings for long conversations. @@ -391,6 +395,7 @@ impl Default for EngineConfig { show_thinking: true, max_steps: 100, max_subagents: DEFAULT_MAX_SUBAGENTS, + interactive_launch_limit: crate::config::DEFAULT_INTERACTIVE_LAUNCH_LIMIT, features: Features::with_defaults(), compaction: CompactionConfig::default(), capacity: CapacityControllerConfig::default(), @@ -722,6 +727,7 @@ impl Engine { config.workspace.clone(), config.max_subagents, config.subagent_heartbeat_timeout, + config.interactive_launch_limit, ); let shell_manager = config .runtime_services diff --git a/crates/tui/src/main.rs b/crates/tui/src/main.rs index c642e152..7bc6fb85 100644 --- a/crates/tui/src/main.rs +++ b/crates/tui/src/main.rs @@ -5863,6 +5863,7 @@ async fn run_exec_agent( show_thinking: settings.show_thinking, max_steps: max_turns, max_subagents, + interactive_launch_limit: config.interactive_launch_limit(), features: config.features(), compaction, capacity: crate::core::capacity::CapacityControllerConfig::from_app_config(config), diff --git a/crates/tui/src/runtime_threads.rs b/crates/tui/src/runtime_threads.rs index 724c9d35..98c042cf 100644 --- a/crates/tui/src/runtime_threads.rs +++ b/crates/tui/src/runtime_threads.rs @@ -2036,6 +2036,7 @@ impl RuntimeThreadManager { show_thinking: settings.show_thinking, max_steps: 100, max_subagents: self.config.max_subagents().clamp(1, MAX_SUBAGENTS), + interactive_launch_limit: self.config.interactive_launch_limit(), features: self.config.features(), compaction, capacity: crate::core::capacity::CapacityControllerConfig::from_app_config( diff --git a/crates/tui/src/tools/subagent/mod.rs b/crates/tui/src/tools/subagent/mod.rs index c3c6ece2..e62511c5 100644 --- a/crates/tui/src/tools/subagent/mod.rs +++ b/crates/tui/src/tools/subagent/mod.rs @@ -14,7 +14,7 @@ use std::fs; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::{Mutex, RwLock, Semaphore}; use anyhow::{Result, anyhow}; use async_trait::async_trait; @@ -1197,6 +1197,13 @@ pub struct SubAgentManager { /// agents whose `session_boot_id` doesn't match this value as /// "from prior session" so `agent_list` can hide them by default. current_session_boot_id: String, + /// Launch gate for direct (depth-1) interactive fanout (#3095). Each + /// permit is one actively executing direct child; further direct + /// children spawn immediately but queue for a permit before starting, + /// publishing a visible "queued" reason instead of bursting. Deeper + /// descendants bypass the gate so a permit-holding parent waiting on + /// its own children cannot deadlock the tree. + launch_gate: Arc, } impl SubAgentManager { @@ -1215,9 +1222,20 @@ impl SubAgentManager { // Fresh boot id per manager. Used by #405 to classify // re-loaded persisted agents as "prior session". current_session_boot_id: format!("boot_{}", &Uuid::new_v4().to_string()[..12]), + launch_gate: Arc::new(Semaphore::new( + crate::config::DEFAULT_INTERACTIVE_LAUNCH_LIMIT.min(max_agents), + )), } } + /// Set the number of direct children that may execute concurrently + /// before further launches queue (#3095). Clamped to `1..=max_agents`. + #[must_use] + pub fn with_interactive_launch_limit(mut self, limit: usize) -> Self { + self.launch_gate = Arc::new(Semaphore::new(limit.clamp(1, self.max_agents))); + self + } + /// Return the boot id this manager stamps on agents it spawns. /// Exposed for tests; internal callers use the field directly. #[cfg(test)] @@ -1540,6 +1558,7 @@ impl SubAgentManager { }); } + let launch_gate = (runtime.spawn_depth == 1).then(|| self.launch_gate.clone()); let task = SubAgentTask { manager_handle, runtime, @@ -1552,6 +1571,7 @@ impl SubAgentManager { started_at, max_steps, input_rx, + launch_gate, }; let handle = spawn_supervised( "subagent-task", @@ -1670,6 +1690,7 @@ impl SubAgentManager { if !agent.model.trim().is_empty() && agent.model != "unknown" { restart_runtime.model.clone_from(&agent.model); } + let launch_gate = (restart_runtime.spawn_depth == 1).then(|| self.launch_gate.clone()); let task = SubAgentTask { manager_handle, runtime: restart_runtime, @@ -1682,6 +1703,7 @@ impl SubAgentManager { started_at: restarted_at, max_steps: self.max_steps, input_rx, + launch_gate, }; let handle = spawn_supervised( "subagent-task-resume", @@ -2188,6 +2210,7 @@ pub fn new_shared_subagent_manager(workspace: PathBuf, max_agents: usize) -> Sha workspace, max_agents, Duration::from_secs(crate::config::DEFAULT_SUBAGENT_HEARTBEAT_TIMEOUT_SECS), + crate::config::DEFAULT_INTERACTIVE_LAUNCH_LIMIT, ) } @@ -2198,11 +2221,13 @@ pub fn new_shared_subagent_manager_with_timeout( workspace: PathBuf, max_agents: usize, running_heartbeat_timeout: Duration, + interactive_launch_limit: usize, ) -> SharedSubAgentManager { let max_agents = max_agents.clamp(1, MAX_SUBAGENTS); let state_path = default_state_path(&workspace); let mut manager = SubAgentManager::new(workspace, max_agents) .with_running_heartbeat_timeout(running_heartbeat_timeout) + .with_interactive_launch_limit(interactive_launch_limit) .with_state_path(state_path); if let Err(err) = manager.load_state() { // Routed through tracing instead of stderr — see comment in @@ -2505,7 +2530,7 @@ impl ToolSpec for AgentSpawnTool { fn description(&self) -> &'static str { concat!( - "Spawn a background sub-agent for a focused task. Returns an agent_id immediately; follow with agent_result to retrieve the final result. Default cap of 10 concurrent sub-agents (configurable via `[subagents].max_concurrent`); each is a full sub-agent loop, so cancel or wait if you hit the cap. For parallel one-shot LLM queries, just emit multiple tool calls in one turn — the dispatcher runs them in parallel.\n\n", + "Spawn a background sub-agent for a focused task. Returns an agent_id immediately; follow with agent_result to retrieve the final result. Default cap of 10 concurrent sub-agents (configurable via `[subagents].max_concurrent`); each is a full sub-agent loop, so cancel or wait if you hit the cap. Direct children beyond the interactive launch limit (default 4, `[subagents].interactive_max_launch`) queue for a slot instead of executing at once. For parallel one-shot LLM queries, just emit multiple tool calls in one turn — the dispatcher runs them in parallel.\n\n", "## Trust model: subagent results are self-reports, not verified facts\n\n", "`agent_result` returns the child's narrative summary of what happened. For operations with external side effects, the child's summary may be wrong. Re-verify before reporting success to the user:\n\n", "| Side effect | Re-verify with |\n|---|---|\n| URL claimed posted/written | `fetch_url` and check the response |\n| File claimed created | `read_file` or `list_dir` |\n| File claimed edited | `read_file` and check the change is present |\n| HTTP POST/PUT response | inspect status code and body |\n| Git operation | `git_status` / `git_diff` |\n| Test claimed passing | `run_tests` |\n| Process claimed started | `exec_shell` (e.g. `pgrep`, `lsof -i`) |\n\n", @@ -3855,10 +3880,42 @@ struct SubAgentTask { started_at: Instant, max_steps: u32, input_rx: mpsc::UnboundedReceiver, + /// Interactive launch gate (#3095). `Some` only for direct (depth-1) + /// children: the task acquires a permit before its first model step and + /// holds it until completion, so a fanout burst beyond the limit queues + /// with a visible reason instead of executing all at once. + launch_gate: Option>, } #[allow(clippy::too_many_lines)] async fn run_subagent_task(task: SubAgentTask) { + // Interactive launch gate (#3095): direct children acquire a permit + // before their first model step so a fanout burst beyond the limit + // queues visibly instead of executing all at once. The permit is held + // for the lifetime of the task. Cancellation while queued is handled by + // `run_subagent`'s own first-step cancel check. + let mut _launch_permit = None; + if let Some(gate) = task.launch_gate.as_ref() { + match Arc::clone(gate).try_acquire_owned() { + Ok(permit) => _launch_permit = Some(permit), + Err(tokio::sync::TryAcquireError::NoPermits) => { + record_agent_progress( + &task.runtime, + &task.agent_id, + "queued: waiting for an interactive fanout slot".to_string(), + ); + if let Some(mb) = task.runtime.mailbox.as_ref() { + let _ = mb.send(MailboxMessage::progress( + &task.agent_id, + "queued: waiting for an interactive fanout slot".to_string(), + )); + } + _launch_permit = Arc::clone(gate).acquire_owned().await.ok(); + } + Err(tokio::sync::TryAcquireError::Closed) => {} + } + } + let result = run_subagent( &task.runtime, task.agent_id.clone(), diff --git a/crates/tui/src/tools/subagent/tests.rs b/crates/tui/src/tools/subagent/tests.rs index 2128aff7..5ba28f1b 100644 --- a/crates/tui/src/tools/subagent/tests.rs +++ b/crates/tui/src/tools/subagent/tests.rs @@ -1333,6 +1333,7 @@ async fn api_timeout_preserves_checkpoint_and_agent_eval_continues_from_it() { started_at: Instant::now(), max_steps: 3, input_rx: task_input_rx, + launch_gate: None, }; let task_handle = tokio::spawn(run_subagent_task(task)); @@ -3085,6 +3086,7 @@ async fn run_subagent_task_emits_parent_completion_before_terminal_update() { started_at: Instant::now(), max_steps: 0, input_rx: task_input_rx, + launch_gate: None, }; let manager_lock = manager.write().await; @@ -3364,3 +3366,141 @@ fn format_step_counter_keeps_concrete_budgets() { assert_eq!(format_step_counter(3, 25), "step 3/25"); assert_eq!(format_step_counter(0, 1), "step 0/1"); } + +// ── #3095: interactive fanout launch gate ──────────────────────────────────── + +#[test] +fn launch_gate_defaults_to_interactive_limit_capped_by_max_agents() { + let tmp = tempdir().expect("tempdir"); + let manager = SubAgentManager::new(tmp.path().to_path_buf(), 10); + assert_eq!( + manager.launch_gate.available_permits(), + crate::config::DEFAULT_INTERACTIVE_LAUNCH_LIMIT + ); + + let small = SubAgentManager::new(tmp.path().to_path_buf(), 2); + assert_eq!(small.launch_gate.available_permits(), 2); + + let custom = + SubAgentManager::new(tmp.path().to_path_buf(), 10).with_interactive_launch_limit(0); + assert_eq!(custom.launch_gate.available_permits(), 1, "clamps up to 1"); + + let oversized = + SubAgentManager::new(tmp.path().to_path_buf(), 3).with_interactive_launch_limit(99); + assert_eq!( + oversized.launch_gate.available_permits(), + 3, + "clamps down to max_agents" + ); +} + +#[tokio::test] +async fn interactive_launch_gate_queues_extra_direct_children() { + use tokio::sync::Semaphore; + use tokio_util::sync::CancellationToken; + + let tmp = tempdir().expect("tempdir"); + let manager = Arc::new(RwLock::new(SubAgentManager::new( + tmp.path().to_path_buf(), + 4, + ))); + + let (client, _calls, _bodies) = delayed_chat_client(Duration::from_millis(150), "done").await; + let (mailbox, mut mailbox_rx) = Mailbox::new(CancellationToken::new()); + let mut runtime = stub_runtime(); + runtime.client = client; + runtime.manager = Arc::clone(&manager); + runtime.context = ToolContext::new(tmp.path()); + runtime.mailbox = Some(mailbox); + + let gate = Arc::new(Semaphore::new(1)); + let spawn = |agent_id: &str, gate: Option>| { + let (input_tx, input_rx) = mpsc::unbounded_channel(); + let agent = SubAgent::new( + agent_id.to_string(), + SubAgentType::General, + "Answer".to_string(), + make_assignment(), + "deepseek-v4-flash".to_string(), + None, + Some(vec![]), + input_tx, + tmp.path().to_path_buf(), + "boot_test".to_string(), + ); + let task = SubAgentTask { + manager_handle: Arc::clone(&manager), + runtime: runtime.clone(), + agent_id: agent_id.to_string(), + agent_type: SubAgentType::General, + prompt: "Answer".to_string(), + assignment: make_assignment(), + allowed_tools: Some(vec![]), + fork_context: false, + started_at: Instant::now(), + max_steps: 1, + input_rx, + launch_gate: gate, + }; + (agent, task) + }; + + let (agent_a, task_a) = spawn("agent_gate_a", Some(Arc::clone(&gate))); + let (agent_b, task_b) = spawn("agent_gate_b", Some(Arc::clone(&gate))); + { + let mut mgr = manager.write().await; + mgr.agents.insert(agent_a.id.clone(), agent_a); + mgr.agents.insert(agent_b.id.clone(), agent_b); + } + + tokio::spawn(run_subagent_task(task_a)); + // Give the first task time to take the only permit before the second + // task tries; the second must then queue with a visible reason. + tokio::time::sleep(Duration::from_millis(30)).await; + tokio::spawn(run_subagent_task(task_b)); + + let mut messages = Vec::new(); + let collected = tokio::time::timeout(Duration::from_secs(5), async { + let mut completed = 0; + while completed < 2 { + let Some(envelope) = mailbox_rx.recv().await else { + break; + }; + if matches!(envelope.message, MailboxMessage::Completed { .. }) { + completed += 1; + } + messages.push(envelope.message); + } + }) + .await; + assert!(collected.is_ok(), "both gated children should complete"); + + let queued_b = messages.iter().position(|m| { + matches!( + m, + MailboxMessage::Progress { agent_id, status } + if agent_id == "agent_gate_b" && status.contains("queued") + ) + }); + assert!( + queued_b.is_some(), + "second child must publish a visible queued reason: {messages:?}" + ); + + let completed_a = messages + .iter() + .position( + |m| matches!(m, MailboxMessage::Completed { agent_id, .. } if agent_id == "agent_gate_a"), + ) + .expect("first child completes"); + let started_b = messages + .iter() + .position( + |m| matches!(m, MailboxMessage::Started { agent_id, .. } if agent_id == "agent_gate_b"), + ) + .expect("second child eventually starts"); + assert!( + started_b > completed_a, + "queued child must not start until a permit frees: {messages:?}" + ); +} diff --git a/crates/tui/src/tui/ui.rs b/crates/tui/src/tui/ui.rs index 166e5714..426a88a5 100644 --- a/crates/tui/src/tui/ui.rs +++ b/crates/tui/src/tui/ui.rs @@ -882,6 +882,7 @@ fn build_engine_config(app: &App, config: &Config) -> EngineConfig { // human-noticeable; we trust the operator over a hard step cap. max_steps: u32::MAX, max_subagents: app.max_subagents, + interactive_launch_limit: config.interactive_launch_limit(), features: config.features(), compaction: app.compaction_config(), capacity: crate::core::capacity::CapacityControllerConfig::from_app_config(config),