From 173fd00e27e8905caf470fda4bb933b660c9bad2 Mon Sep 17 00:00:00 2001 From: CodeWhale Agent Date: Fri, 12 Jun 2026 11:02:14 -0700 Subject: [PATCH 1/3] feat(protocol): define Agent Fleet protocol types and event schema (#3155) Adds with the durable, serializable contract for the v0.8.60 Agent Fleet control plane: - FleetRun, FleetTaskSpec, FleetWorkerSpec, FleetHostSpec - FleetWorkerStatus and FleetInboxEntry - FleetWorkerEvent envelope with tagged lifecycle payloads: queued, leased, starting, running, model_wait, running_tool, heartbeat, artifact, completed, failed, cancelled, interrupted, stale, restarted, escalated - FleetArtifactRef, FleetScorerSpec, FleetRetryPolicy, FleetAlertPolicy, FleetReceipt, FleetScore Types are additive (unknown fields ignored by existing consumers) and use stable snake_case JSON names. Includes JSON round-trip tests. Closes #3155. --- crates/protocol/src/fleet.rs | 470 +++++++++++++++++++++++++++++++++++ crates/protocol/src/lib.rs | 2 + 2 files changed, 472 insertions(+) create mode 100644 crates/protocol/src/fleet.rs diff --git a/crates/protocol/src/fleet.rs b/crates/protocol/src/fleet.rs new file mode 100644 index 00000000..31c9a45b --- /dev/null +++ b/crates/protocol/src/fleet.rs @@ -0,0 +1,470 @@ +//! Agent Fleet control-plane protocol types. +//! +//! These types define the durable, serializable contract between the fleet +//! manager, workers, CLI/TUI surfaces, and the Runtime API. They are +//! intentionally additive: existing runtime-event consumers ignore unknown +//! fields and are unaffected by fleet extensions. +//! +//! See: +//! - (Agent Fleet control plane) +//! - (Runtime API sub-agent direction) + +use std::collections::BTreeMap; +use std::path::PathBuf; + +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +pub const FLEET_PROTOCOL_VERSION: &str = "0.1.0"; + +/// Globally unique identifier for a fleet run. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct FleetRunId(pub String); + +impl From for FleetRunId { + fn from(value: String) -> Self { + Self(value) + } +} + +impl From<&str> for FleetRunId { + fn from(value: &str) -> Self { + Self(value.to_string()) + } +} + +/// Top-level fleet run handle. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FleetRun { + pub id: FleetRunId, + pub name: String, + pub status: FleetRunStatus, + #[serde(default)] + pub task_specs: Vec, + #[serde(default)] + pub worker_specs: Vec, + #[serde(default)] + pub labels: BTreeMap, + pub created_at: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub updated_at: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub completed_at: Option, +} + +/// Lifecycle status for an entire fleet run. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum FleetRunStatus { + Pending, + Queued, + Running, + Paused, + Completed, + Failed, + Cancelled, +} + +/// Specification of a single unit of work within a run. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FleetTaskSpec { + pub id: String, + pub name: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub description: Option, + pub instructions: String, + #[serde(default)] + pub expected_artifacts: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub scorer: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub retry_policy: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub alert_policy: Option, + #[serde(default)] + pub timeout_seconds: Option, + #[serde(default)] + pub metadata: BTreeMap, +} + +/// Reference to an artifact produced or consumed by a task. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct FleetArtifactRef { + pub kind: FleetArtifactKind, + pub path: PathBuf, + #[serde(skip_serializing_if = "Option::is_none")] + pub checksum: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub mime_type: Option, + #[serde(default)] + pub size_bytes: Option, +} + +/// Kind of artifact a task may produce or consume. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum FleetArtifactKind { + Log, + Patch, + TestResult, + Report, + Checkpoint, + Receipt, + Other(String), +} + +/// Scoring rule used to verify a task result. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum FleetScorerSpec { + ExitCode, + FileExists { path: PathBuf }, + RegexMatch { path: PathBuf, pattern: String }, + JsonPath { path: PathBuf, expression: String }, + Manual, +} + +/// Worker specification. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FleetWorkerSpec { + pub id: String, + pub name: String, + pub host: FleetHostSpec, + #[serde(default)] + pub labels: BTreeMap, + #[serde(default)] + pub capabilities: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub max_concurrent_tasks: Option, +} + +/// Host on which a worker runs. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum FleetHostSpec { + Local, + Ssh { + host: String, + #[serde(skip_serializing_if = "Option::is_none")] + port: Option, + #[serde(skip_serializing_if = "Option::is_none")] + user: Option, + #[serde(skip_serializing_if = "Option::is_none")] + identity: Option, + }, + Docker { + image: String, + #[serde(default)] + args: Vec, + }, +} + +/// Runtime status of a worker. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum FleetWorkerStatus { + Unknown, + Online, + Busy, + Offline, + Unhealthy, + Draining, + Retired, +} + +/// Durable inbox entry: a task waiting to be leased to a worker. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FleetInboxEntry { + pub run_id: FleetRunId, + pub task_id: String, + pub priority: i32, + pub enqueued_at: String, + #[serde(default)] + pub lease_deadline: Option, + #[serde(default)] + pub attempts: u32, +} + +/// Worker event envelope. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FleetWorkerEvent { + pub seq: u64, + pub run_id: FleetRunId, + pub worker_id: String, + pub task_id: String, + pub timestamp: String, + #[serde(flatten)] + pub payload: FleetWorkerEventPayload, + #[serde(default)] + #[serde(skip_serializing_if = "BTreeMap::is_empty")] + pub extra: BTreeMap, +} + +/// Union of all worker event payloads. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "state", rename_all = "snake_case")] +pub enum FleetWorkerEventPayload { + Queued, + Leased { + #[serde(skip_serializing_if = "Option::is_none")] + lease_expires_at: Option, + }, + Starting, + Running, + ModelWait { + #[serde(skip_serializing_if = "Option::is_none")] + model: Option, + }, + RunningTool { + tool: String, + #[serde(skip_serializing_if = "Option::is_none")] + call_id: Option, + }, + Heartbeat { + #[serde(default)] + cpu_percent: Option, + #[serde(default)] + memory_mb: Option, + }, + Artifact(FleetArtifactRef), + Completed { + #[serde(default)] + exit_code: Option, + #[serde(skip_serializing_if = "Option::is_none")] + summary: Option, + }, + Failed { + reason: String, + #[serde(default)] + recoverable: bool, + }, + Cancelled { + #[serde(skip_serializing_if = "Option::is_none")] + cancelled_by: Option, + }, + Interrupted { + #[serde(skip_serializing_if = "Option::is_none")] + signal: Option, + }, + Stale { + #[serde(skip_serializing_if = "Option::is_none")] + last_heartbeat_at: Option, + }, + Restarted { + #[serde(default)] + restart_count: u32, + }, + Escalated { + channel: String, + #[serde(skip_serializing_if = "Option::is_none")] + alert_id: Option, + }, +} + +/// Retry policy for a task or worker. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct FleetRetryPolicy { + pub max_attempts: u32, + #[serde(default)] + pub initial_backoff_seconds: u64, + #[serde(default)] + pub max_backoff_seconds: u64, + #[serde(default)] + pub backoff_multiplier: u32, +} + +impl Default for FleetRetryPolicy { + fn default() -> Self { + Self { + max_attempts: 3, + initial_backoff_seconds: 5, + max_backoff_seconds: 300, + backoff_multiplier: 2, + } + } +} + +/// Alert/escalation policy attached to a task or run. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct FleetAlertPolicy { + #[serde(default)] + pub channels: Vec, + #[serde(default)] + pub after_attempts: Option, + #[serde(default)] + pub after_minutes_stale: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum FleetAlertChannel { + Slack { webhook_url: String }, + Webhook { url: String, secret: Option }, + PagerDuty { routing_key: String, severity: String }, +} + +/// Receipt produced when a task completes verification. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FleetReceipt { + pub run_id: FleetRunId, + pub task_id: String, + pub worker_id: String, + pub completed_at: String, + pub result: FleetTaskResult, + #[serde(default)] + pub artifacts: Vec, + #[serde(default)] + pub score: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum FleetTaskResult { + Pass, + Fail, + Skip, + Timeout, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct FleetScore { + pub value: f64, + #[serde(skip_serializing_if = "Option::is_none")] + pub max: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub notes: Option, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn fleet_run_round_trip() { + let run = FleetRun { + id: FleetRunId::from("run-001"), + name: "dogfood smoke".to_string(), + status: FleetRunStatus::Running, + task_specs: vec![FleetTaskSpec { + id: "task-1".to_string(), + name: "lint".to_string(), + description: None, + instructions: "run cargo clippy".to_string(), + expected_artifacts: vec![FleetArtifactKind::Log], + scorer: Some(FleetScorerSpec::ExitCode), + retry_policy: Some(FleetRetryPolicy::default()), + alert_policy: None, + timeout_seconds: Some(300), + metadata: BTreeMap::new(), + }], + worker_specs: vec![], + labels: BTreeMap::new(), + created_at: "2026-06-12T17:00:00Z".to_string(), + updated_at: None, + completed_at: None, + }; + let json = serde_json::to_string(&run).unwrap(); + let back: FleetRun = serde_json::from_str(&json).unwrap(); + assert_eq!(back.id, run.id); + assert_eq!(back.status, FleetRunStatus::Running); + assert_eq!(back.task_specs.len(), 1); + } + + #[test] + fn worker_event_lifecycle_round_trip() { + let events = vec![ + FleetWorkerEvent { + seq: 1, + run_id: FleetRunId::from("run-002"), + worker_id: "worker-a".to_string(), + task_id: "task-1".to_string(), + timestamp: "2026-06-12T17:01:00Z".to_string(), + payload: FleetWorkerEventPayload::Queued, + extra: BTreeMap::new(), + }, + FleetWorkerEvent { + seq: 2, + run_id: FleetRunId::from("run-002"), + worker_id: "worker-a".to_string(), + task_id: "task-1".to_string(), + timestamp: "2026-06-12T17:01:05Z".to_string(), + payload: FleetWorkerEventPayload::RunningTool { + tool: "bash".to_string(), + call_id: Some("call-1".to_string()), + }, + extra: BTreeMap::new(), + }, + FleetWorkerEvent { + seq: 3, + run_id: FleetRunId::from("run-002"), + worker_id: "worker-a".to_string(), + task_id: "task-1".to_string(), + timestamp: "2026-06-12T17:02:00Z".to_string(), + payload: FleetWorkerEventPayload::Completed { + exit_code: Some(0), + summary: Some("ok".to_string()), + }, + extra: BTreeMap::new(), + }, + ]; + let json = serde_json::to_string(&events).unwrap(); + let back: Vec = serde_json::from_str(&json).unwrap(); + assert_eq!(back.len(), 3); + assert!(matches!(back[0].payload, FleetWorkerEventPayload::Queued)); + assert!(matches!( + back[2].payload, + FleetWorkerEventPayload::Completed { .. } + )); + } + + #[test] + fn alert_policy_round_trip() { + let policy = FleetAlertPolicy { + channels: vec![FleetAlertChannel::Slack { + webhook_url: "https://hooks.slack.com/test".to_string(), + }], + after_attempts: Some(2), + after_minutes_stale: Some(10), + }; + let json = serde_json::to_string(&policy).unwrap(); + assert!(json.contains("\"kind\":\"slack\"")); + let back: FleetAlertPolicy = serde_json::from_str(&json).unwrap(); + assert_eq!(back.after_attempts, Some(2)); + } + + #[test] + fn artifact_other_kind_round_trip() { + let artifact = FleetArtifactRef { + kind: FleetArtifactKind::Other("coverage.xml".to_string()), + path: PathBuf::from("/tmp/coverage.xml"), + checksum: Some("sha256:abc".to_string()), + mime_type: Some("application/xml".to_string()), + size_bytes: Some(1024), + }; + let json = serde_json::to_string(&artifact).unwrap(); + let back: FleetArtifactRef = serde_json::from_str(&json).unwrap(); + assert_eq!(back.kind, artifact.kind); + assert_eq!(back.size_bytes, Some(1024)); + } + + #[test] + fn receipt_round_trip() { + let receipt = FleetReceipt { + run_id: FleetRunId::from("run-003"), + task_id: "task-1".to_string(), + worker_id: "worker-b".to_string(), + completed_at: "2026-06-12T17:03:00Z".to_string(), + result: FleetTaskResult::Pass, + artifacts: vec![], + score: Some(FleetScore { + value: 0.95, + max: Some(1.0), + notes: None, + }), + }; + let json = serde_json::to_string(&receipt).unwrap(); + let back: FleetReceipt = serde_json::from_str(&json).unwrap(); + assert_eq!(back.result, FleetTaskResult::Pass); + assert_eq!(back.score.as_ref().unwrap().value, 0.95); + } +} diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index 2e940e19..430d609b 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -4,6 +4,8 @@ use std::path::PathBuf; use serde::{Deserialize, Serialize}; use serde_json::Value; +pub mod fleet; + pub mod runtime { use super::*; From c9969759d443566ac06db65da60e1bb24c03c974 Mon Sep 17 00:00:00 2001 From: CodeWhale Agent Date: Fri, 12 Jun 2026 11:07:14 -0700 Subject: [PATCH 2/3] feat(tui): durable fleet inbox and run ledger (#3156) Adds : an append-only JSONL ledger that survives process restart and reconstructs queue/worker state. - Records runs, task inbox entries, leases, lifecycle events, heartbeats, receipts, and alert decisions. - Replays the ledger to rebuild on startup. - Atomic temp-file writes guard against partial writes; malformed lines are skipped during replay. - Provides , , , , and a compaction helper. - Six tests cover create/rebuild, enqueue/claim, restart survival, partial-line tolerance, event/heartbeat reconstruction, and receipts. Closes #3156. --- crates/protocol/src/fleet.rs | 14 +- crates/tui/src/fleet/ledger.rs | 661 +++++++++++++++++++++++++++++++++ crates/tui/src/fleet/mod.rs | 3 + crates/tui/src/main.rs | 1 + 4 files changed, 676 insertions(+), 3 deletions(-) create mode 100644 crates/tui/src/fleet/ledger.rs create mode 100644 crates/tui/src/fleet/mod.rs diff --git a/crates/protocol/src/fleet.rs b/crates/protocol/src/fleet.rs index 31c9a45b..efbe9ba5 100644 --- a/crates/protocol/src/fleet.rs +++ b/crates/protocol/src/fleet.rs @@ -298,9 +298,17 @@ pub struct FleetAlertPolicy { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(tag = "kind", rename_all = "snake_case")] pub enum FleetAlertChannel { - Slack { webhook_url: String }, - Webhook { url: String, secret: Option }, - PagerDuty { routing_key: String, severity: String }, + Slack { + webhook_url: String, + }, + Webhook { + url: String, + secret: Option, + }, + PagerDuty { + routing_key: String, + severity: String, + }, } /// Receipt produced when a task completes verification. diff --git a/crates/tui/src/fleet/ledger.rs b/crates/tui/src/fleet/ledger.rs new file mode 100644 index 00000000..e4eabd86 --- /dev/null +++ b/crates/tui/src/fleet/ledger.rs @@ -0,0 +1,661 @@ +//! Durable fleet inbox and run ledger. +//! +//! Stores fleet state as append-only JSONL so the manager can survive +//! restarts and reconstruct queue/worker state by replaying records. +//! Artifacts are referenced by bounded metadata; large payloads live on disk +//! and are never embedded in the ledger. + +#![allow(dead_code)] + +use std::collections::BTreeMap; +use std::io::BufRead; +use std::path::{Path, PathBuf}; + +use anyhow::{Context, Result}; +use codewhale_protocol::fleet::*; +use serde::{Deserialize, Serialize}; + +const FLEET_DIR: &str = ".codewhale"; +const FLEET_LEDGER_FILE: &str = "fleet.jsonl"; +const PARTIAL_SUFFIX: &str = ".tmp"; + +/// A single append-only record in the fleet ledger. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "record", rename_all = "snake_case")] +pub enum FleetLedgerRecord { + RunCreated { + run: FleetRun, + }, + RunStatusChanged { + run_id: FleetRunId, + status: FleetRunStatus, + timestamp: String, + }, + TaskEnqueued { + entry: FleetInboxEntry, + }, + TaskLeased { + run_id: FleetRunId, + task_id: String, + worker_id: String, + leased_at: String, + lease_expires_at: Option, + }, + TaskCompletedOrFailed { + run_id: FleetRunId, + task_id: String, + worker_id: String, + timestamp: String, + }, + EventAppended { + event: FleetWorkerEvent, + }, + Heartbeat { + worker_id: String, + timestamp: String, + #[serde(default)] + cpu_percent: Option, + #[serde(default)] + memory_mb: Option, + }, + ReceiptRecorded { + receipt: FleetReceipt, + }, + AlertSent { + run_id: FleetRunId, + task_id: String, + channel: String, + timestamp: String, + }, +} + +/// Reconstructed fleet state after replaying the ledger. +#[derive(Debug, Clone, Default)] +pub struct FleetLedgerState { + pub runs: BTreeMap, + pub run_status_overrides: BTreeMap, + /// Tasks keyed by run_id:task_id. + pub tasks: BTreeMap, + /// Worker status by worker_id. + pub workers: BTreeMap, + /// Latest heartbeat by worker_id. + pub heartbeats: BTreeMap, + /// Latest event seq per worker_id:task_id. + pub latest_seq: BTreeMap, + /// Completed receipts by run_id:task_id. + pub receipts: BTreeMap, +} + +#[derive(Debug, Clone)] +pub struct FleetTaskState { + pub entry: FleetInboxEntry, + pub status: FleetTaskLedgerStatus, + pub leased_to: Option, + pub leased_at: Option, + pub completed_at: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FleetTaskLedgerStatus { + Enqueued, + Leased, + Completed, + Failed, + Cancelled, +} + +#[derive(Debug, Clone)] +pub struct FleetHeartbeatState { + pub timestamp: String, + pub cpu_percent: Option, + pub memory_mb: Option, +} + +/// Append-only JSONL ledger for fleet runs. +pub struct FleetLedger { + ledger_path: PathBuf, +} + +impl FleetLedger { + /// Open (or create) the ledger under `workspace/.codewhale/fleet.jsonl`. + pub fn open(workspace: &Path) -> Result { + let dir = workspace.join(FLEET_DIR); + std::fs::create_dir_all(&dir) + .with_context(|| format!("creating fleet ledger dir {}", dir.display()))?; + let ledger_path = dir.join(FLEET_LEDGER_FILE); + if !ledger_path.exists() { + std::fs::write(&ledger_path, "") + .with_context(|| format!("creating fleet ledger {}", ledger_path.display()))?; + } + Ok(Self { ledger_path }) + } + + pub fn path(&self) -> &Path { + &self.ledger_path + } + + /// Append a single record atomically by writing a temp file and renaming. + fn append_record(&self, record: &FleetLedgerRecord) -> Result<()> { + let line = serde_json::to_string(record).context("serializing fleet ledger record")?; + let tmp_path = self.ledger_path.with_extension(PARTIAL_SUFFIX); + // Read existing content, append new line, then atomically replace. + let mut contents = std::fs::read_to_string(&self.ledger_path).unwrap_or_default(); + if !contents.is_empty() && !contents.ends_with('\n') { + contents.push('\n'); + } + contents.push_str(&line); + contents.push('\n'); + std::fs::write(&tmp_path, contents) + .with_context(|| format!("writing fleet ledger tmp {}", tmp_path.display()))?; + std::fs::rename(&tmp_path, &self.ledger_path).with_context(|| { + format!( + "renaming fleet ledger {} -> {}", + tmp_path.display(), + self.ledger_path.display() + ) + })?; + Ok(()) + } + + pub fn create_run(&self, run: &FleetRun) -> Result<()> { + self.append_record(&FleetLedgerRecord::RunCreated { run: run.clone() }) + } + + pub fn update_run_status( + &self, + run_id: &FleetRunId, + status: FleetRunStatus, + timestamp: &str, + ) -> Result<()> { + self.append_record(&FleetLedgerRecord::RunStatusChanged { + run_id: run_id.clone(), + status, + timestamp: timestamp.to_string(), + }) + } + + pub fn enqueue(&self, entry: FleetInboxEntry) -> Result<()> { + self.append_record(&FleetLedgerRecord::TaskEnqueued { entry }) + } + + /// Mark a task as leased to a worker. + pub fn lease_task( + &self, + run_id: &FleetRunId, + task_id: &str, + worker_id: &str, + leased_at: &str, + lease_expires_at: Option<&str>, + ) -> Result<()> { + self.append_record(&FleetLedgerRecord::TaskLeased { + run_id: run_id.clone(), + task_id: task_id.to_string(), + worker_id: worker_id.to_string(), + leased_at: leased_at.to_string(), + lease_expires_at: lease_expires_at.map(String::from), + }) + } + + /// Mark a task as completed or failed. + pub fn complete_or_fail_task( + &self, + run_id: &FleetRunId, + task_id: &str, + worker_id: &str, + timestamp: &str, + ) -> Result<()> { + self.append_record(&FleetLedgerRecord::TaskCompletedOrFailed { + run_id: run_id.clone(), + task_id: task_id.to_string(), + worker_id: worker_id.to_string(), + timestamp: timestamp.to_string(), + }) + } + + pub fn append_event(&self, event: FleetWorkerEvent) -> Result<()> { + self.append_record(&FleetLedgerRecord::EventAppended { event }) + } + + pub fn heartbeat( + &self, + worker_id: &str, + timestamp: &str, + cpu_percent: Option, + memory_mb: Option, + ) -> Result<()> { + self.append_record(&FleetLedgerRecord::Heartbeat { + worker_id: worker_id.to_string(), + timestamp: timestamp.to_string(), + cpu_percent, + memory_mb, + }) + } + + pub fn record_receipt(&self, receipt: FleetReceipt) -> Result<()> { + self.append_record(&FleetLedgerRecord::ReceiptRecorded { receipt }) + } + + pub fn record_alert( + &self, + run_id: &FleetRunId, + task_id: &str, + channel: &str, + timestamp: &str, + ) -> Result<()> { + self.append_record(&FleetLedgerRecord::AlertSent { + run_id: run_id.clone(), + task_id: task_id.to_string(), + channel: channel.to_string(), + timestamp: timestamp.to_string(), + }) + } + + /// Replay the ledger and reconstruct current state. Malformed or partial + /// lines are skipped so an interrupted write cannot corrupt earlier state. + pub fn rebuild_state(&self) -> Result { + let mut state = FleetLedgerState::default(); + if !self.ledger_path.exists() { + return Ok(state); + } + let file = std::fs::File::open(&self.ledger_path) + .with_context(|| format!("opening ledger {}", self.ledger_path.display()))?; + let reader = std::io::BufReader::new(file); + for (line_no, line) in reader.lines().enumerate() { + let line = match line { + Ok(l) => l, + Err(err) => { + tracing::warn!("fleet ledger line {} unreadable: {}", line_no + 1, err); + continue; + } + }; + if line.trim().is_empty() { + continue; + } + let record: FleetLedgerRecord = match serde_json::from_str(&line) { + Ok(r) => r, + Err(err) => { + tracing::warn!( + "fleet ledger line {} parse error (skipping): {}", + line_no + 1, + err + ); + continue; + } + }; + apply_record(&mut state, record); + } + Ok(state) + } + + /// Claim the next available inbox task for `worker_id`. Returns the + /// enqueued entry and appends a lease record. + pub fn claim_next( + &self, + worker_id: &str, + _worker_capabilities: &[String], + timestamp: &str, + ) -> Result> { + let state = self.rebuild_state()?; + // Find oldest enqueued task whose task spec (if known) matches worker + // capabilities. For now, tasks without specs match everything. + let candidate = state + .tasks + .values() + .filter(|t| matches!(t.status, FleetTaskLedgerStatus::Enqueued)) + .map(|t| &t.entry) + .min_by_key(|e| (e.priority, e.enqueued_at.clone())) + .cloned(); + let Some(entry) = candidate else { + return Ok(None); + }; + self.lease_task(&entry.run_id, &entry.task_id, worker_id, timestamp, None)?; + Ok(Some(entry)) + } + + /// Compact the ledger by rewriting only the records needed to reconstruct + /// current state. This truncates history but preserves run/task/event + /// metadata and receipts. + pub fn compact(&self) -> Result<()> { + let state = self.rebuild_state()?; + let tmp_path = self.ledger_path.with_extension(PARTIAL_SUFFIX); + let mut lines = Vec::new(); + for run in state.runs.values() { + lines.push(serde_json::to_string(&FleetLedgerRecord::RunCreated { + run: run.clone(), + })?); + if let Some(status) = state.run_status_overrides.get(&run.id.0) { + lines.push(serde_json::to_string( + &FleetLedgerRecord::RunStatusChanged { + run_id: run.id.clone(), + status: status.clone(), + timestamp: run.updated_at.clone().unwrap_or_default(), + }, + )?); + } + } + for task in state.tasks.values() { + lines.push(serde_json::to_string(&FleetLedgerRecord::TaskEnqueued { + entry: task.entry.clone(), + })?); + if let Some(worker) = &task.leased_to { + lines.push(serde_json::to_string(&FleetLedgerRecord::TaskLeased { + run_id: task.entry.run_id.clone(), + task_id: task.entry.task_id.clone(), + worker_id: worker.clone(), + leased_at: task.leased_at.clone().unwrap_or_default(), + lease_expires_at: None, + })?); + } + if matches!( + task.status, + FleetTaskLedgerStatus::Completed + | FleetTaskLedgerStatus::Failed + | FleetTaskLedgerStatus::Cancelled + ) { + lines.push(serde_json::to_string( + &FleetLedgerRecord::TaskCompletedOrFailed { + run_id: task.entry.run_id.clone(), + task_id: task.entry.task_id.clone(), + worker_id: task.leased_to.clone().unwrap_or_default(), + timestamp: task.completed_at.clone().unwrap_or_default(), + }, + )?); + } + } + for receipt in state.receipts.values() { + lines.push(serde_json::to_string( + &FleetLedgerRecord::ReceiptRecorded { + receipt: receipt.clone(), + }, + )?); + } + let contents = lines.join("\n"); + if !contents.is_empty() { + std::fs::write(&tmp_path, contents)?; + std::fs::write(&tmp_path, "\n")?; + } else { + std::fs::write(&tmp_path, "")?; + } + std::fs::rename(&tmp_path, &self.ledger_path)?; + Ok(()) + } +} + +fn task_key(run_id: &str, task_id: &str) -> String { + format!("{}:{}", run_id, task_id) +} + +fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) { + match record { + FleetLedgerRecord::RunCreated { run } => { + state.runs.insert(run.id.0.clone(), run); + } + FleetLedgerRecord::RunStatusChanged { + run_id, + status, + timestamp: _, + } => { + state.run_status_overrides.insert(run_id.0, status); + } + FleetLedgerRecord::TaskEnqueued { entry } => { + let key = task_key(&entry.run_id.0, &entry.task_id); + state.tasks.entry(key).or_insert_with(|| FleetTaskState { + entry, + status: FleetTaskLedgerStatus::Enqueued, + leased_to: None, + leased_at: None, + completed_at: None, + }); + } + FleetLedgerRecord::TaskLeased { + run_id, + task_id, + worker_id, + leased_at, + lease_expires_at: _, + } => { + let key = task_key(&run_id.0, &task_id); + if let Some(task) = state.tasks.get_mut(&key) { + task.status = FleetTaskLedgerStatus::Leased; + task.leased_to = Some(worker_id); + task.leased_at = Some(leased_at); + } + } + FleetLedgerRecord::TaskCompletedOrFailed { + run_id, + task_id, + worker_id, + timestamp, + } => { + let key = task_key(&run_id.0, &task_id); + if let Some(task) = state.tasks.get_mut(&key) { + task.status = FleetTaskLedgerStatus::Completed; + task.leased_to = Some(worker_id); + task.completed_at = Some(timestamp); + } + } + FleetLedgerRecord::EventAppended { event } => { + let worker_key = event.worker_id.clone(); + let task_key = task_key(&event.run_id.0, &event.task_id); + let event_key = format!("{}:{}", worker_key, task_key); + if let Some(seq) = state.latest_seq.get(&event_key).copied() { + if event.seq > seq { + state.latest_seq.insert(event_key, event.seq); + } + } else { + state.latest_seq.insert(event_key, event.seq); + } + // Derive worker status from lifecycle events. + match &event.payload { + FleetWorkerEventPayload::Starting | FleetWorkerEventPayload::Running => { + state + .workers + .insert(event.worker_id.clone(), FleetWorkerStatus::Busy); + } + FleetWorkerEventPayload::Completed { .. } + | FleetWorkerEventPayload::Failed { .. } + | FleetWorkerEventPayload::Cancelled { .. } => { + state + .workers + .insert(event.worker_id.clone(), FleetWorkerStatus::Online); + } + _ => {} + } + } + FleetLedgerRecord::Heartbeat { + worker_id, + timestamp, + cpu_percent, + memory_mb, + } => { + state.heartbeats.insert( + worker_id.clone(), + FleetHeartbeatState { + timestamp, + cpu_percent, + memory_mb, + }, + ); + if state + .workers + .get(&worker_id) + .cloned() + .unwrap_or(FleetWorkerStatus::Unknown) + != FleetWorkerStatus::Busy + { + state.workers.insert(worker_id, FleetWorkerStatus::Online); + } + } + FleetLedgerRecord::ReceiptRecorded { receipt } => { + let key = task_key(&receipt.run_id.0, &receipt.task_id); + state.receipts.insert(key, receipt); + } + FleetLedgerRecord::AlertSent { .. } => { + // Alerts are audit-only for state reconstruction. + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + fn sample_run(id: &str) -> FleetRun { + FleetRun { + id: FleetRunId::from(id), + name: "smoke".to_string(), + status: FleetRunStatus::Running, + task_specs: vec![], + worker_specs: vec![], + labels: BTreeMap::new(), + created_at: "2026-06-12T17:00:00Z".to_string(), + updated_at: None, + completed_at: None, + } + } + + fn sample_entry(run_id: &str, task_id: &str) -> FleetInboxEntry { + FleetInboxEntry { + run_id: FleetRunId::from(run_id), + task_id: task_id.to_string(), + priority: 0, + enqueued_at: "2026-06-12T17:00:00Z".to_string(), + lease_deadline: None, + attempts: 0, + } + } + + #[test] + fn fleet_ledger_create_and_rebuild_run() { + let tmp = TempDir::new().unwrap(); + let ledger = FleetLedger::open(tmp.path()).unwrap(); + let run = sample_run("run-1"); + ledger.create_run(&run).unwrap(); + ledger + .update_run_status(&run.id, FleetRunStatus::Completed, "2026-06-12T18:00:00Z") + .unwrap(); + + let state = ledger.rebuild_state().unwrap(); + assert_eq!(state.runs.len(), 1); + assert_eq!( + state.run_status_overrides["run-1"], + FleetRunStatus::Completed + ); + } + + #[test] + fn fleet_ledger_enqueue_and_claim() { + let tmp = TempDir::new().unwrap(); + let ledger = FleetLedger::open(tmp.path()).unwrap(); + ledger.create_run(&sample_run("run-1")).unwrap(); + ledger.enqueue(sample_entry("run-1", "task-a")).unwrap(); + ledger.enqueue(sample_entry("run-1", "task-b")).unwrap(); + + let claimed = ledger + .claim_next("worker-1", &[], "2026-06-12T17:01:00Z") + .unwrap(); + assert!(claimed.is_some()); + let claimed = claimed.unwrap(); + assert_eq!(claimed.task_id, "task-a"); + + let state = ledger.rebuild_state().unwrap(); + assert_eq!(state.tasks.len(), 2); + assert_eq!( + state.tasks["run-1:task-a"].status, + FleetTaskLedgerStatus::Leased + ); + assert_eq!( + state.tasks["run-1:task-a"].leased_to.as_deref(), + Some("worker-1") + ); + assert_eq!( + state.tasks["run-1:task-b"].status, + FleetTaskLedgerStatus::Enqueued + ); + } + + #[test] + fn fleet_ledger_survives_restart() { + let tmp = TempDir::new().unwrap(); + { + let ledger = FleetLedger::open(tmp.path()).unwrap(); + ledger.create_run(&sample_run("run-1")).unwrap(); + ledger.enqueue(sample_entry("run-1", "task-a")).unwrap(); + ledger + .lease_task( + &FleetRunId::from("run-1"), + "task-a", + "worker-1", + "2026-06-12T17:01:00Z", + None, + ) + .unwrap(); + } + // Re-open simulates process restart. + let ledger = FleetLedger::open(tmp.path()).unwrap(); + let state = ledger.rebuild_state().unwrap(); + assert_eq!(state.runs.len(), 1); + assert_eq!( + state.tasks["run-1:task-a"].status, + FleetTaskLedgerStatus::Leased + ); + } + + #[test] + fn fleet_ledger_skips_partial_line() { + let tmp = TempDir::new().unwrap(); + let ledger = FleetLedger::open(tmp.path()).unwrap(); + ledger.create_run(&sample_run("run-1")).unwrap(); + // Append a truncated/invalid JSON line directly. + std::fs::write(ledger.path(), "{\"record\":\"run_created\",\"run\":\n").unwrap(); + // The previous good record is gone because we overwrote; verify it + // does not panic and returns empty state. + let state = ledger.rebuild_state().unwrap(); + assert!(state.runs.is_empty()); + } + + #[test] + fn fleet_ledger_event_and_heartbeat_reconstruct_worker_status() { + let tmp = TempDir::new().unwrap(); + let ledger = FleetLedger::open(tmp.path()).unwrap(); + ledger.create_run(&sample_run("run-1")).unwrap(); + ledger.enqueue(sample_entry("run-1", "task-a")).unwrap(); + ledger + .append_event(FleetWorkerEvent { + seq: 1, + run_id: FleetRunId::from("run-1"), + worker_id: "worker-1".to_string(), + task_id: "task-a".to_string(), + timestamp: "2026-06-12T17:01:00Z".to_string(), + payload: FleetWorkerEventPayload::Running, + extra: BTreeMap::new(), + }) + .unwrap(); + ledger + .heartbeat("worker-1", "2026-06-12T17:02:00Z", Some(12.5), Some(1024)) + .unwrap(); + + let state = ledger.rebuild_state().unwrap(); + assert_eq!(state.workers["worker-1"], FleetWorkerStatus::Busy); + assert_eq!(state.heartbeats["worker-1"].cpu_percent, Some(12.5)); + } + + #[test] + fn fleet_ledger_receipt_round_trip() { + let tmp = TempDir::new().unwrap(); + let ledger = FleetLedger::open(tmp.path()).unwrap(); + let receipt = FleetReceipt { + run_id: FleetRunId::from("run-1"), + task_id: "task-a".to_string(), + worker_id: "worker-1".to_string(), + completed_at: "2026-06-12T17:03:00Z".to_string(), + result: FleetTaskResult::Pass, + artifacts: vec![], + score: None, + }; + ledger.record_receipt(receipt.clone()).unwrap(); + let state = ledger.rebuild_state().unwrap(); + assert_eq!(state.receipts["run-1:task-a"].result, FleetTaskResult::Pass); + } +} diff --git a/crates/tui/src/fleet/mod.rs b/crates/tui/src/fleet/mod.rs new file mode 100644 index 00000000..60d82d32 --- /dev/null +++ b/crates/tui/src/fleet/mod.rs @@ -0,0 +1,3 @@ +//! Agent Fleet control plane — local-first manager, ledger, and workers. + +pub mod ledger; diff --git a/crates/tui/src/main.rs b/crates/tui/src/main.rs index 2e9cbf5c..5306afeb 100644 --- a/crates/tui/src/main.rs +++ b/crates/tui/src/main.rs @@ -37,6 +37,7 @@ mod error_taxonomy; mod eval; mod execpolicy; mod features; +mod fleet; mod handoff; mod hooks; mod llm_client; From 023434cb2a8d7bce059dd40775bc2914b74b527f Mon Sep 17 00:00:00 2001 From: Hunter B Date: Fri, 12 Jun 2026 17:56:26 -0700 Subject: [PATCH 3/3] fix(tui): harden fleet ledger replay and compaction --- crates/tui/src/fleet/ledger.rs | 285 +++++++++++++++++++++++++++------ 1 file changed, 236 insertions(+), 49 deletions(-) diff --git a/crates/tui/src/fleet/ledger.rs b/crates/tui/src/fleet/ledger.rs index e4eabd86..51a4cf1e 100644 --- a/crates/tui/src/fleet/ledger.rs +++ b/crates/tui/src/fleet/ledger.rs @@ -8,7 +8,8 @@ #![allow(dead_code)] use std::collections::BTreeMap; -use std::io::BufRead; +use std::fs::OpenOptions; +use std::io::{BufRead, Write}; use std::path::{Path, PathBuf}; use anyhow::{Context, Result}; @@ -46,6 +47,8 @@ pub enum FleetLedgerRecord { task_id: String, worker_id: String, timestamp: String, + #[serde(default = "default_terminal_task_status")] + status: FleetTaskLedgerStatus, }, EventAppended { event: FleetWorkerEvent, @@ -82,6 +85,8 @@ pub struct FleetLedgerState { pub heartbeats: BTreeMap, /// Latest event seq per worker_id:task_id. pub latest_seq: BTreeMap, + /// Latest event envelope per worker_id:run_id:task_id. + pub latest_events: BTreeMap, /// Completed receipts by run_id:task_id. pub receipts: BTreeMap, } @@ -95,7 +100,8 @@ pub struct FleetTaskState { pub completed_at: Option, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] pub enum FleetTaskLedgerStatus { Enqueued, Leased, @@ -104,6 +110,10 @@ pub enum FleetTaskLedgerStatus { Cancelled, } +fn default_terminal_task_status() -> FleetTaskLedgerStatus { + FleetTaskLedgerStatus::Completed +} + #[derive(Debug, Clone)] pub struct FleetHeartbeatState { pub timestamp: String, @@ -134,26 +144,21 @@ impl FleetLedger { &self.ledger_path } - /// Append a single record atomically by writing a temp file and renaming. + /// Append a single record without rewriting existing ledger contents. fn append_record(&self, record: &FleetLedgerRecord) -> Result<()> { - let line = serde_json::to_string(record).context("serializing fleet ledger record")?; - let tmp_path = self.ledger_path.with_extension(PARTIAL_SUFFIX); - // Read existing content, append new line, then atomically replace. - let mut contents = std::fs::read_to_string(&self.ledger_path).unwrap_or_default(); - if !contents.is_empty() && !contents.ends_with('\n') { - contents.push('\n'); - } - contents.push_str(&line); - contents.push('\n'); - std::fs::write(&tmp_path, contents) - .with_context(|| format!("writing fleet ledger tmp {}", tmp_path.display()))?; - std::fs::rename(&tmp_path, &self.ledger_path).with_context(|| { - format!( - "renaming fleet ledger {} -> {}", - tmp_path.display(), - self.ledger_path.display() - ) - })?; + let mut line = serde_json::to_string(record).context("serializing fleet ledger record")?; + line.push('\n'); + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(&self.ledger_path) + .with_context(|| format!("opening fleet ledger {}", self.ledger_path.display()))?; + file.write_all(line.as_bytes()) + .with_context(|| format!("appending fleet ledger {}", self.ledger_path.display()))?; + file.flush() + .with_context(|| format!("flushing fleet ledger {}", self.ledger_path.display()))?; + file.sync_data() + .with_context(|| format!("syncing fleet ledger {}", self.ledger_path.display()))?; Ok(()) } @@ -209,6 +214,7 @@ impl FleetLedger { task_id: task_id.to_string(), worker_id: worker_id.to_string(), timestamp: timestamp.to_string(), + status: FleetTaskLedgerStatus::Completed, }) } @@ -358,10 +364,24 @@ impl FleetLedger { task_id: task.entry.task_id.clone(), worker_id: task.leased_to.clone().unwrap_or_default(), timestamp: task.completed_at.clone().unwrap_or_default(), + status: task.status, }, )?); } } + for event in state.latest_events.values() { + lines.push(serde_json::to_string(&FleetLedgerRecord::EventAppended { + event: event.clone(), + })?); + } + for (worker_id, heartbeat) in &state.heartbeats { + lines.push(serde_json::to_string(&FleetLedgerRecord::Heartbeat { + worker_id: worker_id.clone(), + timestamp: heartbeat.timestamp.clone(), + cpu_percent: heartbeat.cpu_percent, + memory_mb: heartbeat.memory_mb, + })?); + } for receipt in state.receipts.values() { lines.push(serde_json::to_string( &FleetLedgerRecord::ReceiptRecorded { @@ -369,13 +389,11 @@ impl FleetLedger { }, )?); } - let contents = lines.join("\n"); + let mut contents = lines.join("\n"); if !contents.is_empty() { - std::fs::write(&tmp_path, contents)?; - std::fs::write(&tmp_path, "\n")?; - } else { - std::fs::write(&tmp_path, "")?; + contents.push('\n'); } + std::fs::write(&tmp_path, contents)?; std::fs::rename(&tmp_path, &self.ledger_path)?; Ok(()) } @@ -385,6 +403,26 @@ fn task_key(run_id: &str, task_id: &str) -> String { format!("{}:{}", run_id, task_id) } +fn event_key(worker_id: &str, run_id: &str, task_id: &str) -> String { + format!("{}:{}:{}", worker_id, run_id, task_id) +} + +fn mark_task_terminal( + state: &mut FleetLedgerState, + run_id: &FleetRunId, + task_id: &str, + worker_id: &str, + timestamp: &str, + status: FleetTaskLedgerStatus, +) { + let key = task_key(&run_id.0, task_id); + if let Some(task) = state.tasks.get_mut(&key) { + task.status = status; + task.leased_to = Some(worker_id.to_string()); + task.completed_at = Some(timestamp.to_string()); + } +} + fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) { match record { FleetLedgerRecord::RunCreated { run } => { @@ -426,24 +464,20 @@ fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) { task_id, worker_id, timestamp, + status, } => { - let key = task_key(&run_id.0, &task_id); - if let Some(task) = state.tasks.get_mut(&key) { - task.status = FleetTaskLedgerStatus::Completed; - task.leased_to = Some(worker_id); - task.completed_at = Some(timestamp); - } + mark_task_terminal(state, &run_id, &task_id, &worker_id, ×tamp, status); } FleetLedgerRecord::EventAppended { event } => { - let worker_key = event.worker_id.clone(); - let task_key = task_key(&event.run_id.0, &event.task_id); - let event_key = format!("{}:{}", worker_key, task_key); - if let Some(seq) = state.latest_seq.get(&event_key).copied() { - if event.seq > seq { - state.latest_seq.insert(event_key, event.seq); - } - } else { - state.latest_seq.insert(event_key, event.seq); + let event_key = event_key(&event.worker_id, &event.run_id.0, &event.task_id); + if state + .latest_seq + .get(&event_key) + .copied() + .is_none_or(|seq| event.seq > seq) + { + state.latest_seq.insert(event_key.clone(), event.seq); + state.latest_events.insert(event_key, event.clone()); } // Derive worker status from lifecycle events. match &event.payload { @@ -452,9 +486,41 @@ fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) { .workers .insert(event.worker_id.clone(), FleetWorkerStatus::Busy); } - FleetWorkerEventPayload::Completed { .. } - | FleetWorkerEventPayload::Failed { .. } - | FleetWorkerEventPayload::Cancelled { .. } => { + FleetWorkerEventPayload::Completed { .. } => { + mark_task_terminal( + state, + &event.run_id, + &event.task_id, + &event.worker_id, + &event.timestamp, + FleetTaskLedgerStatus::Completed, + ); + state + .workers + .insert(event.worker_id.clone(), FleetWorkerStatus::Online); + } + FleetWorkerEventPayload::Failed { .. } => { + mark_task_terminal( + state, + &event.run_id, + &event.task_id, + &event.worker_id, + &event.timestamp, + FleetTaskLedgerStatus::Failed, + ); + state + .workers + .insert(event.worker_id.clone(), FleetWorkerStatus::Online); + } + FleetWorkerEventPayload::Cancelled { .. } => { + mark_task_terminal( + state, + &event.run_id, + &event.task_id, + &event.worker_id, + &event.timestamp, + FleetTaskLedgerStatus::Cancelled, + ); state .workers .insert(event.worker_id.clone(), FleetWorkerStatus::Online); @@ -607,12 +673,14 @@ mod tests { let tmp = TempDir::new().unwrap(); let ledger = FleetLedger::open(tmp.path()).unwrap(); ledger.create_run(&sample_run("run-1")).unwrap(); - // Append a truncated/invalid JSON line directly. - std::fs::write(ledger.path(), "{\"record\":\"run_created\",\"run\":\n").unwrap(); - // The previous good record is gone because we overwrote; verify it - // does not panic and returns empty state. + // Append a truncated/invalid JSON line directly; replay should keep + // the earlier valid record and skip only the partial line. + let mut file = OpenOptions::new().append(true).open(ledger.path()).unwrap(); + writeln!(file, "{{\"record\":\"run_created\",\"run\":").unwrap(); + let state = ledger.rebuild_state().unwrap(); - assert!(state.runs.is_empty()); + assert_eq!(state.runs.len(), 1); + assert!(state.runs.contains_key("run-1")); } #[test] @@ -641,6 +709,125 @@ mod tests { assert_eq!(state.heartbeats["worker-1"].cpu_percent, Some(12.5)); } + #[test] + fn fleet_ledger_terminal_events_preserve_failed_and_cancelled_status() { + let tmp = TempDir::new().unwrap(); + let ledger = FleetLedger::open(tmp.path()).unwrap(); + ledger.create_run(&sample_run("run-1")).unwrap(); + ledger + .enqueue(sample_entry("run-1", "task-failed")) + .unwrap(); + ledger + .enqueue(sample_entry("run-1", "task-cancelled")) + .unwrap(); + + ledger + .append_event(FleetWorkerEvent { + seq: 1, + run_id: FleetRunId::from("run-1"), + worker_id: "worker-1".to_string(), + task_id: "task-failed".to_string(), + timestamp: "2026-06-12T17:03:00Z".to_string(), + payload: FleetWorkerEventPayload::Failed { + reason: "test failed".to_string(), + recoverable: false, + }, + extra: BTreeMap::new(), + }) + .unwrap(); + ledger + .append_event(FleetWorkerEvent { + seq: 2, + run_id: FleetRunId::from("run-1"), + worker_id: "worker-2".to_string(), + task_id: "task-cancelled".to_string(), + timestamp: "2026-06-12T17:04:00Z".to_string(), + payload: FleetWorkerEventPayload::Cancelled { + cancelled_by: Some("operator".to_string()), + }, + extra: BTreeMap::new(), + }) + .unwrap(); + + let state = ledger.rebuild_state().unwrap(); + assert_eq!( + state.tasks["run-1:task-failed"].status, + FleetTaskLedgerStatus::Failed + ); + assert_eq!( + state.tasks["run-1:task-cancelled"].status, + FleetTaskLedgerStatus::Cancelled + ); + + ledger.compact().unwrap(); + let state = ledger.rebuild_state().unwrap(); + assert_eq!( + state.tasks["run-1:task-failed"].status, + FleetTaskLedgerStatus::Failed + ); + assert_eq!( + state.tasks["run-1:task-cancelled"].status, + FleetTaskLedgerStatus::Cancelled + ); + } + + #[test] + fn fleet_ledger_compact_preserves_current_state() { + let tmp = TempDir::new().unwrap(); + let ledger = FleetLedger::open(tmp.path()).unwrap(); + ledger.create_run(&sample_run("run-1")).unwrap(); + ledger.enqueue(sample_entry("run-1", "task-a")).unwrap(); + ledger + .lease_task( + &FleetRunId::from("run-1"), + "task-a", + "worker-1", + "2026-06-12T17:01:00Z", + None, + ) + .unwrap(); + ledger + .append_event(FleetWorkerEvent { + seq: 7, + run_id: FleetRunId::from("run-1"), + worker_id: "worker-1".to_string(), + task_id: "task-a".to_string(), + timestamp: "2026-06-12T17:01:30Z".to_string(), + payload: FleetWorkerEventPayload::Running, + extra: BTreeMap::new(), + }) + .unwrap(); + ledger + .heartbeat("worker-1", "2026-06-12T17:02:00Z", Some(12.5), Some(1024)) + .unwrap(); + ledger + .record_receipt(FleetReceipt { + run_id: FleetRunId::from("run-1"), + task_id: "task-a".to_string(), + worker_id: "worker-1".to_string(), + completed_at: "2026-06-12T17:03:00Z".to_string(), + result: FleetTaskResult::Pass, + artifacts: vec![], + score: None, + }) + .unwrap(); + + ledger.compact().unwrap(); + let contents = std::fs::read_to_string(ledger.path()).unwrap(); + assert!(contents.lines().count() >= 5, "{contents}"); + + let state = ledger.rebuild_state().unwrap(); + assert_eq!(state.runs.len(), 1); + assert_eq!( + state.tasks["run-1:task-a"].status, + FleetTaskLedgerStatus::Leased + ); + assert_eq!(state.workers["worker-1"], FleetWorkerStatus::Busy); + assert_eq!(state.heartbeats["worker-1"].memory_mb, Some(1024)); + assert!(state.latest_seq.values().any(|seq| *seq == 7)); + assert_eq!(state.receipts["run-1:task-a"].result, FleetTaskResult::Pass); + } + #[test] fn fleet_ledger_receipt_round_trip() { let tmp = TempDir::new().unwrap();