diff --git a/crates/tui/src/fleet/ledger.rs b/crates/tui/src/fleet/ledger.rs index 90e7386a..408b64ba 100644 --- a/crates/tui/src/fleet/ledger.rs +++ b/crates/tui/src/fleet/ledger.rs @@ -166,7 +166,9 @@ impl FleetLedger { } pub fn create_run(&self, run: &FleetRun) -> Result<()> { - self.append_record(&FleetLedgerRecord::RunCreated { run: run.clone() }) + self.append_record(&FleetLedgerRecord::RunCreated { + run: sanitize_run_for_ledger(run), + }) } pub fn update_run_status( @@ -487,13 +489,15 @@ fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) { task_id, worker_id, leased_at, - lease_expires_at: _, + lease_expires_at, } => { let key = task_key(&run_id.0, &task_id); if let Some(task) = state.tasks.get_mut(&key) { task.status = FleetTaskLedgerStatus::Leased; task.leased_to = Some(worker_id); task.leased_at = Some(leased_at); + task.entry.lease_deadline = lease_expires_at; + task.entry.attempts = task.entry.attempts.saturating_add(1); } } FleetLedgerRecord::TaskCompletedOrFailed { @@ -523,11 +527,27 @@ fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) { } // Derive worker status from lifecycle events. match &event.payload { - FleetWorkerEventPayload::Starting | FleetWorkerEventPayload::Running => { + FleetWorkerEventPayload::Leased { .. } + | FleetWorkerEventPayload::Restarted { .. } + | FleetWorkerEventPayload::ModelWait { .. } + | FleetWorkerEventPayload::RunningTool { .. } + | FleetWorkerEventPayload::Heartbeat { .. } + | FleetWorkerEventPayload::Starting + | FleetWorkerEventPayload::Running => { state .workers .insert(event.worker_id.clone(), FleetWorkerStatus::Busy); } + FleetWorkerEventPayload::Interrupted { .. } => { + state + .workers + .insert(event.worker_id.clone(), FleetWorkerStatus::Draining); + } + FleetWorkerEventPayload::Stale { .. } => { + state + .workers + .insert(event.worker_id.clone(), FleetWorkerStatus::Unhealthy); + } FleetWorkerEventPayload::Completed { .. } => { mark_task_terminal( state, @@ -604,6 +624,29 @@ fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) { } } +fn sanitize_run_for_ledger(run: &FleetRun) -> FleetRun { + let mut run = run.clone(); + for task in &mut run.task_specs { + if let Some(policy) = &mut task.alert_policy { + for channel in &mut policy.channels { + match channel { + FleetAlertChannel::Slack { webhook_url } => { + *webhook_url = "".to_string(); + } + FleetAlertChannel::Webhook { url, secret } => { + *url = "".to_string(); + *secret = secret.as_ref().map(|_| "".to_string()); + } + FleetAlertChannel::PagerDuty { routing_key, .. } => { + *routing_key = "".to_string(); + } + } + } + } + } + run +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/tui/src/fleet/mod.rs b/crates/tui/src/fleet/mod.rs index 59a28884..1c7d2e47 100644 --- a/crates/tui/src/fleet/mod.rs +++ b/crates/tui/src/fleet/mod.rs @@ -3,3 +3,4 @@ pub mod host; pub mod ledger; pub mod manager; +pub mod scheduler; diff --git a/crates/tui/src/fleet/scheduler.rs b/crates/tui/src/fleet/scheduler.rs new file mode 100644 index 00000000..20219165 --- /dev/null +++ b/crates/tui/src/fleet/scheduler.rs @@ -0,0 +1,760 @@ +//! Fleet scheduler policy: leases, heartbeats, backpressure, and recovery. + +#![allow(dead_code)] + +use std::collections::{BTreeMap, BTreeSet}; +use std::path::Path; +use std::time::Duration; + +use anyhow::{Context, Result, anyhow}; +use chrono::{DateTime, SecondsFormat, Utc}; +use codewhale_protocol::fleet::*; +use serde_json::Value; + +use super::ledger::{FleetLedger, FleetLedgerState, FleetTaskLedgerStatus, FleetTaskState}; + +#[derive(Debug, Clone)] +pub struct FleetSchedulerPolicy { + pub max_workers_per_run: usize, + pub max_workers_per_host: usize, + pub max_workers_per_task_class: usize, + pub lease_seconds: u64, + pub heartbeat_timeout: Duration, +} + +impl Default for FleetSchedulerPolicy { + fn default() -> Self { + Self { + max_workers_per_run: 4, + max_workers_per_host: 4, + max_workers_per_task_class: 4, + lease_seconds: 300, + heartbeat_timeout: Duration::from_secs(120), + } + } +} + +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct FleetSchedulerReport { + pub launched: usize, + pub heartbeats: usize, + pub marked_stale: usize, + pub restarted: usize, + pub failed: usize, + pub cancelled: usize, + pub alerts: usize, +} + +#[derive(Debug)] +pub struct FleetScheduler { + ledger: FleetLedger, + policy: FleetSchedulerPolicy, + now: DateTime, +} + +impl FleetScheduler { + pub fn open(workspace: impl AsRef, policy: FleetSchedulerPolicy) -> Result { + Ok(Self { + ledger: FleetLedger::open(workspace.as_ref())?, + policy, + now: Utc::now(), + }) + } + + pub fn set_now(&mut self, now: DateTime) { + self.now = now; + } + + pub fn tick_run(&self, run_id: &FleetRunId) -> Result { + let mut report = FleetSchedulerReport::default(); + self.recover_unhealthy_work(run_id, &mut report)?; + self.launch_queued_work(run_id, &mut report)?; + self.refresh_run_status(run_id)?; + Ok(report) + } + + pub fn cancel_run(&self, run_id: &FleetRunId, reason: &str) -> Result { + let state = self.ledger.rebuild_state()?; + let mut report = FleetSchedulerReport::default(); + for task in state + .tasks + .values() + .filter(|task| task.entry.run_id == *run_id) + { + if !matches!( + task.status, + FleetTaskLedgerStatus::Enqueued | FleetTaskLedgerStatus::Leased + ) { + continue; + } + if let Some(worker_id) = task.leased_to.as_deref() { + self.append_worker_event( + &task.entry.run_id, + worker_id, + &task.entry.task_id, + FleetWorkerEventPayload::Interrupted { + signal: Some(reason.to_string()), + }, + )?; + self.append_worker_event( + &task.entry.run_id, + worker_id, + &task.entry.task_id, + FleetWorkerEventPayload::Cancelled { + cancelled_by: Some("scheduler".to_string()), + }, + )?; + } else { + self.ledger.mark_task_terminal_status( + &task.entry.run_id, + &task.entry.task_id, + None, + &self.timestamp(), + FleetTaskLedgerStatus::Cancelled, + )?; + } + report.cancelled += 1; + } + self.ledger + .update_run_status(run_id, FleetRunStatus::Cancelled, &self.timestamp())?; + Ok(report) + } + + fn recover_unhealthy_work( + &self, + run_id: &FleetRunId, + report: &mut FleetSchedulerReport, + ) -> Result<()> { + let state = self.ledger.rebuild_state()?; + for task in state + .tasks + .values() + .filter(|task| task.entry.run_id == *run_id) + { + let Some(task_spec) = task_spec_for(&state, task) else { + continue; + }; + match task.status { + FleetTaskLedgerStatus::Leased if self.task_is_stale(task, &state) => { + let worker_id = task + .leased_to + .clone() + .unwrap_or_else(|| "fleet-scheduler".to_string()); + self.append_worker_event( + &task.entry.run_id, + &worker_id, + &task.entry.task_id, + FleetWorkerEventPayload::Stale { + last_heartbeat_at: state + .heartbeats + .get(&worker_id) + .map(|heartbeat| heartbeat.timestamp.clone()), + }, + )?; + report.marked_stale += 1; + self.retry_or_fail(task, &task_spec, &worker_id, report) + .with_context(|| format!("recovering stale task {}", task.entry.task_id))?; + } + FleetTaskLedgerStatus::Failed => { + let worker_id = task + .leased_to + .clone() + .unwrap_or_else(|| "fleet-scheduler".to_string()); + self.retry_or_fail(task, &task_spec, &worker_id, report) + .with_context(|| { + format!("recovering failed task {}", task.entry.task_id) + })?; + } + _ => {} + } + } + Ok(()) + } + + fn retry_or_fail( + &self, + task: &FleetTaskState, + task_spec: &FleetTaskSpec, + worker_id: &str, + report: &mut FleetSchedulerReport, + ) -> Result<()> { + let retry_policy = task_spec + .retry_policy + .clone() + .unwrap_or_else(FleetRetryPolicy::default); + if task.entry.attempts < retry_policy.max_attempts { + let lease_expires_at = self.lease_expires_at(); + self.ledger.lease_task( + &task.entry.run_id, + &task.entry.task_id, + worker_id, + &self.timestamp(), + Some(&lease_expires_at), + )?; + self.append_worker_event( + &task.entry.run_id, + worker_id, + &task.entry.task_id, + FleetWorkerEventPayload::Restarted { + restart_count: task.entry.attempts, + }, + )?; + self.append_worker_event( + &task.entry.run_id, + worker_id, + &task.entry.task_id, + FleetWorkerEventPayload::Running, + )?; + self.ledger + .heartbeat(worker_id, &self.timestamp(), None, None)?; + report.restarted += 1; + return Ok(()); + } + + self.append_worker_event( + &task.entry.run_id, + worker_id, + &task.entry.task_id, + FleetWorkerEventPayload::Failed { + reason: format!( + "retry attempts exhausted after {} attempt(s)", + task.entry.attempts + ), + recoverable: false, + }, + )?; + report.failed += 1; + report.alerts += self.record_alerts(&task.entry.run_id, &task.entry.task_id, task_spec)?; + Ok(()) + } + + fn launch_queued_work( + &self, + run_id: &FleetRunId, + report: &mut FleetSchedulerReport, + ) -> Result<()> { + loop { + let state = self.ledger.rebuild_state()?; + let run = state + .runs + .get(&run_id.0) + .ok_or_else(|| anyhow!("fleet run {} does not exist", run_id.0))?; + let active = active_tasks_for_run(&state, run_id); + if active.len() >= self.policy.max_workers_per_run { + return Ok(()); + } + let counts = active_counts(&state, run); + let Some((worker_id, task)) = self.next_launch(run, &state, &counts) else { + return Ok(()); + }; + let lease_expires_at = self.lease_expires_at(); + self.ledger.lease_task( + &task.entry.run_id, + &task.entry.task_id, + &worker_id, + &self.timestamp(), + Some(&lease_expires_at), + )?; + self.append_worker_event( + &task.entry.run_id, + &worker_id, + &task.entry.task_id, + FleetWorkerEventPayload::Leased { + lease_expires_at: Some(lease_expires_at), + }, + )?; + self.append_worker_event( + &task.entry.run_id, + &worker_id, + &task.entry.task_id, + FleetWorkerEventPayload::Starting, + )?; + self.append_worker_event( + &task.entry.run_id, + &worker_id, + &task.entry.task_id, + FleetWorkerEventPayload::Running, + )?; + self.ledger + .heartbeat(&worker_id, &self.timestamp(), None, None)?; + report.launched += 1; + report.heartbeats += 1; + } + } + + fn next_launch( + &self, + run: &FleetRun, + state: &FleetLedgerState, + counts: &ActiveCounts, + ) -> Option<(String, FleetTaskState)> { + let active_workers: BTreeSet<_> = active_tasks_for_run(state, &run.id) + .into_iter() + .filter_map(|task| task.leased_to) + .collect(); + let mut queued: Vec<_> = state + .tasks + .values() + .filter(|task| { + task.entry.run_id == run.id + && matches!(task.status, FleetTaskLedgerStatus::Enqueued) + }) + .cloned() + .collect(); + queued.sort_by_key(|task| { + ( + task.entry.priority, + task.entry.enqueued_at.clone(), + task.entry.task_id.clone(), + ) + }); + for task in queued { + let task_spec = run + .task_specs + .iter() + .find(|spec| spec.id == task.entry.task_id)?; + let task_class = task_class(task_spec); + if counts.by_task_class.get(&task_class).copied().unwrap_or(0) + >= self.policy.max_workers_per_task_class + { + continue; + } + for worker in &run.worker_specs { + if active_workers.contains(&worker.id) { + continue; + } + let host_key = host_key(worker); + if counts.by_host.get(&host_key).copied().unwrap_or(0) + >= self.policy.max_workers_per_host + { + continue; + } + return Some((worker.id.clone(), task)); + } + } + None + } + + fn task_is_stale(&self, task: &FleetTaskState, state: &FleetLedgerState) -> bool { + if let Some(worker_id) = task.leased_to.as_deref() { + if let Some(heartbeat) = state.heartbeats.get(worker_id) + && let Ok(last) = DateTime::parse_from_rfc3339(&heartbeat.timestamp) + { + let age = self.now.signed_duration_since(last.with_timezone(&Utc)); + return age + .to_std() + .map_or(true, |age| age > self.policy.heartbeat_timeout); + } + } + if let Some(deadline) = task.entry.lease_deadline.as_deref() + && let Ok(deadline) = DateTime::parse_from_rfc3339(deadline) + { + return self.now > deadline.with_timezone(&Utc); + } + true + } + + fn record_alerts( + &self, + run_id: &FleetRunId, + task_id: &str, + task_spec: &FleetTaskSpec, + ) -> Result { + let Some(policy) = &task_spec.alert_policy else { + return Ok(0); + }; + let mut count = 0; + for channel in &policy.channels { + self.ledger.record_alert( + run_id, + task_id, + alert_channel_label(channel), + &self.timestamp(), + )?; + count += 1; + } + Ok(count) + } + + fn refresh_run_status(&self, run_id: &FleetRunId) -> Result<()> { + let state = self.ledger.rebuild_state()?; + let mut has_open = false; + let mut has_failed = false; + let mut has_cancelled = false; + for task in state + .tasks + .values() + .filter(|task| task.entry.run_id == *run_id) + { + match task.status { + FleetTaskLedgerStatus::Enqueued | FleetTaskLedgerStatus::Leased => has_open = true, + FleetTaskLedgerStatus::Failed => has_failed = true, + FleetTaskLedgerStatus::Cancelled => has_cancelled = true, + FleetTaskLedgerStatus::Completed => {} + } + } + let status = if has_open { + FleetRunStatus::Running + } else if has_failed { + FleetRunStatus::Failed + } else if has_cancelled { + FleetRunStatus::Cancelled + } else { + FleetRunStatus::Completed + }; + self.ledger + .update_run_status(run_id, status, &self.timestamp()) + } + + fn append_worker_event( + &self, + run_id: &FleetRunId, + worker_id: &str, + task_id: &str, + payload: FleetWorkerEventPayload, + ) -> Result { + let state = self.ledger.rebuild_state()?; + let key = event_key(worker_id, &run_id.0, task_id); + let seq = state.latest_seq.get(&key).copied().unwrap_or(0) + 1; + let event = FleetWorkerEvent { + seq, + run_id: run_id.clone(), + worker_id: worker_id.to_string(), + task_id: task_id.to_string(), + timestamp: self.timestamp(), + payload, + extra: BTreeMap::new(), + }; + self.ledger.append_event(event.clone())?; + Ok(event) + } + + fn timestamp(&self) -> String { + self.now.to_rfc3339_opts(SecondsFormat::Secs, true) + } + + fn lease_expires_at(&self) -> String { + (self.now + chrono::Duration::seconds(self.policy.lease_seconds as i64)) + .to_rfc3339_opts(SecondsFormat::Secs, true) + } +} + +#[derive(Debug, Default)] +struct ActiveCounts { + by_host: BTreeMap, + by_task_class: BTreeMap, +} + +fn active_counts(state: &FleetLedgerState, run: &FleetRun) -> ActiveCounts { + let mut counts = ActiveCounts::default(); + for task in active_tasks_for_run(state, &run.id) { + if let Some(worker_id) = task.leased_to.as_deref() + && let Some(worker) = run + .worker_specs + .iter() + .find(|worker| worker.id == worker_id) + { + *counts.by_host.entry(host_key(worker)).or_default() += 1; + } + if let Some(task_spec) = run + .task_specs + .iter() + .find(|spec| spec.id == task.entry.task_id) + { + *counts + .by_task_class + .entry(task_class(task_spec)) + .or_default() += 1; + } + } + counts +} + +fn active_tasks_for_run(state: &FleetLedgerState, run_id: &FleetRunId) -> Vec { + state + .tasks + .values() + .filter(|task| { + task.entry.run_id == *run_id && matches!(task.status, FleetTaskLedgerStatus::Leased) + }) + .cloned() + .collect() +} + +fn task_spec_for(state: &FleetLedgerState, task: &FleetTaskState) -> Option { + state + .runs + .get(&task.entry.run_id.0)? + .task_specs + .iter() + .find(|spec| spec.id == task.entry.task_id) + .cloned() +} + +fn host_key(worker: &FleetWorkerSpec) -> String { + match &worker.host { + FleetHostSpec::Local => "local".to_string(), + FleetHostSpec::Ssh { host, .. } => format!("ssh:{host}"), + FleetHostSpec::Docker { image, .. } => format!("docker:{image}"), + } +} + +fn task_class(task: &FleetTaskSpec) -> String { + task.metadata + .get("class") + .and_then(Value::as_str) + .filter(|value| !value.trim().is_empty()) + .unwrap_or("default") + .to_string() +} + +fn alert_channel_label(channel: &FleetAlertChannel) -> &'static str { + match channel { + FleetAlertChannel::Slack { .. } => "slack", + FleetAlertChannel::Webhook { .. } => "webhook", + FleetAlertChannel::PagerDuty { .. } => "pagerduty", + } +} + +fn event_key(worker_id: &str, run_id: &str, task_id: &str) -> String { + format!("{worker_id}:{run_id}:{task_id}") +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + fn base_now() -> DateTime { + DateTime::parse_from_rfc3339("2026-06-13T01:00:00Z") + .unwrap() + .with_timezone(&Utc) + } + + fn scheduler(tmp: &TempDir, max_workers: usize) -> FleetScheduler { + let mut scheduler = FleetScheduler::open( + tmp.path(), + FleetSchedulerPolicy { + max_workers_per_run: max_workers, + max_workers_per_host: max_workers, + max_workers_per_task_class: max_workers, + lease_seconds: 30, + heartbeat_timeout: Duration::from_secs(5), + }, + ) + .unwrap(); + scheduler.set_now(base_now()); + scheduler + } + + fn worker(id: &str) -> FleetWorkerSpec { + FleetWorkerSpec { + id: id.to_string(), + name: id.to_string(), + host: FleetHostSpec::Local, + labels: BTreeMap::new(), + capabilities: vec!["local".to_string()], + max_concurrent_tasks: Some(1), + } + } + + fn task(id: &str, max_attempts: u32) -> FleetTaskSpec { + FleetTaskSpec { + id: id.to_string(), + name: id.to_string(), + description: None, + instructions: format!("do {id}"), + expected_artifacts: vec![FleetArtifactKind::Log], + scorer: None, + retry_policy: Some(FleetRetryPolicy { + max_attempts, + ..FleetRetryPolicy::default() + }), + alert_policy: None, + timeout_seconds: None, + metadata: BTreeMap::new(), + } + } + + fn create_run( + scheduler: &FleetScheduler, + run_id: &str, + tasks: Vec, + workers: usize, + ) { + let run_id = FleetRunId::from(run_id); + scheduler + .ledger + .create_run(&FleetRun { + id: run_id.clone(), + name: "scheduler smoke".to_string(), + status: FleetRunStatus::Queued, + task_specs: tasks.clone(), + worker_specs: (1..=workers) + .map(|idx| worker(&format!("worker-{idx}"))) + .collect(), + labels: BTreeMap::new(), + created_at: scheduler.timestamp(), + updated_at: None, + completed_at: None, + }) + .unwrap(); + for task in tasks { + scheduler + .ledger + .enqueue(FleetInboxEntry { + run_id: run_id.clone(), + task_id: task.id, + priority: 0, + enqueued_at: scheduler.timestamp(), + lease_deadline: None, + attempts: 0, + }) + .unwrap(); + } + } + + fn ledger_text(scheduler: &FleetScheduler) -> String { + std::fs::read_to_string(scheduler.ledger.path()).unwrap() + } + + #[test] + fn fleet_scheduler_backpressure_prevents_over_launch() { + let tmp = TempDir::new().unwrap(); + let scheduler = scheduler(&tmp, 2); + create_run( + &scheduler, + "run-1", + vec![task("task-a", 3), task("task-b", 3), task("task-c", 3)], + 3, + ); + + let report = scheduler.tick_run(&FleetRunId::from("run-1")).unwrap(); + + assert_eq!(report.launched, 2); + let state = scheduler.ledger.rebuild_state().unwrap(); + assert_eq!( + state.tasks["run-1:task-a"].status, + FleetTaskLedgerStatus::Leased + ); + assert_eq!( + state.tasks["run-1:task-b"].status, + FleetTaskLedgerStatus::Leased + ); + assert_eq!( + state.tasks["run-1:task-c"].status, + FleetTaskLedgerStatus::Enqueued + ); + } + + #[test] + fn fleet_scheduler_lost_heartbeat_restarts_within_retry_limit() { + let tmp = TempDir::new().unwrap(); + let mut scheduler = scheduler(&tmp, 1); + create_run(&scheduler, "run-1", vec![task("task-a", 2)], 1); + scheduler.tick_run(&FleetRunId::from("run-1")).unwrap(); + scheduler.set_now(base_now() + chrono::Duration::seconds(10)); + + let report = scheduler.tick_run(&FleetRunId::from("run-1")).unwrap(); + + assert_eq!(report.marked_stale, 1); + assert_eq!(report.restarted, 1); + let state = scheduler.ledger.rebuild_state().unwrap(); + let task = &state.tasks["run-1:task-a"]; + assert_eq!(task.status, FleetTaskLedgerStatus::Leased); + assert_eq!(task.entry.attempts, 2); + let ledger = ledger_text(&scheduler); + assert!(ledger.contains("\"state\":\"stale\"")); + assert!(ledger.contains("\"state\":\"restarted\"")); + } + + #[test] + fn fleet_scheduler_restart_exhaustion_records_terminal_failure_and_alert() { + let tmp = TempDir::new().unwrap(); + let mut scheduler = scheduler(&tmp, 1); + let mut failing = task("task-a", 1); + failing.alert_policy = Some(FleetAlertPolicy { + channels: vec![FleetAlertChannel::Slack { + webhook_url: "https://hooks.slack.invalid/secret".to_string(), + }], + after_attempts: Some(1), + after_minutes_stale: Some(1), + }); + create_run(&scheduler, "run-1", vec![failing], 1); + scheduler.tick_run(&FleetRunId::from("run-1")).unwrap(); + scheduler.set_now(base_now() + chrono::Duration::seconds(10)); + + let report = scheduler.tick_run(&FleetRunId::from("run-1")).unwrap(); + + assert_eq!(report.marked_stale, 1); + assert_eq!(report.restarted, 0); + assert_eq!(report.failed, 1); + assert_eq!(report.alerts, 1); + let state = scheduler.ledger.rebuild_state().unwrap(); + assert_eq!( + state.tasks["run-1:task-a"].status, + FleetTaskLedgerStatus::Failed + ); + let ledger = ledger_text(&scheduler); + assert!(ledger.contains("\"state\":\"failed\"")); + assert!(ledger.contains("\"record\":\"alert_sent\"")); + assert!(!ledger.contains("hooks.slack.invalid/secret")); + } + + #[test] + fn fleet_scheduler_slow_provider_response_with_fresh_heartbeat_is_not_stale() { + let tmp = TempDir::new().unwrap(); + let mut scheduler = scheduler(&tmp, 1); + create_run(&scheduler, "run-1", vec![task("task-a", 2)], 1); + scheduler.tick_run(&FleetRunId::from("run-1")).unwrap(); + scheduler.set_now(base_now() + chrono::Duration::seconds(4)); + scheduler + .append_worker_event( + &FleetRunId::from("run-1"), + "worker-1", + "task-a", + FleetWorkerEventPayload::ModelWait { + model: Some("deepseek-v4-pro".to_string()), + }, + ) + .unwrap(); + scheduler + .ledger + .heartbeat("worker-1", &scheduler.timestamp(), None, None) + .unwrap(); + + let report = scheduler.tick_run(&FleetRunId::from("run-1")).unwrap(); + + assert_eq!(report.marked_stale, 0); + assert_eq!(report.restarted, 0); + let state = scheduler.ledger.rebuild_state().unwrap(); + assert_eq!(state.tasks["run-1:task-a"].entry.attempts, 1); + assert_eq!(state.workers["worker-1"], FleetWorkerStatus::Busy); + } + + #[test] + fn fleet_scheduler_cancel_run_interrupts_active_and_cancels_queued() { + let tmp = TempDir::new().unwrap(); + let scheduler = scheduler(&tmp, 1); + create_run( + &scheduler, + "run-1", + vec![task("task-a", 3), task("task-b", 3), task("task-c", 3)], + 2, + ); + scheduler.tick_run(&FleetRunId::from("run-1")).unwrap(); + + let report = scheduler + .cancel_run(&FleetRunId::from("run-1"), "operator") + .unwrap(); + + assert_eq!(report.cancelled, 3); + let state = scheduler.ledger.rebuild_state().unwrap(); + for task in state.tasks.values() { + assert_eq!(task.status, FleetTaskLedgerStatus::Cancelled); + } + let ledger = ledger_text(&scheduler); + assert!(ledger.contains("\"state\":\"interrupted\"")); + assert!(ledger.contains("\"state\":\"cancelled\"")); + } +}