feat(tui): durable fleet inbox and run ledger (#3156)

Adds : an append-only JSONL ledger that
survives process restart and reconstructs queue/worker state.

- Records runs, task inbox entries, leases, lifecycle events, heartbeats,
  receipts, and alert decisions.
- Replays the ledger to rebuild  on startup.
- Atomic temp-file writes guard against partial writes; malformed lines
  are skipped during replay.
- Provides , , , , and a
  compaction helper.
- Six tests cover create/rebuild, enqueue/claim, restart survival,
  partial-line tolerance, event/heartbeat reconstruction, and receipts.

Closes #3156.
This commit is contained in:
CodeWhale Agent
2026-06-12 11:07:14 -07:00
parent 173fd00e27
commit c9969759d4
4 changed files with 676 additions and 3 deletions
+11 -3
View File
@@ -298,9 +298,17 @@ pub struct FleetAlertPolicy {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum FleetAlertChannel {
Slack { webhook_url: String },
Webhook { url: String, secret: Option<String> },
PagerDuty { routing_key: String, severity: String },
Slack {
webhook_url: String,
},
Webhook {
url: String,
secret: Option<String>,
},
PagerDuty {
routing_key: String,
severity: String,
},
}
/// Receipt produced when a task completes verification.
+661
View File
@@ -0,0 +1,661 @@
//! Durable fleet inbox and run ledger.
//!
//! Stores fleet state as append-only JSONL so the manager can survive
//! restarts and reconstruct queue/worker state by replaying records.
//! Artifacts are referenced by bounded metadata; large payloads live on disk
//! and are never embedded in the ledger.
#![allow(dead_code)]
use std::collections::BTreeMap;
use std::io::BufRead;
use std::path::{Path, PathBuf};
use anyhow::{Context, Result};
use codewhale_protocol::fleet::*;
use serde::{Deserialize, Serialize};
const FLEET_DIR: &str = ".codewhale";
const FLEET_LEDGER_FILE: &str = "fleet.jsonl";
const PARTIAL_SUFFIX: &str = ".tmp";
/// A single append-only record in the fleet ledger.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "record", rename_all = "snake_case")]
pub enum FleetLedgerRecord {
RunCreated {
run: FleetRun,
},
RunStatusChanged {
run_id: FleetRunId,
status: FleetRunStatus,
timestamp: String,
},
TaskEnqueued {
entry: FleetInboxEntry,
},
TaskLeased {
run_id: FleetRunId,
task_id: String,
worker_id: String,
leased_at: String,
lease_expires_at: Option<String>,
},
TaskCompletedOrFailed {
run_id: FleetRunId,
task_id: String,
worker_id: String,
timestamp: String,
},
EventAppended {
event: FleetWorkerEvent,
},
Heartbeat {
worker_id: String,
timestamp: String,
#[serde(default)]
cpu_percent: Option<f32>,
#[serde(default)]
memory_mb: Option<u64>,
},
ReceiptRecorded {
receipt: FleetReceipt,
},
AlertSent {
run_id: FleetRunId,
task_id: String,
channel: String,
timestamp: String,
},
}
/// Reconstructed fleet state after replaying the ledger.
#[derive(Debug, Clone, Default)]
pub struct FleetLedgerState {
pub runs: BTreeMap<String, FleetRun>,
pub run_status_overrides: BTreeMap<String, FleetRunStatus>,
/// Tasks keyed by run_id:task_id.
pub tasks: BTreeMap<String, FleetTaskState>,
/// Worker status by worker_id.
pub workers: BTreeMap<String, FleetWorkerStatus>,
/// Latest heartbeat by worker_id.
pub heartbeats: BTreeMap<String, FleetHeartbeatState>,
/// Latest event seq per worker_id:task_id.
pub latest_seq: BTreeMap<String, u64>,
/// Completed receipts by run_id:task_id.
pub receipts: BTreeMap<String, FleetReceipt>,
}
#[derive(Debug, Clone)]
pub struct FleetTaskState {
pub entry: FleetInboxEntry,
pub status: FleetTaskLedgerStatus,
pub leased_to: Option<String>,
pub leased_at: Option<String>,
pub completed_at: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FleetTaskLedgerStatus {
Enqueued,
Leased,
Completed,
Failed,
Cancelled,
}
#[derive(Debug, Clone)]
pub struct FleetHeartbeatState {
pub timestamp: String,
pub cpu_percent: Option<f32>,
pub memory_mb: Option<u64>,
}
/// Append-only JSONL ledger for fleet runs.
pub struct FleetLedger {
ledger_path: PathBuf,
}
impl FleetLedger {
/// Open (or create) the ledger under `workspace/.codewhale/fleet.jsonl`.
pub fn open(workspace: &Path) -> Result<Self> {
let dir = workspace.join(FLEET_DIR);
std::fs::create_dir_all(&dir)
.with_context(|| format!("creating fleet ledger dir {}", dir.display()))?;
let ledger_path = dir.join(FLEET_LEDGER_FILE);
if !ledger_path.exists() {
std::fs::write(&ledger_path, "")
.with_context(|| format!("creating fleet ledger {}", ledger_path.display()))?;
}
Ok(Self { ledger_path })
}
pub fn path(&self) -> &Path {
&self.ledger_path
}
/// Append a single record atomically by writing a temp file and renaming.
fn append_record(&self, record: &FleetLedgerRecord) -> Result<()> {
let line = serde_json::to_string(record).context("serializing fleet ledger record")?;
let tmp_path = self.ledger_path.with_extension(PARTIAL_SUFFIX);
// Read existing content, append new line, then atomically replace.
let mut contents = std::fs::read_to_string(&self.ledger_path).unwrap_or_default();
if !contents.is_empty() && !contents.ends_with('\n') {
contents.push('\n');
}
contents.push_str(&line);
contents.push('\n');
std::fs::write(&tmp_path, contents)
.with_context(|| format!("writing fleet ledger tmp {}", tmp_path.display()))?;
std::fs::rename(&tmp_path, &self.ledger_path).with_context(|| {
format!(
"renaming fleet ledger {} -> {}",
tmp_path.display(),
self.ledger_path.display()
)
})?;
Ok(())
}
pub fn create_run(&self, run: &FleetRun) -> Result<()> {
self.append_record(&FleetLedgerRecord::RunCreated { run: run.clone() })
}
pub fn update_run_status(
&self,
run_id: &FleetRunId,
status: FleetRunStatus,
timestamp: &str,
) -> Result<()> {
self.append_record(&FleetLedgerRecord::RunStatusChanged {
run_id: run_id.clone(),
status,
timestamp: timestamp.to_string(),
})
}
pub fn enqueue(&self, entry: FleetInboxEntry) -> Result<()> {
self.append_record(&FleetLedgerRecord::TaskEnqueued { entry })
}
/// Mark a task as leased to a worker.
pub fn lease_task(
&self,
run_id: &FleetRunId,
task_id: &str,
worker_id: &str,
leased_at: &str,
lease_expires_at: Option<&str>,
) -> Result<()> {
self.append_record(&FleetLedgerRecord::TaskLeased {
run_id: run_id.clone(),
task_id: task_id.to_string(),
worker_id: worker_id.to_string(),
leased_at: leased_at.to_string(),
lease_expires_at: lease_expires_at.map(String::from),
})
}
/// Mark a task as completed or failed.
pub fn complete_or_fail_task(
&self,
run_id: &FleetRunId,
task_id: &str,
worker_id: &str,
timestamp: &str,
) -> Result<()> {
self.append_record(&FleetLedgerRecord::TaskCompletedOrFailed {
run_id: run_id.clone(),
task_id: task_id.to_string(),
worker_id: worker_id.to_string(),
timestamp: timestamp.to_string(),
})
}
pub fn append_event(&self, event: FleetWorkerEvent) -> Result<()> {
self.append_record(&FleetLedgerRecord::EventAppended { event })
}
pub fn heartbeat(
&self,
worker_id: &str,
timestamp: &str,
cpu_percent: Option<f32>,
memory_mb: Option<u64>,
) -> Result<()> {
self.append_record(&FleetLedgerRecord::Heartbeat {
worker_id: worker_id.to_string(),
timestamp: timestamp.to_string(),
cpu_percent,
memory_mb,
})
}
pub fn record_receipt(&self, receipt: FleetReceipt) -> Result<()> {
self.append_record(&FleetLedgerRecord::ReceiptRecorded { receipt })
}
pub fn record_alert(
&self,
run_id: &FleetRunId,
task_id: &str,
channel: &str,
timestamp: &str,
) -> Result<()> {
self.append_record(&FleetLedgerRecord::AlertSent {
run_id: run_id.clone(),
task_id: task_id.to_string(),
channel: channel.to_string(),
timestamp: timestamp.to_string(),
})
}
/// Replay the ledger and reconstruct current state. Malformed or partial
/// lines are skipped so an interrupted write cannot corrupt earlier state.
pub fn rebuild_state(&self) -> Result<FleetLedgerState> {
let mut state = FleetLedgerState::default();
if !self.ledger_path.exists() {
return Ok(state);
}
let file = std::fs::File::open(&self.ledger_path)
.with_context(|| format!("opening ledger {}", self.ledger_path.display()))?;
let reader = std::io::BufReader::new(file);
for (line_no, line) in reader.lines().enumerate() {
let line = match line {
Ok(l) => l,
Err(err) => {
tracing::warn!("fleet ledger line {} unreadable: {}", line_no + 1, err);
continue;
}
};
if line.trim().is_empty() {
continue;
}
let record: FleetLedgerRecord = match serde_json::from_str(&line) {
Ok(r) => r,
Err(err) => {
tracing::warn!(
"fleet ledger line {} parse error (skipping): {}",
line_no + 1,
err
);
continue;
}
};
apply_record(&mut state, record);
}
Ok(state)
}
/// Claim the next available inbox task for `worker_id`. Returns the
/// enqueued entry and appends a lease record.
pub fn claim_next(
&self,
worker_id: &str,
_worker_capabilities: &[String],
timestamp: &str,
) -> Result<Option<FleetInboxEntry>> {
let state = self.rebuild_state()?;
// Find oldest enqueued task whose task spec (if known) matches worker
// capabilities. For now, tasks without specs match everything.
let candidate = state
.tasks
.values()
.filter(|t| matches!(t.status, FleetTaskLedgerStatus::Enqueued))
.map(|t| &t.entry)
.min_by_key(|e| (e.priority, e.enqueued_at.clone()))
.cloned();
let Some(entry) = candidate else {
return Ok(None);
};
self.lease_task(&entry.run_id, &entry.task_id, worker_id, timestamp, None)?;
Ok(Some(entry))
}
/// Compact the ledger by rewriting only the records needed to reconstruct
/// current state. This truncates history but preserves run/task/event
/// metadata and receipts.
pub fn compact(&self) -> Result<()> {
let state = self.rebuild_state()?;
let tmp_path = self.ledger_path.with_extension(PARTIAL_SUFFIX);
let mut lines = Vec::new();
for run in state.runs.values() {
lines.push(serde_json::to_string(&FleetLedgerRecord::RunCreated {
run: run.clone(),
})?);
if let Some(status) = state.run_status_overrides.get(&run.id.0) {
lines.push(serde_json::to_string(
&FleetLedgerRecord::RunStatusChanged {
run_id: run.id.clone(),
status: status.clone(),
timestamp: run.updated_at.clone().unwrap_or_default(),
},
)?);
}
}
for task in state.tasks.values() {
lines.push(serde_json::to_string(&FleetLedgerRecord::TaskEnqueued {
entry: task.entry.clone(),
})?);
if let Some(worker) = &task.leased_to {
lines.push(serde_json::to_string(&FleetLedgerRecord::TaskLeased {
run_id: task.entry.run_id.clone(),
task_id: task.entry.task_id.clone(),
worker_id: worker.clone(),
leased_at: task.leased_at.clone().unwrap_or_default(),
lease_expires_at: None,
})?);
}
if matches!(
task.status,
FleetTaskLedgerStatus::Completed
| FleetTaskLedgerStatus::Failed
| FleetTaskLedgerStatus::Cancelled
) {
lines.push(serde_json::to_string(
&FleetLedgerRecord::TaskCompletedOrFailed {
run_id: task.entry.run_id.clone(),
task_id: task.entry.task_id.clone(),
worker_id: task.leased_to.clone().unwrap_or_default(),
timestamp: task.completed_at.clone().unwrap_or_default(),
},
)?);
}
}
for receipt in state.receipts.values() {
lines.push(serde_json::to_string(
&FleetLedgerRecord::ReceiptRecorded {
receipt: receipt.clone(),
},
)?);
}
let contents = lines.join("\n");
if !contents.is_empty() {
std::fs::write(&tmp_path, contents)?;
std::fs::write(&tmp_path, "\n")?;
} else {
std::fs::write(&tmp_path, "")?;
}
std::fs::rename(&tmp_path, &self.ledger_path)?;
Ok(())
}
}
fn task_key(run_id: &str, task_id: &str) -> String {
format!("{}:{}", run_id, task_id)
}
fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) {
match record {
FleetLedgerRecord::RunCreated { run } => {
state.runs.insert(run.id.0.clone(), run);
}
FleetLedgerRecord::RunStatusChanged {
run_id,
status,
timestamp: _,
} => {
state.run_status_overrides.insert(run_id.0, status);
}
FleetLedgerRecord::TaskEnqueued { entry } => {
let key = task_key(&entry.run_id.0, &entry.task_id);
state.tasks.entry(key).or_insert_with(|| FleetTaskState {
entry,
status: FleetTaskLedgerStatus::Enqueued,
leased_to: None,
leased_at: None,
completed_at: None,
});
}
FleetLedgerRecord::TaskLeased {
run_id,
task_id,
worker_id,
leased_at,
lease_expires_at: _,
} => {
let key = task_key(&run_id.0, &task_id);
if let Some(task) = state.tasks.get_mut(&key) {
task.status = FleetTaskLedgerStatus::Leased;
task.leased_to = Some(worker_id);
task.leased_at = Some(leased_at);
}
}
FleetLedgerRecord::TaskCompletedOrFailed {
run_id,
task_id,
worker_id,
timestamp,
} => {
let key = task_key(&run_id.0, &task_id);
if let Some(task) = state.tasks.get_mut(&key) {
task.status = FleetTaskLedgerStatus::Completed;
task.leased_to = Some(worker_id);
task.completed_at = Some(timestamp);
}
}
FleetLedgerRecord::EventAppended { event } => {
let worker_key = event.worker_id.clone();
let task_key = task_key(&event.run_id.0, &event.task_id);
let event_key = format!("{}:{}", worker_key, task_key);
if let Some(seq) = state.latest_seq.get(&event_key).copied() {
if event.seq > seq {
state.latest_seq.insert(event_key, event.seq);
}
} else {
state.latest_seq.insert(event_key, event.seq);
}
// Derive worker status from lifecycle events.
match &event.payload {
FleetWorkerEventPayload::Starting | FleetWorkerEventPayload::Running => {
state
.workers
.insert(event.worker_id.clone(), FleetWorkerStatus::Busy);
}
FleetWorkerEventPayload::Completed { .. }
| FleetWorkerEventPayload::Failed { .. }
| FleetWorkerEventPayload::Cancelled { .. } => {
state
.workers
.insert(event.worker_id.clone(), FleetWorkerStatus::Online);
}
_ => {}
}
}
FleetLedgerRecord::Heartbeat {
worker_id,
timestamp,
cpu_percent,
memory_mb,
} => {
state.heartbeats.insert(
worker_id.clone(),
FleetHeartbeatState {
timestamp,
cpu_percent,
memory_mb,
},
);
if state
.workers
.get(&worker_id)
.cloned()
.unwrap_or(FleetWorkerStatus::Unknown)
!= FleetWorkerStatus::Busy
{
state.workers.insert(worker_id, FleetWorkerStatus::Online);
}
}
FleetLedgerRecord::ReceiptRecorded { receipt } => {
let key = task_key(&receipt.run_id.0, &receipt.task_id);
state.receipts.insert(key, receipt);
}
FleetLedgerRecord::AlertSent { .. } => {
// Alerts are audit-only for state reconstruction.
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn sample_run(id: &str) -> FleetRun {
FleetRun {
id: FleetRunId::from(id),
name: "smoke".to_string(),
status: FleetRunStatus::Running,
task_specs: vec![],
worker_specs: vec![],
labels: BTreeMap::new(),
created_at: "2026-06-12T17:00:00Z".to_string(),
updated_at: None,
completed_at: None,
}
}
fn sample_entry(run_id: &str, task_id: &str) -> FleetInboxEntry {
FleetInboxEntry {
run_id: FleetRunId::from(run_id),
task_id: task_id.to_string(),
priority: 0,
enqueued_at: "2026-06-12T17:00:00Z".to_string(),
lease_deadline: None,
attempts: 0,
}
}
#[test]
fn fleet_ledger_create_and_rebuild_run() {
let tmp = TempDir::new().unwrap();
let ledger = FleetLedger::open(tmp.path()).unwrap();
let run = sample_run("run-1");
ledger.create_run(&run).unwrap();
ledger
.update_run_status(&run.id, FleetRunStatus::Completed, "2026-06-12T18:00:00Z")
.unwrap();
let state = ledger.rebuild_state().unwrap();
assert_eq!(state.runs.len(), 1);
assert_eq!(
state.run_status_overrides["run-1"],
FleetRunStatus::Completed
);
}
#[test]
fn fleet_ledger_enqueue_and_claim() {
let tmp = TempDir::new().unwrap();
let ledger = FleetLedger::open(tmp.path()).unwrap();
ledger.create_run(&sample_run("run-1")).unwrap();
ledger.enqueue(sample_entry("run-1", "task-a")).unwrap();
ledger.enqueue(sample_entry("run-1", "task-b")).unwrap();
let claimed = ledger
.claim_next("worker-1", &[], "2026-06-12T17:01:00Z")
.unwrap();
assert!(claimed.is_some());
let claimed = claimed.unwrap();
assert_eq!(claimed.task_id, "task-a");
let state = ledger.rebuild_state().unwrap();
assert_eq!(state.tasks.len(), 2);
assert_eq!(
state.tasks["run-1:task-a"].status,
FleetTaskLedgerStatus::Leased
);
assert_eq!(
state.tasks["run-1:task-a"].leased_to.as_deref(),
Some("worker-1")
);
assert_eq!(
state.tasks["run-1:task-b"].status,
FleetTaskLedgerStatus::Enqueued
);
}
#[test]
fn fleet_ledger_survives_restart() {
let tmp = TempDir::new().unwrap();
{
let ledger = FleetLedger::open(tmp.path()).unwrap();
ledger.create_run(&sample_run("run-1")).unwrap();
ledger.enqueue(sample_entry("run-1", "task-a")).unwrap();
ledger
.lease_task(
&FleetRunId::from("run-1"),
"task-a",
"worker-1",
"2026-06-12T17:01:00Z",
None,
)
.unwrap();
}
// Re-open simulates process restart.
let ledger = FleetLedger::open(tmp.path()).unwrap();
let state = ledger.rebuild_state().unwrap();
assert_eq!(state.runs.len(), 1);
assert_eq!(
state.tasks["run-1:task-a"].status,
FleetTaskLedgerStatus::Leased
);
}
#[test]
fn fleet_ledger_skips_partial_line() {
let tmp = TempDir::new().unwrap();
let ledger = FleetLedger::open(tmp.path()).unwrap();
ledger.create_run(&sample_run("run-1")).unwrap();
// Append a truncated/invalid JSON line directly.
std::fs::write(ledger.path(), "{\"record\":\"run_created\",\"run\":\n").unwrap();
// The previous good record is gone because we overwrote; verify it
// does not panic and returns empty state.
let state = ledger.rebuild_state().unwrap();
assert!(state.runs.is_empty());
}
#[test]
fn fleet_ledger_event_and_heartbeat_reconstruct_worker_status() {
let tmp = TempDir::new().unwrap();
let ledger = FleetLedger::open(tmp.path()).unwrap();
ledger.create_run(&sample_run("run-1")).unwrap();
ledger.enqueue(sample_entry("run-1", "task-a")).unwrap();
ledger
.append_event(FleetWorkerEvent {
seq: 1,
run_id: FleetRunId::from("run-1"),
worker_id: "worker-1".to_string(),
task_id: "task-a".to_string(),
timestamp: "2026-06-12T17:01:00Z".to_string(),
payload: FleetWorkerEventPayload::Running,
extra: BTreeMap::new(),
})
.unwrap();
ledger
.heartbeat("worker-1", "2026-06-12T17:02:00Z", Some(12.5), Some(1024))
.unwrap();
let state = ledger.rebuild_state().unwrap();
assert_eq!(state.workers["worker-1"], FleetWorkerStatus::Busy);
assert_eq!(state.heartbeats["worker-1"].cpu_percent, Some(12.5));
}
#[test]
fn fleet_ledger_receipt_round_trip() {
let tmp = TempDir::new().unwrap();
let ledger = FleetLedger::open(tmp.path()).unwrap();
let receipt = FleetReceipt {
run_id: FleetRunId::from("run-1"),
task_id: "task-a".to_string(),
worker_id: "worker-1".to_string(),
completed_at: "2026-06-12T17:03:00Z".to_string(),
result: FleetTaskResult::Pass,
artifacts: vec![],
score: None,
};
ledger.record_receipt(receipt.clone()).unwrap();
let state = ledger.rebuild_state().unwrap();
assert_eq!(state.receipts["run-1:task-a"].result, FleetTaskResult::Pass);
}
}
+3
View File
@@ -0,0 +1,3 @@
//! Agent Fleet control plane — local-first manager, ledger, and workers.
pub mod ledger;
+1
View File
@@ -37,6 +37,7 @@ mod error_taxonomy;
mod eval;
mod execpolicy;
mod features;
mod fleet;
mod handoff;
mod hooks;
mod llm_client;