feat(tui): add local fleet manager commands

This commit is contained in:
Hunter B
2026-06-12 18:15:18 -07:00
parent 8ba8f7f2f5
commit 66a45c0070
4 changed files with 1286 additions and 1 deletions
+42
View File
@@ -87,6 +87,8 @@ pub struct FleetLedgerState {
pub latest_seq: BTreeMap<String, u64>,
/// Latest event envelope per worker_id:run_id:task_id.
pub latest_events: BTreeMap<String, FleetWorkerEvent>,
/// Artifact events keyed by worker_id:run_id:task_id:path.
pub artifact_events: BTreeMap<String, FleetWorkerEvent>,
/// Completed receipts by run_id:task_id.
pub receipts: BTreeMap<String, FleetReceipt>,
}
@@ -122,6 +124,7 @@ pub struct FleetHeartbeatState {
}
/// Append-only JSONL ledger for fleet runs.
#[derive(Debug)]
pub struct FleetLedger {
ledger_path: PathBuf,
}
@@ -218,6 +221,23 @@ impl FleetLedger {
})
}
pub fn mark_task_terminal_status(
&self,
run_id: &FleetRunId,
task_id: &str,
worker_id: Option<&str>,
timestamp: &str,
status: FleetTaskLedgerStatus,
) -> Result<()> {
self.append_record(&FleetLedgerRecord::TaskCompletedOrFailed {
run_id: run_id.clone(),
task_id: task_id.to_string(),
worker_id: worker_id.unwrap_or_default().to_string(),
timestamp: timestamp.to_string(),
status,
})
}
pub fn append_event(&self, event: FleetWorkerEvent) -> Result<()> {
self.append_record(&FleetLedgerRecord::EventAppended { event })
}
@@ -374,6 +394,11 @@ impl FleetLedger {
event: event.clone(),
})?);
}
for event in state.artifact_events.values() {
lines.push(serde_json::to_string(&FleetLedgerRecord::EventAppended {
event: event.clone(),
})?);
}
for (worker_id, heartbeat) in &state.heartbeats {
lines.push(serde_json::to_string(&FleetLedgerRecord::Heartbeat {
worker_id: worker_id.clone(),
@@ -418,11 +443,23 @@ fn mark_task_terminal(
let key = task_key(&run_id.0, task_id);
if let Some(task) = state.tasks.get_mut(&key) {
task.status = status;
if !worker_id.is_empty() {
task.leased_to = Some(worker_id.to_string());
}
task.completed_at = Some(timestamp.to_string());
}
}
fn artifact_event_key(event: &FleetWorkerEvent, artifact: &FleetArtifactRef) -> String {
format!(
"{}:{}:{}:{}",
event.worker_id,
event.run_id.0,
event.task_id,
artifact.path.display()
)
}
fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) {
match record {
FleetLedgerRecord::RunCreated { run } => {
@@ -479,6 +516,11 @@ fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) {
state.latest_seq.insert(event_key.clone(), event.seq);
state.latest_events.insert(event_key, event.clone());
}
if let FleetWorkerEventPayload::Artifact(artifact) = &event.payload {
state
.artifact_events
.insert(artifact_event_key(&event, artifact), event.clone());
}
// Derive worker status from lifecycle events.
match &event.payload {
FleetWorkerEventPayload::Starting | FleetWorkerEventPayload::Running => {
+982
View File
@@ -0,0 +1,982 @@
//! Local-first fleet manager loop and operator controls.
//!
//! This module is intentionally ledger-first: the first manager can run in the
//! foreground and coordinate logical local workers while later host adapters
//! add real process and SSH execution behind the same records.
#![allow(dead_code)]
use std::collections::{BTreeMap, BTreeSet};
use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::{Context, Result, anyhow, bail};
use chrono::{DateTime, SecondsFormat, Utc};
use codewhale_protocol::fleet::*;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;
use super::ledger::{FleetLedger, FleetLedgerState, FleetTaskLedgerStatus, FleetTaskState};
const DEFAULT_STALE_AFTER_SECONDS: u64 = 300;
#[derive(Debug)]
pub struct FleetManager {
workspace: PathBuf,
ledger: FleetLedger,
stale_after: Duration,
}
#[derive(Debug, Clone)]
pub struct FleetRunReport {
pub run_id: FleetRunId,
pub task_count: usize,
pub leased: usize,
pub queued: usize,
pub worker_ids: Vec<String>,
}
#[derive(Debug, Clone, Default)]
pub struct FleetTickReport {
pub leased: usize,
pub heartbeats: usize,
}
#[derive(Debug, Clone, Default)]
pub struct FleetStatusSnapshot {
pub runs: usize,
pub queued: usize,
pub running: usize,
pub completed: usize,
pub failed: usize,
pub cancelled: usize,
pub stale: usize,
pub workers: BTreeMap<String, FleetWorkerStatus>,
}
#[derive(Debug, Clone)]
pub struct FleetWorkerInspection {
pub worker_id: String,
pub status: FleetWorkerStatus,
pub current_run_id: Option<FleetRunId>,
pub current_task_id: Option<String>,
pub latest_heartbeat_at: Option<String>,
pub latest_event: Option<FleetWorkerEvent>,
pub artifacts: Vec<FleetArtifactRef>,
pub last_error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FleetTaskSpecDocument {
#[serde(default)]
pub name: Option<String>,
#[serde(default)]
pub labels: BTreeMap<String, String>,
#[serde(default, alias = "worker_specs")]
pub workers: Vec<FleetWorkerSpec>,
#[serde(default)]
pub tasks: Vec<FleetTaskSpec>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(untagged)]
enum FleetTaskSpecFile {
Document(FleetTaskSpecDocument),
Tasks(Vec<FleetTaskSpec>),
Single(FleetTaskSpec),
}
impl FleetTaskSpecFile {
fn into_document(self, fallback_name: String) -> FleetTaskSpecDocument {
match self {
Self::Document(mut doc) => {
if doc.name.as_deref().is_none_or(str::is_empty) {
doc.name = Some(fallback_name);
}
doc
}
Self::Tasks(tasks) => FleetTaskSpecDocument {
name: Some(fallback_name),
labels: BTreeMap::new(),
workers: Vec::new(),
tasks,
},
Self::Single(task) => FleetTaskSpecDocument {
name: Some(fallback_name),
labels: BTreeMap::new(),
workers: Vec::new(),
tasks: vec![task],
},
}
}
}
impl FleetManager {
pub fn open(workspace: impl AsRef<Path>) -> Result<Self> {
let workspace = workspace.as_ref().to_path_buf();
let ledger = FleetLedger::open(&workspace)?;
Ok(Self {
workspace,
ledger,
stale_after: Duration::from_secs(DEFAULT_STALE_AFTER_SECONDS),
})
}
pub fn with_stale_after(mut self, stale_after: Duration) -> Self {
self.stale_after = stale_after;
self
}
pub fn ledger_path(&self) -> &Path {
self.ledger.path()
}
pub fn load_task_spec(path: &Path) -> Result<FleetTaskSpecDocument> {
let raw = std::fs::read_to_string(path)
.with_context(|| format!("reading fleet task spec {}", path.display()))?;
let fallback_name = path
.file_stem()
.and_then(|s| s.to_str())
.filter(|s| !s.is_empty())
.unwrap_or("fleet-run")
.to_string();
let parsed = match path.extension().and_then(|s| s.to_str()) {
Some("toml") => toml::from_str::<FleetTaskSpecFile>(&raw)
.with_context(|| format!("parsing TOML fleet task spec {}", path.display()))?,
_ => serde_json::from_str::<FleetTaskSpecFile>(&raw)
.with_context(|| format!("parsing JSON fleet task spec {}", path.display()))?,
};
let doc = parsed.into_document(fallback_name);
validate_task_spec_document(&doc)?;
Ok(doc)
}
pub fn create_run_from_task_spec_path(
&self,
path: &Path,
max_workers: usize,
) -> Result<FleetRunReport> {
let doc = Self::load_task_spec(path)?;
self.create_run(doc, max_workers)
}
pub fn create_run(
&self,
mut doc: FleetTaskSpecDocument,
max_workers: usize,
) -> Result<FleetRunReport> {
validate_task_spec_document(&doc)?;
let max_workers = max_workers.clamp(1, 128);
let run_id = FleetRunId::from(format!(
"fleet-{}",
&Uuid::new_v4().simple().to_string()[..8]
));
let now = timestamp();
if doc.workers.is_empty() {
doc.workers = default_local_workers(&run_id, max_workers);
}
let run = FleetRun {
id: run_id.clone(),
name: doc.name.unwrap_or_else(|| run_id.0.clone()),
status: FleetRunStatus::Queued,
task_specs: doc.tasks.clone(),
worker_specs: doc.workers.clone(),
labels: doc.labels,
created_at: now.clone(),
updated_at: Some(now.clone()),
completed_at: None,
};
self.ledger.create_run(&run)?;
for task in &run.task_specs {
self.ledger.enqueue(FleetInboxEntry {
run_id: run.id.clone(),
task_id: task.id.clone(),
priority: task_priority(task),
enqueued_at: now.clone(),
lease_deadline: None,
attempts: 0,
})?;
}
let initial_status = if run.task_specs.is_empty() {
FleetRunStatus::Completed
} else {
FleetRunStatus::Running
};
self.ledger
.update_run_status(&run.id, initial_status, &timestamp())?;
let tick = self.schedule_run(&run.id, max_workers)?;
self.refresh_run_status(&run.id)?;
let state = self.ledger.rebuild_state()?;
let snapshot = self.status_from_state(Some(&run.id), &state);
Ok(FleetRunReport {
run_id: run.id,
task_count: run.task_specs.len(),
leased: tick.leased,
queued: snapshot.queued,
worker_ids: run.worker_specs.iter().map(|w| w.id.clone()).collect(),
})
}
pub fn schedule_run(&self, run_id: &FleetRunId, max_workers: usize) -> Result<FleetTickReport> {
let max_workers = max_workers.clamp(1, 128);
let mut report = FleetTickReport::default();
let state = self.ledger.rebuild_state()?;
let run = state
.runs
.get(&run_id.0)
.cloned()
.ok_or_else(|| anyhow!("fleet run {} does not exist", run_id.0))?;
let worker_ids = worker_ids_for_run(&run, max_workers);
for task in active_tasks_for_run(&state, run_id) {
if let Some(worker_id) = task.leased_to.as_deref()
&& worker_ids.iter().any(|id| id == worker_id)
{
self.ledger.heartbeat(worker_id, &timestamp(), None, None)?;
report.heartbeats += 1;
}
}
loop {
let state = self.ledger.rebuild_state()?;
let active_workers = active_workers_for_run(&state, run_id);
if active_workers.len() >= max_workers {
break;
}
let Some(worker_id) = worker_ids
.iter()
.find(|id| !active_workers.contains(*id))
.cloned()
else {
break;
};
let Some((entry, task_spec)) = next_enqueued_task_for_run(&state, run_id) else {
break;
};
self.start_worker_task(&worker_id, &entry, &task_spec)?;
report.leased += 1;
}
self.refresh_run_status(run_id)?;
Ok(report)
}
pub fn status(&self) -> Result<FleetStatusSnapshot> {
let state = self.ledger.rebuild_state()?;
Ok(self.status_from_state(None, &state))
}
pub fn run_status(&self, run_id: &FleetRunId) -> Result<FleetStatusSnapshot> {
let state = self.ledger.rebuild_state()?;
Ok(self.status_from_state(Some(run_id), &state))
}
pub fn run_has_open_work(&self, run_id: &FleetRunId) -> Result<bool> {
let status = self.run_status(run_id)?;
Ok(status.queued + status.running + status.stale > 0)
}
pub fn inspect_worker(&self, worker_id: &str) -> Result<FleetWorkerInspection> {
let state = self.ledger.rebuild_state()?;
let latest_event = latest_event_for_worker(&state, worker_id).cloned();
let current = active_task_for_worker(&state, worker_id)
.or_else(|| latest_task_for_worker(&state, worker_id));
let artifacts = state
.artifact_events
.values()
.filter(|event| event.worker_id == worker_id)
.filter_map(|event| match &event.payload {
FleetWorkerEventPayload::Artifact(artifact) => Some(artifact.clone()),
_ => None,
})
.chain(
state
.receipts
.values()
.filter(|receipt| receipt.worker_id == worker_id)
.flat_map(|receipt| receipt.artifacts.clone()),
)
.collect();
let last_error = latest_error_for_worker(&state, worker_id);
let status = state
.workers
.get(worker_id)
.cloned()
.unwrap_or(FleetWorkerStatus::Unknown);
let latest_heartbeat_at = state
.heartbeats
.get(worker_id)
.map(|heartbeat| heartbeat.timestamp.clone());
Ok(FleetWorkerInspection {
worker_id: worker_id.to_string(),
status,
current_run_id: current.as_ref().map(|task| task.entry.run_id.clone()),
current_task_id: current.map(|task| task.entry.task_id.clone()),
latest_heartbeat_at,
latest_event,
artifacts,
last_error,
})
}
pub fn interrupt_worker(&self, worker_id: &str) -> Result<FleetWorkerInspection> {
let state = self.ledger.rebuild_state()?;
let Some(task) = active_task_for_worker(&state, worker_id) else {
bail!("worker {worker_id} has no running fleet task");
};
self.append_worker_event(
&task.entry.run_id,
worker_id,
&task.entry.task_id,
FleetWorkerEventPayload::Interrupted {
signal: Some("operator".to_string()),
},
)?;
self.append_worker_event(
&task.entry.run_id,
worker_id,
&task.entry.task_id,
FleetWorkerEventPayload::Cancelled {
cancelled_by: Some("operator".to_string()),
},
)?;
self.refresh_run_status(&task.entry.run_id)?;
self.inspect_worker(worker_id)
}
pub fn restart_worker(&self, worker_id: &str) -> Result<FleetWorkerInspection> {
let state = self.ledger.rebuild_state()?;
let Some(task) = active_task_for_worker(&state, worker_id)
.or_else(|| latest_task_for_worker(&state, worker_id))
else {
bail!("worker {worker_id} has no fleet task to restart");
};
let now = timestamp();
self.ledger.lease_task(
&task.entry.run_id,
&task.entry.task_id,
worker_id,
&now,
None,
)?;
self.append_worker_event(
&task.entry.run_id,
worker_id,
&task.entry.task_id,
FleetWorkerEventPayload::Restarted { restart_count: 1 },
)?;
self.append_worker_event(
&task.entry.run_id,
worker_id,
&task.entry.task_id,
FleetWorkerEventPayload::Running,
)?;
self.ledger.heartbeat(worker_id, &timestamp(), None, None)?;
self.ledger
.update_run_status(&task.entry.run_id, FleetRunStatus::Running, &timestamp())?;
self.inspect_worker(worker_id)
}
pub fn stop_all(&self) -> Result<usize> {
let state = self.ledger.rebuild_state()?;
let now = timestamp();
let mut affected_runs = BTreeSet::new();
let mut stopped = 0usize;
for task in state.tasks.values() {
if !matches!(
task.status,
FleetTaskLedgerStatus::Enqueued | FleetTaskLedgerStatus::Leased
) {
continue;
}
if let Some(worker_id) = task.leased_to.as_deref() {
self.append_worker_event(
&task.entry.run_id,
worker_id,
&task.entry.task_id,
FleetWorkerEventPayload::Interrupted {
signal: Some("stop_all".to_string()),
},
)?;
}
self.ledger.mark_task_terminal_status(
&task.entry.run_id,
&task.entry.task_id,
task.leased_to.as_deref(),
&now,
FleetTaskLedgerStatus::Cancelled,
)?;
affected_runs.insert(task.entry.run_id.0.clone());
stopped += 1;
}
for run_id in affected_runs {
self.ledger.update_run_status(
&FleetRunId::from(run_id),
FleetRunStatus::Cancelled,
&timestamp(),
)?;
}
Ok(stopped)
}
fn start_worker_task(
&self,
worker_id: &str,
entry: &FleetInboxEntry,
task_spec: &FleetTaskSpec,
) -> Result<()> {
let now = timestamp();
self.ledger
.lease_task(&entry.run_id, &entry.task_id, worker_id, &now, None)?;
self.append_worker_event(
&entry.run_id,
worker_id,
&entry.task_id,
FleetWorkerEventPayload::Leased {
lease_expires_at: None,
},
)?;
self.append_worker_event(
&entry.run_id,
worker_id,
&entry.task_id,
FleetWorkerEventPayload::Starting,
)?;
let log_artifact = self.write_log_artifact(&entry.run_id, worker_id, task_spec)?;
self.append_worker_event(
&entry.run_id,
worker_id,
&entry.task_id,
FleetWorkerEventPayload::Artifact(log_artifact.clone()),
)?;
self.append_worker_event(
&entry.run_id,
worker_id,
&entry.task_id,
FleetWorkerEventPayload::Running,
)?;
self.ledger.heartbeat(worker_id, &timestamp(), None, None)?;
self.maybe_complete_local_simulation(entry, worker_id, task_spec, log_artifact)
}
fn maybe_complete_local_simulation(
&self,
entry: &FleetInboxEntry,
worker_id: &str,
task_spec: &FleetTaskSpec,
log_artifact: FleetArtifactRef,
) -> Result<()> {
let Some(result) = local_simulation_result(task_spec) else {
return Ok(());
};
let now = timestamp();
let (payload, receipt_result) = match result {
FleetLocalSimulationResult::Pass => (
FleetWorkerEventPayload::Completed {
exit_code: Some(0),
summary: Some("local fleet smoke task completed".to_string()),
},
FleetTaskResult::Pass,
),
FleetLocalSimulationResult::Fail => (
FleetWorkerEventPayload::Failed {
reason: "local fleet smoke task failed".to_string(),
recoverable: false,
},
FleetTaskResult::Fail,
),
FleetLocalSimulationResult::Skip => (
FleetWorkerEventPayload::Completed {
exit_code: Some(0),
summary: Some("local fleet smoke task skipped".to_string()),
},
FleetTaskResult::Skip,
),
FleetLocalSimulationResult::Timeout => (
FleetWorkerEventPayload::Failed {
reason: "local fleet smoke task timed out".to_string(),
recoverable: true,
},
FleetTaskResult::Timeout,
),
};
self.append_worker_event(&entry.run_id, worker_id, &entry.task_id, payload)?;
self.ledger.record_receipt(FleetReceipt {
run_id: entry.run_id.clone(),
task_id: entry.task_id.clone(),
worker_id: worker_id.to_string(),
completed_at: now,
result: receipt_result,
artifacts: vec![log_artifact],
score: None,
})
}
fn append_worker_event(
&self,
run_id: &FleetRunId,
worker_id: &str,
task_id: &str,
payload: FleetWorkerEventPayload,
) -> Result<FleetWorkerEvent> {
let state = self.ledger.rebuild_state()?;
let key = event_key(worker_id, &run_id.0, task_id);
let seq = state.latest_seq.get(&key).copied().unwrap_or(0) + 1;
let event = FleetWorkerEvent {
seq,
run_id: run_id.clone(),
worker_id: worker_id.to_string(),
task_id: task_id.to_string(),
timestamp: timestamp(),
payload,
extra: BTreeMap::new(),
};
self.ledger.append_event(event.clone())?;
Ok(event)
}
fn write_log_artifact(
&self,
run_id: &FleetRunId,
worker_id: &str,
task_spec: &FleetTaskSpec,
) -> Result<FleetArtifactRef> {
let rel_path = PathBuf::from(".codewhale")
.join("fleet")
.join(safe_path_segment(&run_id.0))
.join(safe_path_segment(&task_spec.id))
.join(format!("{}.log", safe_path_segment(worker_id)));
let abs_path = self.workspace.join(&rel_path);
if let Some(parent) = abs_path.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("creating fleet artifact dir {}", parent.display()))?;
}
let contents = format!(
"run_id={}\ntask_id={}\ntask_name={}\nworker_id={}\nstatus=started\n",
run_id.0, task_spec.id, task_spec.name, worker_id
);
std::fs::write(&abs_path, contents)
.with_context(|| format!("writing fleet worker log {}", abs_path.display()))?;
let size_bytes = std::fs::metadata(&abs_path).ok().map(|m| m.len());
Ok(FleetArtifactRef {
kind: FleetArtifactKind::Log,
path: rel_path,
checksum: None,
mime_type: Some("text/plain".to_string()),
size_bytes,
})
}
fn refresh_run_status(&self, run_id: &FleetRunId) -> Result<()> {
let state = self.ledger.rebuild_state()?;
let mut has_queued = false;
let mut has_running = false;
let mut has_failed = false;
let mut has_cancelled = false;
let mut has_tasks = false;
for task in state
.tasks
.values()
.filter(|task| task.entry.run_id == *run_id)
{
has_tasks = true;
match task.status {
FleetTaskLedgerStatus::Enqueued => has_queued = true,
FleetTaskLedgerStatus::Leased => has_running = true,
FleetTaskLedgerStatus::Failed => has_failed = true,
FleetTaskLedgerStatus::Cancelled => has_cancelled = true,
FleetTaskLedgerStatus::Completed => {}
}
}
let status = if !has_tasks {
FleetRunStatus::Completed
} else if has_queued || has_running {
FleetRunStatus::Running
} else if has_failed {
FleetRunStatus::Failed
} else if has_cancelled {
FleetRunStatus::Cancelled
} else {
FleetRunStatus::Completed
};
self.ledger
.update_run_status(run_id, status, &timestamp())
.context("updating fleet run status")
}
fn status_from_state(
&self,
run_filter: Option<&FleetRunId>,
state: &FleetLedgerState,
) -> FleetStatusSnapshot {
let mut snapshot = FleetStatusSnapshot {
runs: state.runs.len(),
workers: state.workers.clone(),
..FleetStatusSnapshot::default()
};
for task in state.tasks.values() {
if run_filter.is_some_and(|run_id| task.entry.run_id != *run_id) {
continue;
}
match task.status {
FleetTaskLedgerStatus::Enqueued => snapshot.queued += 1,
FleetTaskLedgerStatus::Leased => {
if self.task_is_stale(task, state) {
snapshot.stale += 1;
} else {
snapshot.running += 1;
}
}
FleetTaskLedgerStatus::Completed => snapshot.completed += 1,
FleetTaskLedgerStatus::Failed => snapshot.failed += 1,
FleetTaskLedgerStatus::Cancelled => snapshot.cancelled += 1,
}
}
snapshot
}
fn task_is_stale(&self, task: &FleetTaskState, state: &FleetLedgerState) -> bool {
let Some(worker_id) = task.leased_to.as_deref() else {
return true;
};
let Some(heartbeat) = state.heartbeats.get(worker_id) else {
return true;
};
let Ok(last) = DateTime::parse_from_rfc3339(&heartbeat.timestamp) else {
return true;
};
let age = Utc::now().signed_duration_since(last.with_timezone(&Utc));
age.to_std()
.is_ok_and(|duration| duration > self.stale_after)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum FleetLocalSimulationResult {
Pass,
Fail,
Skip,
Timeout,
}
fn validate_task_spec_document(doc: &FleetTaskSpecDocument) -> Result<()> {
if doc.tasks.is_empty() {
bail!("fleet task spec must include at least one task");
}
let mut ids = BTreeSet::new();
for task in &doc.tasks {
if task.id.trim().is_empty() {
bail!("fleet task id cannot be empty");
}
if !ids.insert(task.id.clone()) {
bail!("duplicate fleet task id {}", task.id);
}
if task.name.trim().is_empty() {
bail!("fleet task {} name cannot be empty", task.id);
}
if task.instructions.trim().is_empty() {
bail!("fleet task {} instructions cannot be empty", task.id);
}
}
Ok(())
}
fn default_local_workers(run_id: &FleetRunId, max_workers: usize) -> Vec<FleetWorkerSpec> {
(1..=max_workers)
.map(|index| FleetWorkerSpec {
id: format!("{}-local-{}", run_id.0, index),
name: format!("Local worker {index}"),
host: FleetHostSpec::Local,
labels: BTreeMap::new(),
capabilities: vec!["local".to_string()],
max_concurrent_tasks: Some(1),
})
.collect()
}
fn worker_ids_for_run(run: &FleetRun, max_workers: usize) -> Vec<String> {
run.worker_specs
.iter()
.take(max_workers)
.map(|worker| worker.id.clone())
.collect()
}
fn active_workers_for_run(state: &FleetLedgerState, run_id: &FleetRunId) -> BTreeSet<String> {
active_tasks_for_run(state, run_id)
.filter_map(|task| task.leased_to.clone())
.collect()
}
fn active_tasks_for_run<'a>(
state: &'a FleetLedgerState,
run_id: &'a FleetRunId,
) -> impl Iterator<Item = &'a FleetTaskState> {
state.tasks.values().filter(move |task| {
task.entry.run_id == *run_id && matches!(task.status, FleetTaskLedgerStatus::Leased)
})
}
fn active_task_for_worker<'a>(
state: &'a FleetLedgerState,
worker_id: &str,
) -> Option<&'a FleetTaskState> {
state.tasks.values().find(|task| {
task.leased_to.as_deref() == Some(worker_id)
&& matches!(task.status, FleetTaskLedgerStatus::Leased)
})
}
fn latest_task_for_worker<'a>(
state: &'a FleetLedgerState,
worker_id: &str,
) -> Option<&'a FleetTaskState> {
state
.tasks
.values()
.filter(|task| task.leased_to.as_deref() == Some(worker_id))
.max_by_key(|task| task.completed_at.as_deref().or(task.leased_at.as_deref()))
}
fn next_enqueued_task_for_run(
state: &FleetLedgerState,
run_id: &FleetRunId,
) -> Option<(FleetInboxEntry, FleetTaskSpec)> {
let run = state.runs.get(&run_id.0)?;
let task = state
.tasks
.values()
.filter(|task| {
task.entry.run_id == *run_id && matches!(task.status, FleetTaskLedgerStatus::Enqueued)
})
.min_by_key(|task| {
(
task.entry.priority,
task.entry.enqueued_at.clone(),
task.entry.task_id.clone(),
)
})?;
let task_spec = run
.task_specs
.iter()
.find(|spec| spec.id == task.entry.task_id)
.cloned()?;
Some((task.entry.clone(), task_spec))
}
fn latest_event_for_worker<'a>(
state: &'a FleetLedgerState,
worker_id: &str,
) -> Option<&'a FleetWorkerEvent> {
state
.latest_events
.values()
.filter(|event| event.worker_id == worker_id)
.max_by_key(|event| event.seq)
}
fn latest_error_for_worker(state: &FleetLedgerState, worker_id: &str) -> Option<String> {
state
.latest_events
.values()
.filter(|event| event.worker_id == worker_id)
.filter_map(|event| match &event.payload {
FleetWorkerEventPayload::Failed { reason, .. } => {
Some((event.seq, format!("failed: {reason}")))
}
FleetWorkerEventPayload::Cancelled { cancelled_by } => Some((
event.seq,
cancelled_by
.as_ref()
.map(|by| format!("cancelled by {by}"))
.unwrap_or_else(|| "cancelled".to_string()),
)),
FleetWorkerEventPayload::Interrupted { signal } => Some((
event.seq,
signal
.as_ref()
.map(|signal| format!("interrupted by {signal}"))
.unwrap_or_else(|| "interrupted".to_string()),
)),
FleetWorkerEventPayload::Stale { last_heartbeat_at } => Some((
event.seq,
last_heartbeat_at
.as_ref()
.map(|ts| format!("stale since {ts}"))
.unwrap_or_else(|| "stale".to_string()),
)),
_ => None,
})
.max_by_key(|(seq, _)| *seq)
.map(|(_, message)| message)
}
fn local_simulation_result(task: &FleetTaskSpec) -> Option<FleetLocalSimulationResult> {
if task
.metadata
.get("local_complete")
.and_then(Value::as_bool)
.unwrap_or(false)
{
return Some(FleetLocalSimulationResult::Pass);
}
match task
.metadata
.get("local_result")
.and_then(Value::as_str)
.map(str::to_ascii_lowercase)
.as_deref()
{
Some("pass" | "passed" | "ok" | "completed") => Some(FleetLocalSimulationResult::Pass),
Some("fail" | "failed" | "error") => Some(FleetLocalSimulationResult::Fail),
Some("skip" | "skipped") => Some(FleetLocalSimulationResult::Skip),
Some("timeout" | "timed_out") => Some(FleetLocalSimulationResult::Timeout),
_ => None,
}
}
fn task_priority(task: &FleetTaskSpec) -> i32 {
task.metadata
.get("priority")
.and_then(Value::as_i64)
.and_then(|value| i32::try_from(value).ok())
.unwrap_or(0)
}
fn event_key(worker_id: &str, run_id: &str, task_id: &str) -> String {
format!("{worker_id}:{run_id}:{task_id}")
}
fn timestamp() -> String {
Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true)
}
fn safe_path_segment(value: &str) -> String {
value
.chars()
.map(|ch| {
if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.') {
ch
} else {
'_'
}
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use tempfile::TempDir;
fn task(id: &str) -> FleetTaskSpec {
FleetTaskSpec {
id: id.to_string(),
name: id.to_string(),
description: None,
instructions: format!("do {id}"),
expected_artifacts: vec![FleetArtifactKind::Log],
scorer: None,
retry_policy: None,
alert_policy: None,
timeout_seconds: None,
metadata: BTreeMap::new(),
}
}
fn task_spec_file(dir: &TempDir, tasks: Vec<FleetTaskSpec>) -> PathBuf {
let path = dir.path().join("fleet-tasks.json");
let doc = json!({
"name": "manager smoke",
"tasks": tasks,
});
std::fs::write(&path, serde_json::to_string_pretty(&doc).unwrap()).unwrap();
path
}
#[test]
fn fleet_manager_creates_run_and_starts_workers_up_to_cap() {
let tmp = TempDir::new().unwrap();
let manager = FleetManager::open(tmp.path()).unwrap();
let path = task_spec_file(&tmp, vec![task("task-a"), task("task-b"), task("task-c")]);
let report = manager.create_run_from_task_spec_path(&path, 2).unwrap();
assert_eq!(report.task_count, 3);
assert_eq!(report.leased, 2);
assert_eq!(report.queued, 1);
assert_eq!(report.worker_ids.len(), 2);
let status = manager.run_status(&report.run_id).unwrap();
assert_eq!(status.queued, 1);
assert_eq!(status.running, 2);
assert_eq!(status.completed, 0);
}
#[test]
fn fleet_manager_inspect_exposes_heartbeat_artifacts_and_errors() {
let tmp = TempDir::new().unwrap();
let manager = FleetManager::open(tmp.path()).unwrap();
let path = task_spec_file(&tmp, vec![task("task-a")]);
let report = manager.create_run_from_task_spec_path(&path, 1).unwrap();
let worker_id = &report.worker_ids[0];
let inspection = manager.inspect_worker(worker_id).unwrap();
assert_eq!(inspection.status, FleetWorkerStatus::Busy);
assert_eq!(inspection.current_task_id.as_deref(), Some("task-a"));
assert!(inspection.latest_heartbeat_at.is_some());
assert_eq!(inspection.artifacts.len(), 1);
assert!(inspection.last_error.is_none());
let inspection = manager.interrupt_worker(worker_id).unwrap();
assert_eq!(inspection.status, FleetWorkerStatus::Online);
assert_eq!(
inspection.last_error.as_deref(),
Some("cancelled by operator")
);
let status = manager.run_status(&report.run_id).unwrap();
assert_eq!(status.cancelled, 1);
}
#[test]
fn fleet_manager_restart_and_stop_all_are_ledgered() {
let tmp = TempDir::new().unwrap();
let manager = FleetManager::open(tmp.path()).unwrap();
let path = task_spec_file(&tmp, vec![task("task-a"), task("task-b")]);
let report = manager.create_run_from_task_spec_path(&path, 1).unwrap();
let worker_id = &report.worker_ids[0];
manager.interrupt_worker(worker_id).unwrap();
let inspection = manager.restart_worker(worker_id).unwrap();
assert_eq!(inspection.status, FleetWorkerStatus::Busy);
let status = manager.run_status(&report.run_id).unwrap();
assert_eq!(status.running, 1);
assert_eq!(status.queued, 1);
let stopped = manager.stop_all().unwrap();
assert_eq!(stopped, 2);
let status = manager.run_status(&report.run_id).unwrap();
assert_eq!(status.cancelled, 2);
assert_eq!(status.running, 0);
}
#[test]
fn fleet_manager_can_record_completed_local_smoke_tasks() {
let tmp = TempDir::new().unwrap();
let manager = FleetManager::open(tmp.path()).unwrap();
let mut completed = task("task-a");
completed
.metadata
.insert("local_result".to_string(), json!("pass"));
let path = task_spec_file(&tmp, vec![completed]);
let report = manager.create_run_from_task_spec_path(&path, 1).unwrap();
let status = manager.run_status(&report.run_id).unwrap();
assert_eq!(status.completed, 1);
assert_eq!(status.running, 0);
let state = manager.ledger.rebuild_state().unwrap();
assert_eq!(state.receipts.len(), 1);
}
}
+1
View File
@@ -1,3 +1,4 @@
//! Agent Fleet control plane — local-first manager, ledger, and workers.
pub mod ledger;
pub mod manager;
+260
View File
@@ -243,6 +243,8 @@ enum Commands {
Exec(ExecArgs),
/// Generate SWE-bench prediction rows from CodeWhale runs
Swebench(SwebenchArgs),
/// Manage local Agent Fleet runs and workers
Fleet(FleetArgs),
/// Run a code review over a git diff
Review(ReviewArgs),
/// Open the TUI pre-seeded with a GitHub PR's title, body, and diff (#451)
@@ -373,6 +375,59 @@ enum SwebenchCommand {
Export(SwebenchExportArgs),
}
#[derive(Args, Debug, Clone)]
struct FleetArgs {
#[command(subcommand)]
command: FleetCommand,
}
#[derive(Subcommand, Debug, Clone)]
enum FleetCommand {
/// Initialize the local fleet ledger for this workspace
Init,
/// Create a run from a task spec and start the foreground manager loop
Run(FleetRunArgs),
/// Show queued/running/completed/failed/stale fleet counts
Status,
/// Inspect one worker's status, heartbeat, latest event, and artifacts
Inspect {
/// Worker id printed by `codewhale fleet run`
worker_id: String,
},
/// Interrupt a running worker task and record a terminal cancellation
Interrupt {
/// Worker id printed by `codewhale fleet run`
worker_id: String,
},
/// Restart the latest task for a worker
Restart {
/// Worker id printed by `codewhale fleet run`
worker_id: String,
},
/// Stop all queued and running fleet work
Stop {
/// Confirm stopping all queued and running fleet tasks
#[arg(long, required = true)]
all: bool,
},
}
#[derive(Args, Debug, Clone)]
struct FleetRunArgs {
/// JSON or TOML task spec to enqueue
#[arg(value_name = "TASK_SPEC")]
task_spec: PathBuf,
/// Maximum local workers to lease concurrently
#[arg(long, default_value_t = 4)]
max_workers: usize,
/// Seconds without heartbeat before a running task is counted stale
#[arg(long, default_value_t = 300)]
stale_after_seconds: u64,
/// Schedule once and return instead of staying in the manager loop
#[arg(long, hide = true, default_value_t = false)]
once: bool,
}
#[derive(Args, Debug, Clone)]
struct SwebenchRunArgs {
/// SWE-bench instance id, e.g. django__django-12345
@@ -1068,6 +1123,10 @@ async fn main() -> Result<()> {
);
run_swebench_command(&config, &model, workspace, max_subagents, args).await
}
Commands::Fleet(args) => {
let workspace = resolve_workspace(&cli);
run_fleet_command(&workspace, args).await
}
Commands::Review(args) => {
let config = load_config_from_cli(&cli)?;
run_review(&config, args).await
@@ -1330,6 +1389,207 @@ async fn run_swebench_command(
}
}
async fn run_fleet_command(workspace: &Path, args: FleetArgs) -> Result<()> {
use crate::fleet::manager::{FleetManager, FleetStatusSnapshot, FleetWorkerInspection};
use codewhale_protocol::fleet::{
FleetArtifactKind, FleetWorkerEventPayload, FleetWorkerStatus,
};
fn worker_status_label(status: &FleetWorkerStatus) -> &'static str {
match status {
FleetWorkerStatus::Unknown => "unknown",
FleetWorkerStatus::Online => "online",
FleetWorkerStatus::Busy => "busy",
FleetWorkerStatus::Offline => "offline",
FleetWorkerStatus::Unhealthy => "unhealthy",
FleetWorkerStatus::Draining => "draining",
FleetWorkerStatus::Retired => "retired",
}
}
fn artifact_kind_label(kind: &FleetArtifactKind) -> String {
match kind {
FleetArtifactKind::Log => "log".to_string(),
FleetArtifactKind::Patch => "patch".to_string(),
FleetArtifactKind::TestResult => "test_result".to_string(),
FleetArtifactKind::Report => "report".to_string(),
FleetArtifactKind::Checkpoint => "checkpoint".to_string(),
FleetArtifactKind::Receipt => "receipt".to_string(),
FleetArtifactKind::Other(value) => value.clone(),
}
}
fn event_label(payload: &FleetWorkerEventPayload) -> String {
match payload {
FleetWorkerEventPayload::Queued => "queued".to_string(),
FleetWorkerEventPayload::Leased { .. } => "leased".to_string(),
FleetWorkerEventPayload::Starting => "starting".to_string(),
FleetWorkerEventPayload::Running => "running".to_string(),
FleetWorkerEventPayload::ModelWait { model } => model
.as_ref()
.map(|model| format!("model_wait model={model}"))
.unwrap_or_else(|| "model_wait".to_string()),
FleetWorkerEventPayload::RunningTool { tool, call_id } => call_id
.as_ref()
.map(|call_id| format!("running_tool tool={tool} call_id={call_id}"))
.unwrap_or_else(|| format!("running_tool tool={tool}")),
FleetWorkerEventPayload::Heartbeat { .. } => "heartbeat".to_string(),
FleetWorkerEventPayload::Artifact(artifact) => {
format!("artifact kind={}", artifact_kind_label(&artifact.kind))
}
FleetWorkerEventPayload::Completed { exit_code, summary } => match (exit_code, summary)
{
(Some(code), Some(summary)) => format!("completed exit_code={code} {summary}"),
(Some(code), None) => format!("completed exit_code={code}"),
(None, Some(summary)) => format!("completed {summary}"),
(None, None) => "completed".to_string(),
},
FleetWorkerEventPayload::Failed {
reason,
recoverable,
} => {
format!("failed recoverable={recoverable} reason={reason}")
}
FleetWorkerEventPayload::Cancelled { cancelled_by } => cancelled_by
.as_ref()
.map(|by| format!("cancelled by={by}"))
.unwrap_or_else(|| "cancelled".to_string()),
FleetWorkerEventPayload::Interrupted { signal } => signal
.as_ref()
.map(|signal| format!("interrupted signal={signal}"))
.unwrap_or_else(|| "interrupted".to_string()),
FleetWorkerEventPayload::Stale { last_heartbeat_at } => last_heartbeat_at
.as_ref()
.map(|ts| format!("stale last_heartbeat_at={ts}"))
.unwrap_or_else(|| "stale".to_string()),
FleetWorkerEventPayload::Restarted { restart_count } => {
format!("restarted count={restart_count}")
}
FleetWorkerEventPayload::Escalated { channel, alert_id } => alert_id
.as_ref()
.map(|alert_id| format!("escalated channel={channel} alert_id={alert_id}"))
.unwrap_or_else(|| format!("escalated channel={channel}")),
}
}
fn print_status(status: &FleetStatusSnapshot) {
println!(
"fleet: runs={} queued={} running={} completed={} failed={} cancelled={} stale={}",
status.runs,
status.queued,
status.running,
status.completed,
status.failed,
status.cancelled,
status.stale
);
if !status.workers.is_empty() {
println!("workers:");
for (worker_id, worker_status) in &status.workers {
println!(" {worker_id} {}", worker_status_label(worker_status));
}
}
}
fn print_inspection(inspection: &FleetWorkerInspection) {
println!("worker: {}", inspection.worker_id);
println!("status: {}", worker_status_label(&inspection.status));
if let Some(run_id) = &inspection.current_run_id {
println!("run: {}", run_id.0);
}
if let Some(task_id) = &inspection.current_task_id {
println!("task: {task_id}");
}
if let Some(heartbeat) = &inspection.latest_heartbeat_at {
println!("heartbeat: {heartbeat}");
}
if let Some(event) = &inspection.latest_event {
println!(
"latest_event: seq={} {}",
event.seq,
event_label(&event.payload)
);
}
if !inspection.artifacts.is_empty() {
println!("artifacts:");
for artifact in &inspection.artifacts {
println!(
" {} {}",
artifact_kind_label(&artifact.kind),
artifact.path.display()
);
}
}
if let Some(error) = &inspection.last_error {
println!("last_error: {error}");
}
}
let manager = FleetManager::open(workspace)?;
match args.command {
FleetCommand::Init => {
println!("fleet ledger: {}", manager.ledger_path().display());
Ok(())
}
FleetCommand::Run(args) => {
let max_workers = args.max_workers.clamp(1, 128);
let manager =
manager.with_stale_after(Duration::from_secs(args.stale_after_seconds.max(1)));
let report = manager.create_run_from_task_spec_path(&args.task_spec, max_workers)?;
println!(
"fleet run: {} tasks={} leased={} queued={}",
report.run_id.0, report.task_count, report.leased, report.queued
);
println!("workers:");
for worker_id in &report.worker_ids {
println!(" {worker_id}");
}
if args.once {
print_status(&manager.run_status(&report.run_id)?);
return Ok(());
}
println!(
"manager loop running; use `codewhale fleet status`, `inspect`, `interrupt`, or `stop --all` from another terminal."
);
loop {
manager.schedule_run(&report.run_id, max_workers)?;
if !manager.run_has_open_work(&report.run_id)? {
print_status(&manager.run_status(&report.run_id)?);
break;
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
Ok(())
}
FleetCommand::Status => {
print_status(&manager.status()?);
Ok(())
}
FleetCommand::Inspect { worker_id } => {
print_inspection(&manager.inspect_worker(&worker_id)?);
Ok(())
}
FleetCommand::Interrupt { worker_id } => {
let inspection = manager.interrupt_worker(&worker_id)?;
print_inspection(&inspection);
Ok(())
}
FleetCommand::Restart { worker_id } => {
let inspection = manager.restart_worker(&worker_id)?;
print_inspection(&inspection);
Ok(())
}
FleetCommand::Stop { all } => {
if !all {
bail!("pass --all to stop all fleet work");
}
let stopped = manager.stop_all()?;
println!("stopped: {stopped}");
Ok(())
}
}
}
fn swebench_prompt(
instance_id: &str,
workspace: &Path,