From 66a45c00704af5ab22baca6f46106ac93189b178 Mon Sep 17 00:00:00 2001 From: Hunter B Date: Fri, 12 Jun 2026 18:15:18 -0700 Subject: [PATCH] feat(tui): add local fleet manager commands --- crates/tui/src/fleet/ledger.rs | 44 +- crates/tui/src/fleet/manager.rs | 982 ++++++++++++++++++++++++++++++++ crates/tui/src/fleet/mod.rs | 1 + crates/tui/src/main.rs | 260 +++++++++ 4 files changed, 1286 insertions(+), 1 deletion(-) create mode 100644 crates/tui/src/fleet/manager.rs diff --git a/crates/tui/src/fleet/ledger.rs b/crates/tui/src/fleet/ledger.rs index 51a4cf1e..90e7386a 100644 --- a/crates/tui/src/fleet/ledger.rs +++ b/crates/tui/src/fleet/ledger.rs @@ -87,6 +87,8 @@ pub struct FleetLedgerState { pub latest_seq: BTreeMap, /// Latest event envelope per worker_id:run_id:task_id. pub latest_events: BTreeMap, + /// Artifact events keyed by worker_id:run_id:task_id:path. + pub artifact_events: BTreeMap, /// Completed receipts by run_id:task_id. pub receipts: BTreeMap, } @@ -122,6 +124,7 @@ pub struct FleetHeartbeatState { } /// Append-only JSONL ledger for fleet runs. +#[derive(Debug)] pub struct FleetLedger { ledger_path: PathBuf, } @@ -218,6 +221,23 @@ impl FleetLedger { }) } + pub fn mark_task_terminal_status( + &self, + run_id: &FleetRunId, + task_id: &str, + worker_id: Option<&str>, + timestamp: &str, + status: FleetTaskLedgerStatus, + ) -> Result<()> { + self.append_record(&FleetLedgerRecord::TaskCompletedOrFailed { + run_id: run_id.clone(), + task_id: task_id.to_string(), + worker_id: worker_id.unwrap_or_default().to_string(), + timestamp: timestamp.to_string(), + status, + }) + } + pub fn append_event(&self, event: FleetWorkerEvent) -> Result<()> { self.append_record(&FleetLedgerRecord::EventAppended { event }) } @@ -374,6 +394,11 @@ impl FleetLedger { event: event.clone(), })?); } + for event in state.artifact_events.values() { + lines.push(serde_json::to_string(&FleetLedgerRecord::EventAppended { + event: event.clone(), + })?); + } for (worker_id, heartbeat) in &state.heartbeats { lines.push(serde_json::to_string(&FleetLedgerRecord::Heartbeat { worker_id: worker_id.clone(), @@ -418,11 +443,23 @@ fn mark_task_terminal( let key = task_key(&run_id.0, task_id); if let Some(task) = state.tasks.get_mut(&key) { task.status = status; - task.leased_to = Some(worker_id.to_string()); + if !worker_id.is_empty() { + task.leased_to = Some(worker_id.to_string()); + } task.completed_at = Some(timestamp.to_string()); } } +fn artifact_event_key(event: &FleetWorkerEvent, artifact: &FleetArtifactRef) -> String { + format!( + "{}:{}:{}:{}", + event.worker_id, + event.run_id.0, + event.task_id, + artifact.path.display() + ) +} + fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) { match record { FleetLedgerRecord::RunCreated { run } => { @@ -479,6 +516,11 @@ fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) { state.latest_seq.insert(event_key.clone(), event.seq); state.latest_events.insert(event_key, event.clone()); } + if let FleetWorkerEventPayload::Artifact(artifact) = &event.payload { + state + .artifact_events + .insert(artifact_event_key(&event, artifact), event.clone()); + } // Derive worker status from lifecycle events. match &event.payload { FleetWorkerEventPayload::Starting | FleetWorkerEventPayload::Running => { diff --git a/crates/tui/src/fleet/manager.rs b/crates/tui/src/fleet/manager.rs new file mode 100644 index 00000000..f1d61bc7 --- /dev/null +++ b/crates/tui/src/fleet/manager.rs @@ -0,0 +1,982 @@ +//! Local-first fleet manager loop and operator controls. +//! +//! This module is intentionally ledger-first: the first manager can run in the +//! foreground and coordinate logical local workers while later host adapters +//! add real process and SSH execution behind the same records. + +#![allow(dead_code)] + +use std::collections::{BTreeMap, BTreeSet}; +use std::path::{Path, PathBuf}; +use std::time::Duration; + +use anyhow::{Context, Result, anyhow, bail}; +use chrono::{DateTime, SecondsFormat, Utc}; +use codewhale_protocol::fleet::*; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use uuid::Uuid; + +use super::ledger::{FleetLedger, FleetLedgerState, FleetTaskLedgerStatus, FleetTaskState}; + +const DEFAULT_STALE_AFTER_SECONDS: u64 = 300; + +#[derive(Debug)] +pub struct FleetManager { + workspace: PathBuf, + ledger: FleetLedger, + stale_after: Duration, +} + +#[derive(Debug, Clone)] +pub struct FleetRunReport { + pub run_id: FleetRunId, + pub task_count: usize, + pub leased: usize, + pub queued: usize, + pub worker_ids: Vec, +} + +#[derive(Debug, Clone, Default)] +pub struct FleetTickReport { + pub leased: usize, + pub heartbeats: usize, +} + +#[derive(Debug, Clone, Default)] +pub struct FleetStatusSnapshot { + pub runs: usize, + pub queued: usize, + pub running: usize, + pub completed: usize, + pub failed: usize, + pub cancelled: usize, + pub stale: usize, + pub workers: BTreeMap, +} + +#[derive(Debug, Clone)] +pub struct FleetWorkerInspection { + pub worker_id: String, + pub status: FleetWorkerStatus, + pub current_run_id: Option, + pub current_task_id: Option, + pub latest_heartbeat_at: Option, + pub latest_event: Option, + pub artifacts: Vec, + pub last_error: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FleetTaskSpecDocument { + #[serde(default)] + pub name: Option, + #[serde(default)] + pub labels: BTreeMap, + #[serde(default, alias = "worker_specs")] + pub workers: Vec, + #[serde(default)] + pub tasks: Vec, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(untagged)] +enum FleetTaskSpecFile { + Document(FleetTaskSpecDocument), + Tasks(Vec), + Single(FleetTaskSpec), +} + +impl FleetTaskSpecFile { + fn into_document(self, fallback_name: String) -> FleetTaskSpecDocument { + match self { + Self::Document(mut doc) => { + if doc.name.as_deref().is_none_or(str::is_empty) { + doc.name = Some(fallback_name); + } + doc + } + Self::Tasks(tasks) => FleetTaskSpecDocument { + name: Some(fallback_name), + labels: BTreeMap::new(), + workers: Vec::new(), + tasks, + }, + Self::Single(task) => FleetTaskSpecDocument { + name: Some(fallback_name), + labels: BTreeMap::new(), + workers: Vec::new(), + tasks: vec![task], + }, + } + } +} + +impl FleetManager { + pub fn open(workspace: impl AsRef) -> Result { + let workspace = workspace.as_ref().to_path_buf(); + let ledger = FleetLedger::open(&workspace)?; + Ok(Self { + workspace, + ledger, + stale_after: Duration::from_secs(DEFAULT_STALE_AFTER_SECONDS), + }) + } + + pub fn with_stale_after(mut self, stale_after: Duration) -> Self { + self.stale_after = stale_after; + self + } + + pub fn ledger_path(&self) -> &Path { + self.ledger.path() + } + + pub fn load_task_spec(path: &Path) -> Result { + let raw = std::fs::read_to_string(path) + .with_context(|| format!("reading fleet task spec {}", path.display()))?; + let fallback_name = path + .file_stem() + .and_then(|s| s.to_str()) + .filter(|s| !s.is_empty()) + .unwrap_or("fleet-run") + .to_string(); + let parsed = match path.extension().and_then(|s| s.to_str()) { + Some("toml") => toml::from_str::(&raw) + .with_context(|| format!("parsing TOML fleet task spec {}", path.display()))?, + _ => serde_json::from_str::(&raw) + .with_context(|| format!("parsing JSON fleet task spec {}", path.display()))?, + }; + let doc = parsed.into_document(fallback_name); + validate_task_spec_document(&doc)?; + Ok(doc) + } + + pub fn create_run_from_task_spec_path( + &self, + path: &Path, + max_workers: usize, + ) -> Result { + let doc = Self::load_task_spec(path)?; + self.create_run(doc, max_workers) + } + + pub fn create_run( + &self, + mut doc: FleetTaskSpecDocument, + max_workers: usize, + ) -> Result { + validate_task_spec_document(&doc)?; + let max_workers = max_workers.clamp(1, 128); + let run_id = FleetRunId::from(format!( + "fleet-{}", + &Uuid::new_v4().simple().to_string()[..8] + )); + let now = timestamp(); + if doc.workers.is_empty() { + doc.workers = default_local_workers(&run_id, max_workers); + } + let run = FleetRun { + id: run_id.clone(), + name: doc.name.unwrap_or_else(|| run_id.0.clone()), + status: FleetRunStatus::Queued, + task_specs: doc.tasks.clone(), + worker_specs: doc.workers.clone(), + labels: doc.labels, + created_at: now.clone(), + updated_at: Some(now.clone()), + completed_at: None, + }; + self.ledger.create_run(&run)?; + for task in &run.task_specs { + self.ledger.enqueue(FleetInboxEntry { + run_id: run.id.clone(), + task_id: task.id.clone(), + priority: task_priority(task), + enqueued_at: now.clone(), + lease_deadline: None, + attempts: 0, + })?; + } + let initial_status = if run.task_specs.is_empty() { + FleetRunStatus::Completed + } else { + FleetRunStatus::Running + }; + self.ledger + .update_run_status(&run.id, initial_status, ×tamp())?; + let tick = self.schedule_run(&run.id, max_workers)?; + self.refresh_run_status(&run.id)?; + let state = self.ledger.rebuild_state()?; + let snapshot = self.status_from_state(Some(&run.id), &state); + Ok(FleetRunReport { + run_id: run.id, + task_count: run.task_specs.len(), + leased: tick.leased, + queued: snapshot.queued, + worker_ids: run.worker_specs.iter().map(|w| w.id.clone()).collect(), + }) + } + + pub fn schedule_run(&self, run_id: &FleetRunId, max_workers: usize) -> Result { + let max_workers = max_workers.clamp(1, 128); + let mut report = FleetTickReport::default(); + let state = self.ledger.rebuild_state()?; + let run = state + .runs + .get(&run_id.0) + .cloned() + .ok_or_else(|| anyhow!("fleet run {} does not exist", run_id.0))?; + let worker_ids = worker_ids_for_run(&run, max_workers); + + for task in active_tasks_for_run(&state, run_id) { + if let Some(worker_id) = task.leased_to.as_deref() + && worker_ids.iter().any(|id| id == worker_id) + { + self.ledger.heartbeat(worker_id, ×tamp(), None, None)?; + report.heartbeats += 1; + } + } + + loop { + let state = self.ledger.rebuild_state()?; + let active_workers = active_workers_for_run(&state, run_id); + if active_workers.len() >= max_workers { + break; + } + let Some(worker_id) = worker_ids + .iter() + .find(|id| !active_workers.contains(*id)) + .cloned() + else { + break; + }; + let Some((entry, task_spec)) = next_enqueued_task_for_run(&state, run_id) else { + break; + }; + self.start_worker_task(&worker_id, &entry, &task_spec)?; + report.leased += 1; + } + + self.refresh_run_status(run_id)?; + Ok(report) + } + + pub fn status(&self) -> Result { + let state = self.ledger.rebuild_state()?; + Ok(self.status_from_state(None, &state)) + } + + pub fn run_status(&self, run_id: &FleetRunId) -> Result { + let state = self.ledger.rebuild_state()?; + Ok(self.status_from_state(Some(run_id), &state)) + } + + pub fn run_has_open_work(&self, run_id: &FleetRunId) -> Result { + let status = self.run_status(run_id)?; + Ok(status.queued + status.running + status.stale > 0) + } + + pub fn inspect_worker(&self, worker_id: &str) -> Result { + let state = self.ledger.rebuild_state()?; + let latest_event = latest_event_for_worker(&state, worker_id).cloned(); + let current = active_task_for_worker(&state, worker_id) + .or_else(|| latest_task_for_worker(&state, worker_id)); + let artifacts = state + .artifact_events + .values() + .filter(|event| event.worker_id == worker_id) + .filter_map(|event| match &event.payload { + FleetWorkerEventPayload::Artifact(artifact) => Some(artifact.clone()), + _ => None, + }) + .chain( + state + .receipts + .values() + .filter(|receipt| receipt.worker_id == worker_id) + .flat_map(|receipt| receipt.artifacts.clone()), + ) + .collect(); + let last_error = latest_error_for_worker(&state, worker_id); + let status = state + .workers + .get(worker_id) + .cloned() + .unwrap_or(FleetWorkerStatus::Unknown); + let latest_heartbeat_at = state + .heartbeats + .get(worker_id) + .map(|heartbeat| heartbeat.timestamp.clone()); + Ok(FleetWorkerInspection { + worker_id: worker_id.to_string(), + status, + current_run_id: current.as_ref().map(|task| task.entry.run_id.clone()), + current_task_id: current.map(|task| task.entry.task_id.clone()), + latest_heartbeat_at, + latest_event, + artifacts, + last_error, + }) + } + + pub fn interrupt_worker(&self, worker_id: &str) -> Result { + let state = self.ledger.rebuild_state()?; + let Some(task) = active_task_for_worker(&state, worker_id) else { + bail!("worker {worker_id} has no running fleet task"); + }; + self.append_worker_event( + &task.entry.run_id, + worker_id, + &task.entry.task_id, + FleetWorkerEventPayload::Interrupted { + signal: Some("operator".to_string()), + }, + )?; + self.append_worker_event( + &task.entry.run_id, + worker_id, + &task.entry.task_id, + FleetWorkerEventPayload::Cancelled { + cancelled_by: Some("operator".to_string()), + }, + )?; + self.refresh_run_status(&task.entry.run_id)?; + self.inspect_worker(worker_id) + } + + pub fn restart_worker(&self, worker_id: &str) -> Result { + let state = self.ledger.rebuild_state()?; + let Some(task) = active_task_for_worker(&state, worker_id) + .or_else(|| latest_task_for_worker(&state, worker_id)) + else { + bail!("worker {worker_id} has no fleet task to restart"); + }; + let now = timestamp(); + self.ledger.lease_task( + &task.entry.run_id, + &task.entry.task_id, + worker_id, + &now, + None, + )?; + self.append_worker_event( + &task.entry.run_id, + worker_id, + &task.entry.task_id, + FleetWorkerEventPayload::Restarted { restart_count: 1 }, + )?; + self.append_worker_event( + &task.entry.run_id, + worker_id, + &task.entry.task_id, + FleetWorkerEventPayload::Running, + )?; + self.ledger.heartbeat(worker_id, ×tamp(), None, None)?; + self.ledger + .update_run_status(&task.entry.run_id, FleetRunStatus::Running, ×tamp())?; + self.inspect_worker(worker_id) + } + + pub fn stop_all(&self) -> Result { + let state = self.ledger.rebuild_state()?; + let now = timestamp(); + let mut affected_runs = BTreeSet::new(); + let mut stopped = 0usize; + for task in state.tasks.values() { + if !matches!( + task.status, + FleetTaskLedgerStatus::Enqueued | FleetTaskLedgerStatus::Leased + ) { + continue; + } + if let Some(worker_id) = task.leased_to.as_deref() { + self.append_worker_event( + &task.entry.run_id, + worker_id, + &task.entry.task_id, + FleetWorkerEventPayload::Interrupted { + signal: Some("stop_all".to_string()), + }, + )?; + } + self.ledger.mark_task_terminal_status( + &task.entry.run_id, + &task.entry.task_id, + task.leased_to.as_deref(), + &now, + FleetTaskLedgerStatus::Cancelled, + )?; + affected_runs.insert(task.entry.run_id.0.clone()); + stopped += 1; + } + for run_id in affected_runs { + self.ledger.update_run_status( + &FleetRunId::from(run_id), + FleetRunStatus::Cancelled, + ×tamp(), + )?; + } + Ok(stopped) + } + + fn start_worker_task( + &self, + worker_id: &str, + entry: &FleetInboxEntry, + task_spec: &FleetTaskSpec, + ) -> Result<()> { + let now = timestamp(); + self.ledger + .lease_task(&entry.run_id, &entry.task_id, worker_id, &now, None)?; + self.append_worker_event( + &entry.run_id, + worker_id, + &entry.task_id, + FleetWorkerEventPayload::Leased { + lease_expires_at: None, + }, + )?; + self.append_worker_event( + &entry.run_id, + worker_id, + &entry.task_id, + FleetWorkerEventPayload::Starting, + )?; + let log_artifact = self.write_log_artifact(&entry.run_id, worker_id, task_spec)?; + self.append_worker_event( + &entry.run_id, + worker_id, + &entry.task_id, + FleetWorkerEventPayload::Artifact(log_artifact.clone()), + )?; + self.append_worker_event( + &entry.run_id, + worker_id, + &entry.task_id, + FleetWorkerEventPayload::Running, + )?; + self.ledger.heartbeat(worker_id, ×tamp(), None, None)?; + self.maybe_complete_local_simulation(entry, worker_id, task_spec, log_artifact) + } + + fn maybe_complete_local_simulation( + &self, + entry: &FleetInboxEntry, + worker_id: &str, + task_spec: &FleetTaskSpec, + log_artifact: FleetArtifactRef, + ) -> Result<()> { + let Some(result) = local_simulation_result(task_spec) else { + return Ok(()); + }; + let now = timestamp(); + let (payload, receipt_result) = match result { + FleetLocalSimulationResult::Pass => ( + FleetWorkerEventPayload::Completed { + exit_code: Some(0), + summary: Some("local fleet smoke task completed".to_string()), + }, + FleetTaskResult::Pass, + ), + FleetLocalSimulationResult::Fail => ( + FleetWorkerEventPayload::Failed { + reason: "local fleet smoke task failed".to_string(), + recoverable: false, + }, + FleetTaskResult::Fail, + ), + FleetLocalSimulationResult::Skip => ( + FleetWorkerEventPayload::Completed { + exit_code: Some(0), + summary: Some("local fleet smoke task skipped".to_string()), + }, + FleetTaskResult::Skip, + ), + FleetLocalSimulationResult::Timeout => ( + FleetWorkerEventPayload::Failed { + reason: "local fleet smoke task timed out".to_string(), + recoverable: true, + }, + FleetTaskResult::Timeout, + ), + }; + self.append_worker_event(&entry.run_id, worker_id, &entry.task_id, payload)?; + self.ledger.record_receipt(FleetReceipt { + run_id: entry.run_id.clone(), + task_id: entry.task_id.clone(), + worker_id: worker_id.to_string(), + completed_at: now, + result: receipt_result, + artifacts: vec![log_artifact], + score: None, + }) + } + + fn append_worker_event( + &self, + run_id: &FleetRunId, + worker_id: &str, + task_id: &str, + payload: FleetWorkerEventPayload, + ) -> Result { + let state = self.ledger.rebuild_state()?; + let key = event_key(worker_id, &run_id.0, task_id); + let seq = state.latest_seq.get(&key).copied().unwrap_or(0) + 1; + let event = FleetWorkerEvent { + seq, + run_id: run_id.clone(), + worker_id: worker_id.to_string(), + task_id: task_id.to_string(), + timestamp: timestamp(), + payload, + extra: BTreeMap::new(), + }; + self.ledger.append_event(event.clone())?; + Ok(event) + } + + fn write_log_artifact( + &self, + run_id: &FleetRunId, + worker_id: &str, + task_spec: &FleetTaskSpec, + ) -> Result { + let rel_path = PathBuf::from(".codewhale") + .join("fleet") + .join(safe_path_segment(&run_id.0)) + .join(safe_path_segment(&task_spec.id)) + .join(format!("{}.log", safe_path_segment(worker_id))); + let abs_path = self.workspace.join(&rel_path); + if let Some(parent) = abs_path.parent() { + std::fs::create_dir_all(parent) + .with_context(|| format!("creating fleet artifact dir {}", parent.display()))?; + } + let contents = format!( + "run_id={}\ntask_id={}\ntask_name={}\nworker_id={}\nstatus=started\n", + run_id.0, task_spec.id, task_spec.name, worker_id + ); + std::fs::write(&abs_path, contents) + .with_context(|| format!("writing fleet worker log {}", abs_path.display()))?; + let size_bytes = std::fs::metadata(&abs_path).ok().map(|m| m.len()); + Ok(FleetArtifactRef { + kind: FleetArtifactKind::Log, + path: rel_path, + checksum: None, + mime_type: Some("text/plain".to_string()), + size_bytes, + }) + } + + fn refresh_run_status(&self, run_id: &FleetRunId) -> Result<()> { + let state = self.ledger.rebuild_state()?; + let mut has_queued = false; + let mut has_running = false; + let mut has_failed = false; + let mut has_cancelled = false; + let mut has_tasks = false; + for task in state + .tasks + .values() + .filter(|task| task.entry.run_id == *run_id) + { + has_tasks = true; + match task.status { + FleetTaskLedgerStatus::Enqueued => has_queued = true, + FleetTaskLedgerStatus::Leased => has_running = true, + FleetTaskLedgerStatus::Failed => has_failed = true, + FleetTaskLedgerStatus::Cancelled => has_cancelled = true, + FleetTaskLedgerStatus::Completed => {} + } + } + let status = if !has_tasks { + FleetRunStatus::Completed + } else if has_queued || has_running { + FleetRunStatus::Running + } else if has_failed { + FleetRunStatus::Failed + } else if has_cancelled { + FleetRunStatus::Cancelled + } else { + FleetRunStatus::Completed + }; + self.ledger + .update_run_status(run_id, status, ×tamp()) + .context("updating fleet run status") + } + + fn status_from_state( + &self, + run_filter: Option<&FleetRunId>, + state: &FleetLedgerState, + ) -> FleetStatusSnapshot { + let mut snapshot = FleetStatusSnapshot { + runs: state.runs.len(), + workers: state.workers.clone(), + ..FleetStatusSnapshot::default() + }; + for task in state.tasks.values() { + if run_filter.is_some_and(|run_id| task.entry.run_id != *run_id) { + continue; + } + match task.status { + FleetTaskLedgerStatus::Enqueued => snapshot.queued += 1, + FleetTaskLedgerStatus::Leased => { + if self.task_is_stale(task, state) { + snapshot.stale += 1; + } else { + snapshot.running += 1; + } + } + FleetTaskLedgerStatus::Completed => snapshot.completed += 1, + FleetTaskLedgerStatus::Failed => snapshot.failed += 1, + FleetTaskLedgerStatus::Cancelled => snapshot.cancelled += 1, + } + } + snapshot + } + + fn task_is_stale(&self, task: &FleetTaskState, state: &FleetLedgerState) -> bool { + let Some(worker_id) = task.leased_to.as_deref() else { + return true; + }; + let Some(heartbeat) = state.heartbeats.get(worker_id) else { + return true; + }; + let Ok(last) = DateTime::parse_from_rfc3339(&heartbeat.timestamp) else { + return true; + }; + let age = Utc::now().signed_duration_since(last.with_timezone(&Utc)); + age.to_std() + .is_ok_and(|duration| duration > self.stale_after) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum FleetLocalSimulationResult { + Pass, + Fail, + Skip, + Timeout, +} + +fn validate_task_spec_document(doc: &FleetTaskSpecDocument) -> Result<()> { + if doc.tasks.is_empty() { + bail!("fleet task spec must include at least one task"); + } + let mut ids = BTreeSet::new(); + for task in &doc.tasks { + if task.id.trim().is_empty() { + bail!("fleet task id cannot be empty"); + } + if !ids.insert(task.id.clone()) { + bail!("duplicate fleet task id {}", task.id); + } + if task.name.trim().is_empty() { + bail!("fleet task {} name cannot be empty", task.id); + } + if task.instructions.trim().is_empty() { + bail!("fleet task {} instructions cannot be empty", task.id); + } + } + Ok(()) +} + +fn default_local_workers(run_id: &FleetRunId, max_workers: usize) -> Vec { + (1..=max_workers) + .map(|index| FleetWorkerSpec { + id: format!("{}-local-{}", run_id.0, index), + name: format!("Local worker {index}"), + host: FleetHostSpec::Local, + labels: BTreeMap::new(), + capabilities: vec!["local".to_string()], + max_concurrent_tasks: Some(1), + }) + .collect() +} + +fn worker_ids_for_run(run: &FleetRun, max_workers: usize) -> Vec { + run.worker_specs + .iter() + .take(max_workers) + .map(|worker| worker.id.clone()) + .collect() +} + +fn active_workers_for_run(state: &FleetLedgerState, run_id: &FleetRunId) -> BTreeSet { + active_tasks_for_run(state, run_id) + .filter_map(|task| task.leased_to.clone()) + .collect() +} + +fn active_tasks_for_run<'a>( + state: &'a FleetLedgerState, + run_id: &'a FleetRunId, +) -> impl Iterator { + state.tasks.values().filter(move |task| { + task.entry.run_id == *run_id && matches!(task.status, FleetTaskLedgerStatus::Leased) + }) +} + +fn active_task_for_worker<'a>( + state: &'a FleetLedgerState, + worker_id: &str, +) -> Option<&'a FleetTaskState> { + state.tasks.values().find(|task| { + task.leased_to.as_deref() == Some(worker_id) + && matches!(task.status, FleetTaskLedgerStatus::Leased) + }) +} + +fn latest_task_for_worker<'a>( + state: &'a FleetLedgerState, + worker_id: &str, +) -> Option<&'a FleetTaskState> { + state + .tasks + .values() + .filter(|task| task.leased_to.as_deref() == Some(worker_id)) + .max_by_key(|task| task.completed_at.as_deref().or(task.leased_at.as_deref())) +} + +fn next_enqueued_task_for_run( + state: &FleetLedgerState, + run_id: &FleetRunId, +) -> Option<(FleetInboxEntry, FleetTaskSpec)> { + let run = state.runs.get(&run_id.0)?; + let task = state + .tasks + .values() + .filter(|task| { + task.entry.run_id == *run_id && matches!(task.status, FleetTaskLedgerStatus::Enqueued) + }) + .min_by_key(|task| { + ( + task.entry.priority, + task.entry.enqueued_at.clone(), + task.entry.task_id.clone(), + ) + })?; + let task_spec = run + .task_specs + .iter() + .find(|spec| spec.id == task.entry.task_id) + .cloned()?; + Some((task.entry.clone(), task_spec)) +} + +fn latest_event_for_worker<'a>( + state: &'a FleetLedgerState, + worker_id: &str, +) -> Option<&'a FleetWorkerEvent> { + state + .latest_events + .values() + .filter(|event| event.worker_id == worker_id) + .max_by_key(|event| event.seq) +} + +fn latest_error_for_worker(state: &FleetLedgerState, worker_id: &str) -> Option { + state + .latest_events + .values() + .filter(|event| event.worker_id == worker_id) + .filter_map(|event| match &event.payload { + FleetWorkerEventPayload::Failed { reason, .. } => { + Some((event.seq, format!("failed: {reason}"))) + } + FleetWorkerEventPayload::Cancelled { cancelled_by } => Some(( + event.seq, + cancelled_by + .as_ref() + .map(|by| format!("cancelled by {by}")) + .unwrap_or_else(|| "cancelled".to_string()), + )), + FleetWorkerEventPayload::Interrupted { signal } => Some(( + event.seq, + signal + .as_ref() + .map(|signal| format!("interrupted by {signal}")) + .unwrap_or_else(|| "interrupted".to_string()), + )), + FleetWorkerEventPayload::Stale { last_heartbeat_at } => Some(( + event.seq, + last_heartbeat_at + .as_ref() + .map(|ts| format!("stale since {ts}")) + .unwrap_or_else(|| "stale".to_string()), + )), + _ => None, + }) + .max_by_key(|(seq, _)| *seq) + .map(|(_, message)| message) +} + +fn local_simulation_result(task: &FleetTaskSpec) -> Option { + if task + .metadata + .get("local_complete") + .and_then(Value::as_bool) + .unwrap_or(false) + { + return Some(FleetLocalSimulationResult::Pass); + } + match task + .metadata + .get("local_result") + .and_then(Value::as_str) + .map(str::to_ascii_lowercase) + .as_deref() + { + Some("pass" | "passed" | "ok" | "completed") => Some(FleetLocalSimulationResult::Pass), + Some("fail" | "failed" | "error") => Some(FleetLocalSimulationResult::Fail), + Some("skip" | "skipped") => Some(FleetLocalSimulationResult::Skip), + Some("timeout" | "timed_out") => Some(FleetLocalSimulationResult::Timeout), + _ => None, + } +} + +fn task_priority(task: &FleetTaskSpec) -> i32 { + task.metadata + .get("priority") + .and_then(Value::as_i64) + .and_then(|value| i32::try_from(value).ok()) + .unwrap_or(0) +} + +fn event_key(worker_id: &str, run_id: &str, task_id: &str) -> String { + format!("{worker_id}:{run_id}:{task_id}") +} + +fn timestamp() -> String { + Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true) +} + +fn safe_path_segment(value: &str) -> String { + value + .chars() + .map(|ch| { + if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.') { + ch + } else { + '_' + } + }) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + use tempfile::TempDir; + + fn task(id: &str) -> FleetTaskSpec { + FleetTaskSpec { + id: id.to_string(), + name: id.to_string(), + description: None, + instructions: format!("do {id}"), + expected_artifacts: vec![FleetArtifactKind::Log], + scorer: None, + retry_policy: None, + alert_policy: None, + timeout_seconds: None, + metadata: BTreeMap::new(), + } + } + + fn task_spec_file(dir: &TempDir, tasks: Vec) -> PathBuf { + let path = dir.path().join("fleet-tasks.json"); + let doc = json!({ + "name": "manager smoke", + "tasks": tasks, + }); + std::fs::write(&path, serde_json::to_string_pretty(&doc).unwrap()).unwrap(); + path + } + + #[test] + fn fleet_manager_creates_run_and_starts_workers_up_to_cap() { + let tmp = TempDir::new().unwrap(); + let manager = FleetManager::open(tmp.path()).unwrap(); + let path = task_spec_file(&tmp, vec![task("task-a"), task("task-b"), task("task-c")]); + + let report = manager.create_run_from_task_spec_path(&path, 2).unwrap(); + + assert_eq!(report.task_count, 3); + assert_eq!(report.leased, 2); + assert_eq!(report.queued, 1); + assert_eq!(report.worker_ids.len(), 2); + let status = manager.run_status(&report.run_id).unwrap(); + assert_eq!(status.queued, 1); + assert_eq!(status.running, 2); + assert_eq!(status.completed, 0); + } + + #[test] + fn fleet_manager_inspect_exposes_heartbeat_artifacts_and_errors() { + let tmp = TempDir::new().unwrap(); + let manager = FleetManager::open(tmp.path()).unwrap(); + let path = task_spec_file(&tmp, vec![task("task-a")]); + let report = manager.create_run_from_task_spec_path(&path, 1).unwrap(); + let worker_id = &report.worker_ids[0]; + + let inspection = manager.inspect_worker(worker_id).unwrap(); + assert_eq!(inspection.status, FleetWorkerStatus::Busy); + assert_eq!(inspection.current_task_id.as_deref(), Some("task-a")); + assert!(inspection.latest_heartbeat_at.is_some()); + assert_eq!(inspection.artifacts.len(), 1); + assert!(inspection.last_error.is_none()); + + let inspection = manager.interrupt_worker(worker_id).unwrap(); + assert_eq!(inspection.status, FleetWorkerStatus::Online); + assert_eq!( + inspection.last_error.as_deref(), + Some("cancelled by operator") + ); + let status = manager.run_status(&report.run_id).unwrap(); + assert_eq!(status.cancelled, 1); + } + + #[test] + fn fleet_manager_restart_and_stop_all_are_ledgered() { + let tmp = TempDir::new().unwrap(); + let manager = FleetManager::open(tmp.path()).unwrap(); + let path = task_spec_file(&tmp, vec![task("task-a"), task("task-b")]); + let report = manager.create_run_from_task_spec_path(&path, 1).unwrap(); + let worker_id = &report.worker_ids[0]; + + manager.interrupt_worker(worker_id).unwrap(); + let inspection = manager.restart_worker(worker_id).unwrap(); + assert_eq!(inspection.status, FleetWorkerStatus::Busy); + let status = manager.run_status(&report.run_id).unwrap(); + assert_eq!(status.running, 1); + assert_eq!(status.queued, 1); + + let stopped = manager.stop_all().unwrap(); + assert_eq!(stopped, 2); + let status = manager.run_status(&report.run_id).unwrap(); + assert_eq!(status.cancelled, 2); + assert_eq!(status.running, 0); + } + + #[test] + fn fleet_manager_can_record_completed_local_smoke_tasks() { + let tmp = TempDir::new().unwrap(); + let manager = FleetManager::open(tmp.path()).unwrap(); + let mut completed = task("task-a"); + completed + .metadata + .insert("local_result".to_string(), json!("pass")); + let path = task_spec_file(&tmp, vec![completed]); + + let report = manager.create_run_from_task_spec_path(&path, 1).unwrap(); + + let status = manager.run_status(&report.run_id).unwrap(); + assert_eq!(status.completed, 1); + assert_eq!(status.running, 0); + let state = manager.ledger.rebuild_state().unwrap(); + assert_eq!(state.receipts.len(), 1); + } +} diff --git a/crates/tui/src/fleet/mod.rs b/crates/tui/src/fleet/mod.rs index 60d82d32..d7c02032 100644 --- a/crates/tui/src/fleet/mod.rs +++ b/crates/tui/src/fleet/mod.rs @@ -1,3 +1,4 @@ //! Agent Fleet control plane — local-first manager, ledger, and workers. pub mod ledger; +pub mod manager; diff --git a/crates/tui/src/main.rs b/crates/tui/src/main.rs index b92ba886..f82e6125 100644 --- a/crates/tui/src/main.rs +++ b/crates/tui/src/main.rs @@ -243,6 +243,8 @@ enum Commands { Exec(ExecArgs), /// Generate SWE-bench prediction rows from CodeWhale runs Swebench(SwebenchArgs), + /// Manage local Agent Fleet runs and workers + Fleet(FleetArgs), /// Run a code review over a git diff Review(ReviewArgs), /// Open the TUI pre-seeded with a GitHub PR's title, body, and diff (#451) @@ -373,6 +375,59 @@ enum SwebenchCommand { Export(SwebenchExportArgs), } +#[derive(Args, Debug, Clone)] +struct FleetArgs { + #[command(subcommand)] + command: FleetCommand, +} + +#[derive(Subcommand, Debug, Clone)] +enum FleetCommand { + /// Initialize the local fleet ledger for this workspace + Init, + /// Create a run from a task spec and start the foreground manager loop + Run(FleetRunArgs), + /// Show queued/running/completed/failed/stale fleet counts + Status, + /// Inspect one worker's status, heartbeat, latest event, and artifacts + Inspect { + /// Worker id printed by `codewhale fleet run` + worker_id: String, + }, + /// Interrupt a running worker task and record a terminal cancellation + Interrupt { + /// Worker id printed by `codewhale fleet run` + worker_id: String, + }, + /// Restart the latest task for a worker + Restart { + /// Worker id printed by `codewhale fleet run` + worker_id: String, + }, + /// Stop all queued and running fleet work + Stop { + /// Confirm stopping all queued and running fleet tasks + #[arg(long, required = true)] + all: bool, + }, +} + +#[derive(Args, Debug, Clone)] +struct FleetRunArgs { + /// JSON or TOML task spec to enqueue + #[arg(value_name = "TASK_SPEC")] + task_spec: PathBuf, + /// Maximum local workers to lease concurrently + #[arg(long, default_value_t = 4)] + max_workers: usize, + /// Seconds without heartbeat before a running task is counted stale + #[arg(long, default_value_t = 300)] + stale_after_seconds: u64, + /// Schedule once and return instead of staying in the manager loop + #[arg(long, hide = true, default_value_t = false)] + once: bool, +} + #[derive(Args, Debug, Clone)] struct SwebenchRunArgs { /// SWE-bench instance id, e.g. django__django-12345 @@ -1068,6 +1123,10 @@ async fn main() -> Result<()> { ); run_swebench_command(&config, &model, workspace, max_subagents, args).await } + Commands::Fleet(args) => { + let workspace = resolve_workspace(&cli); + run_fleet_command(&workspace, args).await + } Commands::Review(args) => { let config = load_config_from_cli(&cli)?; run_review(&config, args).await @@ -1330,6 +1389,207 @@ async fn run_swebench_command( } } +async fn run_fleet_command(workspace: &Path, args: FleetArgs) -> Result<()> { + use crate::fleet::manager::{FleetManager, FleetStatusSnapshot, FleetWorkerInspection}; + use codewhale_protocol::fleet::{ + FleetArtifactKind, FleetWorkerEventPayload, FleetWorkerStatus, + }; + + fn worker_status_label(status: &FleetWorkerStatus) -> &'static str { + match status { + FleetWorkerStatus::Unknown => "unknown", + FleetWorkerStatus::Online => "online", + FleetWorkerStatus::Busy => "busy", + FleetWorkerStatus::Offline => "offline", + FleetWorkerStatus::Unhealthy => "unhealthy", + FleetWorkerStatus::Draining => "draining", + FleetWorkerStatus::Retired => "retired", + } + } + + fn artifact_kind_label(kind: &FleetArtifactKind) -> String { + match kind { + FleetArtifactKind::Log => "log".to_string(), + FleetArtifactKind::Patch => "patch".to_string(), + FleetArtifactKind::TestResult => "test_result".to_string(), + FleetArtifactKind::Report => "report".to_string(), + FleetArtifactKind::Checkpoint => "checkpoint".to_string(), + FleetArtifactKind::Receipt => "receipt".to_string(), + FleetArtifactKind::Other(value) => value.clone(), + } + } + + fn event_label(payload: &FleetWorkerEventPayload) -> String { + match payload { + FleetWorkerEventPayload::Queued => "queued".to_string(), + FleetWorkerEventPayload::Leased { .. } => "leased".to_string(), + FleetWorkerEventPayload::Starting => "starting".to_string(), + FleetWorkerEventPayload::Running => "running".to_string(), + FleetWorkerEventPayload::ModelWait { model } => model + .as_ref() + .map(|model| format!("model_wait model={model}")) + .unwrap_or_else(|| "model_wait".to_string()), + FleetWorkerEventPayload::RunningTool { tool, call_id } => call_id + .as_ref() + .map(|call_id| format!("running_tool tool={tool} call_id={call_id}")) + .unwrap_or_else(|| format!("running_tool tool={tool}")), + FleetWorkerEventPayload::Heartbeat { .. } => "heartbeat".to_string(), + FleetWorkerEventPayload::Artifact(artifact) => { + format!("artifact kind={}", artifact_kind_label(&artifact.kind)) + } + FleetWorkerEventPayload::Completed { exit_code, summary } => match (exit_code, summary) + { + (Some(code), Some(summary)) => format!("completed exit_code={code} {summary}"), + (Some(code), None) => format!("completed exit_code={code}"), + (None, Some(summary)) => format!("completed {summary}"), + (None, None) => "completed".to_string(), + }, + FleetWorkerEventPayload::Failed { + reason, + recoverable, + } => { + format!("failed recoverable={recoverable} reason={reason}") + } + FleetWorkerEventPayload::Cancelled { cancelled_by } => cancelled_by + .as_ref() + .map(|by| format!("cancelled by={by}")) + .unwrap_or_else(|| "cancelled".to_string()), + FleetWorkerEventPayload::Interrupted { signal } => signal + .as_ref() + .map(|signal| format!("interrupted signal={signal}")) + .unwrap_or_else(|| "interrupted".to_string()), + FleetWorkerEventPayload::Stale { last_heartbeat_at } => last_heartbeat_at + .as_ref() + .map(|ts| format!("stale last_heartbeat_at={ts}")) + .unwrap_or_else(|| "stale".to_string()), + FleetWorkerEventPayload::Restarted { restart_count } => { + format!("restarted count={restart_count}") + } + FleetWorkerEventPayload::Escalated { channel, alert_id } => alert_id + .as_ref() + .map(|alert_id| format!("escalated channel={channel} alert_id={alert_id}")) + .unwrap_or_else(|| format!("escalated channel={channel}")), + } + } + + fn print_status(status: &FleetStatusSnapshot) { + println!( + "fleet: runs={} queued={} running={} completed={} failed={} cancelled={} stale={}", + status.runs, + status.queued, + status.running, + status.completed, + status.failed, + status.cancelled, + status.stale + ); + if !status.workers.is_empty() { + println!("workers:"); + for (worker_id, worker_status) in &status.workers { + println!(" {worker_id} {}", worker_status_label(worker_status)); + } + } + } + + fn print_inspection(inspection: &FleetWorkerInspection) { + println!("worker: {}", inspection.worker_id); + println!("status: {}", worker_status_label(&inspection.status)); + if let Some(run_id) = &inspection.current_run_id { + println!("run: {}", run_id.0); + } + if let Some(task_id) = &inspection.current_task_id { + println!("task: {task_id}"); + } + if let Some(heartbeat) = &inspection.latest_heartbeat_at { + println!("heartbeat: {heartbeat}"); + } + if let Some(event) = &inspection.latest_event { + println!( + "latest_event: seq={} {}", + event.seq, + event_label(&event.payload) + ); + } + if !inspection.artifacts.is_empty() { + println!("artifacts:"); + for artifact in &inspection.artifacts { + println!( + " {} {}", + artifact_kind_label(&artifact.kind), + artifact.path.display() + ); + } + } + if let Some(error) = &inspection.last_error { + println!("last_error: {error}"); + } + } + + let manager = FleetManager::open(workspace)?; + match args.command { + FleetCommand::Init => { + println!("fleet ledger: {}", manager.ledger_path().display()); + Ok(()) + } + FleetCommand::Run(args) => { + let max_workers = args.max_workers.clamp(1, 128); + let manager = + manager.with_stale_after(Duration::from_secs(args.stale_after_seconds.max(1))); + let report = manager.create_run_from_task_spec_path(&args.task_spec, max_workers)?; + println!( + "fleet run: {} tasks={} leased={} queued={}", + report.run_id.0, report.task_count, report.leased, report.queued + ); + println!("workers:"); + for worker_id in &report.worker_ids { + println!(" {worker_id}"); + } + if args.once { + print_status(&manager.run_status(&report.run_id)?); + return Ok(()); + } + println!( + "manager loop running; use `codewhale fleet status`, `inspect`, `interrupt`, or `stop --all` from another terminal." + ); + loop { + manager.schedule_run(&report.run_id, max_workers)?; + if !manager.run_has_open_work(&report.run_id)? { + print_status(&manager.run_status(&report.run_id)?); + break; + } + tokio::time::sleep(Duration::from_secs(2)).await; + } + Ok(()) + } + FleetCommand::Status => { + print_status(&manager.status()?); + Ok(()) + } + FleetCommand::Inspect { worker_id } => { + print_inspection(&manager.inspect_worker(&worker_id)?); + Ok(()) + } + FleetCommand::Interrupt { worker_id } => { + let inspection = manager.interrupt_worker(&worker_id)?; + print_inspection(&inspection); + Ok(()) + } + FleetCommand::Restart { worker_id } => { + let inspection = manager.restart_worker(&worker_id)?; + print_inspection(&inspection); + Ok(()) + } + FleetCommand::Stop { all } => { + if !all { + bail!("pass --all to stop all fleet work"); + } + let stopped = manager.stop_all()?; + println!("stopped: {stopped}"); + Ok(()) + } + } +} + fn swebench_prompt( instance_id: &str, workspace: &Path,