diff --git a/crates/tui/src/fleet/executor.rs b/crates/tui/src/fleet/executor.rs index 10ef1831..fe4e9597 100644 --- a/crates/tui/src/fleet/executor.rs +++ b/crates/tui/src/fleet/executor.rs @@ -21,7 +21,7 @@ #![allow(dead_code)] use codewhale_config::FleetExecConfig; -use codewhale_protocol::fleet::{FleetTaskSpec, FleetWorkerEventPayload}; +use codewhale_protocol::fleet::{FleetHostSpec, FleetTaskSpec, FleetWorkerEventPayload}; use super::host::{FleetHostAdapter, FleetWorkerCommand}; use super::worker_runtime::fleet_task_prompt; @@ -154,21 +154,38 @@ pub fn classify_worker_exit(exit_code: Option, stopped: bool) -> FleetWorke /// construction never touches the orchestrator — the parent only ingests a /// compact event stream, which is what keeps it light at high fanout. pub struct FleetExecutor { + workspace: std::path::PathBuf, adapter: super::host::LocalProcessFleetHostAdapter, + ssh_adapters: std::collections::BTreeMap, streams: std::collections::BTreeMap, } struct WorkerStream { log_path: std::path::PathBuf, + host: WorkerStreamHost, offset: u64, pending: String, terminal: bool, } +enum WorkerStreamHost { + Local, + Ssh(String), +} + +#[derive(Debug, Clone)] +pub struct FleetWorkerTerminalEvent { + pub payload: FleetWorkerEventPayload, + pub exit_code: Option, +} + impl FleetExecutor { pub fn new(workspace: impl AsRef) -> Self { + let workspace = workspace.as_ref().to_path_buf(); Self { - adapter: super::host::LocalProcessFleetHostAdapter::new(workspace), + adapter: super::host::LocalProcessFleetHostAdapter::new(&workspace), + workspace, + ssh_adapters: std::collections::BTreeMap::new(), streams: std::collections::BTreeMap::new(), } } @@ -179,20 +196,79 @@ impl FleetExecutor { worker_id: &str, command: FleetWorkerCommand, cwd: Option, - ) -> super::host::FleetHostResult<()> { + ) -> super::host::FleetHostResult { + self.start_worker_on_host(worker_id, &FleetHostSpec::Local, command, cwd) + } + + /// Start a worker on the requested fleet host. + pub fn start_worker_on_host( + &mut self, + worker_id: &str, + host: &FleetHostSpec, + command: FleetWorkerCommand, + cwd: Option, + ) -> super::host::FleetHostResult { let mut request = super::host::FleetWorkerStartRequest::new(worker_id, command); request.cwd = cwd; - let handle = self.adapter.start_worker(request)?; + let (handle, host) = match host { + FleetHostSpec::Local => { + let handle = self.adapter.start_worker(request)?; + (handle, WorkerStreamHost::Local) + } + FleetHostSpec::Ssh { .. } => { + let config = super::host::SshFleetHostConfig::from_host_spec(host)?; + let key = worker_id.to_string(); + let adapter = self.ssh_adapters.entry(key.clone()).or_insert( + super::host::SshFleetHostAdapter::new(&self.workspace, config)?, + ); + let handle = adapter.start_worker(request)?; + (handle, WorkerStreamHost::Ssh(key)) + } + FleetHostSpec::Docker { image, .. } => { + return Err(super::host::FleetHostError { + kind: super::host::FleetHostErrorKind::Configuration, + message: format!("docker fleet workers are not wired yet (image {image})"), + }); + } + }; self.streams.insert( worker_id.to_string(), WorkerStream { - log_path: handle.log_path, + log_path: handle.log_path.clone(), + host, offset: 0, pending: String::new(), terminal: false, }, ); - Ok(()) + Ok(handle) + } + + pub fn is_tracking(&self, worker_id: &str) -> bool { + self.streams.contains_key(worker_id) + } + + pub fn worker_ids(&self) -> Vec { + self.streams.keys().cloned().collect() + } + + /// Stop tracking a terminal worker so the scheduler can reuse the same + /// logical worker id for the next queued task. + pub fn forget_worker(&mut self, worker_id: &str) { + let Some(stream) = self.streams.remove(worker_id) else { + return; + }; + match stream.host { + WorkerStreamHost::Local => { + let _ = self.adapter.cleanup_worker(worker_id); + } + WorkerStreamHost::Ssh(key) => { + if let Some(adapter) = self.ssh_adapters.get_mut(&key) { + let _ = adapter.cleanup_worker(worker_id); + } + self.ssh_adapters.remove(&key); + } + } } /// Read any newly-written stream-json lines for a worker and map them to @@ -228,10 +304,26 @@ impl FleetExecutor { /// once. Returns `None` while the worker is still running or already /// finalized. pub fn poll_terminal(&mut self, worker_id: &str) -> Option { + self.poll_terminal_with_status(worker_id) + .map(|event| event.payload) + } + + /// Poll the worker process and include the raw exit code for receipt + /// verification. + pub fn poll_terminal_with_status( + &mut self, + worker_id: &str, + ) -> Option { if self.streams.get(worker_id).is_none_or(|s| s.terminal) { return None; } - let status = self.adapter.read_status(worker_id).ok()?; + let status = match self.streams.get(worker_id).map(|s| &s.host)? { + WorkerStreamHost::Local => self.adapter.read_status(worker_id).ok()?, + WorkerStreamHost::Ssh(key) => self + .ssh_adapters + .get_mut(key) + .and_then(|adapter| adapter.read_status(worker_id).ok())?, + }; let terminal = match status.state { super::host::FleetHostWorkerState::Running | super::host::FleetHostWorkerState::Unknown => return None, @@ -246,7 +338,10 @@ impl FleetExecutor { if let Some(stream) = self.streams.get_mut(worker_id) { stream.terminal = true; } - Some(terminal) + Some(FleetWorkerTerminalEvent { + payload: terminal, + exit_code: status.exit_code, + }) } /// True once every started worker has reached a terminal state. diff --git a/crates/tui/src/fleet/manager.rs b/crates/tui/src/fleet/manager.rs index 9f8ff6fb..26b8c9a8 100644 --- a/crates/tui/src/fleet/manager.rs +++ b/crates/tui/src/fleet/manager.rs @@ -16,6 +16,8 @@ use codewhale_protocol::fleet::*; use serde_json::Value; use uuid::Uuid; +use super::executor::{FleetExecutor, FleetWorkerTerminalEvent, build_worker_exec_command}; +use super::host::FleetHostErrorKind; use super::ledger::{FleetLedger, FleetLedgerState, FleetTaskLedgerStatus, FleetTaskState}; use super::task_spec::{ FleetTaskSpecDocument, FleetTaskVerificationInput, load_task_spec_document, @@ -70,6 +72,13 @@ pub struct FleetTickReport { pub heartbeats: usize, } +#[derive(Debug, Clone, Default)] +pub struct FleetExecutorTickReport { + pub started: usize, + pub events: usize, + pub terminals: usize, +} + #[derive(Debug, Clone, Default)] pub struct FleetStatusSnapshot { pub runs: usize, @@ -126,6 +135,13 @@ pub struct FleetWorkerRuntimeProjection { pub has_session: bool, } +#[derive(Debug, Clone)] +struct FleetExecutorTaskContext { + entry: FleetInboxEntry, + task_spec: FleetTaskSpec, + worker_id: String, +} + impl FleetManager { pub fn open(workspace: impl AsRef) -> Result { let workspace = workspace.as_ref().to_path_buf(); @@ -299,6 +315,76 @@ impl FleetManager { Ok(status.queued + status.running + status.stale > 0) } + pub async fn run_to_completion( + &self, + run_id: &FleetRunId, + max_workers: usize, + executor: &mut FleetExecutor, + codewhale_binary: &str, + model: Option<&str>, + tick_interval: Duration, + ) -> Result { + let max_workers = max_workers.clamp(1, 128); + loop { + self.schedule_run(run_id, max_workers)?; + self.drive_executor_tick(run_id, executor, codewhale_binary, model)?; + self.refresh_run_status(run_id)?; + if !self.run_has_open_work(run_id)? { + return self.run_status(run_id); + } + tokio::time::sleep(tick_interval).await; + } + } + + pub fn drive_executor_tick( + &self, + run_id: &FleetRunId, + executor: &mut FleetExecutor, + codewhale_binary: &str, + model: Option<&str>, + ) -> Result { + let mut report = FleetExecutorTickReport::default(); + report.started += self.start_leased_workers(run_id, executor, codewhale_binary, model)?; + + for worker_id in executor.worker_ids() { + for payload in executor.drain_events(&worker_id) { + // The subprocess exit is the task-completion authority. Stream + // `done` / `error` lines are useful progress signals, but + // appending them as terminal ledger events before the process + // exits would free the logical worker too early. + if is_terminal_payload(&payload) { + continue; + } + let Some(task) = self.executor_task_context(&worker_id)? else { + continue; + }; + self.append_worker_event( + &task.entry.run_id, + &worker_id, + &task.entry.task_id, + payload, + )?; + self.ledger + .heartbeat(&worker_id, ×tamp(), None, None)?; + report.events += 1; + } + + if let Some(terminal) = executor.poll_terminal_with_status(&worker_id) { + let Some(task) = self.executor_task_context(&worker_id)? else { + executor.forget_worker(&worker_id); + continue; + }; + if self.record_task_outcome(&task, terminal)? { + report.terminals += 1; + } + executor.forget_worker(&worker_id); + } + } + + self.refresh_run_status(run_id)?; + Ok(report) + } + pub fn inspect_worker(&self, worker_id: &str) -> Result { let state = self.ledger.rebuild_state()?; let latest_event = latest_event_for_worker(&state, worker_id).cloned(); @@ -602,68 +688,142 @@ impl FleetManager { } } - self.maybe_complete_local_simulation(entry, worker_id, task_spec, log_artifact) + Ok(()) } - fn maybe_complete_local_simulation( + fn start_leased_workers( &self, - entry: &FleetInboxEntry, - worker_id: &str, - task_spec: &FleetTaskSpec, - log_artifact: FleetArtifactRef, - ) -> Result<()> { - let Some(result) = local_simulation_result(task_spec) else { - return Ok(()); + run_id: &FleetRunId, + executor: &mut FleetExecutor, + codewhale_binary: &str, + model: Option<&str>, + ) -> Result { + let state = self.ledger.rebuild_state()?; + let run = state + .runs + .get(&run_id.0) + .cloned() + .ok_or_else(|| anyhow!("fleet run {} does not exist", run_id.0))?; + let mut started = 0usize; + for task in active_tasks_for_run(&state, run_id) { + let Some(worker_id) = task.leased_to.as_deref() else { + continue; + }; + if executor.is_tracking(worker_id) { + continue; + } + let Some(task_spec) = run + .task_specs + .iter() + .find(|spec| spec.id == task.entry.task_id) + .cloned() + else { + continue; + }; + let worker_spec = run + .worker_specs + .iter() + .find(|worker| worker.id == worker_id) + .cloned() + .unwrap_or_else(|| default_local_worker(worker_id)); + let command = + build_worker_exec_command(codewhale_binary, &task_spec, &self.exec_config, model); + let cwd = resolve_task_cwd(&self.workspace, &task_spec); + match executor.start_worker_on_host(worker_id, &worker_spec.host, command, Some(cwd)) { + Ok(handle) => { + let artifact = self.host_log_artifact(&handle.log_path); + self.append_worker_event( + run_id, + worker_id, + &task.entry.task_id, + FleetWorkerEventPayload::Artifact(artifact), + )?; + started += 1; + } + Err(err) => { + let recoverable = matches!(err.kind, FleetHostErrorKind::Retryable); + let task = FleetExecutorTaskContext { + entry: task.entry.clone(), + task_spec, + worker_id: worker_id.to_string(), + }; + let terminal = FleetWorkerTerminalEvent { + payload: FleetWorkerEventPayload::Failed { + reason: err.message, + recoverable, + }, + exit_code: None, + }; + let _ = self.record_task_outcome(&task, terminal)?; + } + } + } + Ok(started) + } + + fn executor_task_context(&self, worker_id: &str) -> Result> { + let state = self.ledger.rebuild_state()?; + let Some(task) = active_task_for_worker(&state, worker_id) + .or_else(|| latest_task_for_worker(&state, worker_id)) + else { + return Ok(None); }; - let now = timestamp(); - let (payload, receipt_result, failure_kind, exit_code) = match result { - FleetLocalSimulationResult::Pass => ( - FleetWorkerEventPayload::Completed { - exit_code: Some(0), - summary: Some("local fleet smoke task completed".to_string()), - }, - FleetTaskResult::Pass, - None, - Some(0), - ), - FleetLocalSimulationResult::Fail => ( - FleetWorkerEventPayload::Failed { - reason: "local fleet smoke task failed".to_string(), - recoverable: false, - }, - FleetTaskResult::Fail, - Some(FleetTaskFailureKind::Task), - Some(1), - ), - FleetLocalSimulationResult::Skip => ( - FleetWorkerEventPayload::Completed { - exit_code: Some(0), - summary: Some("local fleet smoke task skipped".to_string()), - }, - FleetTaskResult::Skip, - None, - Some(0), - ), - FleetLocalSimulationResult::Timeout => ( - FleetWorkerEventPayload::Failed { - reason: "local fleet smoke task timed out".to_string(), - recoverable: true, - }, - FleetTaskResult::Timeout, - Some(FleetTaskFailureKind::Transport), - None, - ), + let Some(run) = state.runs.get(&task.entry.run_id.0) else { + return Ok(None); }; - self.append_worker_event(&entry.run_id, worker_id, &entry.task_id, payload)?; - let verification_input = FleetTaskVerificationInput { - run_id: entry.run_id.clone(), - task_id: entry.task_id.clone(), + let Some(task_spec) = run + .task_specs + .iter() + .find(|spec| spec.id == task.entry.task_id) + .cloned() + else { + return Ok(None); + }; + Ok(Some(FleetExecutorTaskContext { + entry: task.entry.clone(), + task_spec, worker_id: worker_id.to_string(), - exit_code, - artifacts: vec![log_artifact.clone()], + })) + } + + fn record_task_outcome( + &self, + task: &FleetExecutorTaskContext, + terminal: FleetWorkerTerminalEvent, + ) -> Result { + let state = self.ledger.rebuild_state()?; + let key = task_key(&task.entry.run_id.0, &task.entry.task_id); + let Some(current) = state.tasks.get(&key) else { + return Ok(false); }; - if task_spec.scorer.is_some() { - let verification = verify_task_result(&self.workspace, task_spec, &verification_input); + if !matches!(current.status, FleetTaskLedgerStatus::Leased) { + return Ok(false); + } + + let (receipt_result, failure_kind, exit_code) = + task_receipt_outcome(&terminal.payload, terminal.exit_code); + self.append_worker_event( + &task.entry.run_id, + &task.worker_id, + &task.entry.task_id, + terminal.payload, + )?; + + let artifacts = self.task_artifacts_for_receipt( + &task.entry.run_id, + &task.entry.task_id, + &task.worker_id, + )?; + let verification_input = FleetTaskVerificationInput { + run_id: task.entry.run_id.clone(), + task_id: task.entry.task_id.clone(), + worker_id: task.worker_id.clone(), + exit_code, + artifacts, + }; + if task.task_spec.scorer.is_some() { + let verification = + verify_task_result(&self.workspace, &task.task_spec, &verification_input); let receipt = record_verification_receipt( &self.ledger, &self.workspace, @@ -675,25 +835,73 @@ impl FleetManager { FleetTaskResult::Fail | FleetTaskResult::Timeout ) { self.ledger.mark_task_terminal_status( - &entry.run_id, - &entry.task_id, - Some(worker_id), + &task.entry.run_id, + &task.entry.task_id, + Some(&task.worker_id), ×tamp(), FleetTaskLedgerStatus::Failed, )?; } - return Ok(()); + return Ok(true); } self.ledger.record_receipt(FleetReceipt { - run_id: entry.run_id.clone(), - task_id: entry.task_id.clone(), - worker_id: worker_id.to_string(), - completed_at: now, + run_id: task.entry.run_id.clone(), + task_id: task.entry.task_id.clone(), + worker_id: task.worker_id.clone(), + completed_at: timestamp(), result: receipt_result, failure_kind, - artifacts: vec![log_artifact], + artifacts: verification_input.artifacts, score: None, - }) + })?; + Ok(true) + } + + fn task_artifacts_for_receipt( + &self, + run_id: &FleetRunId, + task_id: &str, + worker_id: &str, + ) -> Result> { + let state = self.ledger.rebuild_state()?; + Ok(state + .artifact_events + .values() + .filter(|event| { + event.run_id == *run_id && event.task_id == task_id && event.worker_id == worker_id + }) + .filter_map(|event| match &event.payload { + FleetWorkerEventPayload::Artifact(artifact) => { + Some(self.refresh_artifact_size(artifact.clone())) + } + _ => None, + }) + .collect()) + } + + fn refresh_artifact_size(&self, mut artifact: FleetArtifactRef) -> FleetArtifactRef { + let path = if artifact.path.is_absolute() { + artifact.path.clone() + } else { + self.workspace.join(&artifact.path) + }; + artifact.size_bytes = std::fs::metadata(path).ok().map(|meta| meta.len()); + artifact + } + + fn host_log_artifact(&self, path: &Path) -> FleetArtifactRef { + let rel_path = path + .strip_prefix(&self.workspace) + .map(Path::to_path_buf) + .unwrap_or_else(|_| path.to_path_buf()); + let size_bytes = std::fs::metadata(path).ok().map(|meta| meta.len()); + FleetArtifactRef { + kind: FleetArtifactKind::Log, + path: rel_path, + checksum: None, + mime_type: Some("application/x-ndjson".to_string()), + size_bytes, + } } fn append_worker_event( @@ -859,28 +1067,38 @@ impl FleetManager { } } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum FleetLocalSimulationResult { - Pass, - Fail, - Skip, - Timeout, -} - fn default_local_workers(run_id: &FleetRunId, max_workers: usize) -> Vec { (1..=max_workers) - .map(|index| FleetWorkerSpec { - id: format!("{}-local-{}", run_id.0, index), - name: format!("Local worker {index}"), - host: FleetHostSpec::Local, - trust_level: Some(FleetTrustLevel::Local), - labels: BTreeMap::new(), - capabilities: vec!["local".to_string()], - max_concurrent_tasks: Some(1), + .map(|index| { + default_local_worker_with_name(&format!("{}-local-{}", run_id.0, index), index) }) .collect() } +fn default_local_worker_with_name(worker_id: &str, index: usize) -> FleetWorkerSpec { + FleetWorkerSpec { + id: worker_id.to_string(), + name: format!("Local worker {index}"), + host: FleetHostSpec::Local, + trust_level: Some(FleetTrustLevel::Local), + labels: BTreeMap::new(), + capabilities: vec!["local".to_string()], + max_concurrent_tasks: Some(1), + } +} + +fn default_local_worker(worker_id: &str) -> FleetWorkerSpec { + FleetWorkerSpec { + id: worker_id.to_string(), + name: worker_id.to_string(), + host: FleetHostSpec::Local, + trust_level: Some(FleetTrustLevel::Local), + labels: BTreeMap::new(), + capabilities: vec!["local".to_string()], + max_concurrent_tasks: Some(1), + } +} + fn worker_ids_for_run(run: &FleetRun, max_workers: usize) -> Vec { run.worker_specs .iter() @@ -1048,30 +1266,6 @@ fn latest_error_for_worker(state: &FleetLedgerState, worker_id: &str) -> Option< .map(|(_, message)| message) } -fn local_simulation_result(task: &FleetTaskSpec) -> Option { - if task - .metadata - .get("local_complete") - .and_then(Value::as_bool) - .unwrap_or(false) - { - return Some(FleetLocalSimulationResult::Pass); - } - match task - .metadata - .get("local_result") - .and_then(Value::as_str) - .map(str::to_ascii_lowercase) - .as_deref() - { - Some("pass" | "passed" | "ok" | "completed") => Some(FleetLocalSimulationResult::Pass), - Some("fail" | "failed" | "error") => Some(FleetLocalSimulationResult::Fail), - Some("skip" | "skipped") => Some(FleetLocalSimulationResult::Skip), - Some("timeout" | "timed_out") => Some(FleetLocalSimulationResult::Timeout), - _ => None, - } -} - fn task_priority(task: &FleetTaskSpec) -> i32 { task.metadata .get("priority") @@ -1080,6 +1274,61 @@ fn task_priority(task: &FleetTaskSpec) -> i32 { .unwrap_or(0) } +fn resolve_task_cwd(workspace: &Path, task: &FleetTaskSpec) -> PathBuf { + let Some(root) = task + .workspace + .as_ref() + .and_then(|workspace| workspace.root.as_ref()) + else { + return workspace.to_path_buf(); + }; + if root.is_absolute() { + root.clone() + } else { + workspace.join(root) + } +} + +fn task_receipt_outcome( + payload: &FleetWorkerEventPayload, + exit_code: Option, +) -> (FleetTaskResult, Option, Option) { + match payload { + FleetWorkerEventPayload::Completed { + exit_code: payload_exit_code, + .. + } => ( + FleetTaskResult::Pass, + None, + exit_code.or(*payload_exit_code), + ), + FleetWorkerEventPayload::Cancelled { .. } => (FleetTaskResult::Skip, None, exit_code), + FleetWorkerEventPayload::Failed { .. } => { + let failure_kind = if exit_code.is_none() { + FleetTaskFailureKind::Transport + } else { + FleetTaskFailureKind::Task + }; + (FleetTaskResult::Fail, Some(failure_kind), exit_code) + } + _ => (FleetTaskResult::Partial, None, exit_code), + } +} + +fn is_terminal_payload(payload: &FleetWorkerEventPayload) -> bool { + matches!( + payload, + FleetWorkerEventPayload::Completed { .. } + | FleetWorkerEventPayload::Failed { .. } + | FleetWorkerEventPayload::Cancelled { .. } + | FleetWorkerEventPayload::Interrupted { .. } + ) +} + +fn task_key(run_id: &str, task_id: &str) -> String { + format!("{run_id}:{task_id}") +} + fn event_key(worker_id: &str, run_id: &str, task_id: &str) -> String { format!("{worker_id}:{run_id}:{task_id}") } @@ -1139,6 +1388,42 @@ mod tests { path } + #[cfg(unix)] + fn fake_codewhale(dir: &TempDir, body: &str) -> PathBuf { + use std::os::unix::fs::PermissionsExt; + + let path = dir.path().join("fake-codewhale"); + std::fs::write(&path, body).unwrap(); + let mut permissions = std::fs::metadata(&path).unwrap().permissions(); + permissions.set_mode(0o755); + std::fs::set_permissions(&path, permissions).unwrap(); + path + } + + #[cfg(unix)] + fn complete_with_fake_codewhale( + manager: &FleetManager, + run_id: &FleetRunId, + max_workers: usize, + binary: &Path, + ) -> FleetStatusSnapshot { + let rt = tokio::runtime::Runtime::new().unwrap(); + let mut executor = FleetExecutor::new(&manager.workspace); + rt.block_on(async { + manager + .run_to_completion( + run_id, + max_workers, + &mut executor, + &binary.display().to_string(), + None, + Duration::from_millis(10), + ) + .await + .unwrap() + }) + } + #[test] fn fleet_manager_creates_run_and_starts_workers_up_to_cap() { let tmp = TempDir::new().unwrap(); @@ -1204,23 +1489,30 @@ mod tests { assert_eq!(status.running, 0); } + #[cfg(unix)] #[test] fn fleet_manager_can_record_completed_local_smoke_tasks() { let tmp = TempDir::new().unwrap(); let manager = FleetManager::open(tmp.path()).unwrap(); - let mut completed = task("task-a"); - completed - .metadata - .insert("local_result".to_string(), json!("pass")); - let path = task_spec_file(&tmp, vec![completed]); + let path = task_spec_file(&tmp, vec![task("task-a"), task("task-b"), task("task-c")]); + let fake = fake_codewhale( + &tmp, + r#"#!/bin/sh +printf '{"type":"tool_use","name":"read_file","id":"fake","input":{}}\n' +printf '{"type":"done"}\n' +exit 0 +"#, + ); let report = manager.create_run_from_task_spec_path(&path, 1).unwrap(); - let status = manager.run_status(&report.run_id).unwrap(); - assert_eq!(status.completed, 1); + assert_eq!(report.leased, 1); + assert_eq!(report.queued, 2); + let status = complete_with_fake_codewhale(&manager, &report.run_id, 1, &fake); + assert_eq!(status.completed, 3); assert_eq!(status.running, 0); let state = manager.ledger.rebuild_state().unwrap(); - assert_eq!(state.receipts.len(), 1); + assert_eq!(state.receipts.len(), 3); } #[test] @@ -1260,20 +1552,25 @@ mod tests { ); } + #[cfg(unix)] #[test] fn fleet_task_spec_local_scorer_records_receipt_artifact() { let tmp = TempDir::new().unwrap(); let manager = FleetManager::open(tmp.path()).unwrap(); let mut completed = task("task-a"); completed.scorer = Some(FleetScorerSpec::ExitCode); - completed - .metadata - .insert("local_result".to_string(), json!("pass")); let path = task_spec_file(&tmp, vec![completed]); + let fake = fake_codewhale( + &tmp, + r#"#!/bin/sh +printf '{"type":"done"}\n' +exit 0 +"#, + ); let report = manager.create_run_from_task_spec_path(&path, 1).unwrap(); + let status = complete_with_fake_codewhale(&manager, &report.run_id, 1, &fake); - let status = manager.run_status(&report.run_id).unwrap(); assert_eq!(status.completed, 1); assert_eq!(status.failed, 0); assert_eq!(status.partial, 0); @@ -1290,33 +1587,62 @@ mod tests { ); } + #[cfg(unix)] #[test] fn fleet_task_spec_status_distinguishes_failure_sources() { let tmp = TempDir::new().unwrap(); let manager = FleetManager::open(tmp.path()).unwrap(); - let mut transport = task("transport-failure"); - transport.scorer = Some(FleetScorerSpec::ExitCode); - transport - .metadata - .insert("local_result".to_string(), json!("timeout")); - let mut task_failed = task("task-failure"); + let mut task_failed = task("a-task-failure"); task_failed.scorer = Some(FleetScorerSpec::ExitCode); - task_failed - .metadata - .insert("local_result".to_string(), json!("fail")); - let mut verifier_failed = task("verifier-failure"); + task_failed.instructions = "task-failure".to_string(); + let mut transport = task("b-transport-failure"); + transport.scorer = Some(FleetScorerSpec::ExitCode); + let mut verifier_failed = task("c-verifier-failure"); verifier_failed.scorer = Some(FleetScorerSpec::RegexMatch { path: PathBuf::from("missing.log"), pattern: "[".to_string(), }); - verifier_failed - .metadata - .insert("local_result".to_string(), json!("pass")); - let path = task_spec_file(&tmp, vec![transport, task_failed, verifier_failed]); + let fake = fake_codewhale( + &tmp, + r#"#!/bin/sh +case "$*" in + *task-failure*) + printf '{"type":"error","error":"task failed"}\n' + exit 7 + ;; + *) + printf '{"type":"done"}\n' + exit 0 + ;; +esac +"#, + ); + let doc = FleetTaskSpecDocument { + name: Some("failure source smoke".to_string()), + labels: BTreeMap::new(), + security_policy: None, + workers: vec![ + default_local_worker("local-task"), + FleetWorkerSpec { + id: "docker-transport".to_string(), + name: "Docker transport".to_string(), + host: FleetHostSpec::Docker { + image: "fake".to_string(), + args: Vec::new(), + }, + trust_level: Some(FleetTrustLevel::Sandbox), + labels: BTreeMap::new(), + capabilities: vec![], + max_concurrent_tasks: Some(1), + }, + default_local_worker("local-verifier"), + ], + tasks: vec![task_failed, transport, verifier_failed], + }; - let report = manager.create_run_from_task_spec_path(&path, 3).unwrap(); + let report = manager.create_run(doc, 3).unwrap(); + let status = complete_with_fake_codewhale(&manager, &report.run_id, 3, &fake); - let status = manager.run_status(&report.run_id).unwrap(); assert_eq!(status.failed, 3); assert_eq!(status.transport_failed, 1); assert_eq!(status.task_failed, 1); @@ -1517,7 +1843,6 @@ mod tests { fn fleet_security_policy_propagates_from_task_spec_document_to_run() { let tmp = TempDir::new().unwrap(); let manager = FleetManager::open(tmp.path()).unwrap(); - let path = task_spec_file(&tmp, vec![task("task-a")]); // Rewrite the spec file with a security_policy block. let doc = serde_json::json!({ "name": "secure smoke", diff --git a/crates/tui/src/main.rs b/crates/tui/src/main.rs index c7985e5a..1d5353cd 100644 --- a/crates/tui/src/main.rs +++ b/crates/tui/src/main.rs @@ -1464,6 +1464,7 @@ async fn run_fleet_command(workspace: &Path, config: &Config, args: FleetArgs) - FleetAlertAdapterConfig, FleetAlertConfig, FleetAlertDispatcher, FleetAlertEvent, FleetEnvSecretResolver, }; + use crate::fleet::executor::FleetExecutor; use crate::fleet::manager::{FleetManager, FleetStatusSnapshot, FleetWorkerInspection}; use codewhale_protocol::fleet::{ FleetAlertEventClass, FleetArtifactKind, FleetRunId, FleetWorkerEventPayload, @@ -1713,6 +1714,14 @@ async fn run_fleet_command(workspace: &Path, config: &Config, args: FleetArgs) - } } + fn fleet_codewhale_binary() -> String { + std::env::var("CODEWHALE_FLEET_CODEWHALE_BINARY") + .ok() + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty()) + .unwrap_or_else(|| "codewhale".to_string()) + } + let exec_config = config .fleet .as_ref() @@ -1744,14 +1753,19 @@ async fn run_fleet_command(workspace: &Path, config: &Config, args: FleetArgs) - println!( "manager loop running; use `codewhale fleet status`, `inspect`, `interrupt`, or `stop --all` from another terminal." ); - loop { - manager.schedule_run(&report.run_id, max_workers)?; - if !manager.run_has_open_work(&report.run_id)? { - print_status(&manager.run_status(&report.run_id)?); - break; - } - tokio::time::sleep(Duration::from_secs(2)).await; - } + let mut executor = FleetExecutor::new(workspace); + let codewhale_binary = fleet_codewhale_binary(); + let status = manager + .run_to_completion( + &report.run_id, + max_workers, + &mut executor, + &codewhale_binary, + None, + Duration::from_secs(2), + ) + .await?; + print_status(&status); Ok(()) } FleetCommand::Status => {