Files
codewhale/crates/tui/src/tools/subagent/tests.rs
T
Hunter B 5ca618d70a fix(subagents): make eval and queued steering nonblocking
Make agent_eval return a running projection by default so follow-up steering does not wait for child model calls. Keep checkpoint resume blocking by default unless block=false is explicit.

Teach /agent, /swarm, prompts, and docs to poll workers nonblocking and reserve block:true for deliberate terminal waits. Add Ctrl+S as a reliable queued-message send path before falling back to draft stash.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-13 13:31:21 -07:00

3941 lines
134 KiB
Rust

use super::*;
use axum::{Json, Router, routing::post};
use std::process::Command;
use std::sync::atomic::{AtomicUsize, Ordering};
use tempfile::tempdir;
fn make_assignment() -> SubAgentAssignment {
SubAgentAssignment::new("prompt".to_string(), Some("worker".to_string()))
}
fn make_snapshot(status: SubAgentStatus) -> SubAgentResult {
SubAgentResult {
name: "agent_test".to_string(),
agent_id: "agent_test".to_string(),
context_mode: "fresh".to_string(),
fork_context: false,
workspace: None,
git_branch: None,
agent_type: SubAgentType::General,
assignment: make_assignment(),
model: "deepseek-v4-flash".to_string(),
nickname: None,
status,
result: None,
steps_taken: 0,
checkpoint: None,
duration_ms: 0,
from_prior_session: false,
}
}
fn make_worker_spec(worker_id: &str, workspace: PathBuf) -> AgentWorkerSpec {
AgentWorkerSpec {
worker_id: worker_id.to_string(),
run_id: worker_id.to_string(),
parent_run_id: None,
session_name: Some(worker_id.to_string()),
objective: "inspect the repo".to_string(),
role: Some("explorer".to_string()),
agent_type: SubAgentType::Explore,
model: "deepseek-v4-flash".to_string(),
workspace,
git_branch: None,
context_mode: "fresh".to_string(),
fork_context: false,
tool_profile: AgentWorkerToolProfile::Explicit(vec![
"read_file".to_string(),
"grep_files".to_string(),
]),
max_steps: 8,
spawn_depth: 1,
max_spawn_depth: DEFAULT_MAX_SPAWN_DEPTH,
}
}
#[test]
fn headless_worker_record_tracks_lifecycle_without_tui_projection() {
let tmp = tempdir().expect("tempdir");
let mut manager = SubAgentManager::new(tmp.path().to_path_buf(), 4);
manager.register_worker(make_worker_spec(
"agent_worker_contract",
tmp.path().to_path_buf(),
));
manager.record_worker_event(
"agent_worker_contract",
AgentWorkerStatus::Queued,
Some(SUBAGENT_QUEUED_LAUNCH_REASON.to_string()),
None,
None,
);
manager.record_worker_progress(
"agent_worker_contract",
"step 1: requesting model response".to_string(),
);
manager.record_worker_progress(
"agent_worker_contract",
"step 1: running tool 'read_file'".to_string(),
);
let mut result = make_snapshot(SubAgentStatus::Completed);
result.agent_id = "agent_worker_contract".to_string();
result.name = "agent_worker_contract".to_string();
result.result = Some("worker summary".to_string());
result.steps_taken = 1;
manager.complete_worker_from_result("agent_worker_contract", &result);
let record = manager
.get_worker_record("agent_worker_contract")
.expect("worker record");
assert_eq!(record.status, AgentWorkerStatus::Completed);
assert_eq!(record.spec.run_id, "agent_worker_contract");
assert_eq!(record.actor_kind, "subagent");
assert_eq!(record.spec.agent_type, SubAgentType::Explore);
assert_eq!(
record.spec.tool_profile,
AgentWorkerToolProfile::Explicit(vec!["read_file".to_string(), "grep_files".to_string()])
);
assert_eq!(record.result_summary.as_deref(), Some("worker summary"));
assert_eq!(record.steps_taken, 1);
assert_eq!(record.follow_up.tool, "agent_eval");
assert_eq!(record.follow_up.agent_id.as_str(), "agent_worker_contract");
assert!(record.takeover.supported);
assert!(
record
.takeover
.instructions
.contains("agent_eval with agent_id")
);
assert_eq!(record.usage.status, "unknown");
assert_eq!(record.verification.status, "self_report_only");
assert!(
record
.artifacts
.iter()
.any(|artifact| artifact.kind == "transcript")
);
let statuses: Vec<_> = record.events.iter().map(|event| event.status).collect();
assert!(statuses.contains(&AgentWorkerStatus::Queued));
assert!(statuses.contains(&AgentWorkerStatus::ModelWait));
assert!(statuses.contains(&AgentWorkerStatus::RunningTool));
assert!(statuses.contains(&AgentWorkerStatus::Completed));
assert!(
record
.events
.iter()
.any(|event| event.tool_name.as_deref() == Some("read_file"))
);
}
#[test]
fn subagent_progress_displays_shell_tools_as_bash() {
assert_eq!(subagent_progress_tool_display_name("exec_shell"), "Bash");
assert_eq!(subagent_progress_tool_display_name("exec_wait"), "Bash");
assert_eq!(
subagent_progress_tool_display_name("task_shell_wait"),
"Bash"
);
assert_eq!(
subagent_progress_tool_display_name("read_file"),
"read_file"
);
}
#[test]
fn headless_worker_records_persist_with_subagent_state() {
let tmp = tempdir().expect("tempdir");
let state_path = tmp.path().join("subagents.v1.json");
let mut manager =
SubAgentManager::new(tmp.path().to_path_buf(), 4).with_state_path(state_path.clone());
manager.register_worker(make_worker_spec(
"agent_persisted",
tmp.path().to_path_buf(),
));
let mut result = make_snapshot(SubAgentStatus::Failed("boom".to_string()));
result.agent_id = "agent_persisted".to_string();
result.name = "agent_persisted".to_string();
result.steps_taken = 3;
manager.complete_worker_from_result("agent_persisted", &result);
manager.persist_state().expect("persist state");
let mut loaded = SubAgentManager::new(tmp.path().to_path_buf(), 4).with_state_path(state_path);
loaded.load_state().expect("load state");
let record = loaded.get_worker_record("agent_persisted").expect("record");
assert_eq!(record.spec.run_id, "agent_persisted");
assert_eq!(record.follow_up.agent_id, "agent_persisted");
assert!(record.takeover.supported);
assert_eq!(record.status, AgentWorkerStatus::Failed);
assert_eq!(record.error.as_deref(), Some("boom"));
assert_eq!(record.steps_taken, 3);
assert!(
record
.events
.iter()
.any(|event| event.status == AgentWorkerStatus::Failed)
);
}
fn init_subagent_git_repo() -> tempfile::TempDir {
let dir = tempdir().expect("tempdir");
let init = Command::new("git")
.arg("init")
.current_dir(dir.path())
.output()
.expect("git init should run");
assert!(
init.status.success(),
"git init failed: {}",
String::from_utf8_lossy(&init.stderr)
);
let autocrlf = Command::new("git")
.args(["config", "core.autocrlf", "false"])
.current_dir(dir.path())
.output()
.expect("git config core.autocrlf should run");
assert!(
autocrlf.status.success(),
"git config core.autocrlf failed: {}",
String::from_utf8_lossy(&autocrlf.stderr)
);
let commit = Command::new("git")
.args([
"-c",
"user.name=codewhale Tests",
"-c",
"user.email=tests@example.com",
"-c",
"commit.gpgsign=false",
"commit",
"--allow-empty",
"-m",
"init",
])
.current_dir(dir.path())
.output()
.expect("git commit should run");
assert!(
commit.status.success(),
"git commit failed: {}",
String::from_utf8_lossy(&commit.stderr)
);
dir
}
fn git(repo: &Path, args: &[&str]) {
let output = Command::new("git")
.args(args)
.current_dir(repo)
.output()
.expect("git command should run");
assert!(
output.status.success(),
"git {args:?} failed: {}",
String::from_utf8_lossy(&output.stderr)
);
}
fn text_message(role: &str, text: &str) -> Message {
Message {
role: role.to_string(),
content: vec![ContentBlock::Text {
text: text.to_string(),
cache_control: None,
}],
}
}
fn make_checkpoint(agent_id: &str, steps_taken: u32, messages: Vec<Message>) -> SubAgentCheckpoint {
build_subagent_checkpoint(agent_id, "test_checkpoint", &messages, steps_taken, true)
}
fn message_text(message: &Message) -> &str {
match message.content.first() {
Some(ContentBlock::Text { text, .. }) => text.as_str(),
other => panic!("expected text content block, got {other:?}"),
}
}
async fn delayed_chat_client(
first_delay: Duration,
response_text: &str,
) -> (
DeepSeekClient,
Arc<AtomicUsize>,
Arc<std::sync::Mutex<Vec<Value>>>,
) {
let calls = Arc::new(AtomicUsize::new(0));
let bodies = Arc::new(std::sync::Mutex::new(Vec::new()));
let response_text = response_text.to_string();
let app = Router::new().route(
"/{*path}",
post({
let calls = Arc::clone(&calls);
let bodies = Arc::clone(&bodies);
move |Json(body): Json<Value>| {
let calls = Arc::clone(&calls);
let bodies = Arc::clone(&bodies);
let response_text = response_text.clone();
async move {
let attempt = calls.fetch_add(1, Ordering::SeqCst) + 1;
bodies
.lock()
.expect("request body recorder mutex poisoned")
.push(body);
if attempt == 1 {
tokio::time::sleep(first_delay).await;
}
Json(json!({
"id": format!("chatcmpl-test-{attempt}"),
"model": "deepseek-v4-flash",
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": response_text
},
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": 1,
"completion_tokens": 1,
"total_tokens": 2
}
}))
}
}
}),
);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("bind fake chat server");
let addr = listener.local_addr().expect("fake chat server addr");
tokio::spawn(async move {
let _ = axum::serve(listener, app).await;
});
let config = crate::config::Config {
api_key: Some("test-key".to_string()),
base_url: Some(format!("http://{addr}/v1")),
..crate::config::Config::default()
};
let client = DeepSeekClient::new(&config).expect("fake chat client");
(client, calls, bodies)
}
fn estimate_tool_description_tokens_conservative(text: &str) -> usize {
text.chars().count().div_ceil(3)
}
#[test]
fn test_agent_type_from_str() {
assert_eq!(
SubAgentType::from_str("general"),
Some(SubAgentType::General)
);
assert_eq!(
SubAgentType::from_str("explore"),
Some(SubAgentType::Explore)
);
assert_eq!(SubAgentType::from_str("PLAN"), Some(SubAgentType::Plan));
assert_eq!(
SubAgentType::from_str("code-review"),
Some(SubAgentType::Review)
);
assert_eq!(
SubAgentType::from_str("worker"),
Some(SubAgentType::General)
);
assert_eq!(
SubAgentType::from_str("default"),
Some(SubAgentType::General)
);
assert_eq!(
SubAgentType::from_str("explorer"),
Some(SubAgentType::Explore)
);
assert_eq!(SubAgentType::from_str("awaiter"), Some(SubAgentType::Plan));
assert_eq!(
SubAgentType::from_str("tool-agent"),
Some(SubAgentType::ToolAgent)
);
assert_eq!(SubAgentType::from_str("fin"), Some(SubAgentType::ToolAgent));
assert_eq!(SubAgentType::from_str("invalid"), None);
}
#[test]
fn test_agent_type_implementer_aliases() {
// #404 — Implementer accepts the obvious aliases the model is
// likely to reach for when the user says "build this".
for alias in ["implementer", "implement", "implementation", "builder"] {
assert_eq!(
SubAgentType::from_str(alias),
Some(SubAgentType::Implementer),
"alias {alias} should resolve to Implementer"
);
}
// Case-insensitive.
assert_eq!(
SubAgentType::from_str("IMPLEMENTER"),
Some(SubAgentType::Implementer)
);
}
#[test]
fn test_agent_type_verifier_aliases() {
// #404 — Verifier accepts test/validate aliases distinct from
// Reviewer, which is for *grading* code rather than *running* it.
for alias in ["verifier", "verify", "verification", "validator", "tester"] {
assert_eq!(
SubAgentType::from_str(alias),
Some(SubAgentType::Verifier),
"alias {alias} should resolve to Verifier"
);
}
assert_eq!(
SubAgentType::from_str("VERIFY"),
Some(SubAgentType::Verifier)
);
}
#[test]
fn test_agent_type_round_trips_via_as_str() {
// Every type should serialize to a string that round-trips back
// through `from_str`. Catches missed variants when adding a new
// role.
for t in [
SubAgentType::General,
SubAgentType::Explore,
SubAgentType::Plan,
SubAgentType::Review,
SubAgentType::Implementer,
SubAgentType::Verifier,
SubAgentType::ToolAgent,
SubAgentType::Custom,
] {
let label = t.as_str();
let back = SubAgentType::from_str(label)
.unwrap_or_else(|| panic!("as_str label {label:?} doesn't round-trip via from_str"));
assert_eq!(back, t, "round-trip failed for {t:?} via {label:?}");
}
}
#[test]
fn test_implementer_and_verifier_have_distinct_prompts() {
// The whole point of adding the types is that they carry distinct
// posture. Defensive guard: catch the easy bug where copy-paste
// leaves two new variants with the same prompt as `General`.
let implementer = SubAgentType::Implementer.system_prompt();
let verifier = SubAgentType::Verifier.system_prompt();
let general = SubAgentType::General.system_prompt();
assert_ne!(
implementer, general,
"Implementer prompt must differ from General"
);
assert_ne!(
verifier, general,
"Verifier prompt must differ from General"
);
assert_ne!(
implementer, verifier,
"Implementer and Verifier must differ"
);
// Sanity: each prompt mentions the role's defining verb so the
// model has clear direction.
assert!(
implementer.to_lowercase().contains("implement")
|| implementer.to_lowercase().contains("write the code"),
"Implementer prompt should reference its role: {implementer}"
);
assert!(
verifier.to_lowercase().contains("verif")
|| verifier.to_lowercase().contains("test suite")
|| verifier.to_lowercase().contains("validation"),
"Verifier prompt should reference its role: {verifier}"
);
}
#[test]
fn test_agent_type_prompts_include_shared_output_contract_once() {
for (agent_type, marker) in [
(SubAgentType::General, "general-purpose sub-agent"),
(SubAgentType::Explore, "exploration sub-agent"),
(SubAgentType::Plan, "planning sub-agent"),
(SubAgentType::Review, "code review sub-agent"),
(SubAgentType::Implementer, "implementation sub-agent"),
(SubAgentType::Verifier, "verification sub-agent"),
(SubAgentType::ToolAgent, "tool execution sub-agent"),
(SubAgentType::Custom, "custom sub-agent"),
] {
let prompt = agent_type.system_prompt();
assert!(prompt.contains(marker));
assert_eq!(
prompt.matches("## Output contract (mandatory)").count(),
1,
"{agent_type:?} prompt should include the shared output contract exactly once"
);
assert!(prompt.contains("### SUMMARY") && prompt.contains("### BLOCKERS"));
}
}
#[test]
fn explore_prompt_orients_before_searching() {
let prompt = SubAgentType::Explore.system_prompt();
assert!(prompt.contains("role: `explore`"));
assert!(prompt.contains("AGENTS.md/README"));
assert!(prompt.contains("workspace/project root"));
assert!(prompt.contains("compressed reconnaissance"));
}
#[test]
fn agent_open_description_explains_fresh_vs_forked_context_and_trust_model() {
let tmp = tempdir().expect("tempdir");
let manager = new_shared_subagent_manager(tmp.path().to_path_buf(), 1);
let tool = AgentOpenTool::new(manager, stub_runtime());
let description = tool.description();
assert!(description.contains("fresh child with an independent prefill"));
assert!(description.contains("fork_context=true"));
assert!(description.contains("byte-identically"));
assert!(description.contains("DeepSeek can reuse its prefix cache"));
assert!(description.contains("Sub-agent results are self-reports"));
assert!(
estimate_tool_description_tokens_conservative(description) <= 1024,
"agent_open description exceeds the conservative 1024-token budget"
);
}
#[test]
fn new_session_tools_use_open_eval_close_names() {
let manager = Arc::new(RwLock::new(SubAgentManager::new(PathBuf::from("."), 1)));
assert_eq!(
AgentOpenTool::new(manager.clone(), stub_runtime()).name(),
"agent_open"
);
assert_eq!(AgentEvalTool::new(manager.clone()).name(), "agent_eval");
assert_eq!(
ToolAgentTool::new(manager.clone(), stub_runtime()).name(),
"tool_agent"
);
assert_eq!(AgentCloseTool::new(manager).name(), "agent_close");
}
#[test]
fn tool_agent_description_explains_fast_lane() {
let tmp = tempdir().expect("tempdir");
let manager = new_shared_subagent_manager(tmp.path().to_path_buf(), 1);
let tool = ToolAgentTool::new(manager, stub_runtime());
let description = tool.description();
assert!(description.contains("Fin"));
assert!(description.contains("Flash"));
assert!(description.contains("thinking forced off"));
assert!(description.contains("OCR"));
}
#[test]
fn test_implementer_allowed_tools_include_writes() {
// Implementer is the write-heavy role; the deprecated
// `allowed_tools()` advisory list should reflect that the role
// can write/edit/patch even if today's runtime grants full
// inheritance.
#[allow(deprecated)]
let tools = SubAgentType::Implementer.allowed_tools();
assert!(tools.contains(&"write_file"));
assert!(tools.contains(&"edit_file"));
assert!(tools.contains(&"apply_patch"));
}
#[test]
fn test_verifier_allowed_tools_include_test_runner_but_no_writes() {
// Verifier runs validation; it should not have write tools in
// its advisory list. The runtime will still gate writes through
// approval, but the advisory list signals intent.
#[allow(deprecated)]
let tools = SubAgentType::Verifier.allowed_tools();
assert!(tools.contains(&"run_tests"));
assert!(tools.contains(&"run_verifiers"));
assert!(tools.contains(&"diagnostics"));
assert!(!tools.contains(&"write_file"));
assert!(!tools.contains(&"apply_patch"));
}
#[test]
fn test_parse_spawn_request_accepts_message_and_agent_type_aliases() {
let input = json!({
"message": "Find references to Foo",
"agent_type": "explorer"
});
let parsed = parse_spawn_request(&input).expect("spawn request should parse");
assert_eq!(parsed.prompt, "Find references to Foo");
assert_eq!(parsed.agent_type, SubAgentType::Explore);
assert_eq!(parsed.assignment.role.as_deref(), Some("explorer"));
}
#[test]
fn test_parse_spawn_request_accepts_objective_and_role_alias() {
let input = json!({
"objective": "Coordinate and wait",
"role": "awaiter"
});
let parsed = parse_spawn_request(&input).expect("spawn request should parse");
assert_eq!(parsed.prompt, "Coordinate and wait");
assert_eq!(parsed.agent_type, SubAgentType::Plan);
assert_eq!(parsed.assignment.role.as_deref(), Some("awaiter"));
}
#[test]
fn test_parse_spawn_request_accepts_items_payload() {
let input = json!({
"items": [
{"type": "text", "text": "Analyze module"},
{"type": "mention", "name": "drive", "path": "app://drive"}
],
"agent_name": "explorer"
});
let parsed = parse_spawn_request(&input).expect("spawn request should parse");
assert!(parsed.prompt.contains("Analyze module"));
assert!(parsed.prompt.contains("[mention:$drive](app://drive)"));
assert_eq!(parsed.agent_type, SubAgentType::Explore);
}
#[test]
fn test_parse_spawn_request_accepts_fork_context() {
let input = json!({
"prompt": "continue from here",
"fork_context": true
});
let parsed = parse_spawn_request(&input).expect("spawn request should parse");
assert!(parsed.fork_context);
let input = json!({
"prompt": "continue from here",
"inherit_context": true
});
let parsed = parse_spawn_request(&input).expect("spawn request should parse");
assert!(parsed.fork_context);
}
#[test]
fn test_parse_spawn_request_accepts_session_name_for_agent_open() {
let input = json!({
"name": "review.parser",
"prompt": "inspect parser",
"fork_context": true,
"max_depth": 0
});
let parsed = parse_spawn_request(&input).expect("open request should parse");
assert_eq!(parsed.session_name.as_deref(), Some("review.parser"));
assert!(parsed.fork_context);
assert_eq!(parsed.max_depth, Some(0));
}
#[test]
fn test_parse_spawn_request_accepts_tool_agent_aliases() {
let input = json!({
"prompt": "OCR this screenshot",
"agent_type": "tool-agent"
});
let parsed = parse_spawn_request(&input).expect("spawn request should parse");
assert_eq!(parsed.agent_type, SubAgentType::ToolAgent);
assert_eq!(parsed.assignment.role.as_deref(), Some("tool_agent"));
}
#[test]
fn test_parse_spawn_request_rejects_invalid_session_name() {
let input = json!({
"name": "bad name",
"prompt": "inspect parser"
});
let err = parse_spawn_request(&input).expect_err("space in name should fail");
assert!(err.to_string().contains("name must not contain whitespace"));
}
#[test]
fn test_parse_spawn_request_rejects_out_of_range_max_depth() {
let input = json!({
"name": "review.parser",
"prompt": "inspect parser",
"max_depth": 4
});
let err = parse_spawn_request(&input).expect_err("max_depth should be capped at schema range");
assert!(
err.to_string()
.contains("max_depth must be between 0 and 3")
);
}
#[tokio::test]
async fn session_projection_exposes_forked_prefix_cache_contract() {
let mut snapshot = make_snapshot(SubAgentStatus::Running);
snapshot.name = "fanout_review".to_string();
snapshot.context_mode = "forked".to_string();
snapshot.fork_context = true;
let ctx = ToolContext::new(".");
let projection = subagent_session_projection(snapshot, false, &ctx, None).await;
assert_eq!(projection.name, "fanout_review");
assert_eq!(projection.context_mode, "forked");
assert_eq!(projection.run_id, "agent_test");
assert_eq!(projection.follow_up.tool, "agent_eval");
assert_eq!(projection.follow_up.agent_id, "agent_test");
assert!(projection.takeover.supported);
assert_eq!(projection.usage.status, "unknown");
assert_eq!(projection.verification.status, "self_report_only");
assert!(projection.fork_context);
assert_eq!(projection.prefix_cache.mode, "forked");
assert_eq!(
projection.prefix_cache.parent_prefix,
"preserved_byte_identical_when_available"
);
assert_eq!(projection.transcript_handle.kind, "var_handle");
assert_eq!(projection.transcript_handle.name, "transcript");
}
#[tokio::test]
async fn terminal_session_projection_prefers_full_transcript_handle() {
let mut snapshot = make_snapshot(SubAgentStatus::Completed);
snapshot.result = Some("done".to_string());
let ctx = ToolContext::new(".");
let full_handle = {
let mut store = ctx.runtime.handle_store.lock().await;
store.insert_json(
"agent:agent_test",
"full_transcript",
json!({
"kind": "subagent_full_transcript",
"agent_id": "agent_test",
"messages": [
{
"role": "assistant",
"content": [
{ "type": "text", "text": "complete child output" }
]
}
]
}),
)
};
let projection = subagent_session_projection(snapshot, false, &ctx, None).await;
assert_eq!(projection.transcript_handle, full_handle);
assert_eq!(projection.transcript_handle.name, "full_transcript");
}
#[tokio::test]
async fn interrupted_projection_exposes_checkpoint_metadata_and_messages() {
let mut snapshot = make_snapshot(SubAgentStatus::Interrupted(
"API call timed out after 10ms".to_string(),
));
let checkpoint = make_checkpoint(
&snapshot.agent_id,
1,
vec![text_message("user", "inspect checkpoint recovery")],
);
snapshot.steps_taken = checkpoint.steps_taken;
snapshot.checkpoint = Some(checkpoint.clone());
let ctx = ToolContext::new(".");
let projection = subagent_session_projection(snapshot, false, &ctx, None).await;
assert_eq!(projection.status, "interrupted");
assert!(projection.terminal);
assert!(projection.continuable);
assert_eq!(
projection
.checkpoint
.as_ref()
.expect("checkpoint projected")
.continuation_handle,
checkpoint.continuation_handle
);
assert_eq!(
projection
.snapshot
.checkpoint
.as_ref()
.map(|cp| cp.message_count),
Some(1)
);
assert_eq!(
projection
.checkpoint
.as_ref()
.and_then(|cp| cp.messages.first())
.map(message_text),
Some("inspect checkpoint recovery")
);
}
#[test]
fn test_delegate_defaults_to_fork_context() {
let input = with_default_fork_context(json!({ "prompt": "review current work" }), true);
let parsed = parse_spawn_request(&input).expect("delegate request should parse");
assert!(parsed.fork_context);
let input = with_default_fork_context(
json!({ "prompt": "fresh exploration", "fork_context": false }),
true,
);
let parsed = parse_spawn_request(&input).expect("delegate override should parse");
assert!(!parsed.fork_context);
}
#[test]
fn forked_subagent_messages_preserve_parent_prefix_then_append_task() {
let parent_system = SystemPrompt::Text("parent system".to_string());
let parent_message = Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text: "parent turn".to_string(),
cache_control: None,
}],
};
let fork_context = SubAgentForkContext {
system: Some(parent_system.clone()),
messages: vec![parent_message.clone()],
structured_state_block: Some("## Fork State\n- Mode: `AGENT`".to_string()),
};
let assignment = SubAgentAssignment::new("inspect parser".to_string(), Some("worker".into()));
let messages = build_initial_subagent_messages(
"inspect parser",
&assignment,
&SubAgentType::General,
Some(&fork_context),
);
assert_eq!(
subagent_request_system_prompt("child system", Some(&fork_context)),
parent_system
);
assert_eq!(messages.first(), Some(&parent_message));
assert_eq!(messages.len(), 4);
assert_eq!(messages[1].role, "system");
assert!(message_text(&messages[1]).contains("<codewhale:fork_state>"));
assert_eq!(messages[2].role, "system");
assert!(message_text(&messages[2]).contains("<codewhale:subagent_context>"));
assert_eq!(messages[3].role, "user");
assert!(message_text(&messages[3]).contains("inspect parser"));
}
#[test]
fn fresh_subagent_messages_keep_existing_single_turn_shape() {
let assignment = SubAgentAssignment::new("list files".to_string(), None);
let messages =
build_initial_subagent_messages("list files", &assignment, &SubAgentType::Explore, None);
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].role, "user");
assert!(message_text(&messages[0]).contains("list files"));
}
#[test]
fn test_parse_spawn_request_rejects_text_and_items_together() {
let input = json!({
"prompt": "Analyze module",
"items": [{"type": "text", "text": "dup"}]
});
let err = parse_spawn_request(&input).expect_err("text+items should fail");
assert!(err.to_string().contains("either prompt text or items"));
}
#[test]
fn test_parse_spawn_request_rejects_invalid_role() {
let input = json!({
"prompt": "do work",
"role": "unknown_role"
});
let err = parse_spawn_request(&input).expect_err("invalid role should fail");
assert!(err.to_string().contains("Invalid role alias"));
}
#[test]
fn test_parse_spawn_request_accepts_full_role_vocabulary() {
// Regression for #2649: roles that `SubAgentType::from_str` accepts must
// also pass the second `normalize_role_alias` validation pass instead of
// being rejected with a stale hint.
for (role, expected_type, expected_role) in [
("general", SubAgentType::General, "worker"),
("general-purpose", SubAgentType::General, "worker"),
("general_purpose", SubAgentType::General, "worker"),
("worker", SubAgentType::General, "worker"),
("default", SubAgentType::General, "default"),
("explore", SubAgentType::Explore, "explorer"),
("exploration", SubAgentType::Explore, "explorer"),
("explorer", SubAgentType::Explore, "explorer"),
("plan", SubAgentType::Plan, "awaiter"),
("planning", SubAgentType::Plan, "awaiter"),
("planner", SubAgentType::Plan, "awaiter"),
("awaiter", SubAgentType::Plan, "awaiter"),
("review", SubAgentType::Review, "reviewer"),
("code-review", SubAgentType::Review, "reviewer"),
("code_review", SubAgentType::Review, "reviewer"),
("reviewer", SubAgentType::Review, "reviewer"),
("implementer", SubAgentType::Implementer, "implementer"),
("implement", SubAgentType::Implementer, "implementer"),
("implementation", SubAgentType::Implementer, "implementer"),
("builder", SubAgentType::Implementer, "implementer"),
("verifier", SubAgentType::Verifier, "verifier"),
("verify", SubAgentType::Verifier, "verifier"),
("verification", SubAgentType::Verifier, "verifier"),
("validator", SubAgentType::Verifier, "verifier"),
("tester", SubAgentType::Verifier, "verifier"),
("tool-agent", SubAgentType::ToolAgent, "tool_agent"),
("tool_agent", SubAgentType::ToolAgent, "tool_agent"),
("toolagent", SubAgentType::ToolAgent, "tool_agent"),
("executor", SubAgentType::ToolAgent, "tool_agent"),
("execution", SubAgentType::ToolAgent, "tool_agent"),
("fin", SubAgentType::ToolAgent, "tool_agent"),
("custom", SubAgentType::Custom, "custom"),
] {
assert_eq!(
SubAgentType::from_str(role),
Some(expected_type.clone()),
"from_str should accept role alias {role:?}"
);
assert_eq!(
normalize_role_alias(role),
Some(expected_role),
"normalize_role_alias should accept role alias {role:?}"
);
let input = json!({ "prompt": "do work", "role": role });
let parsed = parse_spawn_request(&input)
.unwrap_or_else(|e| panic!("role {role:?} should parse, got {e}"));
assert_eq!(parsed.agent_type, expected_type, "type for role {role:?}");
assert_eq!(
parsed.assignment.role.as_deref(),
Some(expected_role),
"canonical role for {role:?}"
);
}
}
#[test]
fn test_invalid_role_error_lists_real_aliases() {
// The hint must enumerate the actually-accepted vocabulary (#2649).
let input = json!({ "prompt": "do work", "role": "nonsense" });
let err = parse_spawn_request(&input)
.expect_err("invalid role should fail")
.to_string();
assert!(err.contains("reviewer"), "hint should list reviewer: {err}");
assert!(err.contains("verifier"), "hint should list verifier: {err}");
assert!(err.contains("custom"), "hint should list custom: {err}");
assert!(
err.contains("general-purpose"),
"hint should list general-purpose: {err}"
);
assert!(
err.contains("code_review"),
"hint should list code_review: {err}"
);
assert!(
err.contains("toolagent"),
"hint should list toolagent: {err}"
);
assert!(
err.contains("execution"),
"hint should list execution: {err}"
);
}
fn schema_property_description<'a>(schema: &'a Value, property: &str) -> &'a str {
schema["properties"][property]["description"]
.as_str()
.unwrap_or_else(|| panic!("missing description for schema property {property:?}"))
}
#[test]
fn subagent_tool_schemas_advertise_real_type_and_role_vocabulary() {
let tmp = tempdir().expect("tempdir");
let manager = new_shared_subagent_manager(tmp.path().to_path_buf(), 1);
let agent_open_schema = AgentOpenTool::new(manager.clone(), stub_runtime()).input_schema();
let agent_spawn_schema = AgentSpawnTool::new(manager.clone(), stub_runtime()).input_schema();
let delegate_schema = DelegateToAgentTool::new(manager, stub_runtime()).input_schema();
for (schema, property) in [
(&agent_open_schema, "type"),
(&agent_spawn_schema, "type"),
(&delegate_schema, "agent_name"),
] {
let description = schema_property_description(schema, property);
for alias in [
"general",
"explore",
"plan",
"review",
"implementer",
"verifier",
"tool_agent",
"custom",
] {
assert!(
description.contains(alias),
"{property} description should list accepted type {alias:?}: {description}"
);
}
}
for (schema, property) in [
(&agent_open_schema, "role"),
(&agent_spawn_schema, "role"),
(&delegate_schema, "role"),
] {
let description = schema_property_description(schema, property);
for alias in [
"default",
"worker",
"explorer",
"awaiter",
"reviewer",
"implementer",
"verifier",
"tool_agent",
"custom",
] {
assert!(
description.contains(alias),
"{property} description should list accepted role {alias:?}: {description}"
);
}
}
}
#[test]
fn test_parse_spawn_request_rejects_conflicting_type_and_role() {
let input = json!({
"prompt": "inspect internals",
"type": "explore",
"role": "worker"
});
let err = parse_spawn_request(&input).expect_err("conflicting type+role should fail");
assert!(
err.to_string()
.contains("Conflicting type/agent_type and role/agent_role")
);
}
#[test]
fn test_parse_assign_request_accepts_aliases() {
let input = json!({
"id": "agent_1234",
"objective": "re-check failing tests",
"agent_role": "explorer",
"input": "focus on tests only",
"interrupt": false
});
let request = parse_assign_request(&input).expect("assign request should parse");
assert_eq!(request.agent_id, "agent_1234");
assert_eq!(request.objective.as_deref(), Some("re-check failing tests"));
assert_eq!(request.role.as_deref(), Some("explorer"));
assert_eq!(request.message.as_deref(), Some("focus on tests only"));
assert!(!request.interrupt);
}
#[test]
fn test_parse_assign_request_rejects_invalid_role() {
let input = json!({
"agent_id": "agent_1234",
"role": "unknown"
});
let err = parse_assign_request(&input).expect_err("invalid role should fail");
assert!(err.to_string().contains("Invalid role alias"));
}
#[test]
fn test_parse_assign_request_requires_update_fields() {
let input = json!({
"agent_id": "agent_1234"
});
let err = parse_assign_request(&input).expect_err("missing update fields should fail");
assert!(
err.to_string().contains(
"Provide at least one of objective, role/agent_role, message/input, or items"
)
);
}
#[test]
fn test_build_allowed_tools_independent_of_allow_shell() {
// v0.6.6: allow_shell no longer filters at the build_allowed_tools
// level — the registry builder controls shell-tool registration.
// Both calls return None (full inheritance) for a default General
// agent.
let with_shell = build_allowed_tools(&SubAgentType::General, None, true).unwrap();
let without_shell = build_allowed_tools(&SubAgentType::General, None, false).unwrap();
assert!(with_shell.is_none());
assert!(without_shell.is_none());
}
#[test]
fn test_allowed_tools_are_deduplicated() {
let tools = build_allowed_tools(
&SubAgentType::Custom,
Some(vec![
"read_file".to_string(),
"read_file".to_string(),
" ".to_string(),
"grep_files".to_string(),
]),
true,
)
.unwrap();
assert_eq!(
tools,
Some(vec!["read_file".to_string(), "grep_files".to_string()])
);
}
#[test]
fn test_custom_agent_requires_allowed_tools() {
let err = build_allowed_tools(&SubAgentType::Custom, None, true).unwrap_err();
assert!(err.to_string().contains("requires"));
}
#[test]
fn test_wait_mode_condition_any_and_all() {
let one_done = vec![
make_snapshot(SubAgentStatus::Running),
make_snapshot(SubAgentStatus::Completed),
];
let all_done = vec![
make_snapshot(SubAgentStatus::Completed),
make_snapshot(SubAgentStatus::Cancelled),
];
assert!(WaitMode::Any.condition_met(&one_done));
assert!(!WaitMode::All.condition_met(&one_done));
assert!(WaitMode::All.condition_met(&all_done));
}
#[test]
fn test_parse_wait_mode() {
assert_eq!(parse_wait_mode(&json!({})).unwrap(), WaitMode::Any);
assert_eq!(
parse_wait_mode(&json!({"wait_mode": "all"})).unwrap(),
WaitMode::All
);
assert_eq!(
parse_wait_mode(&json!({"wait_mode": "first"})).unwrap(),
WaitMode::Any
);
assert!(parse_wait_mode(&json!({"wait_mode": "invalid"})).is_err());
}
#[test]
fn test_parse_wait_ids_accepts_aliases() {
let ids = parse_wait_ids(&json!({
"ids": ["agent_a", "agent_b"],
"agent_id": "agent_c",
"id": "agent_a"
}));
assert_eq!(ids, vec!["agent_a", "agent_b", "agent_c"]);
}
#[test]
fn test_parse_wait_ids_empty_when_omitted() {
let ids = parse_wait_ids(&json!({}));
assert!(ids.is_empty());
}
#[test]
fn test_build_assignment_prompt_includes_metadata() {
let assignment = SubAgentAssignment::new(
"Inspect parser behavior".to_string(),
Some("explorer".to_string()),
);
let prompt = build_assignment_prompt(
"Inspect parser behavior",
&assignment,
&SubAgentType::Explore,
);
assert!(prompt.contains("Assignment metadata"));
assert!(prompt.contains("resolved_type: explore"));
assert!(prompt.contains("role: explorer"));
}
#[test]
fn subagent_auto_model_routes_unconfigured_assignments() {
let runtime = stub_runtime().with_auto_model(true);
assert_eq!(
fallback_subagent_assignment_route(&runtime, None, "implement the release fix").model,
"deepseek-v4-pro"
);
assert_eq!(
fallback_subagent_assignment_route(&runtime, None, "say hello").model,
"deepseek-v4-flash"
);
}
#[test]
fn subagent_auto_route_respects_explicit_or_role_model() {
let runtime = stub_runtime().with_auto_model(true);
assert_eq!(
fallback_subagent_assignment_route(
&runtime,
Some("deepseek-v4-flash".to_string()),
"implement the release fix"
)
.model,
"deepseek-v4-flash"
);
}
#[tokio::test]
async fn tool_agent_route_inherits_parent_model_with_thinking_off() {
let mut runtime = stub_runtime()
.with_auto_model(false)
.with_reasoning_effort(Some("max".to_string()), false);
runtime.model = "local-provider/tool-fast".to_string();
let route = resolve_subagent_assignment_route(
&runtime,
None,
"run OCR on this screenshot",
&SubAgentType::ToolAgent,
)
.await;
assert_eq!(route.model, "local-provider/tool-fast");
assert_eq!(route.reasoning_effort.as_deref(), Some("off"));
}
#[tokio::test]
async fn tool_agent_route_respects_explicit_model_with_thinking_off() {
let mut runtime = stub_runtime().with_auto_model(false);
runtime.model = "local-provider/tool-fast".to_string();
let route = resolve_subagent_assignment_route(
&runtime,
Some("deepseek-v4-pro".to_string()),
"run OCR on this screenshot",
&SubAgentType::ToolAgent,
)
.await;
assert_eq!(route.model, "deepseek-v4-pro");
assert_eq!(route.reasoning_effort.as_deref(), Some("off"));
}
#[test]
fn subagent_auto_reasoning_resolves_to_distinct_v4_tiers() {
let runtime = stub_runtime().with_reasoning_effort(Some("high".to_string()), true);
assert_eq!(
fallback_subagent_assignment_route(&runtime, None, "quick lookup").reasoning_effort,
Some("high".to_string())
);
assert_eq!(
fallback_subagent_assignment_route(&runtime, None, "debug this release failure")
.reasoning_effort,
Some("max".to_string())
);
}
#[test]
fn fixed_model_subagent_auto_reasoning_skips_flash_router() {
let runtime = stub_runtime().with_reasoning_effort(Some("high".to_string()), true);
assert!(
!should_use_subagent_flash_router(&runtime),
"fixed-model auto thinking should resolve locally without a hidden router request"
);
}
#[test]
fn auto_model_subagent_assignments_still_use_flash_router() {
let runtime = stub_runtime().with_auto_model(true);
assert!(
should_use_subagent_flash_router(&runtime),
"auto-model sub-agent assignments still need router guidance"
);
}
#[test]
fn subagent_router_prompt_frames_assignment_as_auto_routing() {
let runtime = stub_runtime()
.with_auto_model(true)
.with_reasoning_effort(Some("high".to_string()), true);
let prompt = subagent_router_prompt(&runtime, "inspect one file");
assert!(prompt.contains("Parent selected model mode: auto"));
assert!(prompt.contains("Parent selected thinking mode: auto"));
assert!(prompt.contains("inspect one file"));
}
#[test]
fn test_subagent_tool_registry_reports_unavailable_tools() {
let tmp = tempdir().expect("tempdir");
let mut runtime = stub_runtime();
runtime.context = ToolContext::new(tmp.path().to_path_buf());
runtime.allow_shell = false;
let registry = SubAgentToolRegistry::new(
runtime,
SubAgentType::Explore,
Some(vec!["read_file".to_string(), "missing_tool".to_string()]),
Arc::new(Mutex::new(TodoList::new())),
Arc::new(Mutex::new(PlanState::default())),
);
assert_eq!(
registry.unavailable_allowed_tools(),
vec!["missing_tool".to_string()]
);
}
#[test]
fn test_review_agent_tools_exclude_agent_spawn() {
let tmp = tempdir().expect("tempdir");
let mut runtime = stub_runtime();
runtime.context = ToolContext::new(tmp.path().to_path_buf());
// None = full parent tool inheritance (the default for builtin types).
let registry = SubAgentToolRegistry::new(
runtime,
SubAgentType::Review,
None,
Arc::new(Mutex::new(TodoList::new())),
Arc::new(Mutex::new(PlanState::default())),
);
let tools = registry.tools_for_model(&SubAgentType::Review);
let names: Vec<_> = tools.iter().map(|t| t.name.as_str()).collect();
assert!(
!names.contains(&"agent_spawn"),
"Review agent must not have agent_spawn; tools: {names:?}"
);
}
#[tokio::test]
async fn test_wait_for_result_reports_timeout_when_still_running() {
let manager = Arc::new(RwLock::new(SubAgentManager::new(PathBuf::from("."), 2)));
let (input_tx, _input_rx) = mpsc::unbounded_channel();
let agent = SubAgent::new(
"test_agent_1".to_string(),
SubAgentType::Explore,
"prompt".to_string(),
make_assignment(),
"deepseek-v4-flash".to_string(),
Some("Blue".to_string()),
Some(vec!["read_file".to_string()]),
input_tx,
PathBuf::from("."),
"boot_test".to_string(),
);
let agent_id = agent.id.clone();
let snapshot = agent.snapshot();
{
let mut guard = manager.write().await;
guard.register_worker(make_worker_spec(&agent_id, PathBuf::from(".")));
guard.complete_worker_from_result(&agent_id, &snapshot);
guard.agents.insert(agent_id.clone(), agent);
}
let (snapshot, timed_out) = wait_for_result(&manager, &agent_id, Duration::from_millis(10))
.await
.expect("wait_for_result should succeed");
assert!(timed_out);
assert_eq!(snapshot.status, SubAgentStatus::Running);
}
// Regression for #1738: agent_eval on a terminated session must not
// hard-fail with "not running" when a follow-up message is supplied. The
// parent still needs the projection (and its transcript_handle) to recover
// the child's full output.
#[tokio::test]
async fn agent_eval_on_completed_session_returns_full_projection_not_running_error() {
let manager = Arc::new(RwLock::new(SubAgentManager::new(PathBuf::from("."), 1)));
let (input_tx, _input_rx) = mpsc::unbounded_channel();
let mut agent = SubAgent::new(
"test_agent_2".to_string(),
SubAgentType::Explore,
"analyze 14 issues".to_string(),
make_assignment(),
"deepseek-v4-flash".to_string(),
Some("Blue".to_string()),
Some(vec!["read_file".to_string()]),
input_tx,
PathBuf::from("."),
"boot_test".to_string(),
);
let full_output = "Per-issue analysis:\n".to_string() + &"detail line\n".repeat(400);
agent.status = SubAgentStatus::Completed;
agent.result = Some(full_output.clone());
let agent_id = agent.id.clone();
let snapshot = agent.snapshot();
{
let mut guard = manager.write().await;
guard.register_worker(make_worker_spec(&agent_id, PathBuf::from(".")));
guard.complete_worker_from_result(&agent_id, &snapshot);
guard.agents.insert(agent_id.clone(), agent);
}
let ctx = ToolContext::new(".");
let tool = AgentEvalTool::new(manager.clone());
let result = tool
.execute(
json!({
"agent_id": agent_id,
"message": "give me the full per-issue breakdown",
"block": false
}),
&ctx,
)
.await
.expect("agent_eval on a completed session must not error");
let meta = result.metadata.expect("metadata present");
assert_eq!(meta["terminal"], json!(true));
assert_eq!(meta["message_delivery"]["delivered"], json!(false));
let projection: SubAgentSessionProjection =
serde_json::from_str(&result.content).expect("projection deserializes");
assert_eq!(projection.status, "completed");
assert_eq!(projection.run_id, "test_agent_2");
let worker_record = projection.worker_record.as_ref().expect("worker record");
let delivery = worker_record
.follow_up
.latest_delivery
.as_ref()
.expect("delivery receipt");
assert!(!delivery.delivered);
assert_eq!(
delivery.reason.as_deref(),
Some("session already terminated; follow-up not delivered")
);
assert_eq!(
delivery.message_preview.as_deref(),
Some("give me the full per-issue breakdown")
);
assert_eq!(projection.transcript_handle.kind, "var_handle");
// The full, untruncated child output survives in the snapshot the
// transcript_handle points at.
assert_eq!(
projection.snapshot.result.as_deref(),
Some(full_output.as_str())
);
}
#[tokio::test]
async fn agent_eval_resolves_session_via_agent_name_alias() {
// #2650: `agent_name` is accepted as an alias for `name`/session name.
let manager = Arc::new(RwLock::new(SubAgentManager::new(PathBuf::from("."), 1)));
let (input_tx, _input_rx) = mpsc::unbounded_channel();
let mut agent = SubAgent::new(
"test_agent_named".to_string(),
SubAgentType::Explore,
"scan".to_string(),
make_assignment(),
"deepseek-v4-flash".to_string(),
Some("Blue".to_string()),
Some(vec!["read_file".to_string()]),
input_tx,
PathBuf::from("."),
"boot_test".to_string(),
);
agent.session_name = "researcher".to_string();
agent.status = SubAgentStatus::Completed;
agent.result = Some("done".to_string());
let agent_id = agent.id.clone();
{
let mut guard = manager.write().await;
guard.agents.insert(agent_id.clone(), agent);
}
let ctx = ToolContext::new(".");
let tool = AgentEvalTool::new(manager.clone());
let result = tool
.execute(json!({ "agent_name": "researcher", "block": false }), &ctx)
.await
.expect("agent_name alias must resolve the session");
let projection: SubAgentSessionProjection =
serde_json::from_str(&result.content).expect("projection deserializes");
assert_eq!(projection.status, "completed");
}
#[tokio::test]
async fn agent_eval_follow_up_defaults_to_nonblocking_projection() {
let manager = Arc::new(RwLock::new(SubAgentManager::new(PathBuf::from("."), 1)));
let (input_tx, mut input_rx) = mpsc::unbounded_channel();
let agent = SubAgent::new(
"test_agent_running_eval".to_string(),
SubAgentType::Explore,
"map docs".to_string(),
make_assignment(),
"deepseek-v4-flash".to_string(),
Some("Blue".to_string()),
Some(vec!["read_file".to_string()]),
input_tx,
PathBuf::from("."),
"boot_test".to_string(),
);
let agent_id = agent.id.clone();
{
let mut guard = manager.write().await;
guard.agents.insert(agent_id.clone(), agent);
}
let ctx = ToolContext::new(".");
let tool = AgentEvalTool::new(manager.clone());
let result = tokio::time::timeout(
Duration::from_millis(100),
tool.execute(
json!({
"agent_id": agent_id,
"message": "please prioritize the newest user input"
}),
&ctx,
),
)
.await
.expect("agent_eval should not wait for a running child by default")
.expect("agent_eval should return a projection");
let meta = result.metadata.expect("metadata present");
assert_eq!(meta["terminal"], json!(false));
assert_eq!(meta["timed_out"], json!(false));
assert_eq!(meta["message_delivery"]["delivered"], json!(true));
let delivered = tokio::time::timeout(Duration::from_millis(100), input_rx.recv())
.await
.expect("follow-up should be delivered without waiting")
.expect("follow-up message should exist");
assert_eq!(delivered.text, "please prioritize the newest user input");
assert!(!delivered.interrupt);
let projection: SubAgentSessionProjection =
serde_json::from_str(&result.content).expect("projection deserializes");
assert_eq!(projection.status, "running");
assert!(!projection.terminal);
}
#[tokio::test]
async fn api_timeout_preserves_checkpoint_and_agent_eval_continues_from_it() {
let tmp = tempdir().expect("tempdir");
let manager = Arc::new(RwLock::new(SubAgentManager::new(
tmp.path().to_path_buf(),
2,
)));
let agent_id = "agent_checkpoint_timeout".to_string();
let (task_input_tx, task_input_rx) = mpsc::unbounded_channel();
let agent = SubAgent::new(
agent_id.clone(),
SubAgentType::General,
"Inspect checkpoint behavior".to_string(),
make_assignment(),
"deepseek-v4-flash".to_string(),
Some("Blue".to_string()),
Some(vec![]),
task_input_tx,
tmp.path().to_path_buf(),
"boot_test".to_string(),
);
manager.write().await.agents.insert(agent_id.clone(), agent);
let (client, calls, bodies) =
delayed_chat_client(Duration::from_millis(80), "resumed answer").await;
let mut runtime = stub_runtime().with_step_api_timeout(Duration::from_millis(50));
runtime.client = client;
runtime.manager = Arc::clone(&manager);
runtime.context = ToolContext::new(tmp.path());
let (mailbox, mut mailbox_rx) =
crate::tools::subagent::mailbox::Mailbox::new(tokio_util::sync::CancellationToken::new());
runtime.mailbox = Some(mailbox);
let task = SubAgentTask {
manager_handle: Arc::clone(&manager),
runtime: runtime.clone(),
agent_id: agent_id.clone(),
agent_type: SubAgentType::General,
prompt: "Inspect checkpoint behavior".to_string(),
assignment: make_assignment(),
allowed_tools: Some(vec![]),
fork_context: false,
started_at: Instant::now(),
max_steps: 3,
input_rx: task_input_rx,
launch_gate: None,
};
let task_handle = tokio::spawn(run_subagent_task(task));
let interrupted = tokio::time::timeout(Duration::from_secs(2), async {
loop {
let snapshot = {
let manager = manager.read().await;
manager
.get_result(&agent_id)
.expect("agent should stay registered")
};
if matches!(snapshot.status, SubAgentStatus::Interrupted(_)) {
return snapshot;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("agent should become interrupted after API timeout");
let checkpoint = interrupted
.checkpoint
.as_ref()
.expect("timeout should preserve checkpoint");
assert!(checkpoint.continuable);
assert_eq!(checkpoint.steps_taken, 1);
assert!(
checkpoint
.messages
.iter()
.any(|message| message_text(message).contains("Inspect checkpoint behavior")),
"checkpoint should preserve local child prompt: {checkpoint:?}"
);
tokio::time::timeout(Duration::from_secs(2), async {
loop {
if calls.load(Ordering::SeqCst) >= 1 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("first timed-out API attempt should reach the test server");
let interrupted_envelope = tokio::time::timeout(Duration::from_secs(2), async {
loop {
for env in mailbox_rx.drain() {
if let MailboxMessage::Interrupted {
agent_id: id,
reason,
} = env.message
{
return (id, reason);
}
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("API timeout should publish an Interrupted mailbox lifecycle event");
assert_eq!(interrupted_envelope.0, agent_id);
assert!(
interrupted_envelope.1.contains("API call timed out"),
"reason should carry the timeout context: {}",
interrupted_envelope.1
);
let ctx = runtime.context.clone();
let tool = AgentEvalTool::new(Arc::clone(&manager));
let result = tool
.execute(json!({ "agent_id": agent_id, "block": false }), &ctx)
.await
.expect("agent_eval should project interrupted checkpoint");
let projection: SubAgentSessionProjection =
serde_json::from_str(&result.content).expect("projection deserializes");
assert_eq!(projection.status, "interrupted");
assert!(projection.continuable);
assert!(projection.checkpoint.is_some());
let result = tool
.execute(
json!({
"agent_id": agent_id,
"continue": true,
"message": "Please continue with the prior checkpoint.",
"timeout_ms": 2000
}),
&ctx,
)
.await
.expect("agent_eval should continue checkpointed interrupted session");
let meta = result.metadata.expect("metadata present");
assert_eq!(
meta["message_delivery"]["continued_from_checkpoint"],
json!(true)
);
let projection: SubAgentSessionProjection =
serde_json::from_str(&result.content).expect("projection deserializes");
assert_eq!(projection.status, "completed");
assert_eq!(
projection.snapshot.result.as_deref(),
Some("resumed answer")
);
assert!(
projection
.checkpoint
.as_ref()
.expect("completed projection keeps latest checkpoint")
.messages
.iter()
.any(|message| message_text(message)
.contains("Please continue with the prior checkpoint.")),
"continuation instruction should be part of resumed transcript"
);
task_handle.await.expect("sub-agent task should finish");
assert!(
calls.load(Ordering::SeqCst) >= 2,
"continuation should make a second API request"
);
let bodies = bodies
.lock()
.expect("request body recorder mutex poisoned")
.clone();
let second_request = serde_json::to_string(&bodies[1]).expect("second request body serializes");
assert!(second_request.contains("Inspect checkpoint behavior"));
assert!(second_request.contains("Please continue with the prior checkpoint."));
}
#[tokio::test]
async fn model_wait_heartbeat_prevents_stale_cleanup_during_api_call() {
let tmp = tempdir().expect("tempdir");
let manager = Arc::new(RwLock::new(
SubAgentManager::new(tmp.path().to_path_buf(), 2)
.with_running_heartbeat_timeout(Duration::from_millis(250)),
));
let agent_id = "agent_model_wait_heartbeat".to_string();
let (task_input_tx, task_input_rx) = mpsc::unbounded_channel();
let agent = SubAgent::new(
agent_id.clone(),
SubAgentType::General,
"Wait visibly".to_string(),
make_assignment(),
"deepseek-v4-flash".to_string(),
Some("Blue".to_string()),
Some(vec![]),
task_input_tx,
tmp.path().to_path_buf(),
"boot_test".to_string(),
);
{
let mut guard = manager.write().await;
guard.register_worker(make_worker_spec(&agent_id, tmp.path().to_path_buf()));
guard.agents.insert(agent_id.clone(), agent);
}
let (client, _calls, _bodies) = delayed_chat_client(Duration::from_millis(150), "done").await;
let (mailbox, mut mailbox_rx) = Mailbox::new(CancellationToken::new());
let mut runtime = stub_runtime().with_step_api_timeout(Duration::from_secs(2));
runtime.client = client;
runtime.manager = Arc::clone(&manager);
runtime.context = ToolContext::new(tmp.path());
runtime.mailbox = Some(mailbox);
let task = SubAgentTask {
manager_handle: Arc::clone(&manager),
runtime: runtime.clone(),
agent_id: agent_id.clone(),
agent_type: SubAgentType::General,
prompt: "Wait visibly".to_string(),
assignment: make_assignment(),
allowed_tools: Some(vec![]),
fork_context: false,
started_at: Instant::now(),
max_steps: 1,
input_rx: task_input_rx,
launch_gate: None,
};
let handle = tokio::spawn(run_subagent_task(task));
let heartbeat = tokio::time::timeout(Duration::from_secs(2), async {
loop {
let envelope = mailbox_rx.recv().await?;
if let MailboxMessage::Progress { agent_id, status } = envelope.message
&& agent_id == "agent_model_wait_heartbeat"
&& status.contains(SUBAGENT_MODEL_WAIT_REASON)
{
return Some(status);
}
}
})
.await
.expect("model wait heartbeat should be published before completion")
.expect("mailbox should stay open until heartbeat");
assert_eq!(heartbeat, "step 1/1: waiting for model response");
{
let mut guard = manager.write().await;
assert_eq!(
guard.cleanup(Duration::from_secs(60 * 60)),
0,
"fresh model-wait heartbeat must keep the running agent alive"
);
let record = guard
.get_worker_record(&agent_id)
.expect("worker record should track model wait");
assert_eq!(record.status, AgentWorkerStatus::ModelWait);
assert_eq!(record.latest_message.as_deref(), Some(heartbeat.as_str()));
}
handle.await.expect("sub-agent task should finish");
}
#[tokio::test]
async fn spawn_duplicate_session_name_error_names_conflicting_agent() {
// #2656: the duplicate-name error must identify the conflicting agent so a
// model can recover deterministically (reuse the id, or pick a new name).
let manager = Arc::new(RwLock::new(SubAgentManager::new(PathBuf::from("."), 5)));
let (input_tx, _input_rx) = mpsc::unbounded_channel();
let mut existing = SubAgent::new(
"test_agent_existing".to_string(),
SubAgentType::Explore,
"scan".to_string(),
make_assignment(),
"deepseek-v4-flash".to_string(),
Some("Blue".to_string()),
Some(vec!["read_file".to_string()]),
input_tx,
PathBuf::from("."),
"boot_test".to_string(),
);
existing.session_name = "researcher".to_string();
existing.status = SubAgentStatus::Running;
let existing_id = existing.id.clone();
{
let mut guard = manager.write().await;
guard.agents.insert(existing_id.clone(), existing);
}
let err = {
let mut guard = manager.write().await;
guard
.spawn_background_with_assignment_options(
manager.clone(),
stub_runtime(),
SubAgentType::Explore,
"new work".to_string(),
make_assignment(),
Some(vec!["read_file".to_string()]),
SubAgentSpawnOptions {
name: Some("researcher".to_string()),
..Default::default()
},
)
.expect_err("duplicate session name must error")
};
let msg = err.to_string();
assert!(
msg.contains(&existing_id),
"names the conflicting agent_id: {msg}"
);
assert!(
msg.contains("running"),
"includes the conflicting status: {msg}"
);
// #3020: elapsed time lets the parent distinguish a live worker from a
// stale earlier spawn.
assert!(
msg.contains("started ") && msg.contains(" ago"),
"includes elapsed time since spawn: {msg}"
);
}
#[tokio::test]
async fn test_running_count_counts_only_agents_with_live_task_handles() {
let mut manager = SubAgentManager::new(PathBuf::from("."), 1);
let (input_tx, _input_rx) = mpsc::unbounded_channel();
let mut agent = SubAgent::new(
"test_agent_3".to_string(),
SubAgentType::Explore,
"prompt".to_string(),
make_assignment(),
"deepseek-v4-flash".to_string(),
Some("Blue".to_string()),
Some(vec!["read_file".to_string()]),
input_tx,
PathBuf::from("."),
"boot_test".to_string(),
);
agent.status = SubAgentStatus::Running;
let handle = tokio::spawn(async {
tokio::time::sleep(Duration::from_secs(60)).await;
});
agent.task_handle = Some(handle);
let agent_id = agent.id.clone();
manager.agents.insert(agent.id.clone(), agent);
assert_eq!(manager.running_count(), 1);
manager
.agents
.get_mut(&agent_id)
.and_then(|agent| agent.task_handle.take())
.expect("live task handle")
.abort();
}
#[test]
fn test_running_count_ignores_running_status_without_task_handle() {
let mut manager = SubAgentManager::new(PathBuf::from("."), 1);
let (input_tx, _input_rx) = mpsc::unbounded_channel();
let mut agent = SubAgent::new(
"test_agent_4".to_string(),
SubAgentType::Explore,
"prompt".to_string(),
make_assignment(),
"deepseek-v4-flash".to_string(),
Some("Blue".to_string()),
Some(vec!["read_file".to_string()]),
input_tx,
PathBuf::from("."),
"boot_test".to_string(),
);
agent.status = SubAgentStatus::Running;
manager.agents.insert(agent.id.clone(), agent);
assert_eq!(manager.running_count(), 0);
}
#[tokio::test]
async fn test_running_count_counts_running_agents_until_status_reconciles() {
let mut manager = SubAgentManager::new(PathBuf::from("."), 1);
let (input_tx, _input_rx) = mpsc::unbounded_channel();
let mut agent = SubAgent::new(
"test_agent_5".to_string(),
SubAgentType::Explore,
"prompt".to_string(),
make_assignment(),
"deepseek-v4-flash".to_string(),
Some("Blue".to_string()),
Some(vec!["read_file".to_string()]),
input_tx,
PathBuf::from("."),
"boot_test".to_string(),
);
agent.status = SubAgentStatus::Running;
let finished_handle = tokio::spawn(async {});
while !finished_handle.is_finished() {
tokio::task::yield_now().await;
}
agent.task_handle = Some(finished_handle);
manager.agents.insert(agent.id.clone(), agent);
assert_eq!(manager.running_count(), 1);
}
#[tokio::test]
async fn cleanup_auto_cancels_stale_running_agent_and_releases_slot() {
let mut manager = SubAgentManager::new(PathBuf::from("."), 1)
.with_running_heartbeat_timeout(Duration::from_millis(1));
let (input_tx, _input_rx) = mpsc::unbounded_channel();
let mut agent = SubAgent::new(
"test_agent_stale".to_string(),
SubAgentType::Explore,
"prompt".to_string(),
make_assignment(),
"deepseek-v4-flash".to_string(),
Some("Blue".to_string()),
Some(vec!["read_file".to_string()]),
input_tx,
PathBuf::from("."),
"boot_test".to_string(),
);
agent.task_handle = Some(tokio::spawn(async {
tokio::time::sleep(Duration::from_secs(60)).await;
}));
let agent_id = agent.id.clone();
manager.agents.insert(agent_id.clone(), agent);
tokio::time::sleep(Duration::from_millis(5)).await;
assert_eq!(
manager.running_count(),
0,
"stale running agents must not keep the concurrency slot occupied"
);
assert_eq!(manager.cleanup(Duration::from_secs(60 * 60)), 1);
let snapshot = manager
.get_result(&agent_id)
.expect("agent should remain inspectable");
assert_eq!(snapshot.status, SubAgentStatus::Cancelled);
assert_eq!(manager.running_count(), 0);
assert!(
snapshot
.result
.as_deref()
.unwrap_or_default()
.contains("Auto-cancelled")
);
}
#[tokio::test]
async fn cleanup_keeps_recent_running_agent() {
let mut manager = SubAgentManager::new(PathBuf::from("."), 1)
.with_running_heartbeat_timeout(Duration::from_secs(300));
let (input_tx, _input_rx) = mpsc::unbounded_channel();
let mut agent = SubAgent::new(
"test_agent_recent".to_string(),
SubAgentType::Explore,
"prompt".to_string(),
make_assignment(),
"deepseek-v4-flash".to_string(),
Some("Blue".to_string()),
Some(vec!["read_file".to_string()]),
input_tx,
PathBuf::from("."),
"boot_test".to_string(),
);
agent.last_activity_at = Instant::now();
agent.task_handle = Some(tokio::spawn(async {
tokio::time::sleep(Duration::from_secs(60)).await;
}));
let agent_id = agent.id.clone();
manager.agents.insert(agent_id.clone(), agent);
assert_eq!(manager.running_count(), 1);
assert_eq!(manager.cleanup(Duration::from_secs(60 * 60)), 0);
assert_eq!(
manager.get_result(&agent_id).expect("agent").status,
SubAgentStatus::Running
);
manager
.agents
.get_mut(&agent_id)
.and_then(|agent| agent.task_handle.take())
.expect("live task handle")
.abort();
}
#[tokio::test]
async fn touch_refreshes_stale_running_agent_heartbeat() {
// Use a heartbeat timeout that is comfortably larger than the synchronous
// work between `touch()` and the `cleanup()` assertion below. With a 1ms
// timeout the test was flaky on loaded CI runners (notably Windows, whose
// scheduler can deschedule this thread for >1ms): the just-touched agent
// would tip back over the staleness threshold before `cleanup()` ran and
// get reaped, so `cleanup()` returned 1 instead of 0. A 50ms timeout keeps
// the staleness logic exercised while removing the timing race.
let mut manager = SubAgentManager::new(PathBuf::from("."), 1)
.with_running_heartbeat_timeout(Duration::from_millis(50));
let (input_tx, _input_rx) = mpsc::unbounded_channel();
let mut agent = SubAgent::new(
"test_agent_touched".to_string(),
SubAgentType::Explore,
"prompt".to_string(),
make_assignment(),
"deepseek-v4-flash".to_string(),
Some("Blue".to_string()),
Some(vec!["read_file".to_string()]),
input_tx,
PathBuf::from("."),
"boot_test".to_string(),
);
agent.task_handle = Some(tokio::spawn(async {
tokio::time::sleep(Duration::from_secs(60)).await;
}));
let agent_id = agent.id.clone();
manager.agents.insert(agent_id.clone(), agent);
// Sleep well past the 50ms heartbeat timeout so the agent is reliably stale
// even if the timer fires early under coarse OS timer granularity.
tokio::time::sleep(Duration::from_millis(150)).await;
assert_eq!(manager.running_count(), 0);
assert!(manager.touch(&agent_id));
assert_eq!(manager.running_count(), 1);
assert_eq!(manager.cleanup(Duration::from_secs(60 * 60)), 0);
manager
.agents
.get_mut(&agent_id)
.and_then(|agent| agent.task_handle.take())
.expect("live task handle")
.abort();
}
#[test]
fn test_assign_updates_running_agent_and_sends_message() {
let mut manager = SubAgentManager::new(PathBuf::from("."), 2);
let (input_tx, mut input_rx) = mpsc::unbounded_channel();
let agent = SubAgent::new(
"test_agent_6".to_string(),
SubAgentType::General,
"work".to_string(),
make_assignment(),
"deepseek-v4-flash".to_string(),
Some("Blue".to_string()),
Some(vec!["read_file".to_string()]),
input_tx,
PathBuf::from("."),
"boot_test".to_string(),
);
let agent_id = agent.id.clone();
manager.agents.insert(agent_id.clone(), agent);
let snapshot = manager
.assign(
&agent_id,
Some("Re-check module boundaries".to_string()),
Some("explorer".to_string()),
None,
true,
)
.expect("assignment should succeed");
assert_eq!(snapshot.assignment.objective, "Re-check module boundaries");
assert_eq!(snapshot.assignment.role.as_deref(), Some("explorer"));
let dispatched = input_rx
.try_recv()
.expect("running agent should receive assignment update");
assert!(dispatched.interrupt);
assert!(dispatched.text.contains("Assignment updated"));
assert!(dispatched.text.contains("objective"));
}
#[test]
fn test_assign_rejects_message_for_non_running_agent() {
let mut manager = SubAgentManager::new(PathBuf::from("."), 1);
let (input_tx, _input_rx) = mpsc::unbounded_channel();
let mut agent = SubAgent::new(
"test_agent_7".to_string(),
SubAgentType::Explore,
"prompt".to_string(),
make_assignment(),
"deepseek-v4-flash".to_string(),
Some("Blue".to_string()),
Some(vec!["read_file".to_string()]),
input_tx,
PathBuf::from("."),
"boot_test".to_string(),
);
agent.status = SubAgentStatus::Completed;
let agent_id = agent.id.clone();
manager.agents.insert(agent_id.clone(), agent);
let err = manager
.assign(&agent_id, None, None, Some("keep going".to_string()), true)
.expect_err("non-running agent cannot receive assignment message");
assert!(err.to_string().contains("is not running"));
}
#[test]
fn test_assign_updates_non_running_metadata_without_message() {
let mut manager = SubAgentManager::new(PathBuf::from("."), 1);
let (input_tx, _input_rx) = mpsc::unbounded_channel();
let mut agent = SubAgent::new(
"test_agent_8".to_string(),
SubAgentType::Plan,
"prompt".to_string(),
make_assignment(),
"deepseek-v4-flash".to_string(),
Some("Blue".to_string()),
Some(vec!["read_file".to_string()]),
input_tx,
PathBuf::from("."),
"boot_test".to_string(),
);
agent.status = SubAgentStatus::Completed;
let agent_id = agent.id.clone();
manager.agents.insert(agent_id.clone(), agent);
let snapshot = manager
.assign(
&agent_id,
Some("Draft retry plan".to_string()),
Some("awaiter".to_string()),
None,
true,
)
.expect("metadata update should succeed");
assert_eq!(snapshot.assignment.objective, "Draft retry plan");
assert_eq!(snapshot.assignment.role.as_deref(), Some("awaiter"));
}
#[test]
fn test_persist_and_reload_marks_running_agent_as_interrupted() {
let tmp = tempdir().expect("tempdir");
let workspace = tmp.path().to_path_buf();
let state_path = default_state_path(tmp.path());
let mut manager = SubAgentManager::new(workspace.clone(), 2).with_state_path(state_path);
let (input_tx, _input_rx) = mpsc::unbounded_channel();
let running = SubAgent::new(
"test_agent_9_running".to_string(),
SubAgentType::General,
"work".to_string(),
make_assignment(),
"deepseek-v4-flash".to_string(),
Some("Blue".to_string()),
Some(vec!["read_file".to_string()]),
input_tx,
PathBuf::from("."),
"boot_test".to_string(),
);
let running_id = running.id.clone();
manager.agents.insert(running_id.clone(), running);
manager.persist_state().expect("persist state");
let mut reloaded =
SubAgentManager::new(workspace, 2).with_state_path(default_state_path(tmp.path()));
reloaded.load_state().expect("load state");
let snapshot = reloaded
.get_result(&running_id)
.expect("reloaded agent should exist");
assert!(matches!(
snapshot.status,
SubAgentStatus::Interrupted(ref message)
if message.contains(SUBAGENT_RESTART_REASON)
));
}
#[test]
fn persist_and_reload_preserves_checkpoint_for_interrupted_running_agent() {
let tmp = tempdir().expect("tempdir");
let workspace = tmp.path().to_path_buf();
let state_path = default_state_path(tmp.path());
let mut manager = SubAgentManager::new(workspace.clone(), 2).with_state_path(state_path);
let (input_tx, _input_rx) = mpsc::unbounded_channel();
let mut running = SubAgent::new(
"test_agent_checkpoint_reload".to_string(),
SubAgentType::General,
"work".to_string(),
make_assignment(),
"deepseek-v4-flash".to_string(),
Some("Blue".to_string()),
Some(vec!["read_file".to_string()]),
input_tx,
PathBuf::from("."),
"boot_test".to_string(),
);
running.checkpoint = Some(make_checkpoint(
&running.id,
2,
vec![
text_message("user", "initial task"),
text_message("assistant", "partial progress"),
],
));
let running_id = running.id.clone();
manager.agents.insert(running_id.clone(), running);
manager.persist_state().expect("persist state");
let mut reloaded =
SubAgentManager::new(workspace, 2).with_state_path(default_state_path(tmp.path()));
reloaded.load_state().expect("load state");
let snapshot = reloaded
.get_result(&running_id)
.expect("reloaded agent should exist");
assert!(matches!(snapshot.status, SubAgentStatus::Interrupted(_)));
let checkpoint = snapshot.checkpoint.expect("checkpoint should reload");
assert!(checkpoint.continuable);
assert_eq!(checkpoint.steps_taken, 2);
assert_eq!(checkpoint.messages.len(), 2);
assert_eq!(message_text(&checkpoint.messages[1]), "partial progress");
}
#[test]
fn test_interrupted_status_name_and_summary() {
let snapshot = make_snapshot(SubAgentStatus::Interrupted(
SUBAGENT_RESTART_REASON.to_string(),
));
assert_eq!(subagent_status_name(&snapshot.status), "interrupted");
assert!(summarize_subagent_result(&snapshot).contains(SUBAGENT_RESTART_REASON));
}
// === v0.6.6 — sub-agent authority unification ===
#[test]
fn build_allowed_tools_general_returns_none_for_full_inheritance() {
// Default behavior: General agent with no explicit list inherits the
// parent's full registry (None signals no narrowing).
let result = build_allowed_tools(&SubAgentType::General, None, true).unwrap();
assert!(
result.is_none(),
"General with no explicit_tools should default to full inheritance (None), got {result:?}"
);
}
#[test]
fn build_allowed_tools_explore_returns_none_for_full_inheritance() {
// Per-type allowlists are now advisory — Explore also gets the full
// surface unless an explicit list is passed.
let result = build_allowed_tools(&SubAgentType::Explore, None, true).unwrap();
assert!(
result.is_none(),
"Explore with no explicit_tools should default to full inheritance"
);
}
#[test]
fn build_allowed_tools_custom_requires_explicit_list() {
// Custom is the one type that REQUIRES explicit allowed_tools.
let err = build_allowed_tools(&SubAgentType::Custom, None, true).unwrap_err();
assert!(
err.to_string().contains("Custom sub-agent requires"),
"got: {err}"
);
}
#[test]
fn build_allowed_tools_explicit_list_returned_as_some() {
let explicit = vec!["read_file".to_string(), "list_dir".to_string()];
let result = build_allowed_tools(&SubAgentType::Custom, Some(explicit.clone()), true).unwrap();
assert_eq!(result, Some(explicit));
}
#[test]
fn build_allowed_tools_explicit_list_dedupes_and_trims() {
let explicit = vec![
"read_file".to_string(),
" read_file ".to_string(), // trim + dedupe
"list_dir".to_string(),
"".to_string(), // skip empty
];
let result = build_allowed_tools(&SubAgentType::Custom, Some(explicit), true).unwrap();
assert_eq!(
result,
Some(vec!["read_file".to_string(), "list_dir".to_string()])
);
}
#[test]
fn parse_spawn_request_extracts_cwd_when_present() {
let input = json!({
"prompt": "build feature A",
"cwd": ".worktrees/feature-a"
});
let parsed = parse_spawn_request(&input).expect("spawn request should parse");
assert_eq!(
parsed.cwd.as_ref().map(|p| p.to_string_lossy().to_string()),
Some(".worktrees/feature-a".to_string())
);
}
#[test]
fn parse_spawn_request_cwd_absent_yields_none() {
let input = json!({ "prompt": "no cwd" });
let parsed = parse_spawn_request(&input).expect("spawn request should parse");
assert!(parsed.cwd.is_none());
}
#[test]
fn parse_spawn_request_cwd_empty_string_yields_none() {
let input = json!({ "prompt": "empty cwd", "cwd": " " });
let parsed = parse_spawn_request(&input).expect("spawn request should parse");
assert!(parsed.cwd.is_none(), "whitespace-only cwd should be None");
}
#[test]
fn build_subagent_system_prompt_appends_role_when_set() {
let assignment = SubAgentAssignment::new("p".to_string(), Some("worker".to_string()));
let prompt = build_subagent_system_prompt(&SubAgentType::General, &assignment);
assert!(
prompt.ends_with("You are operating in the role of `worker`."),
"expected role line at end, got: {}",
&prompt[prompt.len().saturating_sub(80)..]
);
}
#[test]
fn build_subagent_system_prompt_skips_role_when_none() {
let assignment = SubAgentAssignment::new("p".to_string(), None);
let prompt = build_subagent_system_prompt(&SubAgentType::General, &assignment);
assert!(!prompt.contains("You are operating in the role of"));
}
#[test]
fn build_subagent_system_prompt_skips_role_when_blank() {
let assignment = SubAgentAssignment::new("p".to_string(), Some(" ".to_string()));
let prompt = build_subagent_system_prompt(&SubAgentType::General, &assignment);
assert!(!prompt.contains("You are operating in the role of"));
}
#[test]
fn subagent_done_sentinel_format_is_well_formed() {
let res = make_snapshot(SubAgentStatus::Completed);
let sentinel = subagent_done_sentinel("agent_xyz", &res);
assert!(sentinel.starts_with("<codewhale:subagent.done>"));
assert!(sentinel.ends_with("</codewhale:subagent.done>"));
// The inner JSON parses and carries the expected fields.
let inner = sentinel
.trim_start_matches("<codewhale:subagent.done>")
.trim_end_matches("</codewhale:subagent.done>");
let parsed: serde_json::Value = serde_json::from_str(inner).expect("inner JSON parses");
assert_eq!(parsed["agent_id"], "agent_xyz");
assert_eq!(parsed["status"], "completed");
assert_eq!(parsed["agent_type"], "general");
assert_eq!(parsed["summary_location"], "previous_line");
assert_eq!(parsed["details"], "agent_eval");
// Self-describing completion fields (#2658): a short result is complete, so
// the parent should trust the previous-line summary.
assert_eq!(parsed["result_clipped"], false);
assert_eq!(parsed["summary_complete"], true);
assert_eq!(parsed["next_action"], "use_summary");
assert!(parsed.get("summary").is_none());
assert!(parsed.get("duration_ms").is_none());
assert!(parsed.get("steps").is_none());
}
#[test]
fn subagent_done_sentinel_flags_clipped_result() {
let mut res = make_snapshot(SubAgentStatus::Completed);
res.result = Some("x".repeat(SUBAGENT_SUMMARY_PREVIEW_MAX + 1));
let sentinel = subagent_done_sentinel("agent_big", &res);
let inner = sentinel
.trim_start_matches("<codewhale:subagent.done>")
.trim_end_matches("</codewhale:subagent.done>");
let parsed: serde_json::Value = serde_json::from_str(inner).expect("inner JSON parses");
assert_eq!(parsed["result_clipped"], true);
assert_eq!(parsed["summary_complete"], false);
assert_eq!(parsed["next_action"], "call_agent_eval");
}
#[test]
fn subagent_failed_sentinel_format_is_well_formed() {
let sentinel = subagent_failed_sentinel("agent_zzz", "boom");
let inner = sentinel
.trim_start_matches("<codewhale:subagent.done>")
.trim_end_matches("</codewhale:subagent.done>");
let parsed: serde_json::Value = serde_json::from_str(inner).expect("inner JSON parses");
assert_eq!(parsed["agent_id"], "agent_zzz");
assert_eq!(parsed["status"], "failed");
assert_eq!(parsed["error_location"], "previous_line");
assert_eq!(parsed["details"], "agent_eval");
assert_eq!(parsed["next_action"], "call_agent_eval");
// Stays lean — the error text lives on the previous line, not the sentinel.
assert!(parsed.get("error").is_none());
}
#[test]
fn annotate_child_model_error_adds_actionable_hint() {
// #2653: a bare provider 403 becomes actionable by naming the model and the
// recovery path, while unrelated errors pass through unchanged.
let auth = annotate_child_model_error("403 Forbidden", "kimi-k2");
assert!(auth.contains("kimi-k2"), "names the model: {auth}");
assert!(
auth.contains("agent_open"),
"names the recovery path: {auth}"
);
assert!(
auth.contains("403 Forbidden"),
"preserves the original: {auth}"
);
let unrelated = annotate_child_model_error("connection reset by peer", "kimi-k2");
assert_eq!(unrelated, "connection reset by peer");
// #3020: provider rejections that classify as Internal (not
// Authorization/State) still get the hint via raw-text matching.
let not_exist = annotate_child_model_error("Model Not Exist", "kimi-k2");
assert!(
not_exist.contains("retry agent_open"),
"DeepSeek-style rejection gets the hint: {not_exist}"
);
let openai_style = annotate_child_model_error(
"The model `gpt-5.5-nano` does not exist or you do not have access to it.",
"gpt-5.5-nano",
);
assert!(
openai_style.contains("retry agent_open"),
"OpenAI-style rejection gets the hint: {openai_style}"
);
}
#[test]
fn subagent_runtime_default_max_depth_is_three() {
// Sanity-check the constant — bumping it without a test means stale docs.
assert_eq!(DEFAULT_MAX_SPAWN_DEPTH, 3);
}
#[test]
fn would_exceed_depth_at_boundary() {
// depth=2, max=3 → next spawn (depth 3) is allowed (allow-equal).
// depth=3, max=3 → next spawn (depth 4) exceeds.
let runtime = stub_runtime();
let mut at_max = runtime.clone();
at_max.spawn_depth = 3;
at_max.max_spawn_depth = 3;
assert!(
at_max.would_exceed_depth(),
"depth 3 + max 3 → next would be 4, exceeds"
);
let mut below_max = runtime;
below_max.spawn_depth = 2;
below_max.max_spawn_depth = 3;
assert!(
!below_max.would_exceed_depth(),
"depth 2 + max 3 → next is 3, allowed"
);
}
#[test]
fn child_runtime_increments_depth_and_preserves_auto_approve() {
let mut parent = stub_runtime();
parent.spawn_depth = 1;
parent.context.auto_approve = false; // parent in suggest mode
let child = parent.child_runtime();
assert_eq!(child.spawn_depth, 2, "child depth = parent + 1");
assert_eq!(child.step_api_timeout, DEFAULT_STEP_API_TIMEOUT);
assert!(
!child.context.auto_approve,
"child must inherit parent approval state"
);
assert!(!parent.context.auto_approve);
parent.context.auto_approve = true;
let auto_child = parent.child_runtime();
assert!(
auto_child.context.auto_approve,
"auto-approved parents should still create auto-approved children"
);
}
#[test]
fn child_and_background_runtimes_preserve_step_api_timeout() {
let timeout = Duration::from_secs(7);
let parent = stub_runtime().with_step_api_timeout(timeout);
let child = parent.child_runtime();
assert_eq!(child.step_api_timeout, timeout);
let background = parent.background_runtime();
assert_eq!(background.step_api_timeout, timeout);
}
#[tokio::test]
async fn subagent_registry_blocks_approval_tools_without_parent_auto_approve() {
let mut runtime = stub_runtime();
runtime.context.auto_approve = false;
let registry = SubAgentToolRegistry::new(
runtime,
SubAgentType::General,
Some(vec!["exec_shell".to_string()]),
Arc::new(Mutex::new(TodoList::new())),
Arc::new(Mutex::new(PlanState::default())),
);
let err = registry
.execute("agent_test", "exec_shell", json!({"command": "echo hi"}))
.await
.expect_err("approval-gated child tool should be blocked");
assert!(
err.to_string().contains("requires approval"),
"unexpected error: {err}"
);
}
#[tokio::test]
async fn implementer_delegation_allows_suggest_write_without_parent_auto_approve() {
// Issue #1828: implementer agents could not write files even when their
// whole job is to land code changes, because the registry blocked every
// approval-gated tool when the parent ran in `suggest` mode. The
// hardened gate (#1833) delegates `Suggest`-level tools (write_file,
// edit_file, apply_patch) to write-capable roles.
let tmp = tempdir().expect("tempdir");
let workspace = tmp.path().to_path_buf();
let mut runtime = stub_runtime();
runtime.context = ToolContext::new(workspace.clone());
runtime.context.auto_approve = false;
let registry = SubAgentToolRegistry::new(
runtime,
SubAgentType::Implementer,
None,
Arc::new(Mutex::new(TodoList::new())),
Arc::new(Mutex::new(PlanState::default())),
);
let result = registry
.execute(
"agent_test",
"write_file",
json!({"path": "delegated.txt", "content": "hello"}),
)
.await
.expect("delegated write should be allowed for implementer");
let written = std::fs::read_to_string(workspace.join("delegated.txt"))
.expect("file should exist after delegated write");
assert_eq!(written, "hello");
assert!(
!result.contains("requires approval"),
"successful write should not look like an approval error: {result}"
);
}
#[tokio::test]
async fn general_delegation_still_blocks_suggest_write_without_parent_auto_approve() {
let tmp = tempdir().expect("tempdir");
let workspace = tmp.path().to_path_buf();
let mut runtime = stub_runtime();
runtime.context = ToolContext::new(workspace.clone());
runtime.context.auto_approve = false;
let registry = SubAgentToolRegistry::new(
runtime,
SubAgentType::General,
None,
Arc::new(Mutex::new(TodoList::new())),
Arc::new(Mutex::new(PlanState::default())),
);
let err = registry
.execute(
"agent_test",
"write_file",
json!({"path": "general.txt", "content": "ok"}),
)
.await
.expect_err("general agent should not silently gain write permission");
let msg = err.to_string();
assert!(
msg.contains("not delegated to general sub-agents"),
"general writes should be rejected with a role-aware message: {msg}"
);
assert!(
!workspace.join("general.txt").exists(),
"general write must not land without parent auto-approve"
);
}
#[tokio::test]
async fn explore_role_still_blocks_suggest_writes_without_parent_auto_approve() {
// Read-only stances (explore, plan, review, verifier) must not gain
// write capabilities via delegation — otherwise a parent that asked
// for "just look at the code" could find files mutated behind its back.
let tmp = tempdir().expect("tempdir");
let mut runtime = stub_runtime();
runtime.context = ToolContext::new(tmp.path().to_path_buf());
runtime.context.auto_approve = false;
let registry = SubAgentToolRegistry::new(
runtime,
SubAgentType::Explore,
None,
Arc::new(Mutex::new(TodoList::new())),
Arc::new(Mutex::new(PlanState::default())),
);
let err = registry
.execute(
"agent_test",
"write_file",
json!({"path": "should_not_appear.txt", "content": "denied"}),
)
.await
.expect_err("explore agents must not write");
let msg = err.to_string();
assert!(
msg.contains("not delegated to explore sub-agents"),
"explore writes should be rejected with a role-aware message: {msg}"
);
assert!(
!tmp.path().join("should_not_appear.txt").exists(),
"file must not have been written"
);
}
#[tokio::test]
async fn delegated_write_role_still_blocks_required_tools() {
// Required-level tools (exec_shell, etc.) remain gated behind parent
// auto-approve regardless of role. Implementer can write files, but it
// still can't bypass shell approval just because it's a "write" role.
let tmp = tempdir().expect("tempdir");
let mut runtime = stub_runtime();
runtime.context = ToolContext::new(tmp.path().to_path_buf());
runtime.context.auto_approve = false;
let registry = SubAgentToolRegistry::new(
runtime,
SubAgentType::Implementer,
Some(vec!["exec_shell".to_string()]),
Arc::new(Mutex::new(TodoList::new())),
Arc::new(Mutex::new(PlanState::default())),
);
let err = registry
.execute("agent_test", "exec_shell", json!({"command": "echo hi"}))
.await
.expect_err("Required-level shell must still need parent auto-approve");
assert!(
err.to_string().contains(
"cannot run inside this sub-agent unless the parent session is auto-approved"
),
"expected Required-level approval message, got: {err}"
);
}
#[tokio::test]
async fn auto_approved_parent_runs_required_tools_in_subagent() {
// Baseline: when the parent runtime IS auto-approved, every approval
// class is permitted (same as before the delegation hardening).
let tmp = tempdir().expect("tempdir");
let mut runtime = stub_runtime();
runtime.context = ToolContext::new(tmp.path().to_path_buf());
runtime.context.auto_approve = true;
let registry = SubAgentToolRegistry::new(
runtime,
SubAgentType::General,
None,
Arc::new(Mutex::new(TodoList::new())),
Arc::new(Mutex::new(PlanState::default())),
);
// Calling exec_shell with interactive=true is what we block via the
// separate terminal-takeover guard; pick the simpler write-file path
// to assert that approval gating is off when auto_approve is set.
registry
.execute(
"agent_test",
"write_file",
json!({"path": "auto.txt", "content": "auto"}),
)
.await
.expect("auto-approved parent should allow writes");
}
#[test]
fn subagent_request_budget_allows_large_write_file_arguments() {
assert_eq!(
SUBAGENT_RESPONSE_MAX_TOKENS, 16_384,
"non-streaming sub-agent tool calls need enough output budget for large write_file arguments"
);
}
#[test]
fn truncated_subagent_tool_calls_return_model_visible_errors() {
let tool_uses = vec![(
"toolu_write".to_string(),
"write_file".to_string(),
json!({"path": "report.md", "content": "partial"}),
)];
let results = truncated_response_tool_results(&tool_uses);
assert_eq!(results.len(), 1);
match &results[0] {
ContentBlock::ToolResult {
tool_use_id,
content,
is_error,
..
} => {
assert_eq!(tool_use_id, "toolu_write");
assert_eq!(is_error, &Some(true));
assert!(content.contains("truncated by max_tokens"));
assert!(content.contains("write_file"));
assert!(content.contains("smaller writes"));
}
other => panic!("expected tool error result, got {other:?}"),
}
}
#[test]
fn truncated_subagent_text_response_returns_model_visible_error() {
let results = truncated_response_text_retry_message();
assert_eq!(results.len(), 1);
match &results[0] {
ContentBlock::Text { text, .. } => {
assert!(text.contains("truncated by max_tokens"));
assert!(text.contains("No complete tool call was available"));
assert!(text.contains("Retry with a shorter response"));
}
other => panic!("expected text retry message, got {other:?}"),
}
}
#[test]
fn consecutive_truncated_subagent_responses_are_capped() {
let mut consecutive = 0;
for _ in 0..MAX_CONSECUTIVE_TRUNCATED_SUBAGENT_RESPONSES {
record_truncated_subagent_response(&mut consecutive).expect("within truncation cap");
}
let err = record_truncated_subagent_response(&mut consecutive)
.expect_err("one more truncation should stop the sub-agent");
assert!(err.to_string().contains("truncated by max_tokens"));
assert!(err.to_string().contains("consecutive"));
reset_truncated_subagent_responses(&mut consecutive);
record_truncated_subagent_response(&mut consecutive).expect("reset should allow recovery");
assert_eq!(consecutive, 1);
}
#[test]
fn child_cancellation_cascades_from_parent() {
let parent = stub_runtime();
let child = parent.child_runtime();
assert!(!child.cancel_token.is_cancelled());
parent.cancel_token.cancel();
assert!(
child.cancel_token.is_cancelled(),
"parent cancel() must propagate to child via child_token()"
);
}
#[test]
fn mailbox_propagates_through_child_runtime_chain() {
use crate::tools::subagent::mailbox::Mailbox;
let parent_token = CancellationToken::new();
let (mailbox, _rx) = Mailbox::new(parent_token.clone());
let mut parent = stub_runtime();
parent.cancel_token = parent_token;
parent.mailbox = Some(mailbox);
let child = parent.child_runtime();
let grandchild = child.child_runtime();
assert!(parent.mailbox.is_some());
assert!(child.mailbox.is_some(), "child inherits parent mailbox");
assert!(
grandchild.mailbox.is_some(),
"grandchild inherits via the cloned Arc inside Mailbox"
);
}
#[test]
fn subagent_rejects_interactive_shell_terminal_takeover() {
let err = reject_subagent_terminal_takeover(
"exec_shell",
&serde_json::json!({
"command": "python3 -i",
"interactive": true
}),
)
.expect_err("sub-agents must not inherit the parent terminal");
let msg = err.to_string();
assert!(msg.contains("cannot use exec_shell with interactive=true"));
assert!(msg.contains("parent TUI terminal"));
reject_subagent_terminal_takeover(
"exec_shell",
&serde_json::json!({
"command": "cargo check",
"interactive": false
}),
)
.expect("non-interactive shell remains allowed");
reject_subagent_terminal_takeover(
"exec_shell",
&serde_json::json!({
"command": "cargo test",
"background": true
}),
)
.expect("background shell remains allowed");
}
#[tokio::test]
async fn mailbox_close_as_cancel_propagates_to_grandchild_runtime() {
use crate::tools::subagent::mailbox::Mailbox;
let parent_token = CancellationToken::new();
let (mailbox, _rx) = Mailbox::new(parent_token.clone());
let mut parent = stub_runtime();
parent.cancel_token = parent_token;
parent.mailbox = Some(mailbox.clone());
let child = parent.child_runtime();
let grandchild = child.child_runtime();
assert!(!grandchild.cancel_token.is_cancelled());
// Close the mailbox via *any* clone — the original or the one stored on
// the runtime. Cancellation must reach all the way to the grandchild.
mailbox.close();
assert!(parent.cancel_token.is_cancelled());
assert!(child.cancel_token.is_cancelled());
assert!(
grandchild.cancel_token.is_cancelled(),
"close-as-cancel must propagate across max_spawn_depth=3"
);
}
#[tokio::test]
async fn mailbox_orders_messages_from_parent_and_child_runtimes() {
use crate::tools::subagent::mailbox::{Mailbox, MailboxMessage};
let parent_token = CancellationToken::new();
let (mailbox, mut rx) = Mailbox::new(parent_token.clone());
let mut parent = stub_runtime();
parent.cancel_token = parent_token;
parent.mailbox = Some(mailbox);
let child = parent.child_runtime();
// Interleave sends from both runtimes; sequence numbers stay monotonic.
parent
.mailbox
.as_ref()
.unwrap()
.send(MailboxMessage::progress("parent_a", "step 1"));
child
.mailbox
.as_ref()
.unwrap()
.send(MailboxMessage::progress("child_b", "step 1"));
parent
.mailbox
.as_ref()
.unwrap()
.send(MailboxMessage::progress("parent_a", "step 2"));
let drained = rx.drain();
assert_eq!(drained.len(), 3);
assert_eq!(drained[0].seq, 1);
assert_eq!(drained[1].seq, 2);
assert_eq!(drained[2].seq, 3);
// Verify ordering is preserved across publishers.
match (
&drained[0].message,
&drained[1].message,
&drained[2].message,
) {
(
MailboxMessage::Progress { agent_id: a, .. },
MailboxMessage::Progress { agent_id: b, .. },
MailboxMessage::Progress { agent_id: c, .. },
) => {
assert_eq!(a, "parent_a");
assert_eq!(b, "child_b");
assert_eq!(c, "parent_a");
}
other => panic!("unexpected message order: {other:?}"),
}
}
#[test]
fn persisted_empty_allowed_tools_loads_as_full_inheritance() {
// Backward-compat: a v0.6.5 session that persisted with an empty Vec
// (or a v0.6.6 session with no narrowing) should load as None on
// restart, meaning full inheritance.
let dir = tempdir().unwrap();
let state_path = dir.path().join("subagents.v1.json");
let payload = serde_json::json!({
"schema_version": SUBAGENT_STATE_SCHEMA_VERSION,
"agents": [{
"id": "agent_test",
"agent_type": "general",
"prompt": "p",
"assignment": { "objective": "p" },
"status": "Completed",
"result": null,
"steps_taken": 0,
"duration_ms": 0,
"allowed_tools": [],
"updated_at_ms": 0
}]
});
std::fs::write(&state_path, payload.to_string()).unwrap();
let mut manager = SubAgentManager::new(dir.path().to_path_buf(), 5).with_state_path(state_path);
manager.load_state().expect("load should succeed");
let agent = manager.agents.get("agent_test").expect("loaded agent");
assert!(
agent.allowed_tools.is_none(),
"empty Vec on disk → None (full inheritance)"
);
}
#[test]
fn persisted_non_empty_allowed_tools_loads_as_narrow() {
// Backward-compat the other way: a v0.6.5 session that persisted with
// an explicit narrow list keeps that list on reload.
let dir = tempdir().unwrap();
let state_path = dir.path().join("subagents.v1.json");
let payload = serde_json::json!({
"schema_version": SUBAGENT_STATE_SCHEMA_VERSION,
"agents": [{
"id": "agent_narrow",
"agent_type": "custom",
"prompt": "p",
"assignment": { "objective": "p" },
"status": "Completed",
"result": null,
"steps_taken": 0,
"duration_ms": 0,
"allowed_tools": ["read_file", "list_dir"],
"updated_at_ms": 0
}]
});
std::fs::write(&state_path, payload.to_string()).unwrap();
let mut manager = SubAgentManager::new(dir.path().to_path_buf(), 5).with_state_path(state_path);
manager.load_state().expect("load should succeed");
let agent = manager.agents.get("agent_narrow").expect("loaded agent");
assert_eq!(
agent.allowed_tools.as_deref(),
Some(&["read_file".to_string(), "list_dir".to_string()][..]),
"non-empty Vec → Some(list), narrow scope preserved"
);
}
/// Build a minimal `SubAgentRuntime` for tests that exercise pure runtime
/// helpers (depth, cancellation, child_runtime). Doesn't construct a real
/// HTTP client — calls that hit `runtime.client` would fail, but the
/// helpers we test here don't.
fn stub_runtime() -> SubAgentRuntime {
use tokio_util::sync::CancellationToken;
let workspace = std::env::temp_dir().join("codewhale-test-stub");
let context = ToolContext::new(workspace.clone());
SubAgentRuntime {
client: stub_client(),
model: "deepseek-v4-flash".to_string(),
auto_model: false,
reasoning_effort: None,
reasoning_effort_auto: false,
role_models: std::collections::HashMap::new(),
context,
allow_shell: true,
event_tx: None,
manager: new_shared_subagent_manager(workspace, 5),
spawn_depth: 0,
max_spawn_depth: DEFAULT_MAX_SPAWN_DEPTH,
cancel_token: CancellationToken::new(),
mailbox: None,
parent_completion_tx: None,
fork_context: None,
mcp_pool: None,
step_api_timeout: DEFAULT_STEP_API_TIMEOUT,
speech_output_dir: None,
}
}
/// A minimal stub client. Test helpers below only ever check struct fields
/// (depth, cancel_token, context); they don't call the network. We need a
/// *some* `DeepSeekClient` because `SubAgentRuntime.client` isn't
/// `Option<...>`. `Config::default()` is enough — `DeepSeekClient::new`
/// only validates that an API key field exists, not that the key works.
fn stub_runtime_for_provider(provider: &str) -> SubAgentRuntime {
let mut runtime = stub_runtime();
runtime.client = stub_client_for_provider(provider);
runtime
}
fn stub_client_for_provider(provider: &str) -> DeepSeekClient {
let _ = rustls::crypto::ring::default_provider().install_default();
let mut providers = crate::config::ProvidersConfig::default();
match provider {
"moonshot" => {
providers.moonshot = crate::config::ProviderConfig {
api_key: Some("test-key".to_string()),
..Default::default()
};
}
// Ollama is keyless (local runtime); extend per-provider as needed.
"ollama" => {}
other => panic!("extend stub_client_for_provider for provider {other}"),
}
let config = crate::config::Config {
api_key: Some("test-key".to_string()),
provider: Some(provider.to_string()),
providers: Some(providers),
..crate::config::Config::default()
};
DeepSeekClient::new(&config).expect("stub client should construct")
}
fn stub_client() -> DeepSeekClient {
let _ = rustls::crypto::ring::default_provider().install_default();
let config = crate::config::Config {
api_key: Some("test-key".to_string()),
..crate::config::Config::default()
};
DeepSeekClient::new(&config).expect("stub client should construct")
}
// ---- #405 session-boundary classification ----
//
// Each manager assigns a fresh session_boot_id; agents stamp the id at
// spawn time. After persist + reload by a *new* manager, those agents
// carry the prior boot id and are classified as `from_prior_session`.
// `agent_list` defaults to current-session only; `include_archived=true`
// surfaces the prior-session records with the flag set.
fn insert_prior_session_agent(
manager: &mut SubAgentManager,
id: &str,
status: SubAgentStatus,
boot_id: &str,
) {
let (input_tx, _input_rx) = mpsc::unbounded_channel();
let mut agent = SubAgent::new(
id.to_string(),
SubAgentType::General,
"old prompt".to_string(),
make_assignment(),
"deepseek-v4-flash".to_string(),
None,
None,
input_tx,
manager.workspace.clone(),
boot_id.to_string(),
);
agent.status = status;
agent.id = id.to_string();
manager.agents.insert(id.to_string(), agent);
}
#[test]
fn session_boot_ids_are_unique_per_manager() {
let a = SubAgentManager::new(PathBuf::from("."), 1);
let b = SubAgentManager::new(PathBuf::from("."), 1);
assert_ne!(a.session_boot_id(), b.session_boot_id());
}
#[test]
fn list_filtered_drops_prior_session_terminals_by_default() {
let mut manager = SubAgentManager::new(PathBuf::from("."), 5);
let current_boot = manager.session_boot_id().to_string();
insert_prior_session_agent(
&mut manager,
"current_running",
SubAgentStatus::Running,
&current_boot,
);
insert_prior_session_agent(
&mut manager,
"prior_completed",
SubAgentStatus::Completed,
"boot_old_session",
);
insert_prior_session_agent(
&mut manager,
"prior_running",
SubAgentStatus::Running,
"boot_old_session",
);
let listed = manager.list_filtered(false);
let ids: Vec<&str> = listed.iter().map(|s| s.agent_id.as_str()).collect();
assert!(ids.contains(&"current_running"), "{ids:?}");
assert!(
ids.contains(&"prior_running"),
"still-running prior-session agents stay visible: {ids:?}"
);
assert!(
!ids.contains(&"prior_completed"),
"completed prior-session agents are hidden by default: {ids:?}"
);
let prior = listed
.iter()
.find(|s| s.agent_id == "prior_running")
.unwrap();
assert!(prior.from_prior_session);
let current = listed
.iter()
.find(|s| s.agent_id == "current_running")
.unwrap();
assert!(!current.from_prior_session);
}
#[test]
fn list_snapshots_refresh_git_branch_from_agent_workspace() {
let repo = init_subagent_git_repo();
git(repo.path(), &["checkout", "-b", "feature/agent-old"]);
let mut manager = SubAgentManager::new(repo.path().to_path_buf(), 5);
let current_boot = manager.session_boot_id().to_string();
insert_prior_session_agent(
&mut manager,
"current_running",
SubAgentStatus::Running,
&current_boot,
);
let listed = manager.list_filtered(false);
let agent = listed
.iter()
.find(|agent| agent.agent_id == "current_running")
.expect("current agent should be listed");
assert_eq!(agent.git_branch.as_deref(), Some("feature/agent-old"));
assert_eq!(agent.workspace.as_deref(), Some(repo.path()));
git(repo.path(), &["checkout", "-b", "feature/agent-new"]);
let refreshed = manager.list_filtered(false);
let agent = refreshed
.iter()
.find(|agent| agent.agent_id == "current_running")
.expect("current agent should still be listed");
assert_eq!(agent.git_branch.as_deref(), Some("feature/agent-new"));
}
#[test]
fn list_filtered_with_include_archived_returns_everything() {
let mut manager = SubAgentManager::new(PathBuf::from("."), 5);
let current_boot = manager.session_boot_id().to_string();
insert_prior_session_agent(
&mut manager,
"current_done",
SubAgentStatus::Completed,
&current_boot,
);
insert_prior_session_agent(
&mut manager,
"prior_done",
SubAgentStatus::Completed,
"boot_old",
);
insert_prior_session_agent(
&mut manager,
"prior_failed",
SubAgentStatus::Failed("boom".to_string()),
"boot_old",
);
let listed = manager.list_filtered(true);
assert_eq!(listed.len(), 3, "{listed:?}");
let prior = listed.iter().find(|s| s.agent_id == "prior_done").unwrap();
assert!(prior.from_prior_session);
let current = listed
.iter()
.find(|s| s.agent_id == "current_done")
.unwrap();
assert!(!current.from_prior_session);
}
#[test]
fn agents_with_empty_boot_id_classify_as_prior_session() {
// Records persisted before #405 land with an empty `session_boot_id`
// due to `#[serde(default)]`. The manager treats those the same as
// a non-matching id — i.e. prior session.
let mut manager = SubAgentManager::new(PathBuf::from("."), 5);
insert_prior_session_agent(&mut manager, "legacy", SubAgentStatus::Completed, "");
let listed_default = manager.list_filtered(false);
assert!(
listed_default.iter().all(|s| s.agent_id != "legacy"),
"legacy completed agents are hidden by default"
);
let listed_archived = manager.list_filtered(true);
let legacy = listed_archived
.iter()
.find(|s| s.agent_id == "legacy")
.unwrap();
assert!(legacy.from_prior_session);
}
#[test]
fn persist_round_trip_preserves_session_boot_id() {
let dir = tempdir().expect("tempdir");
let state_path = dir.path().join(SUBAGENT_STATE_FILE);
let original_boot;
{
let mut writer =
SubAgentManager::new(dir.path().to_path_buf(), 2).with_state_path(state_path.clone());
original_boot = writer.session_boot_id().to_string();
insert_prior_session_agent(
&mut writer,
"agent_persist",
SubAgentStatus::Completed,
&original_boot,
);
writer
.persist_state()
.expect("persist round-trip should write");
}
// A fresh manager comes up with a *different* boot id and reloads
// the persisted state; the agent should now be classified prior.
let mut reader =
SubAgentManager::new(dir.path().to_path_buf(), 2).with_state_path(state_path.clone());
reader.load_state().expect("reload should succeed");
assert_ne!(reader.session_boot_id(), original_boot);
let listed_default = reader.list_filtered(false);
assert!(
!listed_default.iter().any(|s| s.agent_id == "agent_persist"),
"completed prior-session agent hidden after reload: {listed_default:?}"
);
let listed_all = reader.list_filtered(true);
let snap = listed_all
.iter()
.find(|s| s.agent_id == "agent_persist")
.unwrap();
assert!(snap.from_prior_session);
}
// === Issue #756: parent-completion wakeup ===
//
// When a direct child of the engine finishes, `run_subagent_task` emits
// a `SubAgentCompletion` on the runtime's `parent_completion_tx`. The
// engine's turn loop drains that channel before deciding to end the turn.
// These tests cover the gating logic in `emit_parent_completion` so the
// parent isn't flooded with grandchild completions and so the function
// is safe when no channel is wired.
fn runtime_with_depth(
spawn_depth: u32,
parent_completion_tx: Option<mpsc::UnboundedSender<SubAgentCompletion>>,
) -> SubAgentRuntime {
let mut rt = stub_runtime();
rt.spawn_depth = spawn_depth;
rt.parent_completion_tx = parent_completion_tx;
rt
}
#[test]
fn emit_parent_completion_fires_for_direct_child() {
let (tx, mut rx) = mpsc::unbounded_channel::<SubAgentCompletion>();
let runtime = runtime_with_depth(1, Some(tx));
let sent = emit_parent_completion(&runtime, "agent_abc", "summary line\n<sentinel/>");
assert!(sent, "depth=1 with channel wired should send");
let received = rx.try_recv().expect("channel should have one message");
assert_eq!(received.agent_id, "agent_abc");
assert_eq!(received.payload, "summary line\n<sentinel/>");
assert!(rx.try_recv().is_err(), "should be exactly one message");
}
#[test]
fn child_runtime_inherits_speech_output_dir() {
let output_dir = PathBuf::from("configured-speech-output");
let runtime = stub_runtime().with_speech_output_dir(Some(output_dir.clone()));
let child = runtime.child_runtime();
assert_eq!(child.speech_output_dir, Some(output_dir));
}
#[test]
fn emit_parent_completion_skips_grandchildren() {
let (tx, mut rx) = mpsc::unbounded_channel::<SubAgentCompletion>();
let runtime = runtime_with_depth(2, Some(tx));
let sent = emit_parent_completion(&runtime, "agent_grandchild", "ignored");
assert!(
!sent,
"depth=2 grandchild must not fire on the parent channel"
);
assert!(
rx.try_recv().is_err(),
"channel should remain empty for grandchildren"
);
}
#[test]
fn emit_parent_completion_skips_engine_self() {
// depth 0 is the engine itself — the engine never spawns a task at
// depth 0, but defend against accidental misuse.
let (tx, mut rx) = mpsc::unbounded_channel::<SubAgentCompletion>();
let runtime = runtime_with_depth(0, Some(tx));
let sent = emit_parent_completion(&runtime, "agent_root", "ignored");
assert!(
!sent,
"depth=0 must not fire (only depth=1 direct children)"
);
assert!(rx.try_recv().is_err());
}
#[test]
fn emit_parent_completion_no_channel_is_noop() {
let runtime = runtime_with_depth(1, None);
let sent = emit_parent_completion(&runtime, "agent_no_chan", "anything");
assert!(
!sent,
"missing channel should be a silent no-op, not a panic"
);
}
#[test]
fn emit_parent_completion_dropped_receiver_does_not_panic() {
let (tx, rx) = mpsc::unbounded_channel::<SubAgentCompletion>();
drop(rx);
let runtime = runtime_with_depth(1, Some(tx));
// The send returns an error internally but we discard it — the
// caller's run_subagent_task does not care whether the engine is
// still listening (it might be shutting down).
let sent = emit_parent_completion(&runtime, "agent_orphan", "after-rx-drop");
assert!(
sent,
"we still attempt the send; the engine being gone is not our problem"
);
}
#[tokio::test]
async fn run_subagent_task_emits_parent_completion_before_terminal_update() {
let manager = Arc::new(RwLock::new(SubAgentManager::new(PathBuf::from("."), 2)));
let (task_input_tx, task_input_rx) = mpsc::unbounded_channel();
let agent_id = "agent_noop".to_string();
let mut agent = SubAgent::new(
agent_id.clone(),
SubAgentType::General,
"noop".to_string(),
make_assignment(),
"deepseek-v4-flash".to_string(),
None,
None,
task_input_tx,
PathBuf::from("."),
"boot_test".to_string(),
);
agent.status = SubAgentStatus::Running;
manager.write().await.agents.insert(agent_id.clone(), agent);
let (completion_tx, mut completion_rx) = mpsc::unbounded_channel::<SubAgentCompletion>();
let mut runtime = runtime_with_depth(1, Some(completion_tx));
runtime.manager = Arc::clone(&manager);
let task = SubAgentTask {
manager_handle: manager.clone(),
runtime,
agent_id: agent_id.clone(),
agent_type: SubAgentType::General,
prompt: "no-op child run".to_string(),
assignment: make_assignment(),
allowed_tools: None,
fork_context: false,
started_at: Instant::now(),
max_steps: 0,
input_rx: task_input_rx,
launch_gate: None,
};
let manager_lock = manager.write().await;
let task_handle = tokio::spawn(run_subagent_task(task));
// While the manager write lock is held, completion can be emitted only if it
// is sent before the terminal-state manager update (the ordering fixed by
// issue #1961).
let completion = tokio::time::timeout(Duration::from_secs(1), completion_rx.recv())
.await
.expect("completion should be emitted while manager write lock is still held");
let completion = completion.expect("completion channel should remain open");
assert_eq!(completion.agent_id, agent_id);
drop(manager_lock);
task_handle
.await
.expect("run_subagent_task should complete after lock release");
let snapshot = {
let manager = manager.read().await;
manager
.get_result(&agent_id)
.expect("completed agent should be present")
};
assert_eq!(snapshot.status, SubAgentStatus::Completed);
}
#[test]
fn child_runtime_propagates_completion_tx_for_gating() {
// The channel is cloned through `child_runtime()` so descendants carry
// it. The gate at the send site (`spawn_depth == 1`) is what limits
// who actually fires — `child_runtime` simply must not strand it.
let (tx, _rx) = mpsc::unbounded_channel::<SubAgentCompletion>();
let parent = runtime_with_depth(0, Some(tx));
let child = parent.child_runtime();
assert_eq!(child.spawn_depth, 1, "child increments depth");
assert!(
child.parent_completion_tx.is_some(),
"child carries the wakeup channel forward"
);
}
#[test]
fn subagent_runtime_default_step_api_timeout_is_legacy_120s() {
// The legacy hardcoded constant is now the default field value so existing
// call sites and tests that construct a runtime without explicit timeout
// wiring keep their old behavior (#1806, #1808).
let runtime = stub_runtime();
assert_eq!(runtime.step_api_timeout, DEFAULT_STEP_API_TIMEOUT);
assert_eq!(
DEFAULT_STEP_API_TIMEOUT,
std::time::Duration::from_secs(crate::config::DEFAULT_SUBAGENT_API_TIMEOUT_SECS)
);
}
#[test]
fn with_step_api_timeout_overrides_runtime_field() {
let runtime = stub_runtime().with_step_api_timeout(std::time::Duration::from_secs(900));
assert_eq!(runtime.step_api_timeout.as_secs(), 900);
}
#[test]
fn child_runtime_preserves_step_api_timeout() {
// Real sub-agents spawn through `child_runtime()` / `background_runtime()`;
// forgetting to clone the timeout would silently drop the user's config
// override and resurrect the 120 s default for every child step.
let parent = stub_runtime().with_step_api_timeout(std::time::Duration::from_secs(900));
let child = parent.child_runtime();
let background = parent.background_runtime();
assert_eq!(
child.step_api_timeout.as_secs(),
900,
"child_runtime must preserve parent's per-step timeout"
);
assert_eq!(
background.step_api_timeout.as_secs(),
900,
"background_runtime (detached) must also preserve the parent's timeout"
);
}
#[test]
fn subagent_completion_payload_carries_existing_sentinel_format() {
// The payload format is the same one already documented in
// prompts/base.md: human summary on line 1, `<codewhale:subagent.done>`
// sentinel on line 2. This test pins the format so future refactors
// don't silently break the model's parsing contract.
let mut snap = make_snapshot(SubAgentStatus::Completed);
snap.result = Some("Found three errors.".to_string());
let summary = summarize_subagent_result(&snap);
let sentinel = subagent_done_sentinel("agent_test", &snap);
let payload = format!("{summary}\n{sentinel}");
let mut lines = payload.lines();
let first = lines.next().expect("first line is summary");
let second = lines.next().expect("second line is sentinel");
assert!(
!first.starts_with("<codewhale:subagent.done>"),
"summary should not be the sentinel itself"
);
assert!(
second.starts_with("<codewhale:subagent.done>"),
"second line is the sentinel"
);
assert!(second.ends_with("</codewhale:subagent.done>"));
assert!(
second.contains("\"agent_id\":\"agent_test\""),
"sentinel JSON includes agent_id"
);
assert!(
!second.contains("Found three errors."),
"sentinel should not duplicate the human summary line"
);
}
/// #2683 — Verify the model-facing tool catalog only advertises canonical
/// subagent tools and never exposes legacy superseded names.
#[test]
fn model_catalog_only_advertises_canonical_subagent_tools() {
use crate::tools::ToolRegistryBuilder;
let tmp = tempfile::tempdir().expect("tempdir");
let runtime = stub_runtime();
let manager = runtime.manager.clone();
let ctx = crate::tools::spec::ToolContext::new(tmp.path().to_path_buf());
let registry = ToolRegistryBuilder::new()
.with_subagent_tools(manager, runtime)
.build(ctx);
let api_names: Vec<String> = registry
.to_api_tools()
.into_iter()
.map(|t| t.name)
.collect();
// Canonical tools must be model-visible.
for canonical in ["agent_open", "agent_eval", "agent_close", "tool_agent"] {
assert!(
api_names.iter().any(|n| n == canonical),
"{canonical} should be in the model-facing catalog"
);
}
// Legacy/superseded names must NOT appear in the model catalog.
for legacy in [
"agent_spawn",
"agent_result",
"agent_cancel",
"resume_agent",
"agent_list",
"agent_send_input",
"agent_assign",
"agent_wait",
"delegate_to_agent",
] {
assert!(
api_names.iter().all(|n| n != legacy),
"{legacy} should be hidden from the model-facing catalog"
);
}
}
// ── #3018: provider-aware auto routing and model validation ─────────────────
#[tokio::test]
async fn auto_route_on_provider_without_cheap_tier_stays_on_parent_model() {
// AC: Ollama + auto-model must never build a request with a DeepSeek id;
// the routed model equals the session model for any prompt class.
let mut runtime = stub_runtime_for_provider("ollama").with_auto_model(true);
runtime.model = "qwen3:32b".to_string();
for prompt in ["hi", "please refactor the whole auth module for security"] {
let route =
resolve_subagent_assignment_route(&runtime, None, prompt, &SubAgentType::General).await;
assert_eq!(route.model, "qwen3:32b", "prompt {prompt:?}");
assert!(
!route.model.contains("deepseek"),
"no DeepSeek id may be fabricated: {route:?}"
);
}
}
#[test]
fn flash_router_gate_requires_cheap_tier() {
let deepseek = stub_runtime().with_auto_model(true);
assert!(
should_use_subagent_flash_router(&deepseek),
"DeepSeek keeps the network router"
);
let mut moonshot = stub_runtime_for_provider("moonshot").with_auto_model(true);
moonshot.model = "kimi-k2.6".to_string();
assert!(
!should_use_subagent_flash_router(&moonshot),
"providers without a cheap tier skip the network router"
);
}
#[test]
fn role_model_validation_accepts_provider_native_ids() {
// AC: [subagents] worker_model = "kimi-k2.5" on Moonshot must not fail
// with "Expected a DeepSeek model id".
let mut runtime = stub_runtime_for_provider("moonshot");
runtime
.role_models
.insert("worker".to_string(), "kimi-k2.5".to_string());
let model = configured_model_for_role_or_type(&runtime, Some("worker"), &SubAgentType::General)
.expect("provider-native id is accepted");
assert_eq!(model.as_deref(), Some("kimi-k2.5"));
}
#[test]
fn role_model_validation_stays_strict_on_official_deepseek() {
let mut runtime = stub_runtime();
runtime
.role_models
.insert("worker".to_string(), "kimi-k2.5".to_string());
let err = configured_model_for_role_or_type(&runtime, Some("worker"), &SubAgentType::General)
.expect_err("non-DeepSeek id is rejected on the official API");
let msg = err.to_string();
assert!(msg.contains("kimi-k2.5"), "names the bad id: {msg}");
assert!(
msg.contains("deepseek-v4-pro"),
"lists accepted ids from model_completion_names_for_provider: {msg}"
);
}
#[test]
fn normalize_requested_subagent_model_is_provider_aware() {
assert_eq!(
normalize_requested_subagent_model(
"kimi-k2.5",
"model",
crate::config::ApiProvider::Moonshot
)
.expect("Moonshot accepts its own ids"),
"kimi-k2.5"
);
assert_eq!(
normalize_requested_subagent_model(
"qwen3:32b",
"model",
crate::config::ApiProvider::Ollama
)
.expect("Ollama tags pass through"),
"qwen3:32b"
);
assert!(
normalize_requested_subagent_model(
"kimi-k2.5",
"model",
crate::config::ApiProvider::Deepseek
)
.is_err(),
"official DeepSeek API rejects foreign ids"
);
}
// ── #3030: step-counter formatting ──────────────────────────────────────────
#[test]
fn format_step_counter_hides_unbounded_sentinel() {
// DEFAULT_MAX_STEPS is u32::MAX, meaning "unbounded" — rendering the
// sentinel as a denominator produced "step 16/4294967295".
assert_eq!(format_step_counter(16, u32::MAX), "step 16");
}
#[test]
fn format_step_counter_keeps_concrete_budgets() {
assert_eq!(format_step_counter(3, 25), "step 3/25");
assert_eq!(format_step_counter(0, 1), "step 0/1");
}
// ── #3095: interactive fanout launch gate ────────────────────────────────────
#[test]
fn launch_gate_defaults_to_interactive_limit_capped_by_max_agents() {
let tmp = tempdir().expect("tempdir");
let manager = SubAgentManager::new(tmp.path().to_path_buf(), 10);
assert_eq!(
manager.launch_gate.available_permits(),
crate::config::DEFAULT_INTERACTIVE_LAUNCH_LIMIT
);
let small = SubAgentManager::new(tmp.path().to_path_buf(), 2);
assert_eq!(small.launch_gate.available_permits(), 2);
let custom =
SubAgentManager::new(tmp.path().to_path_buf(), 10).with_interactive_launch_limit(0);
assert_eq!(custom.launch_gate.available_permits(), 1, "clamps up to 1");
let oversized =
SubAgentManager::new(tmp.path().to_path_buf(), 3).with_interactive_launch_limit(99);
assert_eq!(
oversized.launch_gate.available_permits(),
3,
"clamps down to max_agents"
);
}
#[tokio::test]
async fn interactive_launch_gate_queues_extra_direct_children() {
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
let tmp = tempdir().expect("tempdir");
let manager = Arc::new(RwLock::new(SubAgentManager::new(
tmp.path().to_path_buf(),
4,
)));
let (client, _calls, _bodies) = delayed_chat_client(Duration::from_millis(150), "done").await;
let (mailbox, mut mailbox_rx) = Mailbox::new(CancellationToken::new());
let mut runtime = stub_runtime();
runtime.client = client;
runtime.manager = Arc::clone(&manager);
runtime.context = ToolContext::new(tmp.path());
runtime.mailbox = Some(mailbox);
let gate = Arc::new(Semaphore::new(1));
let spawn = |agent_id: &str, gate: Option<Arc<Semaphore>>| {
let (input_tx, input_rx) = mpsc::unbounded_channel();
let agent = SubAgent::new(
agent_id.to_string(),
SubAgentType::General,
"Answer".to_string(),
make_assignment(),
"deepseek-v4-flash".to_string(),
None,
Some(vec![]),
input_tx,
tmp.path().to_path_buf(),
"boot_test".to_string(),
);
let task = SubAgentTask {
manager_handle: Arc::clone(&manager),
runtime: runtime.clone(),
agent_id: agent_id.to_string(),
agent_type: SubAgentType::General,
prompt: "Answer".to_string(),
assignment: make_assignment(),
allowed_tools: Some(vec![]),
fork_context: false,
started_at: Instant::now(),
max_steps: 1,
input_rx,
launch_gate: gate,
};
(agent, task)
};
let (agent_a, task_a) = spawn("agent_gate_a", Some(Arc::clone(&gate)));
let (agent_b, task_b) = spawn("agent_gate_b", Some(Arc::clone(&gate)));
{
let mut mgr = manager.write().await;
mgr.agents.insert(agent_a.id.clone(), agent_a);
mgr.agents.insert(agent_b.id.clone(), agent_b);
}
tokio::spawn(run_subagent_task(task_a));
// Give the first task time to take the only permit before the second
// task tries; the second must then queue with a visible reason.
tokio::time::sleep(Duration::from_millis(30)).await;
tokio::spawn(run_subagent_task(task_b));
let mut messages = Vec::new();
let collected = tokio::time::timeout(Duration::from_secs(5), async {
let mut completed = 0;
while completed < 2 {
let Some(envelope) = mailbox_rx.recv().await else {
break;
};
if matches!(envelope.message, MailboxMessage::Completed { .. }) {
completed += 1;
}
messages.push(envelope.message);
}
})
.await;
assert!(collected.is_ok(), "both gated children should complete");
let queued_b = messages.iter().position(|m| {
matches!(
m,
MailboxMessage::Progress { agent_id, status }
if agent_id == "agent_gate_b" && status.contains("queued")
)
});
assert!(
queued_b.is_some(),
"second child must publish a visible queued reason: {messages:?}"
);
let completed_a = messages
.iter()
.position(
|m| matches!(m, MailboxMessage::Completed { agent_id, .. } if agent_id == "agent_gate_a"),
)
.expect("first child completes");
let started_b = messages
.iter()
.position(
|m| matches!(m, MailboxMessage::Started { agent_id, .. } if agent_id == "agent_gate_b"),
)
.expect("second child eventually starts");
assert!(
started_b > completed_a,
"queued child must not start until a permit frees: {messages:?}"
);
}
#[tokio::test]
async fn queued_interactive_child_heartbeat_prevents_stale_cleanup() {
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
let tmp = tempdir().expect("tempdir");
let manager = Arc::new(RwLock::new(
SubAgentManager::new(tmp.path().to_path_buf(), 4)
.with_running_heartbeat_timeout(Duration::from_millis(25)),
));
let (client, _calls, _bodies) = delayed_chat_client(Duration::from_millis(10), "done").await;
let (mailbox, mut mailbox_rx) = Mailbox::new(CancellationToken::new());
let mut runtime = stub_runtime();
runtime.client = client;
runtime.manager = Arc::clone(&manager);
runtime.context = ToolContext::new(tmp.path());
runtime.mailbox = Some(mailbox);
let gate = Arc::new(Semaphore::new(0));
let (input_tx, input_rx) = mpsc::unbounded_channel();
let agent = SubAgent::new(
"agent_gate_heartbeat".to_string(),
SubAgentType::General,
"Answer".to_string(),
make_assignment(),
"deepseek-v4-flash".to_string(),
None,
Some(vec![]),
input_tx,
tmp.path().to_path_buf(),
"boot_test".to_string(),
);
let task = SubAgentTask {
manager_handle: Arc::clone(&manager),
runtime: runtime.clone(),
agent_id: agent.id.clone(),
agent_type: SubAgentType::General,
prompt: "Answer".to_string(),
assignment: make_assignment(),
allowed_tools: Some(vec![]),
fork_context: false,
started_at: Instant::now(),
max_steps: 1,
input_rx,
launch_gate: Some(Arc::clone(&gate)),
};
let agent_id = agent.id.clone();
{
let mut mgr = manager.write().await;
mgr.agents.insert(agent.id.clone(), agent);
}
let handle = tokio::spawn(run_subagent_task(task));
let mut queued_progress = 0;
let collected = tokio::time::timeout(Duration::from_secs(2), async {
while queued_progress < 4 {
let Some(envelope) = mailbox_rx.recv().await else {
break;
};
if matches!(
envelope.message,
MailboxMessage::Progress { ref agent_id, ref status }
if agent_id == "agent_gate_heartbeat"
&& status == SUBAGENT_QUEUED_LAUNCH_REASON
) {
queued_progress += 1;
}
}
})
.await;
assert!(
collected.is_ok(),
"queued child should keep publishing queued heartbeats"
);
assert_eq!(queued_progress, 4);
{
let mut mgr = manager.write().await;
assert_eq!(
mgr.cleanup(Duration::from_secs(60 * 60)),
0,
"queued child with fresh heartbeat must not be auto-cancelled"
);
assert_eq!(
mgr.get_result(&agent_id).expect("agent").status,
SubAgentStatus::Running
);
}
gate.add_permits(1);
let completed = tokio::time::timeout(Duration::from_secs(2), async {
loop {
let Some(envelope) = mailbox_rx.recv().await else {
return false;
};
if matches!(
envelope.message,
MailboxMessage::Completed { ref agent_id, .. }
if agent_id == "agent_gate_heartbeat"
) {
return true;
}
}
})
.await;
assert_eq!(
completed,
Ok(true),
"queued child should complete after release"
);
handle.await.expect("subagent task should join");
}