feat(tui): queue interactive fanout launches behind a visible launch gate (#3095)

Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
This commit is contained in:
Hunter Bown
2026-06-12 00:49:13 +00:00
committed by Hunter B
parent e617e3e618
commit 385a07f32a
7 changed files with 262 additions and 2 deletions
+54
View File
@@ -20,6 +20,11 @@ use crate::hooks::HooksConfig;
pub const DEFAULT_MAX_SUBAGENTS: usize = 10;
pub const MAX_SUBAGENTS: usize = 20;
/// Default number of direct (depth-1) sub-agents that may execute
/// concurrently in an interactive session before further launches queue
/// for a slot (#3095). Deliberately lower than `DEFAULT_MAX_SUBAGENTS`,
/// which caps total live agents across the whole spawn tree.
pub const DEFAULT_INTERACTIVE_LAUNCH_LIMIT: usize = 4;
/// Default per-step DeepSeek API timeout for sub-agent requests, in seconds.
/// Matches the legacy hardcoded value so existing configs keep their old
/// behavior when `[subagents] api_timeout_secs` is unset (#1806, #1808).
@@ -1534,6 +1539,12 @@ pub struct SubagentsConfig {
/// setting. Clamped to [1, MAX_SUBAGENTS].
#[serde(default)]
pub max_concurrent: Option<usize>,
/// Number of direct (depth-1) sub-agents that may execute concurrently
/// before further interactive fanout launches queue for a slot (#3095).
/// Defaults to `DEFAULT_INTERACTIVE_LAUNCH_LIMIT` (4) and is clamped to
/// [1, max_subagents].
#[serde(default)]
pub interactive_max_launch: Option<usize>,
/// Per-step DeepSeek API timeout for sub-agent requests, in seconds. The
/// timeout wraps `client.create_message` so a stuck single step cannot
/// pin the parent's parent-completion wakeup channel indefinitely.
@@ -2922,6 +2933,20 @@ impl Config {
.clamp(1, MAX_SUBAGENTS)
}
/// Number of direct (depth-1) sub-agents that may execute concurrently
/// before further interactive fanout launches queue for a slot (#3095).
/// Reads `[subagents] interactive_max_launch`, defaults to
/// `DEFAULT_INTERACTIVE_LAUNCH_LIMIT`, and clamps to
/// `[1, max_subagents]`.
#[must_use]
pub fn interactive_launch_limit(&self) -> usize {
self.subagents
.as_ref()
.and_then(|cfg| cfg.interactive_max_launch)
.unwrap_or(DEFAULT_INTERACTIVE_LAUNCH_LIMIT)
.clamp(1, self.max_subagents())
}
/// Resolved per-step DeepSeek API timeout for sub-agents, in seconds.
///
/// Reads `[subagents] api_timeout_secs` and clamps to
@@ -6811,6 +6836,35 @@ action = "session.compact"
assert_eq!(DEFAULT_MAX_SUBAGENTS, 10);
}
#[test]
fn interactive_launch_limit_defaults_and_clamps_to_max_subagents() {
assert_eq!(
Config::default().interactive_launch_limit(),
DEFAULT_INTERACTIVE_LAUNCH_LIMIT
);
let mut config = Config {
subagents: Some(SubagentsConfig {
interactive_max_launch: Some(50),
..SubagentsConfig::default()
}),
..Config::default()
};
assert_eq!(config.interactive_launch_limit(), config.max_subagents());
config.subagents = Some(SubagentsConfig {
interactive_max_launch: Some(0),
..SubagentsConfig::default()
});
assert_eq!(config.interactive_launch_limit(), 1);
config.subagents = Some(SubagentsConfig {
interactive_max_launch: Some(2),
..SubagentsConfig::default()
});
assert_eq!(config.interactive_launch_limit(), 2);
}
#[test]
fn subagents_max_concurrent_overrides_top_level_cap() {
let config = Config {
+6
View File
@@ -278,6 +278,10 @@ pub struct EngineConfig {
pub max_steps: u32,
/// Maximum number of concurrently active subagents.
pub max_subagents: usize,
/// Number of direct (depth-1) sub-agents that may execute concurrently
/// before further interactive fanout launches queue for a slot (#3095).
/// Resolved from `[subagents] interactive_max_launch`.
pub interactive_launch_limit: usize,
/// Feature flags controlling tool availability.
pub features: Features,
/// Auto-compaction settings for long conversations.
@@ -391,6 +395,7 @@ impl Default for EngineConfig {
show_thinking: true,
max_steps: 100,
max_subagents: DEFAULT_MAX_SUBAGENTS,
interactive_launch_limit: crate::config::DEFAULT_INTERACTIVE_LAUNCH_LIMIT,
features: Features::with_defaults(),
compaction: CompactionConfig::default(),
capacity: CapacityControllerConfig::default(),
@@ -722,6 +727,7 @@ impl Engine {
config.workspace.clone(),
config.max_subagents,
config.subagent_heartbeat_timeout,
config.interactive_launch_limit,
);
let shell_manager = config
.runtime_services
+1
View File
@@ -5863,6 +5863,7 @@ async fn run_exec_agent(
show_thinking: settings.show_thinking,
max_steps: max_turns,
max_subagents,
interactive_launch_limit: config.interactive_launch_limit(),
features: config.features(),
compaction,
capacity: crate::core::capacity::CapacityControllerConfig::from_app_config(config),
+1
View File
@@ -2036,6 +2036,7 @@ impl RuntimeThreadManager {
show_thinking: settings.show_thinking,
max_steps: 100,
max_subagents: self.config.max_subagents().clamp(1, MAX_SUBAGENTS),
interactive_launch_limit: self.config.interactive_launch_limit(),
features: self.config.features(),
compaction,
capacity: crate::core::capacity::CapacityControllerConfig::from_app_config(
+59 -2
View File
@@ -14,7 +14,7 @@ use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::{Mutex, RwLock};
use tokio::sync::{Mutex, RwLock, Semaphore};
use anyhow::{Result, anyhow};
use async_trait::async_trait;
@@ -1197,6 +1197,13 @@ pub struct SubAgentManager {
/// agents whose `session_boot_id` doesn't match this value as
/// "from prior session" so `agent_list` can hide them by default.
current_session_boot_id: String,
/// Launch gate for direct (depth-1) interactive fanout (#3095). Each
/// permit is one actively executing direct child; further direct
/// children spawn immediately but queue for a permit before starting,
/// publishing a visible "queued" reason instead of bursting. Deeper
/// descendants bypass the gate so a permit-holding parent waiting on
/// its own children cannot deadlock the tree.
launch_gate: Arc<Semaphore>,
}
impl SubAgentManager {
@@ -1215,9 +1222,20 @@ impl SubAgentManager {
// Fresh boot id per manager. Used by #405 to classify
// re-loaded persisted agents as "prior session".
current_session_boot_id: format!("boot_{}", &Uuid::new_v4().to_string()[..12]),
launch_gate: Arc::new(Semaphore::new(
crate::config::DEFAULT_INTERACTIVE_LAUNCH_LIMIT.min(max_agents),
)),
}
}
/// Set the number of direct children that may execute concurrently
/// before further launches queue (#3095). Clamped to `1..=max_agents`.
#[must_use]
pub fn with_interactive_launch_limit(mut self, limit: usize) -> Self {
self.launch_gate = Arc::new(Semaphore::new(limit.clamp(1, self.max_agents)));
self
}
/// Return the boot id this manager stamps on agents it spawns.
/// Exposed for tests; internal callers use the field directly.
#[cfg(test)]
@@ -1540,6 +1558,7 @@ impl SubAgentManager {
});
}
let launch_gate = (runtime.spawn_depth == 1).then(|| self.launch_gate.clone());
let task = SubAgentTask {
manager_handle,
runtime,
@@ -1552,6 +1571,7 @@ impl SubAgentManager {
started_at,
max_steps,
input_rx,
launch_gate,
};
let handle = spawn_supervised(
"subagent-task",
@@ -1670,6 +1690,7 @@ impl SubAgentManager {
if !agent.model.trim().is_empty() && agent.model != "unknown" {
restart_runtime.model.clone_from(&agent.model);
}
let launch_gate = (restart_runtime.spawn_depth == 1).then(|| self.launch_gate.clone());
let task = SubAgentTask {
manager_handle,
runtime: restart_runtime,
@@ -1682,6 +1703,7 @@ impl SubAgentManager {
started_at: restarted_at,
max_steps: self.max_steps,
input_rx,
launch_gate,
};
let handle = spawn_supervised(
"subagent-task-resume",
@@ -2188,6 +2210,7 @@ pub fn new_shared_subagent_manager(workspace: PathBuf, max_agents: usize) -> Sha
workspace,
max_agents,
Duration::from_secs(crate::config::DEFAULT_SUBAGENT_HEARTBEAT_TIMEOUT_SECS),
crate::config::DEFAULT_INTERACTIVE_LAUNCH_LIMIT,
)
}
@@ -2198,11 +2221,13 @@ pub fn new_shared_subagent_manager_with_timeout(
workspace: PathBuf,
max_agents: usize,
running_heartbeat_timeout: Duration,
interactive_launch_limit: usize,
) -> SharedSubAgentManager {
let max_agents = max_agents.clamp(1, MAX_SUBAGENTS);
let state_path = default_state_path(&workspace);
let mut manager = SubAgentManager::new(workspace, max_agents)
.with_running_heartbeat_timeout(running_heartbeat_timeout)
.with_interactive_launch_limit(interactive_launch_limit)
.with_state_path(state_path);
if let Err(err) = manager.load_state() {
// Routed through tracing instead of stderr — see comment in
@@ -2505,7 +2530,7 @@ impl ToolSpec for AgentSpawnTool {
fn description(&self) -> &'static str {
concat!(
"Spawn a background sub-agent for a focused task. Returns an agent_id immediately; follow with agent_result to retrieve the final result. Default cap of 10 concurrent sub-agents (configurable via `[subagents].max_concurrent`); each is a full sub-agent loop, so cancel or wait if you hit the cap. For parallel one-shot LLM queries, just emit multiple tool calls in one turn — the dispatcher runs them in parallel.\n\n",
"Spawn a background sub-agent for a focused task. Returns an agent_id immediately; follow with agent_result to retrieve the final result. Default cap of 10 concurrent sub-agents (configurable via `[subagents].max_concurrent`); each is a full sub-agent loop, so cancel or wait if you hit the cap. Direct children beyond the interactive launch limit (default 4, `[subagents].interactive_max_launch`) queue for a slot instead of executing at once. For parallel one-shot LLM queries, just emit multiple tool calls in one turn — the dispatcher runs them in parallel.\n\n",
"## Trust model: subagent results are self-reports, not verified facts\n\n",
"`agent_result` returns the child's narrative summary of what happened. For operations with external side effects, the child's summary may be wrong. Re-verify before reporting success to the user:\n\n",
"| Side effect | Re-verify with |\n|---|---|\n| URL claimed posted/written | `fetch_url` and check the response |\n| File claimed created | `read_file` or `list_dir` |\n| File claimed edited | `read_file` and check the change is present |\n| HTTP POST/PUT response | inspect status code and body |\n| Git operation | `git_status` / `git_diff` |\n| Test claimed passing | `run_tests` |\n| Process claimed started | `exec_shell` (e.g. `pgrep`, `lsof -i`) |\n\n",
@@ -3855,10 +3880,42 @@ struct SubAgentTask {
started_at: Instant,
max_steps: u32,
input_rx: mpsc::UnboundedReceiver<SubAgentInput>,
/// Interactive launch gate (#3095). `Some` only for direct (depth-1)
/// children: the task acquires a permit before its first model step and
/// holds it until completion, so a fanout burst beyond the limit queues
/// with a visible reason instead of executing all at once.
launch_gate: Option<Arc<Semaphore>>,
}
#[allow(clippy::too_many_lines)]
async fn run_subagent_task(task: SubAgentTask) {
// Interactive launch gate (#3095): direct children acquire a permit
// before their first model step so a fanout burst beyond the limit
// queues visibly instead of executing all at once. The permit is held
// for the lifetime of the task. Cancellation while queued is handled by
// `run_subagent`'s own first-step cancel check.
let mut _launch_permit = None;
if let Some(gate) = task.launch_gate.as_ref() {
match Arc::clone(gate).try_acquire_owned() {
Ok(permit) => _launch_permit = Some(permit),
Err(tokio::sync::TryAcquireError::NoPermits) => {
record_agent_progress(
&task.runtime,
&task.agent_id,
"queued: waiting for an interactive fanout slot".to_string(),
);
if let Some(mb) = task.runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::progress(
&task.agent_id,
"queued: waiting for an interactive fanout slot".to_string(),
));
}
_launch_permit = Arc::clone(gate).acquire_owned().await.ok();
}
Err(tokio::sync::TryAcquireError::Closed) => {}
}
}
let result = run_subagent(
&task.runtime,
task.agent_id.clone(),
+140
View File
@@ -1333,6 +1333,7 @@ async fn api_timeout_preserves_checkpoint_and_agent_eval_continues_from_it() {
started_at: Instant::now(),
max_steps: 3,
input_rx: task_input_rx,
launch_gate: None,
};
let task_handle = tokio::spawn(run_subagent_task(task));
@@ -3085,6 +3086,7 @@ async fn run_subagent_task_emits_parent_completion_before_terminal_update() {
started_at: Instant::now(),
max_steps: 0,
input_rx: task_input_rx,
launch_gate: None,
};
let manager_lock = manager.write().await;
@@ -3364,3 +3366,141 @@ fn format_step_counter_keeps_concrete_budgets() {
assert_eq!(format_step_counter(3, 25), "step 3/25");
assert_eq!(format_step_counter(0, 1), "step 0/1");
}
// ── #3095: interactive fanout launch gate ────────────────────────────────────
#[test]
fn launch_gate_defaults_to_interactive_limit_capped_by_max_agents() {
let tmp = tempdir().expect("tempdir");
let manager = SubAgentManager::new(tmp.path().to_path_buf(), 10);
assert_eq!(
manager.launch_gate.available_permits(),
crate::config::DEFAULT_INTERACTIVE_LAUNCH_LIMIT
);
let small = SubAgentManager::new(tmp.path().to_path_buf(), 2);
assert_eq!(small.launch_gate.available_permits(), 2);
let custom =
SubAgentManager::new(tmp.path().to_path_buf(), 10).with_interactive_launch_limit(0);
assert_eq!(custom.launch_gate.available_permits(), 1, "clamps up to 1");
let oversized =
SubAgentManager::new(tmp.path().to_path_buf(), 3).with_interactive_launch_limit(99);
assert_eq!(
oversized.launch_gate.available_permits(),
3,
"clamps down to max_agents"
);
}
#[tokio::test]
async fn interactive_launch_gate_queues_extra_direct_children() {
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
let tmp = tempdir().expect("tempdir");
let manager = Arc::new(RwLock::new(SubAgentManager::new(
tmp.path().to_path_buf(),
4,
)));
let (client, _calls, _bodies) = delayed_chat_client(Duration::from_millis(150), "done").await;
let (mailbox, mut mailbox_rx) = Mailbox::new(CancellationToken::new());
let mut runtime = stub_runtime();
runtime.client = client;
runtime.manager = Arc::clone(&manager);
runtime.context = ToolContext::new(tmp.path());
runtime.mailbox = Some(mailbox);
let gate = Arc::new(Semaphore::new(1));
let spawn = |agent_id: &str, gate: Option<Arc<Semaphore>>| {
let (input_tx, input_rx) = mpsc::unbounded_channel();
let agent = SubAgent::new(
agent_id.to_string(),
SubAgentType::General,
"Answer".to_string(),
make_assignment(),
"deepseek-v4-flash".to_string(),
None,
Some(vec![]),
input_tx,
tmp.path().to_path_buf(),
"boot_test".to_string(),
);
let task = SubAgentTask {
manager_handle: Arc::clone(&manager),
runtime: runtime.clone(),
agent_id: agent_id.to_string(),
agent_type: SubAgentType::General,
prompt: "Answer".to_string(),
assignment: make_assignment(),
allowed_tools: Some(vec![]),
fork_context: false,
started_at: Instant::now(),
max_steps: 1,
input_rx,
launch_gate: gate,
};
(agent, task)
};
let (agent_a, task_a) = spawn("agent_gate_a", Some(Arc::clone(&gate)));
let (agent_b, task_b) = spawn("agent_gate_b", Some(Arc::clone(&gate)));
{
let mut mgr = manager.write().await;
mgr.agents.insert(agent_a.id.clone(), agent_a);
mgr.agents.insert(agent_b.id.clone(), agent_b);
}
tokio::spawn(run_subagent_task(task_a));
// Give the first task time to take the only permit before the second
// task tries; the second must then queue with a visible reason.
tokio::time::sleep(Duration::from_millis(30)).await;
tokio::spawn(run_subagent_task(task_b));
let mut messages = Vec::new();
let collected = tokio::time::timeout(Duration::from_secs(5), async {
let mut completed = 0;
while completed < 2 {
let Some(envelope) = mailbox_rx.recv().await else {
break;
};
if matches!(envelope.message, MailboxMessage::Completed { .. }) {
completed += 1;
}
messages.push(envelope.message);
}
})
.await;
assert!(collected.is_ok(), "both gated children should complete");
let queued_b = messages.iter().position(|m| {
matches!(
m,
MailboxMessage::Progress { agent_id, status }
if agent_id == "agent_gate_b" && status.contains("queued")
)
});
assert!(
queued_b.is_some(),
"second child must publish a visible queued reason: {messages:?}"
);
let completed_a = messages
.iter()
.position(
|m| matches!(m, MailboxMessage::Completed { agent_id, .. } if agent_id == "agent_gate_a"),
)
.expect("first child completes");
let started_b = messages
.iter()
.position(
|m| matches!(m, MailboxMessage::Started { agent_id, .. } if agent_id == "agent_gate_b"),
)
.expect("second child eventually starts");
assert!(
started_b > completed_a,
"queued child must not start until a permit frees: {messages:?}"
);
}
+1
View File
@@ -882,6 +882,7 @@ fn build_engine_config(app: &App, config: &Config) -> EngineConfig {
// human-noticeable; we trust the operator over a hard step cap.
max_steps: u32::MAX,
max_subagents: app.max_subagents,
interactive_launch_limit: config.interactive_launch_limit(),
features: config.features(),
compaction: app.compaction_config(),
capacity: crate::core::capacity::CapacityControllerConfig::from_app_config(config),