diff --git a/crates/tui/src/fleet/executor.rs b/crates/tui/src/fleet/executor.rs index 3fc01e1b..d544a35e 100644 --- a/crates/tui/src/fleet/executor.rs +++ b/crates/tui/src/fleet/executor.rs @@ -23,7 +23,7 @@ use codewhale_config::FleetExecConfig; use codewhale_protocol::fleet::{FleetTaskSpec, FleetWorkerEventPayload}; -use super::host::FleetWorkerCommand; +use super::host::{FleetHostAdapter, FleetWorkerCommand}; use super::worker_runtime::fleet_task_prompt; /// Build the `codewhale exec` argv that runs a fleet task headlessly. @@ -144,6 +144,117 @@ pub fn classify_worker_exit(exit_code: Option, stopped: bool) -> FleetWorke } } +/// Drives fleet workers as real `codewhale exec` subprocesses on the local +/// host, incrementally draining each worker's stream-json output into fleet +/// ledger events. +/// +/// The caller (the `codewhale fleet run` loop / `FleetManager`) owns the +/// ledger; the executor owns the OS process boundary and the incremental log +/// parse. Because the worker is a separate process, its heavy runtime/tool +/// construction never touches the orchestrator — the parent only ingests a +/// compact event stream, which is what keeps it light at high fanout. +pub struct FleetExecutor { + adapter: super::host::LocalProcessFleetHostAdapter, + streams: std::collections::BTreeMap, +} + +struct WorkerStream { + log_path: std::path::PathBuf, + offset: u64, + pending: String, + terminal: bool, +} + +impl FleetExecutor { + pub fn new(workspace: impl AsRef) -> Self { + Self { + adapter: super::host::LocalProcessFleetHostAdapter::new(workspace), + streams: std::collections::BTreeMap::new(), + } + } + + /// Start a worker process and begin tracking its event stream. + pub fn start_worker( + &mut self, + worker_id: &str, + command: FleetWorkerCommand, + cwd: Option, + ) -> super::host::FleetHostResult<()> { + let mut request = super::host::FleetWorkerStartRequest::new(worker_id, command); + request.cwd = cwd; + let handle = self.adapter.start_worker(request)?; + self.streams.insert( + worker_id.to_string(), + WorkerStream { + log_path: handle.log_path, + offset: 0, + pending: String::new(), + terminal: false, + }, + ); + Ok(()) + } + + /// Read any newly-written stream-json lines for a worker and map them to + /// fleet ledger events. Safe to call repeatedly; only new bytes are parsed, + /// and a trailing partial line is buffered until its newline arrives. + pub fn drain_events(&mut self, worker_id: &str) -> Vec { + let Some(stream) = self.streams.get_mut(worker_id) else { + return Vec::new(); + }; + let mut events = Vec::new(); + let Ok(mut file) = std::fs::File::open(&stream.log_path) else { + return events; + }; + use std::io::{Read, Seek, SeekFrom}; + if file.seek(SeekFrom::Start(stream.offset)).is_err() { + return events; + } + let mut buf = Vec::new(); + if let Ok(read) = file.read_to_end(&mut buf) { + stream.offset += read as u64; + stream.pending.push_str(&String::from_utf8_lossy(&buf)); + while let Some(idx) = stream.pending.find('\n') { + let line: String = stream.pending.drain(..=idx).collect(); + if let Some(event) = map_exec_stream_line(line.trim_end()) { + events.push(event); + } + } + } + events + } + + /// Poll the worker process; once it exits, return the terminal event exactly + /// once. Returns `None` while the worker is still running or already + /// finalized. + pub fn poll_terminal(&mut self, worker_id: &str) -> Option { + if self.streams.get(worker_id).is_none_or(|s| s.terminal) { + return None; + } + let status = self.adapter.read_status(worker_id).ok()?; + let terminal = match status.state { + super::host::FleetHostWorkerState::Running + | super::host::FleetHostWorkerState::Unknown => return None, + super::host::FleetHostWorkerState::Stopped => { + classify_worker_exit(status.exit_code, true) + } + super::host::FleetHostWorkerState::Exited + | super::host::FleetHostWorkerState::Failed => { + classify_worker_exit(status.exit_code, false) + } + }; + if let Some(stream) = self.streams.get_mut(worker_id) { + stream.terminal = true; + } + Some(terminal) + } + + /// True once every started worker has reached a terminal state. + pub fn all_terminal(&self) -> bool { + !self.streams.is_empty() && self.streams.values().all(|s| s.terminal) + } +} + #[cfg(test)] mod tests { use super::*; @@ -265,4 +376,49 @@ mod tests { FleetWorkerEventPayload::Cancelled { .. } )); } + + /// End-to-end: run a REAL subprocess that emits stream-json (standing in for + /// `codewhale exec`), and prove the executor drains its events and terminal + /// exit through the real host adapter — no codewhale binary needed. This is + /// the verifiable proof that a fleet worker is an out-of-process exec run. + #[cfg(unix)] + #[test] + fn executor_runs_real_process_and_drains_stream_json_into_ledger_events() { + let tmp = tempfile::TempDir::new().unwrap(); + let mut exec = FleetExecutor::new(tmp.path()); + let script = r#"printf '{"type":"tool_use","name":"read_file","id":"c1","input":{}}\n'; printf '{"type":"done"}\n'"#; + let command = FleetWorkerCommand::new("sh", vec!["-c".to_string(), script.to_string()]); + exec.start_worker("w1", command, None).unwrap(); + + let mut events = Vec::new(); + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5); + loop { + events.extend(exec.drain_events("w1")); + if let Some(term) = exec.poll_terminal("w1") { + events.extend(exec.drain_events("w1")); // final flush after exit + events.push(term); + break; + } + assert!( + std::time::Instant::now() < deadline, + "worker did not terminate; events so far: {events:?}" + ); + std::thread::sleep(std::time::Duration::from_millis(20)); + } + + assert!( + events.iter().any(|e| matches!( + e, + FleetWorkerEventPayload::RunningTool { tool, .. } if tool == "read_file" + )), + "expected a RunningTool(read_file) event, got {events:?}" + ); + assert!( + events + .iter() + .any(|e| matches!(e, FleetWorkerEventPayload::Completed { .. })), + "expected a terminal Completed event, got {events:?}" + ); + assert!(exec.all_terminal()); + } }