diff --git a/crates/tui/src/fleet/ledger.rs b/crates/tui/src/fleet/ledger.rs index e4eabd86..51a4cf1e 100644 --- a/crates/tui/src/fleet/ledger.rs +++ b/crates/tui/src/fleet/ledger.rs @@ -8,7 +8,8 @@ #![allow(dead_code)] use std::collections::BTreeMap; -use std::io::BufRead; +use std::fs::OpenOptions; +use std::io::{BufRead, Write}; use std::path::{Path, PathBuf}; use anyhow::{Context, Result}; @@ -46,6 +47,8 @@ pub enum FleetLedgerRecord { task_id: String, worker_id: String, timestamp: String, + #[serde(default = "default_terminal_task_status")] + status: FleetTaskLedgerStatus, }, EventAppended { event: FleetWorkerEvent, @@ -82,6 +85,8 @@ pub struct FleetLedgerState { pub heartbeats: BTreeMap, /// Latest event seq per worker_id:task_id. pub latest_seq: BTreeMap, + /// Latest event envelope per worker_id:run_id:task_id. + pub latest_events: BTreeMap, /// Completed receipts by run_id:task_id. pub receipts: BTreeMap, } @@ -95,7 +100,8 @@ pub struct FleetTaskState { pub completed_at: Option, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] pub enum FleetTaskLedgerStatus { Enqueued, Leased, @@ -104,6 +110,10 @@ pub enum FleetTaskLedgerStatus { Cancelled, } +fn default_terminal_task_status() -> FleetTaskLedgerStatus { + FleetTaskLedgerStatus::Completed +} + #[derive(Debug, Clone)] pub struct FleetHeartbeatState { pub timestamp: String, @@ -134,26 +144,21 @@ impl FleetLedger { &self.ledger_path } - /// Append a single record atomically by writing a temp file and renaming. + /// Append a single record without rewriting existing ledger contents. fn append_record(&self, record: &FleetLedgerRecord) -> Result<()> { - let line = serde_json::to_string(record).context("serializing fleet ledger record")?; - let tmp_path = self.ledger_path.with_extension(PARTIAL_SUFFIX); - // Read existing content, append new line, then atomically replace. - let mut contents = std::fs::read_to_string(&self.ledger_path).unwrap_or_default(); - if !contents.is_empty() && !contents.ends_with('\n') { - contents.push('\n'); - } - contents.push_str(&line); - contents.push('\n'); - std::fs::write(&tmp_path, contents) - .with_context(|| format!("writing fleet ledger tmp {}", tmp_path.display()))?; - std::fs::rename(&tmp_path, &self.ledger_path).with_context(|| { - format!( - "renaming fleet ledger {} -> {}", - tmp_path.display(), - self.ledger_path.display() - ) - })?; + let mut line = serde_json::to_string(record).context("serializing fleet ledger record")?; + line.push('\n'); + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(&self.ledger_path) + .with_context(|| format!("opening fleet ledger {}", self.ledger_path.display()))?; + file.write_all(line.as_bytes()) + .with_context(|| format!("appending fleet ledger {}", self.ledger_path.display()))?; + file.flush() + .with_context(|| format!("flushing fleet ledger {}", self.ledger_path.display()))?; + file.sync_data() + .with_context(|| format!("syncing fleet ledger {}", self.ledger_path.display()))?; Ok(()) } @@ -209,6 +214,7 @@ impl FleetLedger { task_id: task_id.to_string(), worker_id: worker_id.to_string(), timestamp: timestamp.to_string(), + status: FleetTaskLedgerStatus::Completed, }) } @@ -358,10 +364,24 @@ impl FleetLedger { task_id: task.entry.task_id.clone(), worker_id: task.leased_to.clone().unwrap_or_default(), timestamp: task.completed_at.clone().unwrap_or_default(), + status: task.status, }, )?); } } + for event in state.latest_events.values() { + lines.push(serde_json::to_string(&FleetLedgerRecord::EventAppended { + event: event.clone(), + })?); + } + for (worker_id, heartbeat) in &state.heartbeats { + lines.push(serde_json::to_string(&FleetLedgerRecord::Heartbeat { + worker_id: worker_id.clone(), + timestamp: heartbeat.timestamp.clone(), + cpu_percent: heartbeat.cpu_percent, + memory_mb: heartbeat.memory_mb, + })?); + } for receipt in state.receipts.values() { lines.push(serde_json::to_string( &FleetLedgerRecord::ReceiptRecorded { @@ -369,13 +389,11 @@ impl FleetLedger { }, )?); } - let contents = lines.join("\n"); + let mut contents = lines.join("\n"); if !contents.is_empty() { - std::fs::write(&tmp_path, contents)?; - std::fs::write(&tmp_path, "\n")?; - } else { - std::fs::write(&tmp_path, "")?; + contents.push('\n'); } + std::fs::write(&tmp_path, contents)?; std::fs::rename(&tmp_path, &self.ledger_path)?; Ok(()) } @@ -385,6 +403,26 @@ fn task_key(run_id: &str, task_id: &str) -> String { format!("{}:{}", run_id, task_id) } +fn event_key(worker_id: &str, run_id: &str, task_id: &str) -> String { + format!("{}:{}:{}", worker_id, run_id, task_id) +} + +fn mark_task_terminal( + state: &mut FleetLedgerState, + run_id: &FleetRunId, + task_id: &str, + worker_id: &str, + timestamp: &str, + status: FleetTaskLedgerStatus, +) { + let key = task_key(&run_id.0, task_id); + if let Some(task) = state.tasks.get_mut(&key) { + task.status = status; + task.leased_to = Some(worker_id.to_string()); + task.completed_at = Some(timestamp.to_string()); + } +} + fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) { match record { FleetLedgerRecord::RunCreated { run } => { @@ -426,24 +464,20 @@ fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) { task_id, worker_id, timestamp, + status, } => { - let key = task_key(&run_id.0, &task_id); - if let Some(task) = state.tasks.get_mut(&key) { - task.status = FleetTaskLedgerStatus::Completed; - task.leased_to = Some(worker_id); - task.completed_at = Some(timestamp); - } + mark_task_terminal(state, &run_id, &task_id, &worker_id, ×tamp, status); } FleetLedgerRecord::EventAppended { event } => { - let worker_key = event.worker_id.clone(); - let task_key = task_key(&event.run_id.0, &event.task_id); - let event_key = format!("{}:{}", worker_key, task_key); - if let Some(seq) = state.latest_seq.get(&event_key).copied() { - if event.seq > seq { - state.latest_seq.insert(event_key, event.seq); - } - } else { - state.latest_seq.insert(event_key, event.seq); + let event_key = event_key(&event.worker_id, &event.run_id.0, &event.task_id); + if state + .latest_seq + .get(&event_key) + .copied() + .is_none_or(|seq| event.seq > seq) + { + state.latest_seq.insert(event_key.clone(), event.seq); + state.latest_events.insert(event_key, event.clone()); } // Derive worker status from lifecycle events. match &event.payload { @@ -452,9 +486,41 @@ fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) { .workers .insert(event.worker_id.clone(), FleetWorkerStatus::Busy); } - FleetWorkerEventPayload::Completed { .. } - | FleetWorkerEventPayload::Failed { .. } - | FleetWorkerEventPayload::Cancelled { .. } => { + FleetWorkerEventPayload::Completed { .. } => { + mark_task_terminal( + state, + &event.run_id, + &event.task_id, + &event.worker_id, + &event.timestamp, + FleetTaskLedgerStatus::Completed, + ); + state + .workers + .insert(event.worker_id.clone(), FleetWorkerStatus::Online); + } + FleetWorkerEventPayload::Failed { .. } => { + mark_task_terminal( + state, + &event.run_id, + &event.task_id, + &event.worker_id, + &event.timestamp, + FleetTaskLedgerStatus::Failed, + ); + state + .workers + .insert(event.worker_id.clone(), FleetWorkerStatus::Online); + } + FleetWorkerEventPayload::Cancelled { .. } => { + mark_task_terminal( + state, + &event.run_id, + &event.task_id, + &event.worker_id, + &event.timestamp, + FleetTaskLedgerStatus::Cancelled, + ); state .workers .insert(event.worker_id.clone(), FleetWorkerStatus::Online); @@ -607,12 +673,14 @@ mod tests { let tmp = TempDir::new().unwrap(); let ledger = FleetLedger::open(tmp.path()).unwrap(); ledger.create_run(&sample_run("run-1")).unwrap(); - // Append a truncated/invalid JSON line directly. - std::fs::write(ledger.path(), "{\"record\":\"run_created\",\"run\":\n").unwrap(); - // The previous good record is gone because we overwrote; verify it - // does not panic and returns empty state. + // Append a truncated/invalid JSON line directly; replay should keep + // the earlier valid record and skip only the partial line. + let mut file = OpenOptions::new().append(true).open(ledger.path()).unwrap(); + writeln!(file, "{{\"record\":\"run_created\",\"run\":").unwrap(); + let state = ledger.rebuild_state().unwrap(); - assert!(state.runs.is_empty()); + assert_eq!(state.runs.len(), 1); + assert!(state.runs.contains_key("run-1")); } #[test] @@ -641,6 +709,125 @@ mod tests { assert_eq!(state.heartbeats["worker-1"].cpu_percent, Some(12.5)); } + #[test] + fn fleet_ledger_terminal_events_preserve_failed_and_cancelled_status() { + let tmp = TempDir::new().unwrap(); + let ledger = FleetLedger::open(tmp.path()).unwrap(); + ledger.create_run(&sample_run("run-1")).unwrap(); + ledger + .enqueue(sample_entry("run-1", "task-failed")) + .unwrap(); + ledger + .enqueue(sample_entry("run-1", "task-cancelled")) + .unwrap(); + + ledger + .append_event(FleetWorkerEvent { + seq: 1, + run_id: FleetRunId::from("run-1"), + worker_id: "worker-1".to_string(), + task_id: "task-failed".to_string(), + timestamp: "2026-06-12T17:03:00Z".to_string(), + payload: FleetWorkerEventPayload::Failed { + reason: "test failed".to_string(), + recoverable: false, + }, + extra: BTreeMap::new(), + }) + .unwrap(); + ledger + .append_event(FleetWorkerEvent { + seq: 2, + run_id: FleetRunId::from("run-1"), + worker_id: "worker-2".to_string(), + task_id: "task-cancelled".to_string(), + timestamp: "2026-06-12T17:04:00Z".to_string(), + payload: FleetWorkerEventPayload::Cancelled { + cancelled_by: Some("operator".to_string()), + }, + extra: BTreeMap::new(), + }) + .unwrap(); + + let state = ledger.rebuild_state().unwrap(); + assert_eq!( + state.tasks["run-1:task-failed"].status, + FleetTaskLedgerStatus::Failed + ); + assert_eq!( + state.tasks["run-1:task-cancelled"].status, + FleetTaskLedgerStatus::Cancelled + ); + + ledger.compact().unwrap(); + let state = ledger.rebuild_state().unwrap(); + assert_eq!( + state.tasks["run-1:task-failed"].status, + FleetTaskLedgerStatus::Failed + ); + assert_eq!( + state.tasks["run-1:task-cancelled"].status, + FleetTaskLedgerStatus::Cancelled + ); + } + + #[test] + fn fleet_ledger_compact_preserves_current_state() { + let tmp = TempDir::new().unwrap(); + let ledger = FleetLedger::open(tmp.path()).unwrap(); + ledger.create_run(&sample_run("run-1")).unwrap(); + ledger.enqueue(sample_entry("run-1", "task-a")).unwrap(); + ledger + .lease_task( + &FleetRunId::from("run-1"), + "task-a", + "worker-1", + "2026-06-12T17:01:00Z", + None, + ) + .unwrap(); + ledger + .append_event(FleetWorkerEvent { + seq: 7, + run_id: FleetRunId::from("run-1"), + worker_id: "worker-1".to_string(), + task_id: "task-a".to_string(), + timestamp: "2026-06-12T17:01:30Z".to_string(), + payload: FleetWorkerEventPayload::Running, + extra: BTreeMap::new(), + }) + .unwrap(); + ledger + .heartbeat("worker-1", "2026-06-12T17:02:00Z", Some(12.5), Some(1024)) + .unwrap(); + ledger + .record_receipt(FleetReceipt { + run_id: FleetRunId::from("run-1"), + task_id: "task-a".to_string(), + worker_id: "worker-1".to_string(), + completed_at: "2026-06-12T17:03:00Z".to_string(), + result: FleetTaskResult::Pass, + artifacts: vec![], + score: None, + }) + .unwrap(); + + ledger.compact().unwrap(); + let contents = std::fs::read_to_string(ledger.path()).unwrap(); + assert!(contents.lines().count() >= 5, "{contents}"); + + let state = ledger.rebuild_state().unwrap(); + assert_eq!(state.runs.len(), 1); + assert_eq!( + state.tasks["run-1:task-a"].status, + FleetTaskLedgerStatus::Leased + ); + assert_eq!(state.workers["worker-1"], FleetWorkerStatus::Busy); + assert_eq!(state.heartbeats["worker-1"].memory_mb, Some(1024)); + assert!(state.latest_seq.values().any(|seq| *seq == 7)); + assert_eq!(state.receipts["run-1:task-a"].result, FleetTaskResult::Pass); + } + #[test] fn fleet_ledger_receipt_round_trip() { let tmp = TempDir::new().unwrap();