merge #3158 fleet host adapters

This commit is contained in:
Hunter B
2026-06-12 18:31:39 -07:00
4 changed files with 1053 additions and 0 deletions
+34
View File
@@ -195,6 +195,13 @@ pub enum FleetHostSpec {
user: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
identity: Option<PathBuf>,
#[serde(skip_serializing_if = "Option::is_none")]
working_directory: Option<PathBuf>,
#[serde(default)]
#[serde(skip_serializing_if = "Vec::is_empty")]
env_allowlist: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
codewhale_binary: Option<String>,
},
Docker {
image: String,
@@ -519,6 +526,33 @@ mod tests {
assert_eq!(back.size_bytes, Some(1024));
}
#[test]
fn ssh_host_spec_accepts_minimal_legacy_json() {
let json = r#"{"kind":"ssh","host":"builder.example.test"}"#;
let host: FleetHostSpec = serde_json::from_str(json).unwrap();
match host {
FleetHostSpec::Ssh {
host,
port,
user,
identity,
working_directory,
env_allowlist,
codewhale_binary,
} => {
assert_eq!(host, "builder.example.test");
assert_eq!(port, None);
assert_eq!(user, None);
assert_eq!(identity, None);
assert_eq!(working_directory, None);
assert!(env_allowlist.is_empty());
assert_eq!(codewhale_binary, None);
}
other => panic!("expected ssh host spec, got {other:?}"),
}
}
#[test]
fn artifact_kind_uses_flat_string_json() {
let known = serde_json::to_string(&FleetArtifactKind::TestResult).unwrap();
+935
View File
@@ -0,0 +1,935 @@
//! Fleet worker host adapters.
//!
//! Adapters own process boundaries for worker hosts. The manager can lease and
//! observe work through this trait without knowing whether the worker is a
//! local child process or an SSH-backed remote command.
#![allow(dead_code)]
use std::collections::{BTreeMap, BTreeSet};
use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use std::process::{Child, Command, ExitStatus, Stdio};
use std::thread;
use std::time::{Duration, Instant};
use codewhale_protocol::fleet::FleetHostSpec;
use thiserror::Error;
const DEFAULT_LOG_LIMIT_BYTES: usize = 64 * 1024;
const DEFAULT_CONNECT_TIMEOUT_SECONDS: u64 = 10;
pub type FleetHostResult<T> = Result<T, FleetHostError>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FleetHostErrorKind {
Retryable,
Terminal,
Configuration,
}
#[derive(Debug, Error)]
#[error("{kind:?}: {message}")]
pub struct FleetHostError {
pub kind: FleetHostErrorKind,
pub message: String,
}
impl FleetHostError {
fn retryable(message: impl Into<String>) -> Self {
Self {
kind: FleetHostErrorKind::Retryable,
message: message.into(),
}
}
fn terminal(message: impl Into<String>) -> Self {
Self {
kind: FleetHostErrorKind::Terminal,
message: message.into(),
}
}
fn configuration(message: impl Into<String>) -> Self {
Self {
kind: FleetHostErrorKind::Configuration,
message: message.into(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FleetWorkerCommand {
pub program: String,
pub args: Vec<String>,
}
impl FleetWorkerCommand {
pub fn new<S, I, A>(program: S, args: I) -> Self
where
S: Into<String>,
I: IntoIterator<Item = A>,
A: Into<String>,
{
Self {
program: program.into(),
args: args.into_iter().map(Into::into).collect(),
}
}
}
#[derive(Debug, Clone)]
pub struct FleetWorkerStartRequest {
pub worker_id: String,
pub command: FleetWorkerCommand,
pub cwd: Option<PathBuf>,
pub env: BTreeMap<String, String>,
pub env_allowlist: BTreeSet<String>,
pub log_limit_bytes: usize,
}
impl FleetWorkerStartRequest {
pub fn new(worker_id: impl Into<String>, command: FleetWorkerCommand) -> Self {
Self {
worker_id: worker_id.into(),
command,
cwd: None,
env: BTreeMap::new(),
env_allowlist: BTreeSet::new(),
log_limit_bytes: DEFAULT_LOG_LIMIT_BYTES,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FleetWorkerHandle {
pub worker_id: String,
pub host_kind: FleetHostKind,
pub pid: Option<u32>,
pub log_path: PathBuf,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FleetHostKind {
LocalProcess,
Ssh,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FleetHostWorkerState {
Running,
Exited,
Failed,
Stopped,
Unknown,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FleetHostWorkerStatus {
pub worker_id: String,
pub state: FleetHostWorkerState,
pub pid: Option<u32>,
pub exit_code: Option<i32>,
pub retryable: bool,
}
pub trait FleetHostAdapter {
fn start_worker(
&mut self,
request: FleetWorkerStartRequest,
) -> FleetHostResult<FleetWorkerHandle>;
fn read_status(&mut self, worker_id: &str) -> FleetHostResult<FleetHostWorkerStatus>;
fn read_logs(&self, worker_id: &str, max_bytes: usize) -> FleetHostResult<String>;
fn interrupt_worker(&mut self, worker_id: &str) -> FleetHostResult<FleetHostWorkerStatus>;
fn restart_worker(&mut self, worker_id: &str) -> FleetHostResult<FleetWorkerHandle>;
fn stop_worker(&mut self, worker_id: &str) -> FleetHostResult<FleetHostWorkerStatus>;
fn cleanup_worker(&mut self, worker_id: &str) -> FleetHostResult<()>;
}
#[derive(Debug)]
pub struct LocalProcessFleetHostAdapter {
workspace: PathBuf,
processes: BTreeMap<String, LocalWorkerProcess>,
}
#[derive(Debug)]
struct LocalWorkerProcess {
request: FleetWorkerStartRequest,
child: Child,
log_path: PathBuf,
stopped: bool,
last_exit: Option<ExitStatus>,
}
impl LocalProcessFleetHostAdapter {
pub fn new(workspace: impl AsRef<Path>) -> Self {
Self {
workspace: workspace.as_ref().to_path_buf(),
processes: BTreeMap::new(),
}
}
fn start_with_kind(
&mut self,
request: FleetWorkerStartRequest,
host_kind: FleetHostKind,
) -> FleetHostResult<FleetWorkerHandle> {
validate_worker_id(&request.worker_id)?;
if self.processes.contains_key(&request.worker_id) {
let status = self.read_status(&request.worker_id)?;
if matches!(status.state, FleetHostWorkerState::Running) {
return Err(FleetHostError::terminal(format!(
"worker {} is already running",
request.worker_id
)));
}
self.processes.remove(&request.worker_id);
}
let mut env = process_base_env();
env.extend(filtered_env(&request.env, &request.env_allowlist)?);
let log_path = self.log_path_for(&request.worker_id, host_kind);
let log = open_worker_log(&log_path)?;
let stderr = log
.try_clone()
.map_err(|err| FleetHostError::retryable(format!("cloning worker log: {err}")))?;
let mut command = Command::new(&request.command.program);
command
.args(&request.command.args)
.stdin(Stdio::null())
.stdout(Stdio::from(log))
.stderr(Stdio::from(stderr))
.env_clear()
.envs(env);
if let Some(cwd) = &request.cwd {
command.current_dir(cwd);
}
let child = command.spawn().map_err(|err| {
classify_spawn_error(err, format!("starting worker {}", request.worker_id))
})?;
let pid = child.id();
let handle = FleetWorkerHandle {
worker_id: request.worker_id.clone(),
host_kind,
pid: Some(pid),
log_path: log_path.clone(),
};
self.processes.insert(
request.worker_id.clone(),
LocalWorkerProcess {
request,
child,
log_path,
stopped: false,
last_exit: None,
},
);
Ok(handle)
}
fn log_path_for(&self, worker_id: &str, host_kind: FleetHostKind) -> PathBuf {
let host_dir = match host_kind {
FleetHostKind::LocalProcess => "local",
FleetHostKind::Ssh => "ssh",
};
self.workspace
.join(".codewhale")
.join("fleet-host")
.join(host_dir)
.join(format!("{}.log", safe_path_segment(worker_id)))
}
}
impl FleetHostAdapter for LocalProcessFleetHostAdapter {
fn start_worker(
&mut self,
request: FleetWorkerStartRequest,
) -> FleetHostResult<FleetWorkerHandle> {
self.start_with_kind(request, FleetHostKind::LocalProcess)
}
fn read_status(&mut self, worker_id: &str) -> FleetHostResult<FleetHostWorkerStatus> {
let process = self
.processes
.get_mut(worker_id)
.ok_or_else(|| FleetHostError::terminal(format!("unknown worker {worker_id}")))?;
if let Some(status) = process.last_exit {
return Ok(status_from_exit(
worker_id,
Some(process.child.id()),
status,
process.stopped,
));
}
match process.child.try_wait() {
Ok(None) => Ok(FleetHostWorkerStatus {
worker_id: worker_id.to_string(),
state: FleetHostWorkerState::Running,
pid: Some(process.child.id()),
exit_code: None,
retryable: false,
}),
Ok(Some(status)) => {
process.last_exit = Some(status);
Ok(status_from_exit(
worker_id,
Some(process.child.id()),
status,
process.stopped,
))
}
Err(err) => Err(FleetHostError::retryable(format!(
"reading worker {worker_id} status: {err}"
))),
}
}
fn read_logs(&self, worker_id: &str, max_bytes: usize) -> FleetHostResult<String> {
let process = self
.processes
.get(worker_id)
.ok_or_else(|| FleetHostError::terminal(format!("unknown worker {worker_id}")))?;
let max_bytes = max_bytes.min(process.request.log_limit_bytes.max(1));
read_bounded_log(&process.log_path, max_bytes)
}
fn interrupt_worker(&mut self, worker_id: &str) -> FleetHostResult<FleetHostWorkerStatus> {
{
let process = self
.processes
.get_mut(worker_id)
.ok_or_else(|| FleetHostError::terminal(format!("unknown worker {worker_id}")))?;
if process.last_exit.is_some() {
return self.read_status(worker_id);
}
interrupt_child(&mut process.child)?;
}
wait_for_exit(self, worker_id, Duration::from_millis(750))
}
fn restart_worker(&mut self, worker_id: &str) -> FleetHostResult<FleetWorkerHandle> {
let request = self
.processes
.get(worker_id)
.map(|process| process.request.clone())
.ok_or_else(|| FleetHostError::terminal(format!("unknown worker {worker_id}")))?;
let _ = self.stop_worker(worker_id);
self.processes.remove(worker_id);
self.start_worker(request)
}
fn stop_worker(&mut self, worker_id: &str) -> FleetHostResult<FleetHostWorkerStatus> {
{
let process = self
.processes
.get_mut(worker_id)
.ok_or_else(|| FleetHostError::terminal(format!("unknown worker {worker_id}")))?;
process.stopped = true;
if process.last_exit.is_none() {
match process.child.try_wait() {
Ok(Some(status)) => {
process.last_exit = Some(status);
}
Ok(None) => {
process.child.kill().map_err(|err| {
FleetHostError::retryable(format!("stopping worker {worker_id}: {err}"))
})?;
let status = process.child.wait().map_err(|err| {
FleetHostError::retryable(format!(
"waiting for worker {worker_id}: {err}"
))
})?;
process.last_exit = Some(status);
}
Err(err) => {
return Err(FleetHostError::retryable(format!(
"reading worker {worker_id} status before stop: {err}"
)));
}
}
}
}
self.read_status(worker_id)
}
fn cleanup_worker(&mut self, worker_id: &str) -> FleetHostResult<()> {
if matches!(
self.read_status(worker_id).map(|status| status.state),
Ok(FleetHostWorkerState::Running)
) {
let _ = self.stop_worker(worker_id)?;
}
self.processes.remove(worker_id);
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct SshFleetHostConfig {
pub host: String,
pub user: Option<String>,
pub port: Option<u16>,
pub identity: Option<PathBuf>,
pub working_directory: PathBuf,
pub env_allowlist: BTreeSet<String>,
pub codewhale_binary: String,
pub ssh_binary: String,
pub connect_timeout_seconds: u64,
}
impl SshFleetHostConfig {
pub fn new(host: impl Into<String>, working_directory: impl Into<PathBuf>) -> Self {
Self {
host: host.into(),
user: None,
port: None,
identity: None,
working_directory: working_directory.into(),
env_allowlist: BTreeSet::new(),
codewhale_binary: "codewhale".to_string(),
ssh_binary: "ssh".to_string(),
connect_timeout_seconds: DEFAULT_CONNECT_TIMEOUT_SECONDS,
}
}
pub fn from_host_spec(spec: &FleetHostSpec) -> FleetHostResult<Self> {
let FleetHostSpec::Ssh {
host,
port,
user,
identity,
working_directory,
env_allowlist,
codewhale_binary,
} = spec
else {
return Err(FleetHostError::configuration(
"expected SSH fleet host spec",
));
};
let working_directory = working_directory.clone().ok_or_else(|| {
FleetHostError::configuration("SSH fleet host spec requires working_directory")
})?;
let codewhale_binary = codewhale_binary.clone().ok_or_else(|| {
FleetHostError::configuration("SSH fleet host spec requires codewhale_binary")
})?;
let mut config = Self::new(host.clone(), working_directory);
config.port = *port;
config.user = user.clone();
config.identity = identity.clone();
config.env_allowlist = env_allowlist.iter().cloned().collect();
config.codewhale_binary = codewhale_binary;
config.validate()?;
Ok(config)
}
fn validate(&self) -> FleetHostResult<()> {
if self.host.trim().is_empty() {
return Err(FleetHostError::configuration(
"SSH fleet host requires an explicit host",
));
}
if self.codewhale_binary.trim().is_empty() {
return Err(FleetHostError::configuration(
"SSH fleet host requires an explicit codewhale binary path",
));
}
if self.working_directory.as_os_str().is_empty() {
return Err(FleetHostError::configuration(
"SSH fleet host requires an explicit working directory",
));
}
validate_env_allowlist(&self.env_allowlist)
}
fn target(&self) -> String {
self.user
.as_ref()
.filter(|user| !user.trim().is_empty())
.map(|user| format!("{user}@{}", self.host))
.unwrap_or_else(|| self.host.clone())
}
}
#[derive(Debug)]
pub struct SshFleetHostAdapter {
config: SshFleetHostConfig,
local: LocalProcessFleetHostAdapter,
}
impl SshFleetHostAdapter {
pub fn new(workspace: impl AsRef<Path>, config: SshFleetHostConfig) -> FleetHostResult<Self> {
config.validate()?;
Ok(Self {
config,
local: LocalProcessFleetHostAdapter::new(workspace),
})
}
pub fn build_ssh_command(
&self,
request: &FleetWorkerStartRequest,
) -> FleetHostResult<FleetWorkerCommand> {
self.config.validate()?;
let env = filtered_env(&request.env, &self.config.env_allowlist)?;
let mut args = vec![
"-o".to_string(),
"BatchMode=yes".to_string(),
"-o".to_string(),
format!("ConnectTimeout={}", self.config.connect_timeout_seconds),
];
for key in env.keys() {
args.push("-o".to_string());
args.push(format!("SendEnv={key}"));
}
if let Some(port) = self.config.port {
args.push("-p".to_string());
args.push(port.to_string());
}
if let Some(identity) = &self.config.identity {
args.push("-i".to_string());
args.push(identity.display().to_string());
}
args.push(self.config.target());
args.push(self.remote_command(request));
Ok(FleetWorkerCommand::new(
self.config.ssh_binary.clone(),
args,
))
}
fn ssh_start_request(
&self,
request: FleetWorkerStartRequest,
) -> FleetHostResult<FleetWorkerStartRequest> {
let command = self.build_ssh_command(&request)?;
let mut env = ssh_client_env();
env.extend(filtered_env(&request.env, &self.config.env_allowlist)?);
let env_allowlist = env.keys().cloned().collect();
Ok(FleetWorkerStartRequest {
worker_id: request.worker_id,
command,
cwd: None,
env,
env_allowlist,
log_limit_bytes: request.log_limit_bytes,
})
}
fn remote_command(&self, request: &FleetWorkerStartRequest) -> String {
let mut parts = vec![
"cd".to_string(),
shell_quote(&self.config.working_directory.display().to_string()),
"&&".to_string(),
"exec".to_string(),
shell_quote(&self.config.codewhale_binary),
];
parts.extend(request.command.args.iter().map(|arg| shell_quote(arg)));
parts.join(" ")
}
}
impl FleetHostAdapter for SshFleetHostAdapter {
fn start_worker(
&mut self,
request: FleetWorkerStartRequest,
) -> FleetHostResult<FleetWorkerHandle> {
let request = self.ssh_start_request(request)?;
self.local.start_with_kind(request, FleetHostKind::Ssh)
}
fn read_status(&mut self, worker_id: &str) -> FleetHostResult<FleetHostWorkerStatus> {
self.local.read_status(worker_id)
}
fn read_logs(&self, worker_id: &str, max_bytes: usize) -> FleetHostResult<String> {
self.local.read_logs(worker_id, max_bytes)
}
fn interrupt_worker(&mut self, worker_id: &str) -> FleetHostResult<FleetHostWorkerStatus> {
self.local.interrupt_worker(worker_id)
}
fn restart_worker(&mut self, worker_id: &str) -> FleetHostResult<FleetWorkerHandle> {
let request = self
.local
.processes
.get(worker_id)
.map(|process| process.request.clone())
.ok_or_else(|| FleetHostError::terminal(format!("unknown worker {worker_id}")))?;
let _ = self.stop_worker(worker_id);
self.local.processes.remove(worker_id);
self.local.start_with_kind(request, FleetHostKind::Ssh)
}
fn stop_worker(&mut self, worker_id: &str) -> FleetHostResult<FleetHostWorkerStatus> {
self.local.stop_worker(worker_id)
}
fn cleanup_worker(&mut self, worker_id: &str) -> FleetHostResult<()> {
self.local.cleanup_worker(worker_id)
}
}
fn open_worker_log(path: &Path) -> FleetHostResult<File> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).map_err(|err| {
FleetHostError::retryable(format!(
"creating worker log dir {}: {err}",
parent.display()
))
})?;
}
OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(path)
.map_err(|err| FleetHostError::retryable(format!("opening worker log: {err}")))
}
fn read_bounded_log(path: &Path, max_bytes: usize) -> FleetHostResult<String> {
let mut file = File::open(path).map_err(|err| {
FleetHostError::retryable(format!("opening worker log {}: {err}", path.display()))
})?;
let len = file
.metadata()
.map_err(|err| FleetHostError::retryable(format!("reading worker log metadata: {err}")))?
.len();
let max_bytes = max_bytes.max(1) as u64;
if len > max_bytes {
file.seek(SeekFrom::Start(len - max_bytes))
.map_err(|err| FleetHostError::retryable(format!("seeking worker log: {err}")))?;
}
let mut bytes = Vec::new();
file.read_to_end(&mut bytes)
.map_err(|err| FleetHostError::retryable(format!("reading worker log: {err}")))?;
Ok(String::from_utf8_lossy(&bytes).into_owned())
}
fn status_from_exit(
worker_id: &str,
pid: Option<u32>,
status: ExitStatus,
stopped: bool,
) -> FleetHostWorkerStatus {
let success = status.success();
FleetHostWorkerStatus {
worker_id: worker_id.to_string(),
state: if stopped {
FleetHostWorkerState::Stopped
} else if success {
FleetHostWorkerState::Exited
} else {
FleetHostWorkerState::Failed
},
pid,
exit_code: status.code(),
retryable: !success && !stopped,
}
}
fn classify_spawn_error(err: std::io::Error, context: String) -> FleetHostError {
match err.kind() {
std::io::ErrorKind::NotFound => FleetHostError::configuration(format!("{context}: {err}")),
std::io::ErrorKind::PermissionDenied => {
FleetHostError::terminal(format!("{context}: {err}"))
}
_ => FleetHostError::retryable(format!("{context}: {err}")),
}
}
fn wait_for_exit(
adapter: &mut LocalProcessFleetHostAdapter,
worker_id: &str,
timeout: Duration,
) -> FleetHostResult<FleetHostWorkerStatus> {
let deadline = Instant::now() + timeout;
loop {
let status = adapter.read_status(worker_id)?;
if !matches!(status.state, FleetHostWorkerState::Running) {
return Ok(status);
}
if Instant::now() >= deadline {
return Ok(status);
}
thread::sleep(Duration::from_millis(25));
}
}
#[cfg(unix)]
fn interrupt_child(child: &mut Child) -> FleetHostResult<()> {
let pid = child.id() as libc::pid_t;
let rc = unsafe { libc::kill(pid, libc::SIGINT) };
if rc == 0 {
Ok(())
} else {
Err(FleetHostError::retryable(format!(
"interrupting worker pid {}: {}",
child.id(),
std::io::Error::last_os_error()
)))
}
}
#[cfg(not(unix))]
fn interrupt_child(child: &mut Child) -> FleetHostResult<()> {
child
.kill()
.map_err(|err| FleetHostError::retryable(format!("interrupting worker: {err}")))
}
fn filtered_env(
env: &BTreeMap<String, String>,
allowlist: &BTreeSet<String>,
) -> FleetHostResult<BTreeMap<String, String>> {
validate_env_allowlist(allowlist)?;
Ok(env
.iter()
.filter(|(key, _)| allowlist.contains(*key))
.map(|(key, value)| (key.clone(), value.clone()))
.collect())
}
fn validate_env_allowlist(allowlist: &BTreeSet<String>) -> FleetHostResult<()> {
for key in allowlist {
if !is_safe_env_key(key) {
return Err(FleetHostError::configuration(format!(
"fleet host env allowlist key {key} looks secret-bearing; pass secrets through config providers, not worker argv/env"
)));
}
}
Ok(())
}
fn is_safe_env_key(key: &str) -> bool {
let upper = key.to_ascii_uppercase();
![
"SECRET",
"TOKEN",
"PASSWORD",
"PASSWD",
"API_KEY",
"CREDENTIAL",
"PRIVATE_KEY",
]
.iter()
.any(|needle| upper.contains(needle))
}
fn ssh_client_env() -> BTreeMap<String, String> {
["HOME", "PATH", "SSH_AUTH_SOCK"]
.into_iter()
.filter_map(|key| {
std::env::var(key)
.ok()
.map(|value| (key.to_string(), value))
})
.collect()
}
fn process_base_env() -> BTreeMap<String, String> {
let mut env = BTreeMap::new();
for key in [
"HOME",
"PATH",
"SYSTEMROOT",
"SystemRoot",
"COMSPEC",
"ComSpec",
] {
if let Ok(value) = std::env::var(key) {
env.insert(key.to_string(), value);
}
}
env
}
fn shell_quote(value: &str) -> String {
if value.is_empty() {
return "''".to_string();
}
format!("'{}'", value.replace('\'', "'\\''"))
}
fn validate_worker_id(worker_id: &str) -> FleetHostResult<()> {
if worker_id.trim().is_empty() {
return Err(FleetHostError::configuration("worker id cannot be empty"));
}
Ok(())
}
fn safe_path_segment(value: &str) -> String {
value
.chars()
.map(|ch| {
if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.') {
ch
} else {
'_'
}
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn shell_command(script: &str) -> FleetWorkerCommand {
if cfg!(windows) {
FleetWorkerCommand::new("cmd", ["/C", script])
} else {
FleetWorkerCommand::new("sh", ["-c", script])
}
}
fn wait_for_log(
adapter: &LocalProcessFleetHostAdapter,
worker_id: &str,
needle: &str,
) -> String {
let deadline = Instant::now() + Duration::from_secs(3);
loop {
let logs = adapter.read_logs(worker_id, 4096).unwrap();
if logs.contains(needle) || Instant::now() > deadline {
return logs;
}
thread::sleep(Duration::from_millis(25));
}
}
#[test]
fn fleet_host_local_adapter_starts_reads_bounded_logs_and_stops() {
let tmp = TempDir::new().unwrap();
let mut adapter = LocalProcessFleetHostAdapter::new(tmp.path());
let script = if cfg!(windows) {
"echo 0123456789abcdef & ping -n 30 127.0.0.1 >NUL"
} else {
"printf 0123456789abcdef; sleep 30"
};
let mut request = FleetWorkerStartRequest::new("local-1", shell_command(script));
request.log_limit_bytes = 16;
let handle = adapter.start_worker(request).unwrap();
assert_eq!(handle.host_kind, FleetHostKind::LocalProcess);
assert!(handle.pid.is_some());
let status = adapter.read_status("local-1").unwrap();
assert_eq!(status.state, FleetHostWorkerState::Running);
let logs = wait_for_log(&adapter, "local-1", "abcdef");
assert!(logs.ends_with("0123456789abcdef") || logs.contains("0123456789abcdef"));
let bounded = adapter.read_logs("local-1", 6).unwrap();
assert!(bounded.ends_with("abcdef"), "{bounded:?}");
let status = adapter.stop_worker("local-1").unwrap();
assert_eq!(status.state, FleetHostWorkerState::Stopped);
adapter.cleanup_worker("local-1").unwrap();
assert_eq!(
adapter.read_status("local-1").unwrap_err().kind,
FleetHostErrorKind::Terminal
);
}
#[test]
fn fleet_host_local_adapter_restarts_worker_with_same_request() {
let tmp = TempDir::new().unwrap();
let mut adapter = LocalProcessFleetHostAdapter::new(tmp.path());
let script = if cfg!(windows) {
"echo restart-ready & ping -n 30 127.0.0.1 >NUL"
} else {
"printf restart-ready; sleep 30"
};
let request = FleetWorkerStartRequest::new("local-restart", shell_command(script));
let first = adapter.start_worker(request).unwrap();
let restarted = adapter.restart_worker("local-restart").unwrap();
assert_eq!(restarted.worker_id, first.worker_id);
assert_eq!(restarted.host_kind, FleetHostKind::LocalProcess);
assert_ne!(restarted.pid, first.pid);
let logs = wait_for_log(&adapter, "local-restart", "restart-ready");
assert!(logs.contains("restart-ready"));
adapter.stop_worker("local-restart").unwrap();
}
#[test]
fn fleet_host_rejects_secret_like_env_allowlist_keys() {
let mut env = BTreeMap::new();
env.insert("DEEPSEEK_API_KEY".to_string(), "secret".to_string());
let allowlist = BTreeSet::from(["DEEPSEEK_API_KEY".to_string()]);
let err = filtered_env(&env, &allowlist).unwrap_err();
assert_eq!(err.kind, FleetHostErrorKind::Configuration);
assert!(err.message.contains("looks secret-bearing"));
}
#[test]
fn fleet_host_ssh_command_uses_sendenv_without_argv_secret_values() {
let tmp = TempDir::new().unwrap();
let mut config = SshFleetHostConfig::new("builder.example.test", "/srv/codewhale");
config.user = Some("fleet".to_string());
config.port = Some(2222);
config.identity = Some(PathBuf::from("/tmp/fleet_id"));
config.codewhale_binary = "/usr/local/bin/codewhale".to_string();
config.env_allowlist = BTreeSet::from(["FLEET_PROFILE".to_string()]);
let adapter = SshFleetHostAdapter::new(tmp.path(), config).unwrap();
let mut request = FleetWorkerStartRequest::new(
"ssh-1",
FleetWorkerCommand::new("codewhale", ["fleet-worker", "noop"]),
);
request.env.insert(
"FLEET_PROFILE".to_string(),
"super-secret-profile-value".to_string(),
);
let command = adapter.build_ssh_command(&request).unwrap();
let argv = command.args.join(" ");
assert_eq!(command.program, "ssh");
assert!(argv.contains("BatchMode=yes"));
assert!(argv.contains("SendEnv=FLEET_PROFILE"));
assert!(argv.contains("fleet@builder.example.test"));
assert!(argv.contains("/usr/local/bin/codewhale"));
assert!(argv.contains("fleet-worker"));
assert!(!argv.contains("super-secret-profile-value"));
}
#[test]
fn fleet_host_ssh_config_requires_explicit_safe_fields() {
let tmp = TempDir::new().unwrap();
let mut config = SshFleetHostConfig::new("", "/srv/codewhale");
config.env_allowlist = BTreeSet::from(["SAFE_FLAG".to_string()]);
let err = SshFleetHostAdapter::new(tmp.path(), config).unwrap_err();
assert_eq!(err.kind, FleetHostErrorKind::Configuration);
assert!(err.message.contains("explicit host"));
}
#[test]
fn fleet_host_ssh_config_maps_from_protocol_host_spec() {
let spec = FleetHostSpec::Ssh {
host: "builder.example.test".to_string(),
port: Some(2222),
user: Some("fleet".to_string()),
identity: Some(PathBuf::from("/tmp/fleet_id")),
working_directory: Some(PathBuf::from("/srv/codewhale")),
env_allowlist: vec!["FLEET_PROFILE".to_string()],
codewhale_binary: Some("/usr/local/bin/codewhale".to_string()),
};
let config = SshFleetHostConfig::from_host_spec(&spec).unwrap();
assert_eq!(config.host, "builder.example.test");
assert_eq!(config.port, Some(2222));
assert_eq!(config.user.as_deref(), Some("fleet"));
assert_eq!(config.working_directory, PathBuf::from("/srv/codewhale"));
assert!(config.env_allowlist.contains("FLEET_PROFILE"));
assert_eq!(config.codewhale_binary, "/usr/local/bin/codewhale");
}
}
+1
View File
@@ -1,4 +1,5 @@
//! Agent Fleet control plane — local-first manager, ledger, and workers.
pub mod host;
pub mod ledger;
pub mod manager;
+83
View File
@@ -0,0 +1,83 @@
# Agent Fleet
Agent Fleet is the local-first control plane for durable multi-worker runs. The
initial CLI surface is:
```sh
codewhale fleet init
codewhale fleet run tasks.json --max-workers 4
codewhale fleet status
codewhale fleet inspect <worker-id>
codewhale fleet interrupt <worker-id>
codewhale fleet restart <worker-id>
codewhale fleet stop --all
```
Fleet state is stored under the workspace in `.codewhale/fleet.jsonl`. Worker
logs and adapter logs are stored under `.codewhale/fleet/` and
`.codewhale/fleet-host/`.
## Task Spec
`codewhale fleet run` accepts JSON or TOML. A minimal JSON spec:
```json
{
"name": "local smoke",
"tasks": [
{
"id": "lint",
"name": "Lint",
"instructions": "Run the lint check and report failures.",
"expected_artifacts": ["log"]
}
]
}
```
Workers are optional. If omitted, CodeWhale creates local worker slots up to
`--max-workers`.
## Host Adapters
The host adapter boundary supports local child processes and explicit SSH
workers. Adapters expose the same operations: start, read status, read bounded
logs, interrupt, restart, stop, and cleanup.
Local workers run as child processes with stdin closed and stdout/stderr written
to bounded fleet host logs. They inherit only a small safe base environment
such as `PATH` and explicitly allowlisted variables.
SSH workers run through the system `ssh` client with `BatchMode=yes` and a
bounded connect timeout. Remote environment variables are sent with OpenSSH
`SendEnv`; values are not embedded in the local ssh argv or fleet logs.
Example SSH worker spec:
```json
{
"id": "builder-1",
"name": "Builder 1",
"host": {
"kind": "ssh",
"host": "builder.example.com",
"user": "codewhale",
"port": 22,
"identity": "~/.ssh/codewhale_fleet",
"working_directory": "/srv/codewhale/work",
"env_allowlist": ["CODEWHALE_PROFILE"],
"codewhale_binary": "/usr/local/bin/codewhale"
},
"capabilities": ["local", "linux", "tests"],
"max_concurrent_tasks": 1
}
```
Defaults are intentionally conservative:
- no hosted control plane or cloud provisioning is enabled;
- SSH requires an explicit host, working directory, and CodeWhale binary path;
- secret-like environment names such as `TOKEN`, `SECRET`, `PASSWORD`,
`API_KEY`, and `PRIVATE_KEY` are rejected from adapter allowlists;
- secrets should remain in CodeWhale config providers or remote host config,
not in task instructions, argv, or fleet logs.