Files
codewhale/crates/tui/src/fleet/worker_runtime.rs
T
Hunter B e8b52ac57a feat(fleet): security/trust + headless-worker foundation; unify recursion depth
Lands the Agent Fleet security/trust boundary and the headless-worker bridge on
the v0.8.60 line, and collapses the sub-agent and fleet recursion model into a
single shared axis (Hunter steer: "not two moving targets").

Security & trust (#3165):
- FleetTrustLevel, FleetSecurityPolicy, FleetSecretRef (redacted), FleetWorkerAuth,
  FleetCapabilityGrant, FleetAlertEndpoint (redacted) in protocol.
- secrets: resolve_direct(key, source_hint) — fleet secret resolution, never logged.
- Host adapters refuse secret-bearing env keys; SSH uses SendEnv (no argv secrets).

Roles & delegation (#3167):
- fleet role -> SubAgentType mapping; reviewer/verifier default read-only.

Headless worker bridge (#3096/#3154, partial — still simulation, real spawn next):
- worker_runtime: FleetTaskSpec -> AgentWorkerSpec, status -> ledger events,
  exec hardening (mirrors #3027), parallel-safe read-only tool set (#2983).
- FleetManager carries an optional SharedSubAgentManager + exec config.

Recursion depth — ONE axis:
- codewhale_config now owns DEFAULT_SPAWN_DEPTH (3) + MAX_SPAWN_DEPTH_CEILING (3).
- sub-agent DEFAULT_MAX_SPAWN_DEPTH and the fleet clamp both source these consts.
- fleet default raised 1 -> 3 to match standalone sub-agents; root runs at depth 0,
  budget gates child delegation. End-to-end test proves a depth-0 fleet worker
  reaches 3 nested levels (afford >= 3).

Dogfood scaffolding (#3166, partial): docs/examples/fleet-dogfood.toml.

Tests green: codewhale-config fleet, codewhale-tui fleet (58), subagent max_depth;
cargo fmt + git diff --check clean; cargo check --workspace ok.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-13 01:10:30 -07:00

627 lines
22 KiB
Rust

//! Fleet worker runtime — bridges fleet task specs to headless sub-agent execution.
//!
//! This module makes fleet workers real: instead of simulating task completion,
//! each fleet worker spawns a headless sub-agent that runs the task instructions
//! and streams progress back into the fleet ledger.
//!
//! Architecture:
//! - `FleetTaskSpec` + `FleetWorkerSpec` → `AgentWorkerSpec`
//! - `SubAgentManager::register_worker()` tracks the worker
//! - Sub-agent spawn happens through the existing `agent_open` machinery
//! - Mailbox events stream into fleet ledger as `FleetWorkerEventPayload`
//! - `FleetWorkerInspection` reads both ledger state and sub-agent worker records
#![allow(dead_code)]
use codewhale_protocol::fleet::{
FleetHostSpec, FleetTaskSpec, FleetTaskWorkerProfile, FleetWorkerEventPayload, FleetWorkerSpec,
};
use super::host::FleetHostKind;
use crate::tools::subagent::{
AgentWorkerSpec, AgentWorkerStatus, AgentWorkerToolProfile, SubAgentType,
};
/// Map a fleet worker spec's host kind to a display string.
pub fn fleet_host_kind_for_spec(spec: &FleetWorkerSpec) -> FleetHostKind {
match &spec.host {
FleetHostSpec::Local => FleetHostKind::LocalProcess,
FleetHostSpec::Ssh { .. } => FleetHostKind::Ssh,
FleetHostSpec::Docker { .. } => FleetHostKind::LocalProcess, // Docker runs local-ish
}
}
/// Map a fleet host kind to a compact display label.
pub fn fleet_host_kind_label(kind: FleetHostKind) -> &'static str {
match kind {
FleetHostKind::LocalProcess => "local",
FleetHostKind::Ssh => "ssh",
}
}
/// Build a sub-agent `AgentWorkerSpec` from a fleet task spec and worker spec.
///
/// The fleet task's `instructions` become the sub-agent's `objective`, the
/// `worker.role` maps to a `SubAgentType`, and tool/capability restrictions
/// become an `AgentWorkerToolProfile`.
pub fn fleet_task_to_worker_spec(
worker_id: &str,
run_id: &str,
task_spec: &FleetTaskSpec,
_worker_spec: &FleetWorkerSpec,
model: &str,
workspace: &std::path::Path,
) -> AgentWorkerSpec {
let agent_type =
fleet_role_to_agent_type(task_spec.worker.as_ref().and_then(|w| w.role.as_deref()));
let tool_profile = fleet_tool_profile(task_spec.worker.as_ref());
let objective = fleet_task_prompt(task_spec);
AgentWorkerSpec {
worker_id: worker_id.to_string(),
run_id: run_id.to_string(),
parent_run_id: None,
session_name: Some(format!("fleet-{}-{}", worker_id, task_spec.id)),
objective,
role: task_spec.worker.as_ref().and_then(|w| w.role.clone()),
agent_type,
model: model.to_string(),
workspace: workspace.to_path_buf(),
git_branch: None,
context_mode: "fresh".to_string(),
fork_context: false,
tool_profile,
max_steps: task_spec
.budget
.as_ref()
.and_then(|b| b.max_tool_calls)
.unwrap_or(u32::MAX),
spawn_depth: 0,
max_spawn_depth: codewhale_config::FleetExecConfig::default().max_spawn_depth,
}
}
fn fleet_task_prompt(task_spec: &FleetTaskSpec) -> String {
let mut prompt = String::new();
prompt.push_str("Fleet task: ");
prompt.push_str(&task_spec.name);
if let Some(objective) = task_spec.objective.as_deref() {
prompt.push_str("\n\nObjective:\n");
prompt.push_str(objective);
} else if let Some(description) = task_spec.description.as_deref() {
prompt.push_str("\n\nObjective:\n");
prompt.push_str(description);
}
prompt.push_str("\n\nInstructions:\n");
prompt.push_str(&task_spec.instructions);
if !task_spec.context.is_empty() {
prompt.push_str("\n\nContext:\n");
for item in &task_spec.context {
prompt.push_str("- ");
prompt.push_str(item);
prompt.push('\n');
}
}
if !task_spec.input_files.is_empty() {
prompt.push_str("\nInput files:\n");
for path in &task_spec.input_files {
prompt.push_str("- ");
prompt.push_str(&path.display().to_string());
prompt.push('\n');
}
}
prompt
}
/// Map a fleet role name to a `SubAgentType`. Unknown roles default to `General`.
fn fleet_role_to_agent_type(role: Option<&str>) -> SubAgentType {
match role {
Some("smoke-runner") | Some("read-only") => SubAgentType::ToolAgent,
Some("reviewer") => SubAgentType::Review,
Some("builder") => SubAgentType::Implementer,
Some("verifier") | Some("tester") => SubAgentType::Verifier,
Some("planner") => SubAgentType::Plan,
Some("explorer") => SubAgentType::Explore,
Some("general") | None => SubAgentType::General,
Some(other) => {
// Try parsing as a SubAgentType directly
SubAgentType::from_str(other).unwrap_or(SubAgentType::General)
}
}
}
/// Convert a fleet worker profile's tool list into an `AgentWorkerToolProfile`.
fn fleet_tool_profile(profile: Option<&FleetTaskWorkerProfile>) -> AgentWorkerToolProfile {
match profile {
Some(p) if !p.tools.is_empty() => AgentWorkerToolProfile::Explicit(p.tools.clone()),
_ => AgentWorkerToolProfile::Inherited,
}
}
/// Create a fleet artifact ref from a worker output.
///
/// Uses the fleet artifact conventions: logs go under `.codewhale/fleet/`,
/// reports under `.codewhale/fleet/reports/`.
pub fn fleet_artifact_ref(
_run_id: &str,
_worker_id: &str,
kind: codewhale_protocol::fleet::FleetArtifactKind,
path: std::path::PathBuf,
) -> codewhale_protocol::fleet::FleetArtifactRef {
codewhale_protocol::fleet::FleetArtifactRef {
kind,
path,
checksum: None,
mime_type: None,
size_bytes: None,
}
}
/// Map a sub-agent `AgentWorkerStatus` to a fleet `FleetWorkerEventPayload`.
///
/// This is the streaming bridge: as the sub-agent runs, each status transition
/// produces a corresponding fleet ledger event so the TUI surfaces stay in sync.
pub fn agent_status_to_fleet_event(
status: AgentWorkerStatus,
message: Option<&str>,
tool_name: Option<&str>,
) -> FleetWorkerEventPayload {
match status {
AgentWorkerStatus::Queued => FleetWorkerEventPayload::Queued,
AgentWorkerStatus::Starting => FleetWorkerEventPayload::Starting,
AgentWorkerStatus::Running => FleetWorkerEventPayload::Running,
AgentWorkerStatus::WaitingForUser => FleetWorkerEventPayload::ModelWait { model: None },
AgentWorkerStatus::ModelWait => FleetWorkerEventPayload::ModelWait { model: None },
AgentWorkerStatus::RunningTool => FleetWorkerEventPayload::RunningTool {
tool: tool_name.unwrap_or("unknown").to_string(),
call_id: None,
},
AgentWorkerStatus::Completed => FleetWorkerEventPayload::Completed {
exit_code: Some(0),
summary: message.map(|s| s.to_string()),
},
AgentWorkerStatus::Failed => FleetWorkerEventPayload::Failed {
reason: message.unwrap_or("unknown error").to_string(),
recoverable: false,
},
AgentWorkerStatus::Cancelled => FleetWorkerEventPayload::Cancelled { cancelled_by: None },
AgentWorkerStatus::Interrupted => FleetWorkerEventPayload::Interrupted {
signal: message.map(|s| s.to_string()),
},
}
}
/// Apply exec hardening to a worker spec from fleet config (#3027).
///
/// Filters tools against allowed/disallowed lists, caps max_steps to
/// config's max_turns, and returns the objective with system prompt
/// appended when configured.
pub fn apply_exec_hardening(
mut spec: AgentWorkerSpec,
exec: &codewhale_config::FleetExecConfig,
) -> AgentWorkerSpec {
// Cap max_steps to config max_turns
if exec.max_turns > 0 && exec.max_turns != u32::MAX {
spec.max_steps = spec.max_steps.min(exec.max_turns);
}
spec.max_spawn_depth = exec
.max_spawn_depth
.min(codewhale_config::MAX_SPAWN_DEPTH_CEILING);
// Apply tool filtering
if !exec.allowed_tools.is_empty() || !exec.disallowed_tools.is_empty() {
spec.tool_profile = filter_tool_profile(&spec.tool_profile, exec);
}
// Append system prompt
if !exec.append_system_prompt.is_empty() {
spec.objective = format!(
"{}\n\n[Policy]\n{}",
spec.objective, exec.append_system_prompt
);
}
spec
}
/// Filter a tool profile against allowed/disallowed lists.
fn filter_tool_profile(
profile: &AgentWorkerToolProfile,
exec: &codewhale_config::FleetExecConfig,
) -> AgentWorkerToolProfile {
match profile {
AgentWorkerToolProfile::Explicit(tools) => {
let filtered: Vec<String> = tools
.iter()
.filter(|t| {
// If allowed_tools is non-empty, only keep tools in the list
if !exec.allowed_tools.is_empty() && !exec.allowed_tools.contains(t) {
return false;
}
// Disallowed tools always win
!exec.disallowed_tools.contains(t)
})
.cloned()
.collect();
AgentWorkerToolProfile::Explicit(filtered)
}
AgentWorkerToolProfile::Inherited => {
// Inherited profiles can't be filtered at spec time;
// the sub-agent spawn path applies tool filtering.
AgentWorkerToolProfile::Inherited
}
}
}
/// Determine whether a tool is safe for parallel execution (#2983).
///
/// Read-only tools that don't mutate state and have no side effects
/// are candidates for conservative parallel batching.
pub fn is_parallel_safe_read_only_tool(tool_name: &str) -> bool {
matches!(
tool_name,
"read_file"
| "grep_files"
| "file_search"
| "list_dir"
| "git_status"
| "git_diff"
| "git_log"
| "git_show"
| "git_blame"
| "fetch_url"
| "web_search"
| "tool_search_tool_regex"
| "tool_search_tool_bm25"
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn fleet_role_smoke_runner_maps_to_tool_agent() {
assert_eq!(
fleet_role_to_agent_type(Some("smoke-runner")),
SubAgentType::ToolAgent
);
}
#[test]
fn fleet_role_reviewer_maps_to_review() {
assert_eq!(
fleet_role_to_agent_type(Some("reviewer")),
SubAgentType::Review
);
}
#[test]
fn fleet_role_builder_maps_to_implementer() {
assert_eq!(
fleet_role_to_agent_type(Some("builder")),
SubAgentType::Implementer
);
}
#[test]
fn fleet_role_none_maps_to_general() {
assert_eq!(fleet_role_to_agent_type(None), SubAgentType::General);
}
#[test]
fn unknown_role_maps_to_general() {
assert_eq!(
fleet_role_to_agent_type(Some("nonexistent-role")),
SubAgentType::General
);
}
#[test]
fn fleet_tool_profile_empty_uses_inherited() {
let profile = FleetTaskWorkerProfile {
role: None,
tool_profile: None,
tools: vec![],
capabilities: vec![],
};
assert_eq!(
fleet_tool_profile(Some(&profile)),
AgentWorkerToolProfile::Inherited
);
}
#[test]
fn fleet_tool_profile_explicit_passes_tools() {
let profile = FleetTaskWorkerProfile {
role: None,
tool_profile: None,
tools: vec!["cargo".to_string(), "git".to_string()],
capabilities: vec![],
};
assert_eq!(
fleet_tool_profile(Some(&profile)),
AgentWorkerToolProfile::Explicit(vec!["cargo".to_string(), "git".to_string()])
);
}
#[test]
fn fleet_task_prompt_includes_instructions_context_and_input_files() {
let task = FleetTaskSpec {
id: "review".to_string(),
name: "Review protocol".to_string(),
description: None,
objective: Some("Find protocol regressions".to_string()),
instructions: "Read the fleet protocol and report issues.".to_string(),
worker: None,
workspace: None,
input_files: vec![std::path::PathBuf::from("crates/protocol/src/fleet.rs")],
context: vec!["Keep the report concise.".to_string()],
budget: None,
tags: vec![],
expected_artifacts: vec![],
scorer: None,
retry_policy: None,
alert_policy: None,
timeout_seconds: None,
metadata: Default::default(),
};
let prompt = fleet_task_prompt(&task);
assert!(prompt.contains("Review protocol"));
assert!(prompt.contains("Find protocol regressions"));
assert!(prompt.contains("Read the fleet protocol and report issues."));
assert!(prompt.contains("Keep the report concise."));
assert!(prompt.contains("crates/protocol/src/fleet.rs"));
}
#[test]
fn fleet_worker_spec_defaults_to_shared_subagent_spawn_depth() {
let task = FleetTaskSpec {
id: "task-1".to_string(),
name: "Task".to_string(),
description: None,
objective: None,
instructions: "Do the task.".to_string(),
worker: None,
workspace: None,
input_files: vec![],
context: vec![],
budget: None,
tags: vec![],
expected_artifacts: vec![],
scorer: None,
retry_policy: None,
alert_policy: None,
timeout_seconds: None,
metadata: Default::default(),
};
let worker = FleetWorkerSpec {
id: "worker-1".to_string(),
name: "Worker".to_string(),
host: FleetHostSpec::Local,
trust_level: None,
labels: Default::default(),
capabilities: vec![],
max_concurrent_tasks: None,
};
let spec = fleet_task_to_worker_spec(
"worker-1",
"run-1",
&task,
&worker,
"auto",
std::path::Path::new("/tmp"),
);
// Root fleet worker runs at depth 0; its budget equals the shared
// sub-agent default (3) so fleet and sub-agents are one substrate and
// at least 3 nested delegation levels are afforded.
assert_eq!(spec.spawn_depth, 0);
assert_eq!(spec.max_spawn_depth, codewhale_config::DEFAULT_SPAWN_DEPTH);
assert_eq!(spec.max_spawn_depth, 3);
// End-to-end reachability: walk the SAME gate the SubAgentRuntime
// enforces (`would_exceed_depth` = `spawn_depth + 1 > max_spawn_depth`).
// A depth-0 root must reach 3 nested levels, then stop. This fails if
// anyone lowers the shared default below 3 (Hunter: afford >= 3).
let hardened = apply_exec_hardening(spec, &codewhale_config::FleetExecConfig::default());
let would_exceed = |spawn_depth: u32| spawn_depth + 1 > hardened.max_spawn_depth;
assert!(
!would_exceed(0),
"root (depth 0) must spawn a child at depth 1"
);
assert!(!would_exceed(1), "depth-1 child must spawn to depth 2");
assert!(!would_exceed(2), "depth-2 child must spawn to depth 3");
assert!(
would_exceed(3),
"depth 3 is the afforded ceiling; depth 4 is blocked"
);
}
#[test]
fn exec_hardening_caps_max_steps_to_max_turns() {
let spec = AgentWorkerSpec {
worker_id: "w1".to_string(),
run_id: "r1".to_string(),
parent_run_id: None,
session_name: None,
objective: "test".to_string(),
role: None,
agent_type: SubAgentType::General,
model: "auto".to_string(),
workspace: std::path::PathBuf::from("/tmp"),
git_branch: None,
context_mode: "fresh".to_string(),
fork_context: false,
tool_profile: AgentWorkerToolProfile::Inherited,
max_steps: 1000,
spawn_depth: 0,
max_spawn_depth: 0,
};
let exec = codewhale_config::FleetExecConfig {
max_turns: 50,
..Default::default()
};
let hardened = apply_exec_hardening(spec, &exec);
assert_eq!(hardened.max_steps, 50);
}
#[test]
fn exec_hardening_applies_and_clamps_spawn_depth() {
let spec = AgentWorkerSpec {
worker_id: "w1".to_string(),
run_id: "r1".to_string(),
parent_run_id: None,
session_name: None,
objective: "test".to_string(),
role: None,
agent_type: SubAgentType::General,
model: "auto".to_string(),
workspace: std::path::PathBuf::from("/tmp"),
git_branch: None,
context_mode: "fresh".to_string(),
fork_context: false,
tool_profile: AgentWorkerToolProfile::Inherited,
max_steps: 1000,
spawn_depth: 0,
max_spawn_depth: 0,
};
let exec = codewhale_config::FleetExecConfig {
max_spawn_depth: 2,
..Default::default()
};
let hardened = apply_exec_hardening(spec.clone(), &exec);
assert_eq!(hardened.max_spawn_depth, 2);
let exec = codewhale_config::FleetExecConfig {
max_spawn_depth: 99,
..Default::default()
};
let hardened = apply_exec_hardening(spec.clone(), &exec);
assert_eq!(hardened.max_spawn_depth, 3);
let exec = codewhale_config::FleetExecConfig {
max_spawn_depth: 0,
..Default::default()
};
let hardened = apply_exec_hardening(spec, &exec);
assert_eq!(hardened.max_spawn_depth, 0);
}
#[test]
fn exec_hardening_filters_disallowed_tools() {
let profile = AgentWorkerToolProfile::Explicit(vec![
"read_file".to_string(),
"exec_shell".to_string(),
"git_diff".to_string(),
]);
let exec = codewhale_config::FleetExecConfig {
disallowed_tools: vec!["exec_shell".to_string()],
..Default::default()
};
let filtered = filter_tool_profile(&profile, &exec);
assert_eq!(
filtered,
AgentWorkerToolProfile::Explicit(
vec!["read_file".to_string(), "git_diff".to_string(),]
)
);
}
#[test]
fn exec_hardening_allowed_tools_acts_as_allowlist() {
let profile = AgentWorkerToolProfile::Explicit(vec![
"read_file".to_string(),
"exec_shell".to_string(),
"git_diff".to_string(),
]);
let exec = codewhale_config::FleetExecConfig {
allowed_tools: vec!["read_file".to_string(), "git_diff".to_string()],
..Default::default()
};
let filtered = filter_tool_profile(&profile, &exec);
assert_eq!(
filtered,
AgentWorkerToolProfile::Explicit(
vec!["read_file".to_string(), "git_diff".to_string(),]
)
);
}
#[test]
fn exec_hardening_allowed_plus_disallowed_disallowed_wins() {
let profile = AgentWorkerToolProfile::Explicit(vec![
"read_file".to_string(),
"exec_shell".to_string(),
]);
let exec = codewhale_config::FleetExecConfig {
allowed_tools: vec!["read_file".to_string(), "exec_shell".to_string()],
disallowed_tools: vec!["exec_shell".to_string()],
..Default::default()
};
let filtered = filter_tool_profile(&profile, &exec);
assert_eq!(
filtered,
AgentWorkerToolProfile::Explicit(vec!["read_file".to_string(),])
);
}
#[test]
fn parallel_safe_read_only_tools_includes_grep_and_read() {
assert!(is_parallel_safe_read_only_tool("read_file"));
assert!(is_parallel_safe_read_only_tool("grep_files"));
assert!(is_parallel_safe_read_only_tool("git_status"));
assert!(is_parallel_safe_read_only_tool("web_search"));
}
#[test]
fn destructive_tools_not_parallel_safe() {
assert!(!is_parallel_safe_read_only_tool("exec_shell"));
assert!(!is_parallel_safe_read_only_tool("write_file"));
assert!(!is_parallel_safe_read_only_tool("edit_file"));
assert!(!is_parallel_safe_read_only_tool("apply_patch"));
assert!(!is_parallel_safe_read_only_tool("agent_open"));
}
#[test]
fn exec_hardening_appends_system_prompt() {
let spec = AgentWorkerSpec {
worker_id: "w1".to_string(),
run_id: "r1".to_string(),
parent_run_id: None,
session_name: None,
objective: "do the thing".to_string(),
role: None,
agent_type: SubAgentType::General,
model: "auto".to_string(),
workspace: std::path::PathBuf::from("/tmp"),
git_branch: None,
context_mode: "fresh".to_string(),
fork_context: false,
tool_profile: AgentWorkerToolProfile::Inherited,
max_steps: 100,
spawn_depth: 0,
max_spawn_depth: 0,
};
let exec = codewhale_config::FleetExecConfig {
append_system_prompt: "never push to main".to_string(),
..Default::default()
};
let hardened = apply_exec_hardening(spec, &exec);
assert!(hardened.objective.contains("do the thing"));
assert!(hardened.objective.contains("[Policy]"));
assert!(hardened.objective.contains("never push to main"));
}
}