diff --git a/crates/protocol/src/fleet.rs b/crates/protocol/src/fleet.rs index 8f37f1e7..728fbf40 100644 --- a/crates/protocol/src/fleet.rs +++ b/crates/protocol/src/fleet.rs @@ -195,6 +195,13 @@ pub enum FleetHostSpec { user: Option, #[serde(skip_serializing_if = "Option::is_none")] identity: Option, + #[serde(skip_serializing_if = "Option::is_none")] + working_directory: Option, + #[serde(default)] + #[serde(skip_serializing_if = "Vec::is_empty")] + env_allowlist: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + codewhale_binary: Option, }, Docker { image: String, @@ -519,6 +526,33 @@ mod tests { assert_eq!(back.size_bytes, Some(1024)); } + #[test] + fn ssh_host_spec_accepts_minimal_legacy_json() { + let json = r#"{"kind":"ssh","host":"builder.example.test"}"#; + let host: FleetHostSpec = serde_json::from_str(json).unwrap(); + + match host { + FleetHostSpec::Ssh { + host, + port, + user, + identity, + working_directory, + env_allowlist, + codewhale_binary, + } => { + assert_eq!(host, "builder.example.test"); + assert_eq!(port, None); + assert_eq!(user, None); + assert_eq!(identity, None); + assert_eq!(working_directory, None); + assert!(env_allowlist.is_empty()); + assert_eq!(codewhale_binary, None); + } + other => panic!("expected ssh host spec, got {other:?}"), + } + } + #[test] fn artifact_kind_uses_flat_string_json() { let known = serde_json::to_string(&FleetArtifactKind::TestResult).unwrap(); diff --git a/crates/tui/src/fleet/host.rs b/crates/tui/src/fleet/host.rs new file mode 100644 index 00000000..8a4502d9 --- /dev/null +++ b/crates/tui/src/fleet/host.rs @@ -0,0 +1,935 @@ +//! Fleet worker host adapters. +//! +//! Adapters own process boundaries for worker hosts. The manager can lease and +//! observe work through this trait without knowing whether the worker is a +//! local child process or an SSH-backed remote command. + +#![allow(dead_code)] + +use std::collections::{BTreeMap, BTreeSet}; +use std::fs::{File, OpenOptions}; +use std::io::{Read, Seek, SeekFrom}; +use std::path::{Path, PathBuf}; +use std::process::{Child, Command, ExitStatus, Stdio}; +use std::thread; +use std::time::{Duration, Instant}; + +use codewhale_protocol::fleet::FleetHostSpec; +use thiserror::Error; + +const DEFAULT_LOG_LIMIT_BYTES: usize = 64 * 1024; +const DEFAULT_CONNECT_TIMEOUT_SECONDS: u64 = 10; + +pub type FleetHostResult = Result; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FleetHostErrorKind { + Retryable, + Terminal, + Configuration, +} + +#[derive(Debug, Error)] +#[error("{kind:?}: {message}")] +pub struct FleetHostError { + pub kind: FleetHostErrorKind, + pub message: String, +} + +impl FleetHostError { + fn retryable(message: impl Into) -> Self { + Self { + kind: FleetHostErrorKind::Retryable, + message: message.into(), + } + } + + fn terminal(message: impl Into) -> Self { + Self { + kind: FleetHostErrorKind::Terminal, + message: message.into(), + } + } + + fn configuration(message: impl Into) -> Self { + Self { + kind: FleetHostErrorKind::Configuration, + message: message.into(), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FleetWorkerCommand { + pub program: String, + pub args: Vec, +} + +impl FleetWorkerCommand { + pub fn new(program: S, args: I) -> Self + where + S: Into, + I: IntoIterator, + A: Into, + { + Self { + program: program.into(), + args: args.into_iter().map(Into::into).collect(), + } + } +} + +#[derive(Debug, Clone)] +pub struct FleetWorkerStartRequest { + pub worker_id: String, + pub command: FleetWorkerCommand, + pub cwd: Option, + pub env: BTreeMap, + pub env_allowlist: BTreeSet, + pub log_limit_bytes: usize, +} + +impl FleetWorkerStartRequest { + pub fn new(worker_id: impl Into, command: FleetWorkerCommand) -> Self { + Self { + worker_id: worker_id.into(), + command, + cwd: None, + env: BTreeMap::new(), + env_allowlist: BTreeSet::new(), + log_limit_bytes: DEFAULT_LOG_LIMIT_BYTES, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FleetWorkerHandle { + pub worker_id: String, + pub host_kind: FleetHostKind, + pub pid: Option, + pub log_path: PathBuf, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FleetHostKind { + LocalProcess, + Ssh, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FleetHostWorkerState { + Running, + Exited, + Failed, + Stopped, + Unknown, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FleetHostWorkerStatus { + pub worker_id: String, + pub state: FleetHostWorkerState, + pub pid: Option, + pub exit_code: Option, + pub retryable: bool, +} + +pub trait FleetHostAdapter { + fn start_worker( + &mut self, + request: FleetWorkerStartRequest, + ) -> FleetHostResult; + fn read_status(&mut self, worker_id: &str) -> FleetHostResult; + fn read_logs(&self, worker_id: &str, max_bytes: usize) -> FleetHostResult; + fn interrupt_worker(&mut self, worker_id: &str) -> FleetHostResult; + fn restart_worker(&mut self, worker_id: &str) -> FleetHostResult; + fn stop_worker(&mut self, worker_id: &str) -> FleetHostResult; + fn cleanup_worker(&mut self, worker_id: &str) -> FleetHostResult<()>; +} + +#[derive(Debug)] +pub struct LocalProcessFleetHostAdapter { + workspace: PathBuf, + processes: BTreeMap, +} + +#[derive(Debug)] +struct LocalWorkerProcess { + request: FleetWorkerStartRequest, + child: Child, + log_path: PathBuf, + stopped: bool, + last_exit: Option, +} + +impl LocalProcessFleetHostAdapter { + pub fn new(workspace: impl AsRef) -> Self { + Self { + workspace: workspace.as_ref().to_path_buf(), + processes: BTreeMap::new(), + } + } + + fn start_with_kind( + &mut self, + request: FleetWorkerStartRequest, + host_kind: FleetHostKind, + ) -> FleetHostResult { + validate_worker_id(&request.worker_id)?; + if self.processes.contains_key(&request.worker_id) { + let status = self.read_status(&request.worker_id)?; + if matches!(status.state, FleetHostWorkerState::Running) { + return Err(FleetHostError::terminal(format!( + "worker {} is already running", + request.worker_id + ))); + } + self.processes.remove(&request.worker_id); + } + + let mut env = process_base_env(); + env.extend(filtered_env(&request.env, &request.env_allowlist)?); + let log_path = self.log_path_for(&request.worker_id, host_kind); + let log = open_worker_log(&log_path)?; + let stderr = log + .try_clone() + .map_err(|err| FleetHostError::retryable(format!("cloning worker log: {err}")))?; + + let mut command = Command::new(&request.command.program); + command + .args(&request.command.args) + .stdin(Stdio::null()) + .stdout(Stdio::from(log)) + .stderr(Stdio::from(stderr)) + .env_clear() + .envs(env); + if let Some(cwd) = &request.cwd { + command.current_dir(cwd); + } + + let child = command.spawn().map_err(|err| { + classify_spawn_error(err, format!("starting worker {}", request.worker_id)) + })?; + let pid = child.id(); + let handle = FleetWorkerHandle { + worker_id: request.worker_id.clone(), + host_kind, + pid: Some(pid), + log_path: log_path.clone(), + }; + self.processes.insert( + request.worker_id.clone(), + LocalWorkerProcess { + request, + child, + log_path, + stopped: false, + last_exit: None, + }, + ); + Ok(handle) + } + + fn log_path_for(&self, worker_id: &str, host_kind: FleetHostKind) -> PathBuf { + let host_dir = match host_kind { + FleetHostKind::LocalProcess => "local", + FleetHostKind::Ssh => "ssh", + }; + self.workspace + .join(".codewhale") + .join("fleet-host") + .join(host_dir) + .join(format!("{}.log", safe_path_segment(worker_id))) + } +} + +impl FleetHostAdapter for LocalProcessFleetHostAdapter { + fn start_worker( + &mut self, + request: FleetWorkerStartRequest, + ) -> FleetHostResult { + self.start_with_kind(request, FleetHostKind::LocalProcess) + } + + fn read_status(&mut self, worker_id: &str) -> FleetHostResult { + let process = self + .processes + .get_mut(worker_id) + .ok_or_else(|| FleetHostError::terminal(format!("unknown worker {worker_id}")))?; + if let Some(status) = process.last_exit { + return Ok(status_from_exit( + worker_id, + Some(process.child.id()), + status, + process.stopped, + )); + } + match process.child.try_wait() { + Ok(None) => Ok(FleetHostWorkerStatus { + worker_id: worker_id.to_string(), + state: FleetHostWorkerState::Running, + pid: Some(process.child.id()), + exit_code: None, + retryable: false, + }), + Ok(Some(status)) => { + process.last_exit = Some(status); + Ok(status_from_exit( + worker_id, + Some(process.child.id()), + status, + process.stopped, + )) + } + Err(err) => Err(FleetHostError::retryable(format!( + "reading worker {worker_id} status: {err}" + ))), + } + } + + fn read_logs(&self, worker_id: &str, max_bytes: usize) -> FleetHostResult { + let process = self + .processes + .get(worker_id) + .ok_or_else(|| FleetHostError::terminal(format!("unknown worker {worker_id}")))?; + let max_bytes = max_bytes.min(process.request.log_limit_bytes.max(1)); + read_bounded_log(&process.log_path, max_bytes) + } + + fn interrupt_worker(&mut self, worker_id: &str) -> FleetHostResult { + { + let process = self + .processes + .get_mut(worker_id) + .ok_or_else(|| FleetHostError::terminal(format!("unknown worker {worker_id}")))?; + if process.last_exit.is_some() { + return self.read_status(worker_id); + } + interrupt_child(&mut process.child)?; + } + wait_for_exit(self, worker_id, Duration::from_millis(750)) + } + + fn restart_worker(&mut self, worker_id: &str) -> FleetHostResult { + let request = self + .processes + .get(worker_id) + .map(|process| process.request.clone()) + .ok_or_else(|| FleetHostError::terminal(format!("unknown worker {worker_id}")))?; + let _ = self.stop_worker(worker_id); + self.processes.remove(worker_id); + self.start_worker(request) + } + + fn stop_worker(&mut self, worker_id: &str) -> FleetHostResult { + { + let process = self + .processes + .get_mut(worker_id) + .ok_or_else(|| FleetHostError::terminal(format!("unknown worker {worker_id}")))?; + process.stopped = true; + if process.last_exit.is_none() { + match process.child.try_wait() { + Ok(Some(status)) => { + process.last_exit = Some(status); + } + Ok(None) => { + process.child.kill().map_err(|err| { + FleetHostError::retryable(format!("stopping worker {worker_id}: {err}")) + })?; + let status = process.child.wait().map_err(|err| { + FleetHostError::retryable(format!( + "waiting for worker {worker_id}: {err}" + )) + })?; + process.last_exit = Some(status); + } + Err(err) => { + return Err(FleetHostError::retryable(format!( + "reading worker {worker_id} status before stop: {err}" + ))); + } + } + } + } + self.read_status(worker_id) + } + + fn cleanup_worker(&mut self, worker_id: &str) -> FleetHostResult<()> { + if matches!( + self.read_status(worker_id).map(|status| status.state), + Ok(FleetHostWorkerState::Running) + ) { + let _ = self.stop_worker(worker_id)?; + } + self.processes.remove(worker_id); + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub struct SshFleetHostConfig { + pub host: String, + pub user: Option, + pub port: Option, + pub identity: Option, + pub working_directory: PathBuf, + pub env_allowlist: BTreeSet, + pub codewhale_binary: String, + pub ssh_binary: String, + pub connect_timeout_seconds: u64, +} + +impl SshFleetHostConfig { + pub fn new(host: impl Into, working_directory: impl Into) -> Self { + Self { + host: host.into(), + user: None, + port: None, + identity: None, + working_directory: working_directory.into(), + env_allowlist: BTreeSet::new(), + codewhale_binary: "codewhale".to_string(), + ssh_binary: "ssh".to_string(), + connect_timeout_seconds: DEFAULT_CONNECT_TIMEOUT_SECONDS, + } + } + + pub fn from_host_spec(spec: &FleetHostSpec) -> FleetHostResult { + let FleetHostSpec::Ssh { + host, + port, + user, + identity, + working_directory, + env_allowlist, + codewhale_binary, + } = spec + else { + return Err(FleetHostError::configuration( + "expected SSH fleet host spec", + )); + }; + let working_directory = working_directory.clone().ok_or_else(|| { + FleetHostError::configuration("SSH fleet host spec requires working_directory") + })?; + let codewhale_binary = codewhale_binary.clone().ok_or_else(|| { + FleetHostError::configuration("SSH fleet host spec requires codewhale_binary") + })?; + let mut config = Self::new(host.clone(), working_directory); + config.port = *port; + config.user = user.clone(); + config.identity = identity.clone(); + config.env_allowlist = env_allowlist.iter().cloned().collect(); + config.codewhale_binary = codewhale_binary; + config.validate()?; + Ok(config) + } + + fn validate(&self) -> FleetHostResult<()> { + if self.host.trim().is_empty() { + return Err(FleetHostError::configuration( + "SSH fleet host requires an explicit host", + )); + } + if self.codewhale_binary.trim().is_empty() { + return Err(FleetHostError::configuration( + "SSH fleet host requires an explicit codewhale binary path", + )); + } + if self.working_directory.as_os_str().is_empty() { + return Err(FleetHostError::configuration( + "SSH fleet host requires an explicit working directory", + )); + } + validate_env_allowlist(&self.env_allowlist) + } + + fn target(&self) -> String { + self.user + .as_ref() + .filter(|user| !user.trim().is_empty()) + .map(|user| format!("{user}@{}", self.host)) + .unwrap_or_else(|| self.host.clone()) + } +} + +#[derive(Debug)] +pub struct SshFleetHostAdapter { + config: SshFleetHostConfig, + local: LocalProcessFleetHostAdapter, +} + +impl SshFleetHostAdapter { + pub fn new(workspace: impl AsRef, config: SshFleetHostConfig) -> FleetHostResult { + config.validate()?; + Ok(Self { + config, + local: LocalProcessFleetHostAdapter::new(workspace), + }) + } + + pub fn build_ssh_command( + &self, + request: &FleetWorkerStartRequest, + ) -> FleetHostResult { + self.config.validate()?; + let env = filtered_env(&request.env, &self.config.env_allowlist)?; + let mut args = vec![ + "-o".to_string(), + "BatchMode=yes".to_string(), + "-o".to_string(), + format!("ConnectTimeout={}", self.config.connect_timeout_seconds), + ]; + for key in env.keys() { + args.push("-o".to_string()); + args.push(format!("SendEnv={key}")); + } + if let Some(port) = self.config.port { + args.push("-p".to_string()); + args.push(port.to_string()); + } + if let Some(identity) = &self.config.identity { + args.push("-i".to_string()); + args.push(identity.display().to_string()); + } + args.push(self.config.target()); + args.push(self.remote_command(request)); + Ok(FleetWorkerCommand::new( + self.config.ssh_binary.clone(), + args, + )) + } + + fn ssh_start_request( + &self, + request: FleetWorkerStartRequest, + ) -> FleetHostResult { + let command = self.build_ssh_command(&request)?; + let mut env = ssh_client_env(); + env.extend(filtered_env(&request.env, &self.config.env_allowlist)?); + let env_allowlist = env.keys().cloned().collect(); + Ok(FleetWorkerStartRequest { + worker_id: request.worker_id, + command, + cwd: None, + env, + env_allowlist, + log_limit_bytes: request.log_limit_bytes, + }) + } + + fn remote_command(&self, request: &FleetWorkerStartRequest) -> String { + let mut parts = vec![ + "cd".to_string(), + shell_quote(&self.config.working_directory.display().to_string()), + "&&".to_string(), + "exec".to_string(), + shell_quote(&self.config.codewhale_binary), + ]; + parts.extend(request.command.args.iter().map(|arg| shell_quote(arg))); + parts.join(" ") + } +} + +impl FleetHostAdapter for SshFleetHostAdapter { + fn start_worker( + &mut self, + request: FleetWorkerStartRequest, + ) -> FleetHostResult { + let request = self.ssh_start_request(request)?; + self.local.start_with_kind(request, FleetHostKind::Ssh) + } + + fn read_status(&mut self, worker_id: &str) -> FleetHostResult { + self.local.read_status(worker_id) + } + + fn read_logs(&self, worker_id: &str, max_bytes: usize) -> FleetHostResult { + self.local.read_logs(worker_id, max_bytes) + } + + fn interrupt_worker(&mut self, worker_id: &str) -> FleetHostResult { + self.local.interrupt_worker(worker_id) + } + + fn restart_worker(&mut self, worker_id: &str) -> FleetHostResult { + let request = self + .local + .processes + .get(worker_id) + .map(|process| process.request.clone()) + .ok_or_else(|| FleetHostError::terminal(format!("unknown worker {worker_id}")))?; + let _ = self.stop_worker(worker_id); + self.local.processes.remove(worker_id); + self.local.start_with_kind(request, FleetHostKind::Ssh) + } + + fn stop_worker(&mut self, worker_id: &str) -> FleetHostResult { + self.local.stop_worker(worker_id) + } + + fn cleanup_worker(&mut self, worker_id: &str) -> FleetHostResult<()> { + self.local.cleanup_worker(worker_id) + } +} + +fn open_worker_log(path: &Path) -> FleetHostResult { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent).map_err(|err| { + FleetHostError::retryable(format!( + "creating worker log dir {}: {err}", + parent.display() + )) + })?; + } + OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(path) + .map_err(|err| FleetHostError::retryable(format!("opening worker log: {err}"))) +} + +fn read_bounded_log(path: &Path, max_bytes: usize) -> FleetHostResult { + let mut file = File::open(path).map_err(|err| { + FleetHostError::retryable(format!("opening worker log {}: {err}", path.display())) + })?; + let len = file + .metadata() + .map_err(|err| FleetHostError::retryable(format!("reading worker log metadata: {err}")))? + .len(); + let max_bytes = max_bytes.max(1) as u64; + if len > max_bytes { + file.seek(SeekFrom::Start(len - max_bytes)) + .map_err(|err| FleetHostError::retryable(format!("seeking worker log: {err}")))?; + } + let mut bytes = Vec::new(); + file.read_to_end(&mut bytes) + .map_err(|err| FleetHostError::retryable(format!("reading worker log: {err}")))?; + Ok(String::from_utf8_lossy(&bytes).into_owned()) +} + +fn status_from_exit( + worker_id: &str, + pid: Option, + status: ExitStatus, + stopped: bool, +) -> FleetHostWorkerStatus { + let success = status.success(); + FleetHostWorkerStatus { + worker_id: worker_id.to_string(), + state: if stopped { + FleetHostWorkerState::Stopped + } else if success { + FleetHostWorkerState::Exited + } else { + FleetHostWorkerState::Failed + }, + pid, + exit_code: status.code(), + retryable: !success && !stopped, + } +} + +fn classify_spawn_error(err: std::io::Error, context: String) -> FleetHostError { + match err.kind() { + std::io::ErrorKind::NotFound => FleetHostError::configuration(format!("{context}: {err}")), + std::io::ErrorKind::PermissionDenied => { + FleetHostError::terminal(format!("{context}: {err}")) + } + _ => FleetHostError::retryable(format!("{context}: {err}")), + } +} + +fn wait_for_exit( + adapter: &mut LocalProcessFleetHostAdapter, + worker_id: &str, + timeout: Duration, +) -> FleetHostResult { + let deadline = Instant::now() + timeout; + loop { + let status = adapter.read_status(worker_id)?; + if !matches!(status.state, FleetHostWorkerState::Running) { + return Ok(status); + } + if Instant::now() >= deadline { + return Ok(status); + } + thread::sleep(Duration::from_millis(25)); + } +} + +#[cfg(unix)] +fn interrupt_child(child: &mut Child) -> FleetHostResult<()> { + let pid = child.id() as libc::pid_t; + let rc = unsafe { libc::kill(pid, libc::SIGINT) }; + if rc == 0 { + Ok(()) + } else { + Err(FleetHostError::retryable(format!( + "interrupting worker pid {}: {}", + child.id(), + std::io::Error::last_os_error() + ))) + } +} + +#[cfg(not(unix))] +fn interrupt_child(child: &mut Child) -> FleetHostResult<()> { + child + .kill() + .map_err(|err| FleetHostError::retryable(format!("interrupting worker: {err}"))) +} + +fn filtered_env( + env: &BTreeMap, + allowlist: &BTreeSet, +) -> FleetHostResult> { + validate_env_allowlist(allowlist)?; + Ok(env + .iter() + .filter(|(key, _)| allowlist.contains(*key)) + .map(|(key, value)| (key.clone(), value.clone())) + .collect()) +} + +fn validate_env_allowlist(allowlist: &BTreeSet) -> FleetHostResult<()> { + for key in allowlist { + if !is_safe_env_key(key) { + return Err(FleetHostError::configuration(format!( + "fleet host env allowlist key {key} looks secret-bearing; pass secrets through config providers, not worker argv/env" + ))); + } + } + Ok(()) +} + +fn is_safe_env_key(key: &str) -> bool { + let upper = key.to_ascii_uppercase(); + ![ + "SECRET", + "TOKEN", + "PASSWORD", + "PASSWD", + "API_KEY", + "CREDENTIAL", + "PRIVATE_KEY", + ] + .iter() + .any(|needle| upper.contains(needle)) +} + +fn ssh_client_env() -> BTreeMap { + ["HOME", "PATH", "SSH_AUTH_SOCK"] + .into_iter() + .filter_map(|key| { + std::env::var(key) + .ok() + .map(|value| (key.to_string(), value)) + }) + .collect() +} + +fn process_base_env() -> BTreeMap { + let mut env = BTreeMap::new(); + for key in [ + "HOME", + "PATH", + "SYSTEMROOT", + "SystemRoot", + "COMSPEC", + "ComSpec", + ] { + if let Ok(value) = std::env::var(key) { + env.insert(key.to_string(), value); + } + } + env +} + +fn shell_quote(value: &str) -> String { + if value.is_empty() { + return "''".to_string(); + } + format!("'{}'", value.replace('\'', "'\\''")) +} + +fn validate_worker_id(worker_id: &str) -> FleetHostResult<()> { + if worker_id.trim().is_empty() { + return Err(FleetHostError::configuration("worker id cannot be empty")); + } + Ok(()) +} + +fn safe_path_segment(value: &str) -> String { + value + .chars() + .map(|ch| { + if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.') { + ch + } else { + '_' + } + }) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + fn shell_command(script: &str) -> FleetWorkerCommand { + if cfg!(windows) { + FleetWorkerCommand::new("cmd", ["/C", script]) + } else { + FleetWorkerCommand::new("sh", ["-c", script]) + } + } + + fn wait_for_log( + adapter: &LocalProcessFleetHostAdapter, + worker_id: &str, + needle: &str, + ) -> String { + let deadline = Instant::now() + Duration::from_secs(3); + loop { + let logs = adapter.read_logs(worker_id, 4096).unwrap(); + if logs.contains(needle) || Instant::now() > deadline { + return logs; + } + thread::sleep(Duration::from_millis(25)); + } + } + + #[test] + fn fleet_host_local_adapter_starts_reads_bounded_logs_and_stops() { + let tmp = TempDir::new().unwrap(); + let mut adapter = LocalProcessFleetHostAdapter::new(tmp.path()); + let script = if cfg!(windows) { + "echo 0123456789abcdef & ping -n 30 127.0.0.1 >NUL" + } else { + "printf 0123456789abcdef; sleep 30" + }; + let mut request = FleetWorkerStartRequest::new("local-1", shell_command(script)); + request.log_limit_bytes = 16; + + let handle = adapter.start_worker(request).unwrap(); + assert_eq!(handle.host_kind, FleetHostKind::LocalProcess); + assert!(handle.pid.is_some()); + let status = adapter.read_status("local-1").unwrap(); + assert_eq!(status.state, FleetHostWorkerState::Running); + + let logs = wait_for_log(&adapter, "local-1", "abcdef"); + assert!(logs.ends_with("0123456789abcdef") || logs.contains("0123456789abcdef")); + let bounded = adapter.read_logs("local-1", 6).unwrap(); + assert!(bounded.ends_with("abcdef"), "{bounded:?}"); + + let status = adapter.stop_worker("local-1").unwrap(); + assert_eq!(status.state, FleetHostWorkerState::Stopped); + adapter.cleanup_worker("local-1").unwrap(); + assert_eq!( + adapter.read_status("local-1").unwrap_err().kind, + FleetHostErrorKind::Terminal + ); + } + + #[test] + fn fleet_host_local_adapter_restarts_worker_with_same_request() { + let tmp = TempDir::new().unwrap(); + let mut adapter = LocalProcessFleetHostAdapter::new(tmp.path()); + let script = if cfg!(windows) { + "echo restart-ready & ping -n 30 127.0.0.1 >NUL" + } else { + "printf restart-ready; sleep 30" + }; + let request = FleetWorkerStartRequest::new("local-restart", shell_command(script)); + let first = adapter.start_worker(request).unwrap(); + let restarted = adapter.restart_worker("local-restart").unwrap(); + + assert_eq!(restarted.worker_id, first.worker_id); + assert_eq!(restarted.host_kind, FleetHostKind::LocalProcess); + assert_ne!(restarted.pid, first.pid); + let logs = wait_for_log(&adapter, "local-restart", "restart-ready"); + assert!(logs.contains("restart-ready")); + adapter.stop_worker("local-restart").unwrap(); + } + + #[test] + fn fleet_host_rejects_secret_like_env_allowlist_keys() { + let mut env = BTreeMap::new(); + env.insert("DEEPSEEK_API_KEY".to_string(), "secret".to_string()); + let allowlist = BTreeSet::from(["DEEPSEEK_API_KEY".to_string()]); + + let err = filtered_env(&env, &allowlist).unwrap_err(); + + assert_eq!(err.kind, FleetHostErrorKind::Configuration); + assert!(err.message.contains("looks secret-bearing")); + } + + #[test] + fn fleet_host_ssh_command_uses_sendenv_without_argv_secret_values() { + let tmp = TempDir::new().unwrap(); + let mut config = SshFleetHostConfig::new("builder.example.test", "/srv/codewhale"); + config.user = Some("fleet".to_string()); + config.port = Some(2222); + config.identity = Some(PathBuf::from("/tmp/fleet_id")); + config.codewhale_binary = "/usr/local/bin/codewhale".to_string(); + config.env_allowlist = BTreeSet::from(["FLEET_PROFILE".to_string()]); + let adapter = SshFleetHostAdapter::new(tmp.path(), config).unwrap(); + let mut request = FleetWorkerStartRequest::new( + "ssh-1", + FleetWorkerCommand::new("codewhale", ["fleet-worker", "noop"]), + ); + request.env.insert( + "FLEET_PROFILE".to_string(), + "super-secret-profile-value".to_string(), + ); + + let command = adapter.build_ssh_command(&request).unwrap(); + let argv = command.args.join(" "); + + assert_eq!(command.program, "ssh"); + assert!(argv.contains("BatchMode=yes")); + assert!(argv.contains("SendEnv=FLEET_PROFILE")); + assert!(argv.contains("fleet@builder.example.test")); + assert!(argv.contains("/usr/local/bin/codewhale")); + assert!(argv.contains("fleet-worker")); + assert!(!argv.contains("super-secret-profile-value")); + } + + #[test] + fn fleet_host_ssh_config_requires_explicit_safe_fields() { + let tmp = TempDir::new().unwrap(); + let mut config = SshFleetHostConfig::new("", "/srv/codewhale"); + config.env_allowlist = BTreeSet::from(["SAFE_FLAG".to_string()]); + + let err = SshFleetHostAdapter::new(tmp.path(), config).unwrap_err(); + + assert_eq!(err.kind, FleetHostErrorKind::Configuration); + assert!(err.message.contains("explicit host")); + } + + #[test] + fn fleet_host_ssh_config_maps_from_protocol_host_spec() { + let spec = FleetHostSpec::Ssh { + host: "builder.example.test".to_string(), + port: Some(2222), + user: Some("fleet".to_string()), + identity: Some(PathBuf::from("/tmp/fleet_id")), + working_directory: Some(PathBuf::from("/srv/codewhale")), + env_allowlist: vec!["FLEET_PROFILE".to_string()], + codewhale_binary: Some("/usr/local/bin/codewhale".to_string()), + }; + + let config = SshFleetHostConfig::from_host_spec(&spec).unwrap(); + + assert_eq!(config.host, "builder.example.test"); + assert_eq!(config.port, Some(2222)); + assert_eq!(config.user.as_deref(), Some("fleet")); + assert_eq!(config.working_directory, PathBuf::from("/srv/codewhale")); + assert!(config.env_allowlist.contains("FLEET_PROFILE")); + assert_eq!(config.codewhale_binary, "/usr/local/bin/codewhale"); + } +} diff --git a/crates/tui/src/fleet/mod.rs b/crates/tui/src/fleet/mod.rs index d7c02032..59a28884 100644 --- a/crates/tui/src/fleet/mod.rs +++ b/crates/tui/src/fleet/mod.rs @@ -1,4 +1,5 @@ //! Agent Fleet control plane — local-first manager, ledger, and workers. +pub mod host; pub mod ledger; pub mod manager; diff --git a/docs/FLEET.md b/docs/FLEET.md new file mode 100644 index 00000000..e7ea5b37 --- /dev/null +++ b/docs/FLEET.md @@ -0,0 +1,83 @@ +# Agent Fleet + +Agent Fleet is the local-first control plane for durable multi-worker runs. The +initial CLI surface is: + +```sh +codewhale fleet init +codewhale fleet run tasks.json --max-workers 4 +codewhale fleet status +codewhale fleet inspect +codewhale fleet interrupt +codewhale fleet restart +codewhale fleet stop --all +``` + +Fleet state is stored under the workspace in `.codewhale/fleet.jsonl`. Worker +logs and adapter logs are stored under `.codewhale/fleet/` and +`.codewhale/fleet-host/`. + +## Task Spec + +`codewhale fleet run` accepts JSON or TOML. A minimal JSON spec: + +```json +{ + "name": "local smoke", + "tasks": [ + { + "id": "lint", + "name": "Lint", + "instructions": "Run the lint check and report failures.", + "expected_artifacts": ["log"] + } + ] +} +``` + +Workers are optional. If omitted, CodeWhale creates local worker slots up to +`--max-workers`. + +## Host Adapters + +The host adapter boundary supports local child processes and explicit SSH +workers. Adapters expose the same operations: start, read status, read bounded +logs, interrupt, restart, stop, and cleanup. + +Local workers run as child processes with stdin closed and stdout/stderr written +to bounded fleet host logs. They inherit only a small safe base environment +such as `PATH` and explicitly allowlisted variables. + +SSH workers run through the system `ssh` client with `BatchMode=yes` and a +bounded connect timeout. Remote environment variables are sent with OpenSSH +`SendEnv`; values are not embedded in the local ssh argv or fleet logs. + +Example SSH worker spec: + +```json +{ + "id": "builder-1", + "name": "Builder 1", + "host": { + "kind": "ssh", + "host": "builder.example.com", + "user": "codewhale", + "port": 22, + "identity": "~/.ssh/codewhale_fleet", + "working_directory": "/srv/codewhale/work", + "env_allowlist": ["CODEWHALE_PROFILE"], + "codewhale_binary": "/usr/local/bin/codewhale" + }, + "capabilities": ["local", "linux", "tests"], + "max_concurrent_tasks": 1 +} +``` + +Defaults are intentionally conservative: + +- no hosted control plane or cloud provisioning is enabled; +- SSH requires an explicit host, working directory, and CodeWhale binary path; +- secret-like environment names such as `TOKEN`, `SECRET`, `PASSWORD`, + `API_KEY`, and `PRIVATE_KEY` are rejected from adapter allowlists; +- secrets should remain in CodeWhale config providers or remote host config, + not in task instructions, argv, or fleet logs.