diff --git a/crates/protocol/src/fleet.rs b/crates/protocol/src/fleet.rs index 728fbf40..e268ad65 100644 --- a/crates/protocol/src/fleet.rs +++ b/crates/protocol/src/fleet.rs @@ -72,7 +72,24 @@ pub struct FleetTaskSpec { pub name: String, #[serde(skip_serializing_if = "Option::is_none")] pub description: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub objective: Option, pub instructions: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub worker: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub workspace: Option, + #[serde(default)] + #[serde(skip_serializing_if = "Vec::is_empty")] + pub input_files: Vec, + #[serde(default)] + #[serde(skip_serializing_if = "Vec::is_empty")] + pub context: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub budget: Option, + #[serde(default)] + #[serde(skip_serializing_if = "Vec::is_empty")] + pub tags: Vec, #[serde(default)] pub expected_artifacts: Vec, #[serde(skip_serializing_if = "Option::is_none")] @@ -87,6 +104,58 @@ pub struct FleetTaskSpec { pub metadata: BTreeMap, } +/// Worker role and tool expectations for a task. +#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] +pub struct FleetTaskWorkerProfile { + #[serde(skip_serializing_if = "Option::is_none")] + pub role: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub tool_profile: Option, + #[serde(default)] + #[serde(skip_serializing_if = "Vec::is_empty")] + pub tools: Vec, + #[serde(default)] + #[serde(skip_serializing_if = "Vec::is_empty")] + pub capabilities: Vec, +} + +/// Workspace and environment constraints needed before a task starts. +#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] +pub struct FleetWorkspaceRequirements { + #[serde(skip_serializing_if = "Option::is_none")] + pub root: Option, + #[serde(default)] + #[serde(skip_serializing_if = "Vec::is_empty")] + pub required_files: Vec, + #[serde(default)] + #[serde(skip_serializing_if = "Vec::is_empty")] + pub writable_paths: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub environment: Option, +} + +/// Environment variables a task requires or may pass through to workers. +#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] +pub struct FleetEnvironmentRequirements { + #[serde(default)] + #[serde(skip_serializing_if = "Vec::is_empty")] + pub required: Vec, + #[serde(default)] + #[serde(skip_serializing_if = "Vec::is_empty")] + pub allowlist: Vec, +} + +/// Budget limits for a task. +#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] +pub struct FleetTaskBudget { + #[serde(skip_serializing_if = "Option::is_none")] + pub max_tokens: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub max_tool_calls: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub max_seconds: Option, +} + /// Reference to an artifact produced or consumed by a task. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct FleetArtifactRef { @@ -162,9 +231,25 @@ impl<'de> Deserialize<'de> for FleetArtifactKind { #[serde(tag = "kind", rename_all = "snake_case")] pub enum FleetScorerSpec { ExitCode, - FileExists { path: PathBuf }, - RegexMatch { path: PathBuf, pattern: String }, - JsonPath { path: PathBuf, expression: String }, + FileExists { + path: PathBuf, + }, + RegexMatch { + path: PathBuf, + pattern: String, + }, + JsonPath { + path: PathBuf, + expression: String, + }, + Command { + command: String, + #[serde(default)] + args: Vec, + }, + CodeWhaleVerifierPrompt { + prompt: String, + }, Manual, } @@ -390,6 +475,8 @@ pub struct FleetReceipt { pub worker_id: String, pub completed_at: String, pub result: FleetTaskResult, + #[serde(skip_serializing_if = "Option::is_none")] + pub failure_kind: Option, #[serde(default)] pub artifacts: Vec, #[serde(default)] @@ -400,11 +487,21 @@ pub struct FleetReceipt { #[serde(rename_all = "snake_case")] pub enum FleetTaskResult { Pass, + Partial, Fail, Skip, Timeout, } +/// Source category for a failed task receipt. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum FleetTaskFailureKind { + Transport, + Task, + Verifier, +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct FleetScore { pub value: f64, @@ -428,7 +525,31 @@ mod tests { id: "task-1".to_string(), name: "lint".to_string(), description: None, + objective: Some("Keep the workspace lint-clean".to_string()), instructions: "run cargo clippy".to_string(), + worker: Some(FleetTaskWorkerProfile { + role: Some("release-checker".to_string()), + tool_profile: Some("read-only".to_string()), + tools: vec!["cargo".to_string()], + capabilities: vec!["rust".to_string()], + }), + workspace: Some(FleetWorkspaceRequirements { + root: Some(PathBuf::from(".")), + required_files: vec![PathBuf::from("Cargo.toml")], + writable_paths: vec![], + environment: Some(FleetEnvironmentRequirements { + required: vec!["PATH".to_string()], + allowlist: vec!["RUST_LOG".to_string()], + }), + }), + input_files: vec![PathBuf::from("crates/tui/src/main.rs")], + context: vec!["release gate".to_string()], + budget: Some(FleetTaskBudget { + max_tokens: Some(8000), + max_tool_calls: Some(20), + max_seconds: Some(300), + }), + tags: vec!["release".to_string()], expected_artifacts: vec![FleetArtifactKind::Log], scorer: Some(FleetScorerSpec::ExitCode), retry_policy: Some(FleetRetryPolicy::default()), @@ -447,6 +568,18 @@ mod tests { assert_eq!(back.id, run.id); assert_eq!(back.status, FleetRunStatus::Running); assert_eq!(back.task_specs.len(), 1); + assert_eq!( + back.task_specs[0].worker.as_ref().unwrap().role.as_deref(), + Some("release-checker") + ); + assert_eq!( + back.task_specs[0] + .workspace + .as_ref() + .unwrap() + .required_files, + vec![PathBuf::from("Cargo.toml")] + ); } #[test] @@ -613,6 +746,7 @@ mod tests { worker_id: "worker-b".to_string(), completed_at: "2026-06-12T17:03:00Z".to_string(), result: FleetTaskResult::Pass, + failure_kind: None, artifacts: vec![], score: Some(FleetScore { value: 0.95, @@ -625,4 +759,29 @@ mod tests { assert_eq!(back.result, FleetTaskResult::Pass); assert_eq!(back.score.as_ref().unwrap().value, 0.95); } + + #[test] + fn partial_receipt_records_failure_source_when_needed() { + let receipt = FleetReceipt { + run_id: FleetRunId::from("run-004"), + task_id: "task-2".to_string(), + worker_id: "worker-c".to_string(), + completed_at: "2026-06-12T17:04:00Z".to_string(), + result: FleetTaskResult::Partial, + failure_kind: Some(FleetTaskFailureKind::Verifier), + artifacts: vec![], + score: Some(FleetScore { + value: 0.5, + max: Some(1.0), + notes: Some("manual verification required".to_string()), + }), + }; + + let json = serde_json::to_string(&receipt).unwrap(); + assert!(json.contains("\"result\":\"partial\"")); + assert!(json.contains("\"failure_kind\":\"verifier\"")); + let back: FleetReceipt = serde_json::from_str(&json).unwrap(); + assert_eq!(back.result, FleetTaskResult::Partial); + assert_eq!(back.failure_kind, Some(FleetTaskFailureKind::Verifier)); + } } diff --git a/crates/tui/src/fleet/ledger.rs b/crates/tui/src/fleet/ledger.rs index 408b64ba..051d1e32 100644 --- a/crates/tui/src/fleet/ledger.rs +++ b/crates/tui/src/fleet/ledger.rs @@ -892,6 +892,7 @@ mod tests { worker_id: "worker-1".to_string(), completed_at: "2026-06-12T17:03:00Z".to_string(), result: FleetTaskResult::Pass, + failure_kind: None, artifacts: vec![], score: None, }) @@ -923,6 +924,7 @@ mod tests { worker_id: "worker-1".to_string(), completed_at: "2026-06-12T17:03:00Z".to_string(), result: FleetTaskResult::Pass, + failure_kind: None, artifacts: vec![], score: None, }; diff --git a/crates/tui/src/fleet/manager.rs b/crates/tui/src/fleet/manager.rs index f1d61bc7..31cab863 100644 --- a/crates/tui/src/fleet/manager.rs +++ b/crates/tui/src/fleet/manager.rs @@ -13,11 +13,14 @@ use std::time::Duration; use anyhow::{Context, Result, anyhow, bail}; use chrono::{DateTime, SecondsFormat, Utc}; use codewhale_protocol::fleet::*; -use serde::{Deserialize, Serialize}; use serde_json::Value; use uuid::Uuid; use super::ledger::{FleetLedger, FleetLedgerState, FleetTaskLedgerStatus, FleetTaskState}; +use super::task_spec::{ + FleetTaskSpecDocument, FleetTaskVerificationInput, load_task_spec_document, + record_verification_receipt, validate_task_spec_document, verify_task_result, +}; const DEFAULT_STALE_AFTER_SECONDS: u64 = 300; @@ -49,7 +52,11 @@ pub struct FleetStatusSnapshot { pub queued: usize, pub running: usize, pub completed: usize, + pub partial: usize, pub failed: usize, + pub transport_failed: usize, + pub task_failed: usize, + pub verifier_failed: usize, pub cancelled: usize, pub stale: usize, pub workers: BTreeMap, @@ -67,51 +74,6 @@ pub struct FleetWorkerInspection { pub last_error: Option, } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct FleetTaskSpecDocument { - #[serde(default)] - pub name: Option, - #[serde(default)] - pub labels: BTreeMap, - #[serde(default, alias = "worker_specs")] - pub workers: Vec, - #[serde(default)] - pub tasks: Vec, -} - -#[derive(Debug, Clone, Deserialize)] -#[serde(untagged)] -enum FleetTaskSpecFile { - Document(FleetTaskSpecDocument), - Tasks(Vec), - Single(FleetTaskSpec), -} - -impl FleetTaskSpecFile { - fn into_document(self, fallback_name: String) -> FleetTaskSpecDocument { - match self { - Self::Document(mut doc) => { - if doc.name.as_deref().is_none_or(str::is_empty) { - doc.name = Some(fallback_name); - } - doc - } - Self::Tasks(tasks) => FleetTaskSpecDocument { - name: Some(fallback_name), - labels: BTreeMap::new(), - workers: Vec::new(), - tasks, - }, - Self::Single(task) => FleetTaskSpecDocument { - name: Some(fallback_name), - labels: BTreeMap::new(), - workers: Vec::new(), - tasks: vec![task], - }, - } - } -} - impl FleetManager { pub fn open(workspace: impl AsRef) -> Result { let workspace = workspace.as_ref().to_path_buf(); @@ -133,23 +95,7 @@ impl FleetManager { } pub fn load_task_spec(path: &Path) -> Result { - let raw = std::fs::read_to_string(path) - .with_context(|| format!("reading fleet task spec {}", path.display()))?; - let fallback_name = path - .file_stem() - .and_then(|s| s.to_str()) - .filter(|s| !s.is_empty()) - .unwrap_or("fleet-run") - .to_string(); - let parsed = match path.extension().and_then(|s| s.to_str()) { - Some("toml") => toml::from_str::(&raw) - .with_context(|| format!("parsing TOML fleet task spec {}", path.display()))?, - _ => serde_json::from_str::(&raw) - .with_context(|| format!("parsing JSON fleet task spec {}", path.display()))?, - }; - let doc = parsed.into_document(fallback_name); - validate_task_spec_document(&doc)?; - Ok(doc) + load_task_spec_document(path) } pub fn create_run_from_task_spec_path( @@ -471,13 +417,15 @@ impl FleetManager { return Ok(()); }; let now = timestamp(); - let (payload, receipt_result) = match result { + let (payload, receipt_result, failure_kind, exit_code) = match result { FleetLocalSimulationResult::Pass => ( FleetWorkerEventPayload::Completed { exit_code: Some(0), summary: Some("local fleet smoke task completed".to_string()), }, FleetTaskResult::Pass, + None, + Some(0), ), FleetLocalSimulationResult::Fail => ( FleetWorkerEventPayload::Failed { @@ -485,6 +433,8 @@ impl FleetManager { recoverable: false, }, FleetTaskResult::Fail, + Some(FleetTaskFailureKind::Task), + Some(1), ), FleetLocalSimulationResult::Skip => ( FleetWorkerEventPayload::Completed { @@ -492,6 +442,8 @@ impl FleetManager { summary: Some("local fleet smoke task skipped".to_string()), }, FleetTaskResult::Skip, + None, + Some(0), ), FleetLocalSimulationResult::Timeout => ( FleetWorkerEventPayload::Failed { @@ -499,15 +451,47 @@ impl FleetManager { recoverable: true, }, FleetTaskResult::Timeout, + Some(FleetTaskFailureKind::Transport), + None, ), }; self.append_worker_event(&entry.run_id, worker_id, &entry.task_id, payload)?; + let verification_input = FleetTaskVerificationInput { + run_id: entry.run_id.clone(), + task_id: entry.task_id.clone(), + worker_id: worker_id.to_string(), + exit_code, + artifacts: vec![log_artifact.clone()], + }; + if task_spec.scorer.is_some() { + let verification = verify_task_result(&self.workspace, task_spec, &verification_input); + let receipt = record_verification_receipt( + &self.ledger, + &self.workspace, + &verification_input, + verification, + )?; + if matches!( + receipt.result, + FleetTaskResult::Fail | FleetTaskResult::Timeout + ) { + self.ledger.mark_task_terminal_status( + &entry.run_id, + &entry.task_id, + Some(worker_id), + ×tamp(), + FleetTaskLedgerStatus::Failed, + )?; + } + return Ok(()); + } self.ledger.record_receipt(FleetReceipt { run_id: entry.run_id.clone(), task_id: entry.task_id.clone(), worker_id: worker_id.to_string(), completed_at: now, result: receipt_result, + failure_kind, artifacts: vec![log_artifact], score: None, }) @@ -633,6 +617,20 @@ impl FleetManager { FleetTaskLedgerStatus::Cancelled => snapshot.cancelled += 1, } } + for receipt in state.receipts.values() { + if run_filter.is_some_and(|run_id| receipt.run_id != *run_id) { + continue; + } + if receipt.result == FleetTaskResult::Partial { + snapshot.partial += 1; + } + match &receipt.failure_kind { + Some(FleetTaskFailureKind::Transport) => snapshot.transport_failed += 1, + Some(FleetTaskFailureKind::Task) => snapshot.task_failed += 1, + Some(FleetTaskFailureKind::Verifier) => snapshot.verifier_failed += 1, + None => {} + } + } snapshot } @@ -660,28 +658,6 @@ enum FleetLocalSimulationResult { Timeout, } -fn validate_task_spec_document(doc: &FleetTaskSpecDocument) -> Result<()> { - if doc.tasks.is_empty() { - bail!("fleet task spec must include at least one task"); - } - let mut ids = BTreeSet::new(); - for task in &doc.tasks { - if task.id.trim().is_empty() { - bail!("fleet task id cannot be empty"); - } - if !ids.insert(task.id.clone()) { - bail!("duplicate fleet task id {}", task.id); - } - if task.name.trim().is_empty() { - bail!("fleet task {} name cannot be empty", task.id); - } - if task.instructions.trim().is_empty() { - bail!("fleet task {} instructions cannot be empty", task.id); - } - } - Ok(()) -} - fn default_local_workers(run_id: &FleetRunId, max_workers: usize) -> Vec { (1..=max_workers) .map(|index| FleetWorkerSpec { @@ -876,7 +852,14 @@ mod tests { id: id.to_string(), name: id.to_string(), description: None, + objective: Some(format!("Complete {id}")), instructions: format!("do {id}"), + worker: None, + workspace: None, + input_files: Vec::new(), + context: Vec::new(), + budget: None, + tags: Vec::new(), expected_artifacts: vec![FleetArtifactKind::Log], scorer: None, retry_policy: None, @@ -979,4 +962,105 @@ mod tests { let state = manager.ledger.rebuild_state().unwrap(); assert_eq!(state.receipts.len(), 1); } + + #[test] + fn fleet_task_spec_sample_launches_independent_worker_tasks() { + let tmp = TempDir::new().unwrap(); + let manager = FleetManager::open(tmp.path()).unwrap(); + let path = task_spec_file( + &tmp, + vec![ + task("release-triage"), + task("risk-review"), + task("docs-check"), + ], + ); + + let report = manager.create_run_from_task_spec_path(&path, 2).unwrap(); + + assert_eq!(report.task_count, 3); + assert_eq!(report.leased, 2); + assert_eq!(report.queued, 1); + assert_ne!(report.worker_ids[0], report.worker_ids[1]); + let state = manager.ledger.rebuild_state().unwrap(); + assert!( + state + .tasks + .contains_key(&format!("{}:release-triage", report.run_id.0)) + ); + assert!( + state + .tasks + .contains_key(&format!("{}:risk-review", report.run_id.0)) + ); + assert!( + state + .tasks + .contains_key(&format!("{}:docs-check", report.run_id.0)) + ); + } + + #[test] + fn fleet_task_spec_local_scorer_records_receipt_artifact() { + let tmp = TempDir::new().unwrap(); + let manager = FleetManager::open(tmp.path()).unwrap(); + let mut completed = task("task-a"); + completed.scorer = Some(FleetScorerSpec::ExitCode); + completed + .metadata + .insert("local_result".to_string(), json!("pass")); + let path = task_spec_file(&tmp, vec![completed]); + + let report = manager.create_run_from_task_spec_path(&path, 1).unwrap(); + + let status = manager.run_status(&report.run_id).unwrap(); + assert_eq!(status.completed, 1); + assert_eq!(status.failed, 0); + assert_eq!(status.partial, 0); + let state = manager.ledger.rebuild_state().unwrap(); + let receipt = &state.receipts[&format!("{}:task-a", report.run_id.0)]; + assert_eq!(receipt.result, FleetTaskResult::Pass); + assert_eq!(receipt.failure_kind, None); + assert!(receipt.score.as_ref().unwrap().value > 0.99); + assert!( + receipt + .artifacts + .iter() + .any(|artifact| matches!(artifact.kind, FleetArtifactKind::Receipt)) + ); + } + + #[test] + fn fleet_task_spec_status_distinguishes_failure_sources() { + let tmp = TempDir::new().unwrap(); + let manager = FleetManager::open(tmp.path()).unwrap(); + let mut transport = task("transport-failure"); + transport.scorer = Some(FleetScorerSpec::ExitCode); + transport + .metadata + .insert("local_result".to_string(), json!("timeout")); + let mut task_failed = task("task-failure"); + task_failed.scorer = Some(FleetScorerSpec::ExitCode); + task_failed + .metadata + .insert("local_result".to_string(), json!("fail")); + let mut verifier_failed = task("verifier-failure"); + verifier_failed.scorer = Some(FleetScorerSpec::RegexMatch { + path: PathBuf::from("missing.log"), + pattern: "[".to_string(), + }); + verifier_failed + .metadata + .insert("local_result".to_string(), json!("pass")); + let path = task_spec_file(&tmp, vec![transport, task_failed, verifier_failed]); + + let report = manager.create_run_from_task_spec_path(&path, 3).unwrap(); + + let status = manager.run_status(&report.run_id).unwrap(); + assert_eq!(status.failed, 3); + assert_eq!(status.transport_failed, 1); + assert_eq!(status.task_failed, 1); + assert_eq!(status.verifier_failed, 1); + assert_eq!(status.running, 0); + } } diff --git a/crates/tui/src/fleet/mod.rs b/crates/tui/src/fleet/mod.rs index 1c7d2e47..5fbf0c2b 100644 --- a/crates/tui/src/fleet/mod.rs +++ b/crates/tui/src/fleet/mod.rs @@ -4,3 +4,4 @@ pub mod host; pub mod ledger; pub mod manager; pub mod scheduler; +pub mod task_spec; diff --git a/crates/tui/src/fleet/scheduler.rs b/crates/tui/src/fleet/scheduler.rs index 20219165..874594b6 100644 --- a/crates/tui/src/fleet/scheduler.rs +++ b/crates/tui/src/fleet/scheduler.rs @@ -563,7 +563,14 @@ mod tests { id: id.to_string(), name: id.to_string(), description: None, + objective: Some(format!("Schedule {id}")), instructions: format!("do {id}"), + worker: None, + workspace: None, + input_files: Vec::new(), + context: Vec::new(), + budget: None, + tags: Vec::new(), expected_artifacts: vec![FleetArtifactKind::Log], scorer: None, retry_policy: Some(FleetRetryPolicy { diff --git a/crates/tui/src/fleet/task_spec.rs b/crates/tui/src/fleet/task_spec.rs new file mode 100644 index 00000000..1dfcdadc --- /dev/null +++ b/crates/tui/src/fleet/task_spec.rs @@ -0,0 +1,749 @@ +//! Typed task-spec loading, artifact refs, deterministic scorers, and receipts. + +#![allow(dead_code)] + +use std::collections::{BTreeMap, BTreeSet}; +use std::path::{Path, PathBuf}; + +use anyhow::{Context, Result, bail}; +use chrono::{SecondsFormat, Utc}; +use codewhale_protocol::fleet::*; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use serde_json::{Value, json}; +use sha2::{Digest, Sha256}; + +use super::ledger::FleetLedger; + +const MAX_SCORER_READ_BYTES: u64 = 1_000_000; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FleetTaskSpecDocument { + #[serde(default)] + pub name: Option, + #[serde(default)] + pub labels: BTreeMap, + #[serde(default, alias = "worker_specs")] + pub workers: Vec, + #[serde(default)] + pub tasks: Vec, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(untagged)] +enum FleetTaskSpecFile { + Document(FleetTaskSpecDocument), + Tasks(Vec), + Single(FleetTaskSpec), +} + +impl FleetTaskSpecFile { + fn into_document(self, fallback_name: String) -> FleetTaskSpecDocument { + match self { + Self::Document(mut doc) => { + if doc.name.as_deref().is_none_or(str::is_empty) { + doc.name = Some(fallback_name); + } + doc + } + Self::Tasks(tasks) => FleetTaskSpecDocument { + name: Some(fallback_name), + labels: BTreeMap::new(), + workers: Vec::new(), + tasks, + }, + Self::Single(task) => FleetTaskSpecDocument { + name: Some(fallback_name), + labels: BTreeMap::new(), + workers: Vec::new(), + tasks: vec![task], + }, + } + } +} + +#[derive(Debug, Clone)] +pub struct FleetTaskVerificationInput { + pub run_id: FleetRunId, + pub task_id: String, + pub worker_id: String, + pub exit_code: Option, + pub artifacts: Vec, +} + +#[derive(Debug, Clone)] +pub struct FleetTaskVerification { + pub result: FleetTaskResult, + pub failure_kind: Option, + pub score: FleetScore, + pub evidence: Vec, +} + +pub fn load_task_spec_document(path: &Path) -> Result { + let raw = std::fs::read_to_string(path) + .with_context(|| format!("reading fleet task spec {}", path.display()))?; + let fallback_name = path + .file_stem() + .and_then(|s| s.to_str()) + .filter(|s| !s.is_empty()) + .unwrap_or("fleet-run") + .to_string(); + let parsed = match path.extension().and_then(|s| s.to_str()) { + Some("toml") => toml::from_str::(&raw) + .with_context(|| format!("parsing TOML fleet task spec {}", path.display()))?, + _ => serde_json::from_str::(&raw) + .with_context(|| format!("parsing JSON fleet task spec {}", path.display()))?, + }; + let doc = parsed.into_document(fallback_name); + validate_task_spec_document(&doc)?; + Ok(doc) +} + +pub fn validate_task_spec_document(doc: &FleetTaskSpecDocument) -> Result<()> { + if doc.tasks.is_empty() { + bail!("fleet task spec must include at least one task"); + } + let mut ids = BTreeSet::new(); + for task in &doc.tasks { + if task.id.trim().is_empty() { + bail!("fleet task id cannot be empty"); + } + if !ids.insert(task.id.clone()) { + bail!("duplicate fleet task id {}", task.id); + } + if task.name.trim().is_empty() { + bail!("fleet task {} name cannot be empty", task.id); + } + if task.instructions.trim().is_empty() { + bail!("fleet task {} instructions cannot be empty", task.id); + } + if let Some(objective) = &task.objective + && objective.trim().is_empty() + { + bail!("fleet task {} objective cannot be empty", task.id); + } + validate_tags(&task.id, &task.tags)?; + validate_workspace_requirements(task)?; + } + Ok(()) +} + +pub fn write_fleet_artifact_ref( + workspace: &Path, + run_id: &FleetRunId, + task_id: &str, + worker_id: &str, + kind: FleetArtifactKind, + filename: &str, + contents: &[u8], + mime_type: Option<&str>, +) -> Result { + let rel_path = PathBuf::from(".codewhale") + .join("fleet") + .join(safe_path_segment(&run_id.0)) + .join(safe_path_segment(task_id)) + .join(safe_path_segment(worker_id)) + .join(safe_path_segment(filename)); + let abs_path = workspace.join(&rel_path); + if let Some(parent) = abs_path.parent() { + std::fs::create_dir_all(parent) + .with_context(|| format!("creating fleet artifact dir {}", parent.display()))?; + } + std::fs::write(&abs_path, contents) + .with_context(|| format!("writing fleet artifact {}", abs_path.display()))?; + let digest = Sha256::digest(contents); + Ok(FleetArtifactRef { + kind, + path: rel_path, + checksum: Some(format!("sha256:{digest:x}")), + mime_type: mime_type.map(str::to_string), + size_bytes: Some(contents.len() as u64), + }) +} + +pub fn verify_task_result( + workspace: &Path, + task: &FleetTaskSpec, + input: &FleetTaskVerificationInput, +) -> FleetTaskVerification { + match &task.scorer { + Some(FleetScorerSpec::ExitCode) => verify_exit_code(input.exit_code), + Some(FleetScorerSpec::FileExists { path }) => verify_file_exists(workspace, path), + Some(FleetScorerSpec::RegexMatch { path, pattern }) => { + verify_regex_match(workspace, path, pattern) + } + Some(FleetScorerSpec::JsonPath { path, expression }) => { + verify_json_path(workspace, path, expression) + } + Some(FleetScorerSpec::Command { command, .. }) => partial( + format!("external scorer command configured: {command}"), + "run the configured scorer command to finalize this receipt", + ), + Some(FleetScorerSpec::CodeWhaleVerifierPrompt { .. }) => partial( + "CodeWhale verifier prompt configured", + "run a verifier prompt pass to finalize this receipt", + ), + Some(FleetScorerSpec::Manual) => partial( + "manual scorer configured", + "manual verification is required to finalize this receipt", + ), + None => partial( + "no scorer configured", + "task has artifacts but no deterministic scorer", + ), + } +} + +pub fn record_verification_receipt( + ledger: &FleetLedger, + workspace: &Path, + input: &FleetTaskVerificationInput, + verification: FleetTaskVerification, +) -> Result { + let evidence = json!({ + "run_id": input.run_id.0.clone(), + "task_id": input.task_id.clone(), + "worker_id": input.worker_id.clone(), + "result": verification.result.clone(), + "failure_kind": verification.failure_kind.clone(), + "score": verification.score.clone(), + "evidence": verification.evidence.clone(), + "artifacts": input.artifacts.clone(), + }); + let bytes = + serde_json::to_vec_pretty(&evidence).context("serializing fleet receipt evidence")?; + let receipt_artifact = write_fleet_artifact_ref( + workspace, + &input.run_id, + &input.task_id, + &input.worker_id, + FleetArtifactKind::Receipt, + "verification-receipt.json", + &bytes, + Some("application/json"), + )?; + let mut artifacts = input.artifacts.clone(); + artifacts.push(receipt_artifact); + let receipt = FleetReceipt { + run_id: input.run_id.clone(), + task_id: input.task_id.clone(), + worker_id: input.worker_id.clone(), + completed_at: timestamp(), + result: verification.result, + failure_kind: verification.failure_kind, + artifacts, + score: Some(verification.score), + }; + ledger.record_receipt(receipt.clone())?; + Ok(receipt) +} + +fn validate_tags(task_id: &str, tags: &[String]) -> Result<()> { + let mut seen = BTreeSet::new(); + for tag in tags { + if tag.trim().is_empty() { + bail!("fleet task {task_id} tag cannot be empty"); + } + if !seen.insert(tag) { + bail!("fleet task {task_id} has duplicate tag {tag}"); + } + } + Ok(()) +} + +fn validate_workspace_requirements(task: &FleetTaskSpec) -> Result<()> { + let Some(workspace) = &task.workspace else { + return Ok(()); + }; + let env = workspace.environment.as_ref(); + for name in env + .into_iter() + .flat_map(|env| env.required.iter().chain(env.allowlist.iter())) + { + if name.trim().is_empty() { + bail!( + "fleet task {} environment variable name cannot be empty", + task.id + ); + } + } + Ok(()) +} + +fn verify_exit_code(exit_code: Option) -> FleetTaskVerification { + match exit_code { + Some(0) => pass("exit_code=0"), + Some(code) => fail( + FleetTaskFailureKind::Task, + 0.0, + format!("exit_code={code}"), + "worker task exited unsuccessfully", + ), + None => fail( + FleetTaskFailureKind::Transport, + 0.0, + "missing exit code", + "worker transport did not report a process result", + ), + } +} + +fn verify_file_exists(workspace: &Path, path: &Path) -> FleetTaskVerification { + let abs_path = resolve_workspace_path(workspace, path); + if abs_path.is_file() { + pass(format!("file exists: {}", path.display())) + } else { + fail( + FleetTaskFailureKind::Task, + 0.0, + format!("missing file: {}", path.display()), + "expected artifact file was not produced", + ) + } +} + +fn verify_regex_match(workspace: &Path, path: &Path, pattern: &str) -> FleetTaskVerification { + let regex = match Regex::new(pattern) { + Ok(regex) => regex, + Err(err) => { + return fail( + FleetTaskFailureKind::Verifier, + 0.0, + format!("invalid regex: {err}"), + "regex scorer could not be compiled", + ); + } + }; + let contents = match read_bounded_to_string(workspace, path) { + Ok(contents) => contents, + Err(err) => { + return fail( + err.failure_kind, + 0.0, + err.evidence, + "regex scorer could not read bounded evidence", + ); + } + }; + if regex.is_match(&contents) { + pass(format!("regex matched {}: {pattern}", path.display())) + } else { + fail( + FleetTaskFailureKind::Task, + 0.0, + format!("regex did not match {}: {pattern}", path.display()), + "worker output did not satisfy the regex scorer", + ) + } +} + +fn verify_json_path(workspace: &Path, path: &Path, expression: &str) -> FleetTaskVerification { + let Some(segments) = json_path_segments(expression) else { + return fail( + FleetTaskFailureKind::Verifier, + 0.0, + format!("unsupported JSON path expression: {expression}"), + "json_path scorer supports $.field or .field paths", + ); + }; + let contents = match read_bounded_to_string(workspace, path) { + Ok(contents) => contents, + Err(err) => { + return fail( + err.failure_kind, + 0.0, + err.evidence, + "json_path scorer could not read bounded evidence", + ); + } + }; + let value: Value = match serde_json::from_str(&contents) { + Ok(value) => value, + Err(err) => { + return fail( + FleetTaskFailureKind::Task, + 0.0, + format!("invalid JSON in {}: {err}", path.display()), + "worker artifact was not valid JSON", + ); + } + }; + match json_path_lookup(&value, &segments) { + Some(found) if json_truthy(found) => pass(format!( + "json_path matched {}: {expression}", + path.display() + )), + _ => fail( + FleetTaskFailureKind::Task, + 0.0, + format!( + "json_path missing or false in {}: {expression}", + path.display() + ), + "worker JSON artifact did not satisfy the scorer", + ), + } +} + +fn pass(evidence: impl Into) -> FleetTaskVerification { + let evidence = evidence.into(); + FleetTaskVerification { + result: FleetTaskResult::Pass, + failure_kind: None, + score: FleetScore { + value: 1.0, + max: Some(1.0), + notes: Some(evidence.clone()), + }, + evidence: vec![evidence], + } +} + +fn partial(evidence: impl Into, notes: impl Into) -> FleetTaskVerification { + let evidence = evidence.into(); + let notes = notes.into(); + FleetTaskVerification { + result: FleetTaskResult::Partial, + failure_kind: None, + score: FleetScore { + value: 0.5, + max: Some(1.0), + notes: Some(notes), + }, + evidence: vec![evidence], + } +} + +fn fail( + failure_kind: FleetTaskFailureKind, + value: f64, + evidence: impl Into, + notes: impl Into, +) -> FleetTaskVerification { + let evidence = evidence.into(); + FleetTaskVerification { + result: FleetTaskResult::Fail, + failure_kind: Some(failure_kind), + score: FleetScore { + value, + max: Some(1.0), + notes: Some(notes.into()), + }, + evidence: vec![evidence], + } +} + +#[derive(Debug)] +struct EvidenceReadError { + failure_kind: FleetTaskFailureKind, + evidence: String, +} + +fn read_bounded_to_string( + workspace: &Path, + path: &Path, +) -> std::result::Result { + let abs_path = resolve_workspace_path(workspace, path); + let metadata = std::fs::metadata(&abs_path).map_err(|err| EvidenceReadError { + failure_kind: if err.kind() == std::io::ErrorKind::NotFound { + FleetTaskFailureKind::Task + } else { + FleetTaskFailureKind::Verifier + }, + evidence: format!("cannot read {}: {err}", path.display()), + })?; + if metadata.len() > MAX_SCORER_READ_BYTES { + return Err(EvidenceReadError { + failure_kind: FleetTaskFailureKind::Verifier, + evidence: format!( + "refusing to read oversized evidence {}: {} bytes", + path.display(), + metadata.len() + ), + }); + } + std::fs::read_to_string(&abs_path).map_err(|err| EvidenceReadError { + failure_kind: FleetTaskFailureKind::Verifier, + evidence: format!("cannot decode {} as UTF-8: {err}", path.display()), + }) +} + +fn resolve_workspace_path(workspace: &Path, path: &Path) -> PathBuf { + if path.is_absolute() { + path.to_path_buf() + } else { + workspace.join(path) + } +} + +fn json_path_segments(expression: &str) -> Option> { + let trimmed = expression.trim(); + let path = trimmed + .strip_prefix("$.") + .or_else(|| trimmed.strip_prefix('.'))?; + if path.is_empty() { + return None; + } + let segments: Vec<_> = path.split('.').collect(); + if segments.iter().any(|segment| segment.is_empty()) { + return None; + } + Some(segments) +} + +fn json_path_lookup<'a>(value: &'a Value, segments: &[&str]) -> Option<&'a Value> { + let mut current = value; + for segment in segments { + current = current.as_object()?.get(*segment)?; + } + Some(current) +} + +fn json_truthy(value: &Value) -> bool { + !matches!(value, Value::Null | Value::Bool(false)) +} + +fn timestamp() -> String { + Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true) +} + +fn safe_path_segment(value: &str) -> String { + value + .chars() + .map(|ch| { + if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.') { + ch + } else { + '_' + } + }) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + use tempfile::TempDir; + + fn task(id: &str, scorer: Option) -> FleetTaskSpec { + FleetTaskSpec { + id: id.to_string(), + name: id.to_string(), + description: None, + objective: Some(format!("Verify {id}")), + instructions: format!("do {id}"), + worker: Some(FleetTaskWorkerProfile { + role: Some("reviewer".to_string()), + tool_profile: Some("read-only".to_string()), + tools: vec!["git".to_string()], + capabilities: vec!["rust".to_string()], + }), + workspace: Some(FleetWorkspaceRequirements { + root: Some(PathBuf::from(".")), + required_files: vec![PathBuf::from("Cargo.toml")], + writable_paths: vec![PathBuf::from(".codewhale/fleet")], + environment: Some(FleetEnvironmentRequirements { + required: vec!["PATH".to_string()], + allowlist: vec!["RUST_LOG".to_string()], + }), + }), + input_files: vec![PathBuf::from("Cargo.toml")], + context: vec!["fleet verifier test".to_string()], + budget: Some(FleetTaskBudget { + max_tokens: Some(4000), + max_tool_calls: Some(12), + max_seconds: Some(120), + }), + expected_artifacts: vec![FleetArtifactKind::Log, FleetArtifactKind::Receipt], + scorer, + retry_policy: Some(FleetRetryPolicy::default()), + alert_policy: None, + timeout_seconds: Some(120), + tags: vec!["review".to_string()], + metadata: BTreeMap::new(), + } + } + + #[test] + fn fleet_task_spec_document_parses_multi_task_verified_shape() { + let tmp = TempDir::new().unwrap(); + let path = tmp.path().join("fleet-tasks.json"); + let doc = json!({ + "name": "release triage", + "labels": {"milestone": "v0.8.60"}, + "tasks": [ + task("release-notes", Some(FleetScorerSpec::ExitCode)), + task("risk-review", Some(FleetScorerSpec::Manual)) + ] + }); + std::fs::write(&path, serde_json::to_string_pretty(&doc).unwrap()).unwrap(); + + let parsed = load_task_spec_document(&path).unwrap(); + + assert_eq!(parsed.name.as_deref(), Some("release triage")); + assert_eq!(parsed.tasks.len(), 2); + assert_eq!( + parsed.tasks[0].objective.as_deref(), + Some("Verify release-notes") + ); + assert_eq!( + parsed.tasks[0].worker.as_ref().unwrap().role.as_deref(), + Some("reviewer") + ); + assert_eq!(parsed.tasks[1].tags, vec!["review"]); + } + + #[test] + fn fleet_task_spec_artifact_refs_are_bounded_paths() { + let tmp = TempDir::new().unwrap(); + let artifact = write_fleet_artifact_ref( + tmp.path(), + &FleetRunId::from("run-1"), + "task-a", + "worker-1", + FleetArtifactKind::Log, + "worker.log", + b"this is artifact content", + Some("text/plain"), + ) + .unwrap(); + + let json = serde_json::to_string(&artifact).unwrap(); + assert!(!json.contains("this is artifact content")); + assert!(json.contains("worker.log")); + assert_eq!(artifact.size_bytes, Some(24)); + assert!(artifact.checksum.as_deref().unwrap().starts_with("sha256:")); + assert!(tmp.path().join(&artifact.path).exists()); + } + + #[test] + fn fleet_task_spec_scorers_record_pass_fail_partial_evidence() { + let tmp = TempDir::new().unwrap(); + std::fs::write(tmp.path().join("result.txt"), "status=ok\n").unwrap(); + std::fs::write(tmp.path().join("result.json"), r#"{"status":"ok"}"#).unwrap(); + let input = FleetTaskVerificationInput { + run_id: FleetRunId::from("run-1"), + task_id: "task-a".to_string(), + worker_id: "worker-1".to_string(), + exit_code: Some(0), + artifacts: vec![], + }; + + let pass = verify_task_result( + tmp.path(), + &task("exit", Some(FleetScorerSpec::ExitCode)), + &input, + ); + assert_eq!(pass.result, FleetTaskResult::Pass); + assert_eq!(pass.failure_kind, None); + + let regex = verify_task_result( + tmp.path(), + &task( + "regex", + Some(FleetScorerSpec::RegexMatch { + path: PathBuf::from("result.txt"), + pattern: "status=ok".to_string(), + }), + ), + &input, + ); + assert_eq!(regex.result, FleetTaskResult::Pass); + + let json_path = verify_task_result( + tmp.path(), + &task( + "json", + Some(FleetScorerSpec::JsonPath { + path: PathBuf::from("result.json"), + expression: "$.status".to_string(), + }), + ), + &input, + ); + assert_eq!(json_path.result, FleetTaskResult::Pass); + + let manual = verify_task_result( + tmp.path(), + &task("manual", Some(FleetScorerSpec::Manual)), + &input, + ); + assert_eq!(manual.result, FleetTaskResult::Partial); + + let failed = verify_task_result( + tmp.path(), + &task( + "missing", + Some(FleetScorerSpec::FileExists { + path: PathBuf::from("missing.txt"), + }), + ), + &input, + ); + assert_eq!(failed.result, FleetTaskResult::Fail); + assert_eq!(failed.failure_kind, Some(FleetTaskFailureKind::Task)); + + let verifier_failed = verify_task_result( + tmp.path(), + &task( + "bad-regex", + Some(FleetScorerSpec::RegexMatch { + path: PathBuf::from("result.txt"), + pattern: "[".to_string(), + }), + ), + &input, + ); + assert_eq!(verifier_failed.result, FleetTaskResult::Fail); + assert_eq!( + verifier_failed.failure_kind, + Some(FleetTaskFailureKind::Verifier) + ); + } + + #[test] + fn fleet_task_spec_receipt_records_artifacts_scores_and_failure_kind() { + let tmp = TempDir::new().unwrap(); + let ledger = FleetLedger::open(tmp.path()).unwrap(); + let log = write_fleet_artifact_ref( + tmp.path(), + &FleetRunId::from("run-1"), + "task-a", + "worker-1", + FleetArtifactKind::Log, + "worker.log", + b"exit_code=1", + Some("text/plain"), + ) + .unwrap(); + let input = FleetTaskVerificationInput { + run_id: FleetRunId::from("run-1"), + task_id: "task-a".to_string(), + worker_id: "worker-1".to_string(), + exit_code: Some(1), + artifacts: vec![log], + }; + let verification = verify_task_result( + tmp.path(), + &task("task-a", Some(FleetScorerSpec::ExitCode)), + &input, + ); + + let receipt = + record_verification_receipt(&ledger, tmp.path(), &input, verification).unwrap(); + + assert_eq!(receipt.result, FleetTaskResult::Fail); + assert_eq!(receipt.failure_kind, Some(FleetTaskFailureKind::Task)); + assert_eq!(receipt.artifacts.len(), 2); + assert!(matches!( + receipt.artifacts.last().unwrap().kind, + FleetArtifactKind::Receipt + )); + let state = ledger.rebuild_state().unwrap(); + assert_eq!( + state.receipts["run-1:task-a"].failure_kind, + Some(FleetTaskFailureKind::Task) + ); + } +} diff --git a/crates/tui/src/main.rs b/crates/tui/src/main.rs index f82e6125..7c24368f 100644 --- a/crates/tui/src/main.rs +++ b/crates/tui/src/main.rs @@ -1474,12 +1474,16 @@ async fn run_fleet_command(workspace: &Path, args: FleetArgs) -> Result<()> { fn print_status(status: &FleetStatusSnapshot) { println!( - "fleet: runs={} queued={} running={} completed={} failed={} cancelled={} stale={}", + "fleet: runs={} queued={} running={} completed={} partial={} failed={} transport_failed={} task_failed={} verifier_failed={} cancelled={} stale={}", status.runs, status.queued, status.running, status.completed, + status.partial, status.failed, + status.transport_failed, + status.task_failed, + status.verifier_failed, status.cancelled, status.stale ); diff --git a/docs/FLEET.md b/docs/FLEET.md index e7ea5b37..1c227155 100644 --- a/docs/FLEET.md +++ b/docs/FLEET.md @@ -38,6 +38,143 @@ logs and adapter logs are stored under `.codewhale/fleet/` and Workers are optional. If omitted, CodeWhale creates local worker slots up to `--max-workers`. +Task specs are typed in Rust and keep verification data separate from worker +transcripts. A task can declare: + +- `id`, `name`, `description`, `objective`, and `instructions` +- `worker` role, tool profile, tools, and required capabilities +- `workspace` root, required files, writable paths, and environment allowlist +- `input_files`, extra `context`, `budget`, `timeout_seconds`, and `retry_policy` +- `expected_artifacts`, `scorer`, `tags`, and free-form `metadata` + +Workers write bounded artifact files under `.codewhale/fleet/` and ledger only +the artifact refs: kind, path, checksum, MIME type, and size. Receipts record +`pass`, `fail`, `partial`, `skip`, or `timeout`; failed receipts may also mark +the source as `transport`, `task`, or `verifier`. `codewhale fleet status` +surfaces those failure-source counts separately. + +Deterministic built-in scorers are `exit_code`, `file_exists`, `regex_match`, +and `json_path`. Specs may also declare `command`, +`code_whale_verifier_prompt`, or `manual`; those record a partial receipt until +an explicit verifier pass completes. + +### Release Triage Example + +```json +{ + "name": "v0.8.60 release triage", + "labels": { + "milestone": "v0.8.60" + }, + "tasks": [ + { + "id": "release-issue-sweep", + "name": "Release issue sweep", + "objective": "Find open v0.8.60 blockers and credit-sensitive PRs.", + "instructions": "Review the v0.8.60 milestone, linked PRs, changelog entries, and contributor-credit requirements. Write a concise blocker report.", + "worker": { + "role": "release-triage", + "tool_profile": "read-only", + "tools": ["gh", "git"], + "capabilities": ["github", "release"] + }, + "workspace": { + "required_files": ["Cargo.toml", "CHANGELOG.md", ".github/AUTHOR_MAP"], + "writable_paths": [".codewhale/fleet"], + "environment": { + "required": ["PATH"] + } + }, + "input_files": ["CHANGELOG.md", ".github/AUTHOR_MAP"], + "context": ["Treat community PRs as maintainer evidence."], + "budget": { + "max_tokens": 12000, + "max_tool_calls": 24, + "max_seconds": 900 + }, + "timeout_seconds": 900, + "expected_artifacts": ["log", "report", "receipt"], + "scorer": { + "kind": "exit_code" + }, + "retry_policy": { + "max_attempts": 2, + "initial_backoff_seconds": 10, + "max_backoff_seconds": 60, + "backoff_multiplier": 2 + }, + "tags": ["release", "triage"], + "metadata": { + "class": "release" + } + } + ] +} +``` + +### Code Review Swarm Example + +```json +{ + "name": "code review swarm", + "tasks": [ + { + "id": "protocol-review", + "name": "Protocol review", + "objective": "Review fleet protocol changes for compatibility and sparse JSON behavior.", + "instructions": "Inspect crates/protocol/src/fleet.rs and report behavior regressions, missing serde defaults, or unsafe wire changes.", + "worker": { + "role": "reviewer", + "tool_profile": "read-only", + "tools": ["git", "rg", "cargo"], + "capabilities": ["rust"] + }, + "input_files": ["crates/protocol/src/fleet.rs"], + "budget": { + "max_tokens": 8000, + "max_tool_calls": 16, + "max_seconds": 600 + }, + "expected_artifacts": ["log", "report", "receipt"], + "scorer": { + "kind": "code_whale_verifier_prompt", + "prompt": "Verify the review includes at least one concrete file:line finding or explicitly says no issues were found." + }, + "tags": ["review", "protocol"], + "metadata": { + "class": "code-review" + } + }, + { + "id": "tui-review", + "name": "TUI review", + "objective": "Review fleet CLI and manager behavior for operator-visible regressions.", + "instructions": "Inspect crates/tui/src/fleet and crates/tui/src/main.rs. Focus on status output, receipt recording, and failure classification.", + "worker": { + "role": "reviewer", + "tool_profile": "read-only", + "tools": ["git", "rg", "cargo"], + "capabilities": ["rust", "cli"] + }, + "input_files": ["crates/tui/src/fleet", "crates/tui/src/main.rs"], + "budget": { + "max_tokens": 10000, + "max_tool_calls": 20, + "max_seconds": 600 + }, + "expected_artifacts": ["log", "report", "receipt"], + "scorer": { + "kind": "manual" + }, + "tags": ["review", "tui"], + "metadata": { + "class": "code-review" + } + } + ] +} +``` + ## Host Adapters The host adapter boundary supports local child processes and explicit SSH