fix(tui): harden fleet ledger replay and compaction

This commit is contained in:
Hunter B
2026-06-12 17:56:26 -07:00
parent b0c876e5ac
commit 023434cb2a
+236 -49
View File
@@ -8,7 +8,8 @@
#![allow(dead_code)]
use std::collections::BTreeMap;
use std::io::BufRead;
use std::fs::OpenOptions;
use std::io::{BufRead, Write};
use std::path::{Path, PathBuf};
use anyhow::{Context, Result};
@@ -46,6 +47,8 @@ pub enum FleetLedgerRecord {
task_id: String,
worker_id: String,
timestamp: String,
#[serde(default = "default_terminal_task_status")]
status: FleetTaskLedgerStatus,
},
EventAppended {
event: FleetWorkerEvent,
@@ -82,6 +85,8 @@ pub struct FleetLedgerState {
pub heartbeats: BTreeMap<String, FleetHeartbeatState>,
/// Latest event seq per worker_id:task_id.
pub latest_seq: BTreeMap<String, u64>,
/// Latest event envelope per worker_id:run_id:task_id.
pub latest_events: BTreeMap<String, FleetWorkerEvent>,
/// Completed receipts by run_id:task_id.
pub receipts: BTreeMap<String, FleetReceipt>,
}
@@ -95,7 +100,8 @@ pub struct FleetTaskState {
pub completed_at: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum FleetTaskLedgerStatus {
Enqueued,
Leased,
@@ -104,6 +110,10 @@ pub enum FleetTaskLedgerStatus {
Cancelled,
}
fn default_terminal_task_status() -> FleetTaskLedgerStatus {
FleetTaskLedgerStatus::Completed
}
#[derive(Debug, Clone)]
pub struct FleetHeartbeatState {
pub timestamp: String,
@@ -134,26 +144,21 @@ impl FleetLedger {
&self.ledger_path
}
/// Append a single record atomically by writing a temp file and renaming.
/// Append a single record without rewriting existing ledger contents.
fn append_record(&self, record: &FleetLedgerRecord) -> Result<()> {
let line = serde_json::to_string(record).context("serializing fleet ledger record")?;
let tmp_path = self.ledger_path.with_extension(PARTIAL_SUFFIX);
// Read existing content, append new line, then atomically replace.
let mut contents = std::fs::read_to_string(&self.ledger_path).unwrap_or_default();
if !contents.is_empty() && !contents.ends_with('\n') {
contents.push('\n');
}
contents.push_str(&line);
contents.push('\n');
std::fs::write(&tmp_path, contents)
.with_context(|| format!("writing fleet ledger tmp {}", tmp_path.display()))?;
std::fs::rename(&tmp_path, &self.ledger_path).with_context(|| {
format!(
"renaming fleet ledger {} -> {}",
tmp_path.display(),
self.ledger_path.display()
)
})?;
let mut line = serde_json::to_string(record).context("serializing fleet ledger record")?;
line.push('\n');
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.ledger_path)
.with_context(|| format!("opening fleet ledger {}", self.ledger_path.display()))?;
file.write_all(line.as_bytes())
.with_context(|| format!("appending fleet ledger {}", self.ledger_path.display()))?;
file.flush()
.with_context(|| format!("flushing fleet ledger {}", self.ledger_path.display()))?;
file.sync_data()
.with_context(|| format!("syncing fleet ledger {}", self.ledger_path.display()))?;
Ok(())
}
@@ -209,6 +214,7 @@ impl FleetLedger {
task_id: task_id.to_string(),
worker_id: worker_id.to_string(),
timestamp: timestamp.to_string(),
status: FleetTaskLedgerStatus::Completed,
})
}
@@ -358,10 +364,24 @@ impl FleetLedger {
task_id: task.entry.task_id.clone(),
worker_id: task.leased_to.clone().unwrap_or_default(),
timestamp: task.completed_at.clone().unwrap_or_default(),
status: task.status,
},
)?);
}
}
for event in state.latest_events.values() {
lines.push(serde_json::to_string(&FleetLedgerRecord::EventAppended {
event: event.clone(),
})?);
}
for (worker_id, heartbeat) in &state.heartbeats {
lines.push(serde_json::to_string(&FleetLedgerRecord::Heartbeat {
worker_id: worker_id.clone(),
timestamp: heartbeat.timestamp.clone(),
cpu_percent: heartbeat.cpu_percent,
memory_mb: heartbeat.memory_mb,
})?);
}
for receipt in state.receipts.values() {
lines.push(serde_json::to_string(
&FleetLedgerRecord::ReceiptRecorded {
@@ -369,13 +389,11 @@ impl FleetLedger {
},
)?);
}
let contents = lines.join("\n");
let mut contents = lines.join("\n");
if !contents.is_empty() {
std::fs::write(&tmp_path, contents)?;
std::fs::write(&tmp_path, "\n")?;
} else {
std::fs::write(&tmp_path, "")?;
contents.push('\n');
}
std::fs::write(&tmp_path, contents)?;
std::fs::rename(&tmp_path, &self.ledger_path)?;
Ok(())
}
@@ -385,6 +403,26 @@ fn task_key(run_id: &str, task_id: &str) -> String {
format!("{}:{}", run_id, task_id)
}
fn event_key(worker_id: &str, run_id: &str, task_id: &str) -> String {
format!("{}:{}:{}", worker_id, run_id, task_id)
}
fn mark_task_terminal(
state: &mut FleetLedgerState,
run_id: &FleetRunId,
task_id: &str,
worker_id: &str,
timestamp: &str,
status: FleetTaskLedgerStatus,
) {
let key = task_key(&run_id.0, task_id);
if let Some(task) = state.tasks.get_mut(&key) {
task.status = status;
task.leased_to = Some(worker_id.to_string());
task.completed_at = Some(timestamp.to_string());
}
}
fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) {
match record {
FleetLedgerRecord::RunCreated { run } => {
@@ -426,24 +464,20 @@ fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) {
task_id,
worker_id,
timestamp,
status,
} => {
let key = task_key(&run_id.0, &task_id);
if let Some(task) = state.tasks.get_mut(&key) {
task.status = FleetTaskLedgerStatus::Completed;
task.leased_to = Some(worker_id);
task.completed_at = Some(timestamp);
}
mark_task_terminal(state, &run_id, &task_id, &worker_id, &timestamp, status);
}
FleetLedgerRecord::EventAppended { event } => {
let worker_key = event.worker_id.clone();
let task_key = task_key(&event.run_id.0, &event.task_id);
let event_key = format!("{}:{}", worker_key, task_key);
if let Some(seq) = state.latest_seq.get(&event_key).copied() {
if event.seq > seq {
state.latest_seq.insert(event_key, event.seq);
}
} else {
state.latest_seq.insert(event_key, event.seq);
let event_key = event_key(&event.worker_id, &event.run_id.0, &event.task_id);
if state
.latest_seq
.get(&event_key)
.copied()
.is_none_or(|seq| event.seq > seq)
{
state.latest_seq.insert(event_key.clone(), event.seq);
state.latest_events.insert(event_key, event.clone());
}
// Derive worker status from lifecycle events.
match &event.payload {
@@ -452,9 +486,41 @@ fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) {
.workers
.insert(event.worker_id.clone(), FleetWorkerStatus::Busy);
}
FleetWorkerEventPayload::Completed { .. }
| FleetWorkerEventPayload::Failed { .. }
| FleetWorkerEventPayload::Cancelled { .. } => {
FleetWorkerEventPayload::Completed { .. } => {
mark_task_terminal(
state,
&event.run_id,
&event.task_id,
&event.worker_id,
&event.timestamp,
FleetTaskLedgerStatus::Completed,
);
state
.workers
.insert(event.worker_id.clone(), FleetWorkerStatus::Online);
}
FleetWorkerEventPayload::Failed { .. } => {
mark_task_terminal(
state,
&event.run_id,
&event.task_id,
&event.worker_id,
&event.timestamp,
FleetTaskLedgerStatus::Failed,
);
state
.workers
.insert(event.worker_id.clone(), FleetWorkerStatus::Online);
}
FleetWorkerEventPayload::Cancelled { .. } => {
mark_task_terminal(
state,
&event.run_id,
&event.task_id,
&event.worker_id,
&event.timestamp,
FleetTaskLedgerStatus::Cancelled,
);
state
.workers
.insert(event.worker_id.clone(), FleetWorkerStatus::Online);
@@ -607,12 +673,14 @@ mod tests {
let tmp = TempDir::new().unwrap();
let ledger = FleetLedger::open(tmp.path()).unwrap();
ledger.create_run(&sample_run("run-1")).unwrap();
// Append a truncated/invalid JSON line directly.
std::fs::write(ledger.path(), "{\"record\":\"run_created\",\"run\":\n").unwrap();
// The previous good record is gone because we overwrote; verify it
// does not panic and returns empty state.
// Append a truncated/invalid JSON line directly; replay should keep
// the earlier valid record and skip only the partial line.
let mut file = OpenOptions::new().append(true).open(ledger.path()).unwrap();
writeln!(file, "{{\"record\":\"run_created\",\"run\":").unwrap();
let state = ledger.rebuild_state().unwrap();
assert!(state.runs.is_empty());
assert_eq!(state.runs.len(), 1);
assert!(state.runs.contains_key("run-1"));
}
#[test]
@@ -641,6 +709,125 @@ mod tests {
assert_eq!(state.heartbeats["worker-1"].cpu_percent, Some(12.5));
}
#[test]
fn fleet_ledger_terminal_events_preserve_failed_and_cancelled_status() {
let tmp = TempDir::new().unwrap();
let ledger = FleetLedger::open(tmp.path()).unwrap();
ledger.create_run(&sample_run("run-1")).unwrap();
ledger
.enqueue(sample_entry("run-1", "task-failed"))
.unwrap();
ledger
.enqueue(sample_entry("run-1", "task-cancelled"))
.unwrap();
ledger
.append_event(FleetWorkerEvent {
seq: 1,
run_id: FleetRunId::from("run-1"),
worker_id: "worker-1".to_string(),
task_id: "task-failed".to_string(),
timestamp: "2026-06-12T17:03:00Z".to_string(),
payload: FleetWorkerEventPayload::Failed {
reason: "test failed".to_string(),
recoverable: false,
},
extra: BTreeMap::new(),
})
.unwrap();
ledger
.append_event(FleetWorkerEvent {
seq: 2,
run_id: FleetRunId::from("run-1"),
worker_id: "worker-2".to_string(),
task_id: "task-cancelled".to_string(),
timestamp: "2026-06-12T17:04:00Z".to_string(),
payload: FleetWorkerEventPayload::Cancelled {
cancelled_by: Some("operator".to_string()),
},
extra: BTreeMap::new(),
})
.unwrap();
let state = ledger.rebuild_state().unwrap();
assert_eq!(
state.tasks["run-1:task-failed"].status,
FleetTaskLedgerStatus::Failed
);
assert_eq!(
state.tasks["run-1:task-cancelled"].status,
FleetTaskLedgerStatus::Cancelled
);
ledger.compact().unwrap();
let state = ledger.rebuild_state().unwrap();
assert_eq!(
state.tasks["run-1:task-failed"].status,
FleetTaskLedgerStatus::Failed
);
assert_eq!(
state.tasks["run-1:task-cancelled"].status,
FleetTaskLedgerStatus::Cancelled
);
}
#[test]
fn fleet_ledger_compact_preserves_current_state() {
let tmp = TempDir::new().unwrap();
let ledger = FleetLedger::open(tmp.path()).unwrap();
ledger.create_run(&sample_run("run-1")).unwrap();
ledger.enqueue(sample_entry("run-1", "task-a")).unwrap();
ledger
.lease_task(
&FleetRunId::from("run-1"),
"task-a",
"worker-1",
"2026-06-12T17:01:00Z",
None,
)
.unwrap();
ledger
.append_event(FleetWorkerEvent {
seq: 7,
run_id: FleetRunId::from("run-1"),
worker_id: "worker-1".to_string(),
task_id: "task-a".to_string(),
timestamp: "2026-06-12T17:01:30Z".to_string(),
payload: FleetWorkerEventPayload::Running,
extra: BTreeMap::new(),
})
.unwrap();
ledger
.heartbeat("worker-1", "2026-06-12T17:02:00Z", Some(12.5), Some(1024))
.unwrap();
ledger
.record_receipt(FleetReceipt {
run_id: FleetRunId::from("run-1"),
task_id: "task-a".to_string(),
worker_id: "worker-1".to_string(),
completed_at: "2026-06-12T17:03:00Z".to_string(),
result: FleetTaskResult::Pass,
artifacts: vec![],
score: None,
})
.unwrap();
ledger.compact().unwrap();
let contents = std::fs::read_to_string(ledger.path()).unwrap();
assert!(contents.lines().count() >= 5, "{contents}");
let state = ledger.rebuild_state().unwrap();
assert_eq!(state.runs.len(), 1);
assert_eq!(
state.tasks["run-1:task-a"].status,
FleetTaskLedgerStatus::Leased
);
assert_eq!(state.workers["worker-1"], FleetWorkerStatus::Busy);
assert_eq!(state.heartbeats["worker-1"].memory_mb, Some(1024));
assert!(state.latest_seq.values().any(|seq| *seq == 7));
assert_eq!(state.receipts["run-1:task-a"].result, FleetTaskResult::Pass);
}
#[test]
fn fleet_ledger_receipt_round_trip() {
let tmp = TempDir::new().unwrap();