feat(fleet): drive fleet run through exec workers

Wire the foreground fleet manager loop to FleetExecutor so leased tasks launch real codewhale exec workers, stream progress into the ledger, and record terminal receipts from subprocess outcomes. Remove the local_result simulation path and update manager tests to use a fake codewhale binary.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hunter B
2026-06-13 08:02:36 -07:00
parent 2118768148
commit 4f0d261094
3 changed files with 584 additions and 150 deletions
+103 -8
View File
@@ -21,7 +21,7 @@
#![allow(dead_code)]
use codewhale_config::FleetExecConfig;
use codewhale_protocol::fleet::{FleetTaskSpec, FleetWorkerEventPayload};
use codewhale_protocol::fleet::{FleetHostSpec, FleetTaskSpec, FleetWorkerEventPayload};
use super::host::{FleetHostAdapter, FleetWorkerCommand};
use super::worker_runtime::fleet_task_prompt;
@@ -154,21 +154,38 @@ pub fn classify_worker_exit(exit_code: Option<i32>, stopped: bool) -> FleetWorke
/// construction never touches the orchestrator — the parent only ingests a
/// compact event stream, which is what keeps it light at high fanout.
pub struct FleetExecutor {
workspace: std::path::PathBuf,
adapter: super::host::LocalProcessFleetHostAdapter,
ssh_adapters: std::collections::BTreeMap<String, super::host::SshFleetHostAdapter>,
streams: std::collections::BTreeMap<String, WorkerStream>,
}
struct WorkerStream {
log_path: std::path::PathBuf,
host: WorkerStreamHost,
offset: u64,
pending: String,
terminal: bool,
}
enum WorkerStreamHost {
Local,
Ssh(String),
}
#[derive(Debug, Clone)]
pub struct FleetWorkerTerminalEvent {
pub payload: FleetWorkerEventPayload,
pub exit_code: Option<i32>,
}
impl FleetExecutor {
pub fn new(workspace: impl AsRef<std::path::Path>) -> Self {
let workspace = workspace.as_ref().to_path_buf();
Self {
adapter: super::host::LocalProcessFleetHostAdapter::new(workspace),
adapter: super::host::LocalProcessFleetHostAdapter::new(&workspace),
workspace,
ssh_adapters: std::collections::BTreeMap::new(),
streams: std::collections::BTreeMap::new(),
}
}
@@ -179,20 +196,79 @@ impl FleetExecutor {
worker_id: &str,
command: FleetWorkerCommand,
cwd: Option<std::path::PathBuf>,
) -> super::host::FleetHostResult<()> {
) -> super::host::FleetHostResult<super::host::FleetWorkerHandle> {
self.start_worker_on_host(worker_id, &FleetHostSpec::Local, command, cwd)
}
/// Start a worker on the requested fleet host.
pub fn start_worker_on_host(
&mut self,
worker_id: &str,
host: &FleetHostSpec,
command: FleetWorkerCommand,
cwd: Option<std::path::PathBuf>,
) -> super::host::FleetHostResult<super::host::FleetWorkerHandle> {
let mut request = super::host::FleetWorkerStartRequest::new(worker_id, command);
request.cwd = cwd;
let handle = self.adapter.start_worker(request)?;
let (handle, host) = match host {
FleetHostSpec::Local => {
let handle = self.adapter.start_worker(request)?;
(handle, WorkerStreamHost::Local)
}
FleetHostSpec::Ssh { .. } => {
let config = super::host::SshFleetHostConfig::from_host_spec(host)?;
let key = worker_id.to_string();
let adapter = self.ssh_adapters.entry(key.clone()).or_insert(
super::host::SshFleetHostAdapter::new(&self.workspace, config)?,
);
let handle = adapter.start_worker(request)?;
(handle, WorkerStreamHost::Ssh(key))
}
FleetHostSpec::Docker { image, .. } => {
return Err(super::host::FleetHostError {
kind: super::host::FleetHostErrorKind::Configuration,
message: format!("docker fleet workers are not wired yet (image {image})"),
});
}
};
self.streams.insert(
worker_id.to_string(),
WorkerStream {
log_path: handle.log_path,
log_path: handle.log_path.clone(),
host,
offset: 0,
pending: String::new(),
terminal: false,
},
);
Ok(())
Ok(handle)
}
pub fn is_tracking(&self, worker_id: &str) -> bool {
self.streams.contains_key(worker_id)
}
pub fn worker_ids(&self) -> Vec<String> {
self.streams.keys().cloned().collect()
}
/// Stop tracking a terminal worker so the scheduler can reuse the same
/// logical worker id for the next queued task.
pub fn forget_worker(&mut self, worker_id: &str) {
let Some(stream) = self.streams.remove(worker_id) else {
return;
};
match stream.host {
WorkerStreamHost::Local => {
let _ = self.adapter.cleanup_worker(worker_id);
}
WorkerStreamHost::Ssh(key) => {
if let Some(adapter) = self.ssh_adapters.get_mut(&key) {
let _ = adapter.cleanup_worker(worker_id);
}
self.ssh_adapters.remove(&key);
}
}
}
/// Read any newly-written stream-json lines for a worker and map them to
@@ -228,10 +304,26 @@ impl FleetExecutor {
/// once. Returns `None` while the worker is still running or already
/// finalized.
pub fn poll_terminal(&mut self, worker_id: &str) -> Option<FleetWorkerEventPayload> {
self.poll_terminal_with_status(worker_id)
.map(|event| event.payload)
}
/// Poll the worker process and include the raw exit code for receipt
/// verification.
pub fn poll_terminal_with_status(
&mut self,
worker_id: &str,
) -> Option<FleetWorkerTerminalEvent> {
if self.streams.get(worker_id).is_none_or(|s| s.terminal) {
return None;
}
let status = self.adapter.read_status(worker_id).ok()?;
let status = match self.streams.get(worker_id).map(|s| &s.host)? {
WorkerStreamHost::Local => self.adapter.read_status(worker_id).ok()?,
WorkerStreamHost::Ssh(key) => self
.ssh_adapters
.get_mut(key)
.and_then(|adapter| adapter.read_status(worker_id).ok())?,
};
let terminal = match status.state {
super::host::FleetHostWorkerState::Running
| super::host::FleetHostWorkerState::Unknown => return None,
@@ -246,7 +338,10 @@ impl FleetExecutor {
if let Some(stream) = self.streams.get_mut(worker_id) {
stream.terminal = true;
}
Some(terminal)
Some(FleetWorkerTerminalEvent {
payload: terminal,
exit_code: status.exit_code,
})
}
/// True once every started worker has reached a terminal state.
+459 -134
View File
@@ -16,6 +16,8 @@ use codewhale_protocol::fleet::*;
use serde_json::Value;
use uuid::Uuid;
use super::executor::{FleetExecutor, FleetWorkerTerminalEvent, build_worker_exec_command};
use super::host::FleetHostErrorKind;
use super::ledger::{FleetLedger, FleetLedgerState, FleetTaskLedgerStatus, FleetTaskState};
use super::task_spec::{
FleetTaskSpecDocument, FleetTaskVerificationInput, load_task_spec_document,
@@ -70,6 +72,13 @@ pub struct FleetTickReport {
pub heartbeats: usize,
}
#[derive(Debug, Clone, Default)]
pub struct FleetExecutorTickReport {
pub started: usize,
pub events: usize,
pub terminals: usize,
}
#[derive(Debug, Clone, Default)]
pub struct FleetStatusSnapshot {
pub runs: usize,
@@ -126,6 +135,13 @@ pub struct FleetWorkerRuntimeProjection {
pub has_session: bool,
}
#[derive(Debug, Clone)]
struct FleetExecutorTaskContext {
entry: FleetInboxEntry,
task_spec: FleetTaskSpec,
worker_id: String,
}
impl FleetManager {
pub fn open(workspace: impl AsRef<Path>) -> Result<Self> {
let workspace = workspace.as_ref().to_path_buf();
@@ -299,6 +315,76 @@ impl FleetManager {
Ok(status.queued + status.running + status.stale > 0)
}
pub async fn run_to_completion(
&self,
run_id: &FleetRunId,
max_workers: usize,
executor: &mut FleetExecutor,
codewhale_binary: &str,
model: Option<&str>,
tick_interval: Duration,
) -> Result<FleetStatusSnapshot> {
let max_workers = max_workers.clamp(1, 128);
loop {
self.schedule_run(run_id, max_workers)?;
self.drive_executor_tick(run_id, executor, codewhale_binary, model)?;
self.refresh_run_status(run_id)?;
if !self.run_has_open_work(run_id)? {
return self.run_status(run_id);
}
tokio::time::sleep(tick_interval).await;
}
}
pub fn drive_executor_tick(
&self,
run_id: &FleetRunId,
executor: &mut FleetExecutor,
codewhale_binary: &str,
model: Option<&str>,
) -> Result<FleetExecutorTickReport> {
let mut report = FleetExecutorTickReport::default();
report.started += self.start_leased_workers(run_id, executor, codewhale_binary, model)?;
for worker_id in executor.worker_ids() {
for payload in executor.drain_events(&worker_id) {
// The subprocess exit is the task-completion authority. Stream
// `done` / `error` lines are useful progress signals, but
// appending them as terminal ledger events before the process
// exits would free the logical worker too early.
if is_terminal_payload(&payload) {
continue;
}
let Some(task) = self.executor_task_context(&worker_id)? else {
continue;
};
self.append_worker_event(
&task.entry.run_id,
&worker_id,
&task.entry.task_id,
payload,
)?;
self.ledger
.heartbeat(&worker_id, &timestamp(), None, None)?;
report.events += 1;
}
if let Some(terminal) = executor.poll_terminal_with_status(&worker_id) {
let Some(task) = self.executor_task_context(&worker_id)? else {
executor.forget_worker(&worker_id);
continue;
};
if self.record_task_outcome(&task, terminal)? {
report.terminals += 1;
}
executor.forget_worker(&worker_id);
}
}
self.refresh_run_status(run_id)?;
Ok(report)
}
pub fn inspect_worker(&self, worker_id: &str) -> Result<FleetWorkerInspection> {
let state = self.ledger.rebuild_state()?;
let latest_event = latest_event_for_worker(&state, worker_id).cloned();
@@ -602,68 +688,142 @@ impl FleetManager {
}
}
self.maybe_complete_local_simulation(entry, worker_id, task_spec, log_artifact)
Ok(())
}
fn maybe_complete_local_simulation(
fn start_leased_workers(
&self,
entry: &FleetInboxEntry,
worker_id: &str,
task_spec: &FleetTaskSpec,
log_artifact: FleetArtifactRef,
) -> Result<()> {
let Some(result) = local_simulation_result(task_spec) else {
return Ok(());
run_id: &FleetRunId,
executor: &mut FleetExecutor,
codewhale_binary: &str,
model: Option<&str>,
) -> Result<usize> {
let state = self.ledger.rebuild_state()?;
let run = state
.runs
.get(&run_id.0)
.cloned()
.ok_or_else(|| anyhow!("fleet run {} does not exist", run_id.0))?;
let mut started = 0usize;
for task in active_tasks_for_run(&state, run_id) {
let Some(worker_id) = task.leased_to.as_deref() else {
continue;
};
if executor.is_tracking(worker_id) {
continue;
}
let Some(task_spec) = run
.task_specs
.iter()
.find(|spec| spec.id == task.entry.task_id)
.cloned()
else {
continue;
};
let worker_spec = run
.worker_specs
.iter()
.find(|worker| worker.id == worker_id)
.cloned()
.unwrap_or_else(|| default_local_worker(worker_id));
let command =
build_worker_exec_command(codewhale_binary, &task_spec, &self.exec_config, model);
let cwd = resolve_task_cwd(&self.workspace, &task_spec);
match executor.start_worker_on_host(worker_id, &worker_spec.host, command, Some(cwd)) {
Ok(handle) => {
let artifact = self.host_log_artifact(&handle.log_path);
self.append_worker_event(
run_id,
worker_id,
&task.entry.task_id,
FleetWorkerEventPayload::Artifact(artifact),
)?;
started += 1;
}
Err(err) => {
let recoverable = matches!(err.kind, FleetHostErrorKind::Retryable);
let task = FleetExecutorTaskContext {
entry: task.entry.clone(),
task_spec,
worker_id: worker_id.to_string(),
};
let terminal = FleetWorkerTerminalEvent {
payload: FleetWorkerEventPayload::Failed {
reason: err.message,
recoverable,
},
exit_code: None,
};
let _ = self.record_task_outcome(&task, terminal)?;
}
}
}
Ok(started)
}
fn executor_task_context(&self, worker_id: &str) -> Result<Option<FleetExecutorTaskContext>> {
let state = self.ledger.rebuild_state()?;
let Some(task) = active_task_for_worker(&state, worker_id)
.or_else(|| latest_task_for_worker(&state, worker_id))
else {
return Ok(None);
};
let now = timestamp();
let (payload, receipt_result, failure_kind, exit_code) = match result {
FleetLocalSimulationResult::Pass => (
FleetWorkerEventPayload::Completed {
exit_code: Some(0),
summary: Some("local fleet smoke task completed".to_string()),
},
FleetTaskResult::Pass,
None,
Some(0),
),
FleetLocalSimulationResult::Fail => (
FleetWorkerEventPayload::Failed {
reason: "local fleet smoke task failed".to_string(),
recoverable: false,
},
FleetTaskResult::Fail,
Some(FleetTaskFailureKind::Task),
Some(1),
),
FleetLocalSimulationResult::Skip => (
FleetWorkerEventPayload::Completed {
exit_code: Some(0),
summary: Some("local fleet smoke task skipped".to_string()),
},
FleetTaskResult::Skip,
None,
Some(0),
),
FleetLocalSimulationResult::Timeout => (
FleetWorkerEventPayload::Failed {
reason: "local fleet smoke task timed out".to_string(),
recoverable: true,
},
FleetTaskResult::Timeout,
Some(FleetTaskFailureKind::Transport),
None,
),
let Some(run) = state.runs.get(&task.entry.run_id.0) else {
return Ok(None);
};
self.append_worker_event(&entry.run_id, worker_id, &entry.task_id, payload)?;
let verification_input = FleetTaskVerificationInput {
run_id: entry.run_id.clone(),
task_id: entry.task_id.clone(),
let Some(task_spec) = run
.task_specs
.iter()
.find(|spec| spec.id == task.entry.task_id)
.cloned()
else {
return Ok(None);
};
Ok(Some(FleetExecutorTaskContext {
entry: task.entry.clone(),
task_spec,
worker_id: worker_id.to_string(),
exit_code,
artifacts: vec![log_artifact.clone()],
}))
}
fn record_task_outcome(
&self,
task: &FleetExecutorTaskContext,
terminal: FleetWorkerTerminalEvent,
) -> Result<bool> {
let state = self.ledger.rebuild_state()?;
let key = task_key(&task.entry.run_id.0, &task.entry.task_id);
let Some(current) = state.tasks.get(&key) else {
return Ok(false);
};
if task_spec.scorer.is_some() {
let verification = verify_task_result(&self.workspace, task_spec, &verification_input);
if !matches!(current.status, FleetTaskLedgerStatus::Leased) {
return Ok(false);
}
let (receipt_result, failure_kind, exit_code) =
task_receipt_outcome(&terminal.payload, terminal.exit_code);
self.append_worker_event(
&task.entry.run_id,
&task.worker_id,
&task.entry.task_id,
terminal.payload,
)?;
let artifacts = self.task_artifacts_for_receipt(
&task.entry.run_id,
&task.entry.task_id,
&task.worker_id,
)?;
let verification_input = FleetTaskVerificationInput {
run_id: task.entry.run_id.clone(),
task_id: task.entry.task_id.clone(),
worker_id: task.worker_id.clone(),
exit_code,
artifacts,
};
if task.task_spec.scorer.is_some() {
let verification =
verify_task_result(&self.workspace, &task.task_spec, &verification_input);
let receipt = record_verification_receipt(
&self.ledger,
&self.workspace,
@@ -675,25 +835,73 @@ impl FleetManager {
FleetTaskResult::Fail | FleetTaskResult::Timeout
) {
self.ledger.mark_task_terminal_status(
&entry.run_id,
&entry.task_id,
Some(worker_id),
&task.entry.run_id,
&task.entry.task_id,
Some(&task.worker_id),
&timestamp(),
FleetTaskLedgerStatus::Failed,
)?;
}
return Ok(());
return Ok(true);
}
self.ledger.record_receipt(FleetReceipt {
run_id: entry.run_id.clone(),
task_id: entry.task_id.clone(),
worker_id: worker_id.to_string(),
completed_at: now,
run_id: task.entry.run_id.clone(),
task_id: task.entry.task_id.clone(),
worker_id: task.worker_id.clone(),
completed_at: timestamp(),
result: receipt_result,
failure_kind,
artifacts: vec![log_artifact],
artifacts: verification_input.artifacts,
score: None,
})
})?;
Ok(true)
}
fn task_artifacts_for_receipt(
&self,
run_id: &FleetRunId,
task_id: &str,
worker_id: &str,
) -> Result<Vec<FleetArtifactRef>> {
let state = self.ledger.rebuild_state()?;
Ok(state
.artifact_events
.values()
.filter(|event| {
event.run_id == *run_id && event.task_id == task_id && event.worker_id == worker_id
})
.filter_map(|event| match &event.payload {
FleetWorkerEventPayload::Artifact(artifact) => {
Some(self.refresh_artifact_size(artifact.clone()))
}
_ => None,
})
.collect())
}
fn refresh_artifact_size(&self, mut artifact: FleetArtifactRef) -> FleetArtifactRef {
let path = if artifact.path.is_absolute() {
artifact.path.clone()
} else {
self.workspace.join(&artifact.path)
};
artifact.size_bytes = std::fs::metadata(path).ok().map(|meta| meta.len());
artifact
}
fn host_log_artifact(&self, path: &Path) -> FleetArtifactRef {
let rel_path = path
.strip_prefix(&self.workspace)
.map(Path::to_path_buf)
.unwrap_or_else(|_| path.to_path_buf());
let size_bytes = std::fs::metadata(path).ok().map(|meta| meta.len());
FleetArtifactRef {
kind: FleetArtifactKind::Log,
path: rel_path,
checksum: None,
mime_type: Some("application/x-ndjson".to_string()),
size_bytes,
}
}
fn append_worker_event(
@@ -859,28 +1067,38 @@ impl FleetManager {
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum FleetLocalSimulationResult {
Pass,
Fail,
Skip,
Timeout,
}
fn default_local_workers(run_id: &FleetRunId, max_workers: usize) -> Vec<FleetWorkerSpec> {
(1..=max_workers)
.map(|index| FleetWorkerSpec {
id: format!("{}-local-{}", run_id.0, index),
name: format!("Local worker {index}"),
host: FleetHostSpec::Local,
trust_level: Some(FleetTrustLevel::Local),
labels: BTreeMap::new(),
capabilities: vec!["local".to_string()],
max_concurrent_tasks: Some(1),
.map(|index| {
default_local_worker_with_name(&format!("{}-local-{}", run_id.0, index), index)
})
.collect()
}
fn default_local_worker_with_name(worker_id: &str, index: usize) -> FleetWorkerSpec {
FleetWorkerSpec {
id: worker_id.to_string(),
name: format!("Local worker {index}"),
host: FleetHostSpec::Local,
trust_level: Some(FleetTrustLevel::Local),
labels: BTreeMap::new(),
capabilities: vec!["local".to_string()],
max_concurrent_tasks: Some(1),
}
}
fn default_local_worker(worker_id: &str) -> FleetWorkerSpec {
FleetWorkerSpec {
id: worker_id.to_string(),
name: worker_id.to_string(),
host: FleetHostSpec::Local,
trust_level: Some(FleetTrustLevel::Local),
labels: BTreeMap::new(),
capabilities: vec!["local".to_string()],
max_concurrent_tasks: Some(1),
}
}
fn worker_ids_for_run(run: &FleetRun, max_workers: usize) -> Vec<String> {
run.worker_specs
.iter()
@@ -1048,30 +1266,6 @@ fn latest_error_for_worker(state: &FleetLedgerState, worker_id: &str) -> Option<
.map(|(_, message)| message)
}
fn local_simulation_result(task: &FleetTaskSpec) -> Option<FleetLocalSimulationResult> {
if task
.metadata
.get("local_complete")
.and_then(Value::as_bool)
.unwrap_or(false)
{
return Some(FleetLocalSimulationResult::Pass);
}
match task
.metadata
.get("local_result")
.and_then(Value::as_str)
.map(str::to_ascii_lowercase)
.as_deref()
{
Some("pass" | "passed" | "ok" | "completed") => Some(FleetLocalSimulationResult::Pass),
Some("fail" | "failed" | "error") => Some(FleetLocalSimulationResult::Fail),
Some("skip" | "skipped") => Some(FleetLocalSimulationResult::Skip),
Some("timeout" | "timed_out") => Some(FleetLocalSimulationResult::Timeout),
_ => None,
}
}
fn task_priority(task: &FleetTaskSpec) -> i32 {
task.metadata
.get("priority")
@@ -1080,6 +1274,61 @@ fn task_priority(task: &FleetTaskSpec) -> i32 {
.unwrap_or(0)
}
fn resolve_task_cwd(workspace: &Path, task: &FleetTaskSpec) -> PathBuf {
let Some(root) = task
.workspace
.as_ref()
.and_then(|workspace| workspace.root.as_ref())
else {
return workspace.to_path_buf();
};
if root.is_absolute() {
root.clone()
} else {
workspace.join(root)
}
}
fn task_receipt_outcome(
payload: &FleetWorkerEventPayload,
exit_code: Option<i32>,
) -> (FleetTaskResult, Option<FleetTaskFailureKind>, Option<i32>) {
match payload {
FleetWorkerEventPayload::Completed {
exit_code: payload_exit_code,
..
} => (
FleetTaskResult::Pass,
None,
exit_code.or(*payload_exit_code),
),
FleetWorkerEventPayload::Cancelled { .. } => (FleetTaskResult::Skip, None, exit_code),
FleetWorkerEventPayload::Failed { .. } => {
let failure_kind = if exit_code.is_none() {
FleetTaskFailureKind::Transport
} else {
FleetTaskFailureKind::Task
};
(FleetTaskResult::Fail, Some(failure_kind), exit_code)
}
_ => (FleetTaskResult::Partial, None, exit_code),
}
}
fn is_terminal_payload(payload: &FleetWorkerEventPayload) -> bool {
matches!(
payload,
FleetWorkerEventPayload::Completed { .. }
| FleetWorkerEventPayload::Failed { .. }
| FleetWorkerEventPayload::Cancelled { .. }
| FleetWorkerEventPayload::Interrupted { .. }
)
}
fn task_key(run_id: &str, task_id: &str) -> String {
format!("{run_id}:{task_id}")
}
fn event_key(worker_id: &str, run_id: &str, task_id: &str) -> String {
format!("{worker_id}:{run_id}:{task_id}")
}
@@ -1139,6 +1388,42 @@ mod tests {
path
}
#[cfg(unix)]
fn fake_codewhale(dir: &TempDir, body: &str) -> PathBuf {
use std::os::unix::fs::PermissionsExt;
let path = dir.path().join("fake-codewhale");
std::fs::write(&path, body).unwrap();
let mut permissions = std::fs::metadata(&path).unwrap().permissions();
permissions.set_mode(0o755);
std::fs::set_permissions(&path, permissions).unwrap();
path
}
#[cfg(unix)]
fn complete_with_fake_codewhale(
manager: &FleetManager,
run_id: &FleetRunId,
max_workers: usize,
binary: &Path,
) -> FleetStatusSnapshot {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut executor = FleetExecutor::new(&manager.workspace);
rt.block_on(async {
manager
.run_to_completion(
run_id,
max_workers,
&mut executor,
&binary.display().to_string(),
None,
Duration::from_millis(10),
)
.await
.unwrap()
})
}
#[test]
fn fleet_manager_creates_run_and_starts_workers_up_to_cap() {
let tmp = TempDir::new().unwrap();
@@ -1204,23 +1489,30 @@ mod tests {
assert_eq!(status.running, 0);
}
#[cfg(unix)]
#[test]
fn fleet_manager_can_record_completed_local_smoke_tasks() {
let tmp = TempDir::new().unwrap();
let manager = FleetManager::open(tmp.path()).unwrap();
let mut completed = task("task-a");
completed
.metadata
.insert("local_result".to_string(), json!("pass"));
let path = task_spec_file(&tmp, vec![completed]);
let path = task_spec_file(&tmp, vec![task("task-a"), task("task-b"), task("task-c")]);
let fake = fake_codewhale(
&tmp,
r#"#!/bin/sh
printf '{"type":"tool_use","name":"read_file","id":"fake","input":{}}\n'
printf '{"type":"done"}\n'
exit 0
"#,
);
let report = manager.create_run_from_task_spec_path(&path, 1).unwrap();
let status = manager.run_status(&report.run_id).unwrap();
assert_eq!(status.completed, 1);
assert_eq!(report.leased, 1);
assert_eq!(report.queued, 2);
let status = complete_with_fake_codewhale(&manager, &report.run_id, 1, &fake);
assert_eq!(status.completed, 3);
assert_eq!(status.running, 0);
let state = manager.ledger.rebuild_state().unwrap();
assert_eq!(state.receipts.len(), 1);
assert_eq!(state.receipts.len(), 3);
}
#[test]
@@ -1260,20 +1552,25 @@ mod tests {
);
}
#[cfg(unix)]
#[test]
fn fleet_task_spec_local_scorer_records_receipt_artifact() {
let tmp = TempDir::new().unwrap();
let manager = FleetManager::open(tmp.path()).unwrap();
let mut completed = task("task-a");
completed.scorer = Some(FleetScorerSpec::ExitCode);
completed
.metadata
.insert("local_result".to_string(), json!("pass"));
let path = task_spec_file(&tmp, vec![completed]);
let fake = fake_codewhale(
&tmp,
r#"#!/bin/sh
printf '{"type":"done"}\n'
exit 0
"#,
);
let report = manager.create_run_from_task_spec_path(&path, 1).unwrap();
let status = complete_with_fake_codewhale(&manager, &report.run_id, 1, &fake);
let status = manager.run_status(&report.run_id).unwrap();
assert_eq!(status.completed, 1);
assert_eq!(status.failed, 0);
assert_eq!(status.partial, 0);
@@ -1290,33 +1587,62 @@ mod tests {
);
}
#[cfg(unix)]
#[test]
fn fleet_task_spec_status_distinguishes_failure_sources() {
let tmp = TempDir::new().unwrap();
let manager = FleetManager::open(tmp.path()).unwrap();
let mut transport = task("transport-failure");
transport.scorer = Some(FleetScorerSpec::ExitCode);
transport
.metadata
.insert("local_result".to_string(), json!("timeout"));
let mut task_failed = task("task-failure");
let mut task_failed = task("a-task-failure");
task_failed.scorer = Some(FleetScorerSpec::ExitCode);
task_failed
.metadata
.insert("local_result".to_string(), json!("fail"));
let mut verifier_failed = task("verifier-failure");
task_failed.instructions = "task-failure".to_string();
let mut transport = task("b-transport-failure");
transport.scorer = Some(FleetScorerSpec::ExitCode);
let mut verifier_failed = task("c-verifier-failure");
verifier_failed.scorer = Some(FleetScorerSpec::RegexMatch {
path: PathBuf::from("missing.log"),
pattern: "[".to_string(),
});
verifier_failed
.metadata
.insert("local_result".to_string(), json!("pass"));
let path = task_spec_file(&tmp, vec![transport, task_failed, verifier_failed]);
let fake = fake_codewhale(
&tmp,
r#"#!/bin/sh
case "$*" in
*task-failure*)
printf '{"type":"error","error":"task failed"}\n'
exit 7
;;
*)
printf '{"type":"done"}\n'
exit 0
;;
esac
"#,
);
let doc = FleetTaskSpecDocument {
name: Some("failure source smoke".to_string()),
labels: BTreeMap::new(),
security_policy: None,
workers: vec![
default_local_worker("local-task"),
FleetWorkerSpec {
id: "docker-transport".to_string(),
name: "Docker transport".to_string(),
host: FleetHostSpec::Docker {
image: "fake".to_string(),
args: Vec::new(),
},
trust_level: Some(FleetTrustLevel::Sandbox),
labels: BTreeMap::new(),
capabilities: vec![],
max_concurrent_tasks: Some(1),
},
default_local_worker("local-verifier"),
],
tasks: vec![task_failed, transport, verifier_failed],
};
let report = manager.create_run_from_task_spec_path(&path, 3).unwrap();
let report = manager.create_run(doc, 3).unwrap();
let status = complete_with_fake_codewhale(&manager, &report.run_id, 3, &fake);
let status = manager.run_status(&report.run_id).unwrap();
assert_eq!(status.failed, 3);
assert_eq!(status.transport_failed, 1);
assert_eq!(status.task_failed, 1);
@@ -1517,7 +1843,6 @@ mod tests {
fn fleet_security_policy_propagates_from_task_spec_document_to_run() {
let tmp = TempDir::new().unwrap();
let manager = FleetManager::open(tmp.path()).unwrap();
let path = task_spec_file(&tmp, vec![task("task-a")]);
// Rewrite the spec file with a security_policy block.
let doc = serde_json::json!({
"name": "secure smoke",
+22 -8
View File
@@ -1464,6 +1464,7 @@ async fn run_fleet_command(workspace: &Path, config: &Config, args: FleetArgs) -
FleetAlertAdapterConfig, FleetAlertConfig, FleetAlertDispatcher, FleetAlertEvent,
FleetEnvSecretResolver,
};
use crate::fleet::executor::FleetExecutor;
use crate::fleet::manager::{FleetManager, FleetStatusSnapshot, FleetWorkerInspection};
use codewhale_protocol::fleet::{
FleetAlertEventClass, FleetArtifactKind, FleetRunId, FleetWorkerEventPayload,
@@ -1713,6 +1714,14 @@ async fn run_fleet_command(workspace: &Path, config: &Config, args: FleetArgs) -
}
}
fn fleet_codewhale_binary() -> String {
std::env::var("CODEWHALE_FLEET_CODEWHALE_BINARY")
.ok()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
.unwrap_or_else(|| "codewhale".to_string())
}
let exec_config = config
.fleet
.as_ref()
@@ -1744,14 +1753,19 @@ async fn run_fleet_command(workspace: &Path, config: &Config, args: FleetArgs) -
println!(
"manager loop running; use `codewhale fleet status`, `inspect`, `interrupt`, or `stop --all` from another terminal."
);
loop {
manager.schedule_run(&report.run_id, max_workers)?;
if !manager.run_has_open_work(&report.run_id)? {
print_status(&manager.run_status(&report.run_id)?);
break;
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
let mut executor = FleetExecutor::new(workspace);
let codewhale_binary = fleet_codewhale_binary();
let status = manager
.run_to_completion(
&report.run_id,
max_workers,
&mut executor,
&codewhale_binary,
None,
Duration::from_secs(2),
)
.await?;
print_status(&status);
Ok(())
}
FleetCommand::Status => {