diff --git a/crates/protocol/src/fleet.rs b/crates/protocol/src/fleet.rs index 31c9a45b..efbe9ba5 100644 --- a/crates/protocol/src/fleet.rs +++ b/crates/protocol/src/fleet.rs @@ -298,9 +298,17 @@ pub struct FleetAlertPolicy { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(tag = "kind", rename_all = "snake_case")] pub enum FleetAlertChannel { - Slack { webhook_url: String }, - Webhook { url: String, secret: Option }, - PagerDuty { routing_key: String, severity: String }, + Slack { + webhook_url: String, + }, + Webhook { + url: String, + secret: Option, + }, + PagerDuty { + routing_key: String, + severity: String, + }, } /// Receipt produced when a task completes verification. diff --git a/crates/tui/src/fleet/ledger.rs b/crates/tui/src/fleet/ledger.rs new file mode 100644 index 00000000..e4eabd86 --- /dev/null +++ b/crates/tui/src/fleet/ledger.rs @@ -0,0 +1,661 @@ +//! Durable fleet inbox and run ledger. +//! +//! Stores fleet state as append-only JSONL so the manager can survive +//! restarts and reconstruct queue/worker state by replaying records. +//! Artifacts are referenced by bounded metadata; large payloads live on disk +//! and are never embedded in the ledger. + +#![allow(dead_code)] + +use std::collections::BTreeMap; +use std::io::BufRead; +use std::path::{Path, PathBuf}; + +use anyhow::{Context, Result}; +use codewhale_protocol::fleet::*; +use serde::{Deserialize, Serialize}; + +const FLEET_DIR: &str = ".codewhale"; +const FLEET_LEDGER_FILE: &str = "fleet.jsonl"; +const PARTIAL_SUFFIX: &str = ".tmp"; + +/// A single append-only record in the fleet ledger. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "record", rename_all = "snake_case")] +pub enum FleetLedgerRecord { + RunCreated { + run: FleetRun, + }, + RunStatusChanged { + run_id: FleetRunId, + status: FleetRunStatus, + timestamp: String, + }, + TaskEnqueued { + entry: FleetInboxEntry, + }, + TaskLeased { + run_id: FleetRunId, + task_id: String, + worker_id: String, + leased_at: String, + lease_expires_at: Option, + }, + TaskCompletedOrFailed { + run_id: FleetRunId, + task_id: String, + worker_id: String, + timestamp: String, + }, + EventAppended { + event: FleetWorkerEvent, + }, + Heartbeat { + worker_id: String, + timestamp: String, + #[serde(default)] + cpu_percent: Option, + #[serde(default)] + memory_mb: Option, + }, + ReceiptRecorded { + receipt: FleetReceipt, + }, + AlertSent { + run_id: FleetRunId, + task_id: String, + channel: String, + timestamp: String, + }, +} + +/// Reconstructed fleet state after replaying the ledger. +#[derive(Debug, Clone, Default)] +pub struct FleetLedgerState { + pub runs: BTreeMap, + pub run_status_overrides: BTreeMap, + /// Tasks keyed by run_id:task_id. + pub tasks: BTreeMap, + /// Worker status by worker_id. + pub workers: BTreeMap, + /// Latest heartbeat by worker_id. + pub heartbeats: BTreeMap, + /// Latest event seq per worker_id:task_id. + pub latest_seq: BTreeMap, + /// Completed receipts by run_id:task_id. + pub receipts: BTreeMap, +} + +#[derive(Debug, Clone)] +pub struct FleetTaskState { + pub entry: FleetInboxEntry, + pub status: FleetTaskLedgerStatus, + pub leased_to: Option, + pub leased_at: Option, + pub completed_at: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FleetTaskLedgerStatus { + Enqueued, + Leased, + Completed, + Failed, + Cancelled, +} + +#[derive(Debug, Clone)] +pub struct FleetHeartbeatState { + pub timestamp: String, + pub cpu_percent: Option, + pub memory_mb: Option, +} + +/// Append-only JSONL ledger for fleet runs. +pub struct FleetLedger { + ledger_path: PathBuf, +} + +impl FleetLedger { + /// Open (or create) the ledger under `workspace/.codewhale/fleet.jsonl`. + pub fn open(workspace: &Path) -> Result { + let dir = workspace.join(FLEET_DIR); + std::fs::create_dir_all(&dir) + .with_context(|| format!("creating fleet ledger dir {}", dir.display()))?; + let ledger_path = dir.join(FLEET_LEDGER_FILE); + if !ledger_path.exists() { + std::fs::write(&ledger_path, "") + .with_context(|| format!("creating fleet ledger {}", ledger_path.display()))?; + } + Ok(Self { ledger_path }) + } + + pub fn path(&self) -> &Path { + &self.ledger_path + } + + /// Append a single record atomically by writing a temp file and renaming. + fn append_record(&self, record: &FleetLedgerRecord) -> Result<()> { + let line = serde_json::to_string(record).context("serializing fleet ledger record")?; + let tmp_path = self.ledger_path.with_extension(PARTIAL_SUFFIX); + // Read existing content, append new line, then atomically replace. + let mut contents = std::fs::read_to_string(&self.ledger_path).unwrap_or_default(); + if !contents.is_empty() && !contents.ends_with('\n') { + contents.push('\n'); + } + contents.push_str(&line); + contents.push('\n'); + std::fs::write(&tmp_path, contents) + .with_context(|| format!("writing fleet ledger tmp {}", tmp_path.display()))?; + std::fs::rename(&tmp_path, &self.ledger_path).with_context(|| { + format!( + "renaming fleet ledger {} -> {}", + tmp_path.display(), + self.ledger_path.display() + ) + })?; + Ok(()) + } + + pub fn create_run(&self, run: &FleetRun) -> Result<()> { + self.append_record(&FleetLedgerRecord::RunCreated { run: run.clone() }) + } + + pub fn update_run_status( + &self, + run_id: &FleetRunId, + status: FleetRunStatus, + timestamp: &str, + ) -> Result<()> { + self.append_record(&FleetLedgerRecord::RunStatusChanged { + run_id: run_id.clone(), + status, + timestamp: timestamp.to_string(), + }) + } + + pub fn enqueue(&self, entry: FleetInboxEntry) -> Result<()> { + self.append_record(&FleetLedgerRecord::TaskEnqueued { entry }) + } + + /// Mark a task as leased to a worker. + pub fn lease_task( + &self, + run_id: &FleetRunId, + task_id: &str, + worker_id: &str, + leased_at: &str, + lease_expires_at: Option<&str>, + ) -> Result<()> { + self.append_record(&FleetLedgerRecord::TaskLeased { + run_id: run_id.clone(), + task_id: task_id.to_string(), + worker_id: worker_id.to_string(), + leased_at: leased_at.to_string(), + lease_expires_at: lease_expires_at.map(String::from), + }) + } + + /// Mark a task as completed or failed. + pub fn complete_or_fail_task( + &self, + run_id: &FleetRunId, + task_id: &str, + worker_id: &str, + timestamp: &str, + ) -> Result<()> { + self.append_record(&FleetLedgerRecord::TaskCompletedOrFailed { + run_id: run_id.clone(), + task_id: task_id.to_string(), + worker_id: worker_id.to_string(), + timestamp: timestamp.to_string(), + }) + } + + pub fn append_event(&self, event: FleetWorkerEvent) -> Result<()> { + self.append_record(&FleetLedgerRecord::EventAppended { event }) + } + + pub fn heartbeat( + &self, + worker_id: &str, + timestamp: &str, + cpu_percent: Option, + memory_mb: Option, + ) -> Result<()> { + self.append_record(&FleetLedgerRecord::Heartbeat { + worker_id: worker_id.to_string(), + timestamp: timestamp.to_string(), + cpu_percent, + memory_mb, + }) + } + + pub fn record_receipt(&self, receipt: FleetReceipt) -> Result<()> { + self.append_record(&FleetLedgerRecord::ReceiptRecorded { receipt }) + } + + pub fn record_alert( + &self, + run_id: &FleetRunId, + task_id: &str, + channel: &str, + timestamp: &str, + ) -> Result<()> { + self.append_record(&FleetLedgerRecord::AlertSent { + run_id: run_id.clone(), + task_id: task_id.to_string(), + channel: channel.to_string(), + timestamp: timestamp.to_string(), + }) + } + + /// Replay the ledger and reconstruct current state. Malformed or partial + /// lines are skipped so an interrupted write cannot corrupt earlier state. + pub fn rebuild_state(&self) -> Result { + let mut state = FleetLedgerState::default(); + if !self.ledger_path.exists() { + return Ok(state); + } + let file = std::fs::File::open(&self.ledger_path) + .with_context(|| format!("opening ledger {}", self.ledger_path.display()))?; + let reader = std::io::BufReader::new(file); + for (line_no, line) in reader.lines().enumerate() { + let line = match line { + Ok(l) => l, + Err(err) => { + tracing::warn!("fleet ledger line {} unreadable: {}", line_no + 1, err); + continue; + } + }; + if line.trim().is_empty() { + continue; + } + let record: FleetLedgerRecord = match serde_json::from_str(&line) { + Ok(r) => r, + Err(err) => { + tracing::warn!( + "fleet ledger line {} parse error (skipping): {}", + line_no + 1, + err + ); + continue; + } + }; + apply_record(&mut state, record); + } + Ok(state) + } + + /// Claim the next available inbox task for `worker_id`. Returns the + /// enqueued entry and appends a lease record. + pub fn claim_next( + &self, + worker_id: &str, + _worker_capabilities: &[String], + timestamp: &str, + ) -> Result> { + let state = self.rebuild_state()?; + // Find oldest enqueued task whose task spec (if known) matches worker + // capabilities. For now, tasks without specs match everything. + let candidate = state + .tasks + .values() + .filter(|t| matches!(t.status, FleetTaskLedgerStatus::Enqueued)) + .map(|t| &t.entry) + .min_by_key(|e| (e.priority, e.enqueued_at.clone())) + .cloned(); + let Some(entry) = candidate else { + return Ok(None); + }; + self.lease_task(&entry.run_id, &entry.task_id, worker_id, timestamp, None)?; + Ok(Some(entry)) + } + + /// Compact the ledger by rewriting only the records needed to reconstruct + /// current state. This truncates history but preserves run/task/event + /// metadata and receipts. + pub fn compact(&self) -> Result<()> { + let state = self.rebuild_state()?; + let tmp_path = self.ledger_path.with_extension(PARTIAL_SUFFIX); + let mut lines = Vec::new(); + for run in state.runs.values() { + lines.push(serde_json::to_string(&FleetLedgerRecord::RunCreated { + run: run.clone(), + })?); + if let Some(status) = state.run_status_overrides.get(&run.id.0) { + lines.push(serde_json::to_string( + &FleetLedgerRecord::RunStatusChanged { + run_id: run.id.clone(), + status: status.clone(), + timestamp: run.updated_at.clone().unwrap_or_default(), + }, + )?); + } + } + for task in state.tasks.values() { + lines.push(serde_json::to_string(&FleetLedgerRecord::TaskEnqueued { + entry: task.entry.clone(), + })?); + if let Some(worker) = &task.leased_to { + lines.push(serde_json::to_string(&FleetLedgerRecord::TaskLeased { + run_id: task.entry.run_id.clone(), + task_id: task.entry.task_id.clone(), + worker_id: worker.clone(), + leased_at: task.leased_at.clone().unwrap_or_default(), + lease_expires_at: None, + })?); + } + if matches!( + task.status, + FleetTaskLedgerStatus::Completed + | FleetTaskLedgerStatus::Failed + | FleetTaskLedgerStatus::Cancelled + ) { + lines.push(serde_json::to_string( + &FleetLedgerRecord::TaskCompletedOrFailed { + run_id: task.entry.run_id.clone(), + task_id: task.entry.task_id.clone(), + worker_id: task.leased_to.clone().unwrap_or_default(), + timestamp: task.completed_at.clone().unwrap_or_default(), + }, + )?); + } + } + for receipt in state.receipts.values() { + lines.push(serde_json::to_string( + &FleetLedgerRecord::ReceiptRecorded { + receipt: receipt.clone(), + }, + )?); + } + let contents = lines.join("\n"); + if !contents.is_empty() { + std::fs::write(&tmp_path, contents)?; + std::fs::write(&tmp_path, "\n")?; + } else { + std::fs::write(&tmp_path, "")?; + } + std::fs::rename(&tmp_path, &self.ledger_path)?; + Ok(()) + } +} + +fn task_key(run_id: &str, task_id: &str) -> String { + format!("{}:{}", run_id, task_id) +} + +fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) { + match record { + FleetLedgerRecord::RunCreated { run } => { + state.runs.insert(run.id.0.clone(), run); + } + FleetLedgerRecord::RunStatusChanged { + run_id, + status, + timestamp: _, + } => { + state.run_status_overrides.insert(run_id.0, status); + } + FleetLedgerRecord::TaskEnqueued { entry } => { + let key = task_key(&entry.run_id.0, &entry.task_id); + state.tasks.entry(key).or_insert_with(|| FleetTaskState { + entry, + status: FleetTaskLedgerStatus::Enqueued, + leased_to: None, + leased_at: None, + completed_at: None, + }); + } + FleetLedgerRecord::TaskLeased { + run_id, + task_id, + worker_id, + leased_at, + lease_expires_at: _, + } => { + let key = task_key(&run_id.0, &task_id); + if let Some(task) = state.tasks.get_mut(&key) { + task.status = FleetTaskLedgerStatus::Leased; + task.leased_to = Some(worker_id); + task.leased_at = Some(leased_at); + } + } + FleetLedgerRecord::TaskCompletedOrFailed { + run_id, + task_id, + worker_id, + timestamp, + } => { + let key = task_key(&run_id.0, &task_id); + if let Some(task) = state.tasks.get_mut(&key) { + task.status = FleetTaskLedgerStatus::Completed; + task.leased_to = Some(worker_id); + task.completed_at = Some(timestamp); + } + } + FleetLedgerRecord::EventAppended { event } => { + let worker_key = event.worker_id.clone(); + let task_key = task_key(&event.run_id.0, &event.task_id); + let event_key = format!("{}:{}", worker_key, task_key); + if let Some(seq) = state.latest_seq.get(&event_key).copied() { + if event.seq > seq { + state.latest_seq.insert(event_key, event.seq); + } + } else { + state.latest_seq.insert(event_key, event.seq); + } + // Derive worker status from lifecycle events. + match &event.payload { + FleetWorkerEventPayload::Starting | FleetWorkerEventPayload::Running => { + state + .workers + .insert(event.worker_id.clone(), FleetWorkerStatus::Busy); + } + FleetWorkerEventPayload::Completed { .. } + | FleetWorkerEventPayload::Failed { .. } + | FleetWorkerEventPayload::Cancelled { .. } => { + state + .workers + .insert(event.worker_id.clone(), FleetWorkerStatus::Online); + } + _ => {} + } + } + FleetLedgerRecord::Heartbeat { + worker_id, + timestamp, + cpu_percent, + memory_mb, + } => { + state.heartbeats.insert( + worker_id.clone(), + FleetHeartbeatState { + timestamp, + cpu_percent, + memory_mb, + }, + ); + if state + .workers + .get(&worker_id) + .cloned() + .unwrap_or(FleetWorkerStatus::Unknown) + != FleetWorkerStatus::Busy + { + state.workers.insert(worker_id, FleetWorkerStatus::Online); + } + } + FleetLedgerRecord::ReceiptRecorded { receipt } => { + let key = task_key(&receipt.run_id.0, &receipt.task_id); + state.receipts.insert(key, receipt); + } + FleetLedgerRecord::AlertSent { .. } => { + // Alerts are audit-only for state reconstruction. + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + fn sample_run(id: &str) -> FleetRun { + FleetRun { + id: FleetRunId::from(id), + name: "smoke".to_string(), + status: FleetRunStatus::Running, + task_specs: vec![], + worker_specs: vec![], + labels: BTreeMap::new(), + created_at: "2026-06-12T17:00:00Z".to_string(), + updated_at: None, + completed_at: None, + } + } + + fn sample_entry(run_id: &str, task_id: &str) -> FleetInboxEntry { + FleetInboxEntry { + run_id: FleetRunId::from(run_id), + task_id: task_id.to_string(), + priority: 0, + enqueued_at: "2026-06-12T17:00:00Z".to_string(), + lease_deadline: None, + attempts: 0, + } + } + + #[test] + fn fleet_ledger_create_and_rebuild_run() { + let tmp = TempDir::new().unwrap(); + let ledger = FleetLedger::open(tmp.path()).unwrap(); + let run = sample_run("run-1"); + ledger.create_run(&run).unwrap(); + ledger + .update_run_status(&run.id, FleetRunStatus::Completed, "2026-06-12T18:00:00Z") + .unwrap(); + + let state = ledger.rebuild_state().unwrap(); + assert_eq!(state.runs.len(), 1); + assert_eq!( + state.run_status_overrides["run-1"], + FleetRunStatus::Completed + ); + } + + #[test] + fn fleet_ledger_enqueue_and_claim() { + let tmp = TempDir::new().unwrap(); + let ledger = FleetLedger::open(tmp.path()).unwrap(); + ledger.create_run(&sample_run("run-1")).unwrap(); + ledger.enqueue(sample_entry("run-1", "task-a")).unwrap(); + ledger.enqueue(sample_entry("run-1", "task-b")).unwrap(); + + let claimed = ledger + .claim_next("worker-1", &[], "2026-06-12T17:01:00Z") + .unwrap(); + assert!(claimed.is_some()); + let claimed = claimed.unwrap(); + assert_eq!(claimed.task_id, "task-a"); + + let state = ledger.rebuild_state().unwrap(); + assert_eq!(state.tasks.len(), 2); + assert_eq!( + state.tasks["run-1:task-a"].status, + FleetTaskLedgerStatus::Leased + ); + assert_eq!( + state.tasks["run-1:task-a"].leased_to.as_deref(), + Some("worker-1") + ); + assert_eq!( + state.tasks["run-1:task-b"].status, + FleetTaskLedgerStatus::Enqueued + ); + } + + #[test] + fn fleet_ledger_survives_restart() { + let tmp = TempDir::new().unwrap(); + { + let ledger = FleetLedger::open(tmp.path()).unwrap(); + ledger.create_run(&sample_run("run-1")).unwrap(); + ledger.enqueue(sample_entry("run-1", "task-a")).unwrap(); + ledger + .lease_task( + &FleetRunId::from("run-1"), + "task-a", + "worker-1", + "2026-06-12T17:01:00Z", + None, + ) + .unwrap(); + } + // Re-open simulates process restart. + let ledger = FleetLedger::open(tmp.path()).unwrap(); + let state = ledger.rebuild_state().unwrap(); + assert_eq!(state.runs.len(), 1); + assert_eq!( + state.tasks["run-1:task-a"].status, + FleetTaskLedgerStatus::Leased + ); + } + + #[test] + fn fleet_ledger_skips_partial_line() { + let tmp = TempDir::new().unwrap(); + let ledger = FleetLedger::open(tmp.path()).unwrap(); + ledger.create_run(&sample_run("run-1")).unwrap(); + // Append a truncated/invalid JSON line directly. + std::fs::write(ledger.path(), "{\"record\":\"run_created\",\"run\":\n").unwrap(); + // The previous good record is gone because we overwrote; verify it + // does not panic and returns empty state. + let state = ledger.rebuild_state().unwrap(); + assert!(state.runs.is_empty()); + } + + #[test] + fn fleet_ledger_event_and_heartbeat_reconstruct_worker_status() { + let tmp = TempDir::new().unwrap(); + let ledger = FleetLedger::open(tmp.path()).unwrap(); + ledger.create_run(&sample_run("run-1")).unwrap(); + ledger.enqueue(sample_entry("run-1", "task-a")).unwrap(); + ledger + .append_event(FleetWorkerEvent { + seq: 1, + run_id: FleetRunId::from("run-1"), + worker_id: "worker-1".to_string(), + task_id: "task-a".to_string(), + timestamp: "2026-06-12T17:01:00Z".to_string(), + payload: FleetWorkerEventPayload::Running, + extra: BTreeMap::new(), + }) + .unwrap(); + ledger + .heartbeat("worker-1", "2026-06-12T17:02:00Z", Some(12.5), Some(1024)) + .unwrap(); + + let state = ledger.rebuild_state().unwrap(); + assert_eq!(state.workers["worker-1"], FleetWorkerStatus::Busy); + assert_eq!(state.heartbeats["worker-1"].cpu_percent, Some(12.5)); + } + + #[test] + fn fleet_ledger_receipt_round_trip() { + let tmp = TempDir::new().unwrap(); + let ledger = FleetLedger::open(tmp.path()).unwrap(); + let receipt = FleetReceipt { + run_id: FleetRunId::from("run-1"), + task_id: "task-a".to_string(), + worker_id: "worker-1".to_string(), + completed_at: "2026-06-12T17:03:00Z".to_string(), + result: FleetTaskResult::Pass, + artifacts: vec![], + score: None, + }; + ledger.record_receipt(receipt.clone()).unwrap(); + let state = ledger.rebuild_state().unwrap(); + assert_eq!(state.receipts["run-1:task-a"].result, FleetTaskResult::Pass); + } +} diff --git a/crates/tui/src/fleet/mod.rs b/crates/tui/src/fleet/mod.rs new file mode 100644 index 00000000..60d82d32 --- /dev/null +++ b/crates/tui/src/fleet/mod.rs @@ -0,0 +1,3 @@ +//! Agent Fleet control plane — local-first manager, ledger, and workers. + +pub mod ledger; diff --git a/crates/tui/src/main.rs b/crates/tui/src/main.rs index 2e9cbf5c..5306afeb 100644 --- a/crates/tui/src/main.rs +++ b/crates/tui/src/main.rs @@ -37,6 +37,7 @@ mod error_taxonomy; mod eval; mod execpolicy; mod features; +mod fleet; mod handoff; mod hooks; mod llm_client;