merge #3159 fleet scheduler policy

This commit is contained in:
Hunter B
2026-06-12 18:43:44 -07:00
3 changed files with 807 additions and 3 deletions
+46 -3
View File
@@ -166,7 +166,9 @@ impl FleetLedger {
} }
pub fn create_run(&self, run: &FleetRun) -> Result<()> { pub fn create_run(&self, run: &FleetRun) -> Result<()> {
self.append_record(&FleetLedgerRecord::RunCreated { run: run.clone() }) self.append_record(&FleetLedgerRecord::RunCreated {
run: sanitize_run_for_ledger(run),
})
} }
pub fn update_run_status( pub fn update_run_status(
@@ -487,13 +489,15 @@ fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) {
task_id, task_id,
worker_id, worker_id,
leased_at, leased_at,
lease_expires_at: _, lease_expires_at,
} => { } => {
let key = task_key(&run_id.0, &task_id); let key = task_key(&run_id.0, &task_id);
if let Some(task) = state.tasks.get_mut(&key) { if let Some(task) = state.tasks.get_mut(&key) {
task.status = FleetTaskLedgerStatus::Leased; task.status = FleetTaskLedgerStatus::Leased;
task.leased_to = Some(worker_id); task.leased_to = Some(worker_id);
task.leased_at = Some(leased_at); task.leased_at = Some(leased_at);
task.entry.lease_deadline = lease_expires_at;
task.entry.attempts = task.entry.attempts.saturating_add(1);
} }
} }
FleetLedgerRecord::TaskCompletedOrFailed { FleetLedgerRecord::TaskCompletedOrFailed {
@@ -523,11 +527,27 @@ fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) {
} }
// Derive worker status from lifecycle events. // Derive worker status from lifecycle events.
match &event.payload { match &event.payload {
FleetWorkerEventPayload::Starting | FleetWorkerEventPayload::Running => { FleetWorkerEventPayload::Leased { .. }
| FleetWorkerEventPayload::Restarted { .. }
| FleetWorkerEventPayload::ModelWait { .. }
| FleetWorkerEventPayload::RunningTool { .. }
| FleetWorkerEventPayload::Heartbeat { .. }
| FleetWorkerEventPayload::Starting
| FleetWorkerEventPayload::Running => {
state state
.workers .workers
.insert(event.worker_id.clone(), FleetWorkerStatus::Busy); .insert(event.worker_id.clone(), FleetWorkerStatus::Busy);
} }
FleetWorkerEventPayload::Interrupted { .. } => {
state
.workers
.insert(event.worker_id.clone(), FleetWorkerStatus::Draining);
}
FleetWorkerEventPayload::Stale { .. } => {
state
.workers
.insert(event.worker_id.clone(), FleetWorkerStatus::Unhealthy);
}
FleetWorkerEventPayload::Completed { .. } => { FleetWorkerEventPayload::Completed { .. } => {
mark_task_terminal( mark_task_terminal(
state, state,
@@ -604,6 +624,29 @@ fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) {
} }
} }
fn sanitize_run_for_ledger(run: &FleetRun) -> FleetRun {
let mut run = run.clone();
for task in &mut run.task_specs {
if let Some(policy) = &mut task.alert_policy {
for channel in &mut policy.channels {
match channel {
FleetAlertChannel::Slack { webhook_url } => {
*webhook_url = "<redacted>".to_string();
}
FleetAlertChannel::Webhook { url, secret } => {
*url = "<redacted>".to_string();
*secret = secret.as_ref().map(|_| "<redacted>".to_string());
}
FleetAlertChannel::PagerDuty { routing_key, .. } => {
*routing_key = "<redacted>".to_string();
}
}
}
}
}
run
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
+1
View File
@@ -3,3 +3,4 @@
pub mod host; pub mod host;
pub mod ledger; pub mod ledger;
pub mod manager; pub mod manager;
pub mod scheduler;
+760
View File
@@ -0,0 +1,760 @@
//! Fleet scheduler policy: leases, heartbeats, backpressure, and recovery.
#![allow(dead_code)]
use std::collections::{BTreeMap, BTreeSet};
use std::path::Path;
use std::time::Duration;
use anyhow::{Context, Result, anyhow};
use chrono::{DateTime, SecondsFormat, Utc};
use codewhale_protocol::fleet::*;
use serde_json::Value;
use super::ledger::{FleetLedger, FleetLedgerState, FleetTaskLedgerStatus, FleetTaskState};
#[derive(Debug, Clone)]
pub struct FleetSchedulerPolicy {
pub max_workers_per_run: usize,
pub max_workers_per_host: usize,
pub max_workers_per_task_class: usize,
pub lease_seconds: u64,
pub heartbeat_timeout: Duration,
}
impl Default for FleetSchedulerPolicy {
fn default() -> Self {
Self {
max_workers_per_run: 4,
max_workers_per_host: 4,
max_workers_per_task_class: 4,
lease_seconds: 300,
heartbeat_timeout: Duration::from_secs(120),
}
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct FleetSchedulerReport {
pub launched: usize,
pub heartbeats: usize,
pub marked_stale: usize,
pub restarted: usize,
pub failed: usize,
pub cancelled: usize,
pub alerts: usize,
}
#[derive(Debug)]
pub struct FleetScheduler {
ledger: FleetLedger,
policy: FleetSchedulerPolicy,
now: DateTime<Utc>,
}
impl FleetScheduler {
pub fn open(workspace: impl AsRef<Path>, policy: FleetSchedulerPolicy) -> Result<Self> {
Ok(Self {
ledger: FleetLedger::open(workspace.as_ref())?,
policy,
now: Utc::now(),
})
}
pub fn set_now(&mut self, now: DateTime<Utc>) {
self.now = now;
}
pub fn tick_run(&self, run_id: &FleetRunId) -> Result<FleetSchedulerReport> {
let mut report = FleetSchedulerReport::default();
self.recover_unhealthy_work(run_id, &mut report)?;
self.launch_queued_work(run_id, &mut report)?;
self.refresh_run_status(run_id)?;
Ok(report)
}
pub fn cancel_run(&self, run_id: &FleetRunId, reason: &str) -> Result<FleetSchedulerReport> {
let state = self.ledger.rebuild_state()?;
let mut report = FleetSchedulerReport::default();
for task in state
.tasks
.values()
.filter(|task| task.entry.run_id == *run_id)
{
if !matches!(
task.status,
FleetTaskLedgerStatus::Enqueued | FleetTaskLedgerStatus::Leased
) {
continue;
}
if let Some(worker_id) = task.leased_to.as_deref() {
self.append_worker_event(
&task.entry.run_id,
worker_id,
&task.entry.task_id,
FleetWorkerEventPayload::Interrupted {
signal: Some(reason.to_string()),
},
)?;
self.append_worker_event(
&task.entry.run_id,
worker_id,
&task.entry.task_id,
FleetWorkerEventPayload::Cancelled {
cancelled_by: Some("scheduler".to_string()),
},
)?;
} else {
self.ledger.mark_task_terminal_status(
&task.entry.run_id,
&task.entry.task_id,
None,
&self.timestamp(),
FleetTaskLedgerStatus::Cancelled,
)?;
}
report.cancelled += 1;
}
self.ledger
.update_run_status(run_id, FleetRunStatus::Cancelled, &self.timestamp())?;
Ok(report)
}
fn recover_unhealthy_work(
&self,
run_id: &FleetRunId,
report: &mut FleetSchedulerReport,
) -> Result<()> {
let state = self.ledger.rebuild_state()?;
for task in state
.tasks
.values()
.filter(|task| task.entry.run_id == *run_id)
{
let Some(task_spec) = task_spec_for(&state, task) else {
continue;
};
match task.status {
FleetTaskLedgerStatus::Leased if self.task_is_stale(task, &state) => {
let worker_id = task
.leased_to
.clone()
.unwrap_or_else(|| "fleet-scheduler".to_string());
self.append_worker_event(
&task.entry.run_id,
&worker_id,
&task.entry.task_id,
FleetWorkerEventPayload::Stale {
last_heartbeat_at: state
.heartbeats
.get(&worker_id)
.map(|heartbeat| heartbeat.timestamp.clone()),
},
)?;
report.marked_stale += 1;
self.retry_or_fail(task, &task_spec, &worker_id, report)
.with_context(|| format!("recovering stale task {}", task.entry.task_id))?;
}
FleetTaskLedgerStatus::Failed => {
let worker_id = task
.leased_to
.clone()
.unwrap_or_else(|| "fleet-scheduler".to_string());
self.retry_or_fail(task, &task_spec, &worker_id, report)
.with_context(|| {
format!("recovering failed task {}", task.entry.task_id)
})?;
}
_ => {}
}
}
Ok(())
}
fn retry_or_fail(
&self,
task: &FleetTaskState,
task_spec: &FleetTaskSpec,
worker_id: &str,
report: &mut FleetSchedulerReport,
) -> Result<()> {
let retry_policy = task_spec
.retry_policy
.clone()
.unwrap_or_else(FleetRetryPolicy::default);
if task.entry.attempts < retry_policy.max_attempts {
let lease_expires_at = self.lease_expires_at();
self.ledger.lease_task(
&task.entry.run_id,
&task.entry.task_id,
worker_id,
&self.timestamp(),
Some(&lease_expires_at),
)?;
self.append_worker_event(
&task.entry.run_id,
worker_id,
&task.entry.task_id,
FleetWorkerEventPayload::Restarted {
restart_count: task.entry.attempts,
},
)?;
self.append_worker_event(
&task.entry.run_id,
worker_id,
&task.entry.task_id,
FleetWorkerEventPayload::Running,
)?;
self.ledger
.heartbeat(worker_id, &self.timestamp(), None, None)?;
report.restarted += 1;
return Ok(());
}
self.append_worker_event(
&task.entry.run_id,
worker_id,
&task.entry.task_id,
FleetWorkerEventPayload::Failed {
reason: format!(
"retry attempts exhausted after {} attempt(s)",
task.entry.attempts
),
recoverable: false,
},
)?;
report.failed += 1;
report.alerts += self.record_alerts(&task.entry.run_id, &task.entry.task_id, task_spec)?;
Ok(())
}
fn launch_queued_work(
&self,
run_id: &FleetRunId,
report: &mut FleetSchedulerReport,
) -> Result<()> {
loop {
let state = self.ledger.rebuild_state()?;
let run = state
.runs
.get(&run_id.0)
.ok_or_else(|| anyhow!("fleet run {} does not exist", run_id.0))?;
let active = active_tasks_for_run(&state, run_id);
if active.len() >= self.policy.max_workers_per_run {
return Ok(());
}
let counts = active_counts(&state, run);
let Some((worker_id, task)) = self.next_launch(run, &state, &counts) else {
return Ok(());
};
let lease_expires_at = self.lease_expires_at();
self.ledger.lease_task(
&task.entry.run_id,
&task.entry.task_id,
&worker_id,
&self.timestamp(),
Some(&lease_expires_at),
)?;
self.append_worker_event(
&task.entry.run_id,
&worker_id,
&task.entry.task_id,
FleetWorkerEventPayload::Leased {
lease_expires_at: Some(lease_expires_at),
},
)?;
self.append_worker_event(
&task.entry.run_id,
&worker_id,
&task.entry.task_id,
FleetWorkerEventPayload::Starting,
)?;
self.append_worker_event(
&task.entry.run_id,
&worker_id,
&task.entry.task_id,
FleetWorkerEventPayload::Running,
)?;
self.ledger
.heartbeat(&worker_id, &self.timestamp(), None, None)?;
report.launched += 1;
report.heartbeats += 1;
}
}
fn next_launch(
&self,
run: &FleetRun,
state: &FleetLedgerState,
counts: &ActiveCounts,
) -> Option<(String, FleetTaskState)> {
let active_workers: BTreeSet<_> = active_tasks_for_run(state, &run.id)
.into_iter()
.filter_map(|task| task.leased_to)
.collect();
let mut queued: Vec<_> = state
.tasks
.values()
.filter(|task| {
task.entry.run_id == run.id
&& matches!(task.status, FleetTaskLedgerStatus::Enqueued)
})
.cloned()
.collect();
queued.sort_by_key(|task| {
(
task.entry.priority,
task.entry.enqueued_at.clone(),
task.entry.task_id.clone(),
)
});
for task in queued {
let task_spec = run
.task_specs
.iter()
.find(|spec| spec.id == task.entry.task_id)?;
let task_class = task_class(task_spec);
if counts.by_task_class.get(&task_class).copied().unwrap_or(0)
>= self.policy.max_workers_per_task_class
{
continue;
}
for worker in &run.worker_specs {
if active_workers.contains(&worker.id) {
continue;
}
let host_key = host_key(worker);
if counts.by_host.get(&host_key).copied().unwrap_or(0)
>= self.policy.max_workers_per_host
{
continue;
}
return Some((worker.id.clone(), task));
}
}
None
}
fn task_is_stale(&self, task: &FleetTaskState, state: &FleetLedgerState) -> bool {
if let Some(worker_id) = task.leased_to.as_deref() {
if let Some(heartbeat) = state.heartbeats.get(worker_id)
&& let Ok(last) = DateTime::parse_from_rfc3339(&heartbeat.timestamp)
{
let age = self.now.signed_duration_since(last.with_timezone(&Utc));
return age
.to_std()
.map_or(true, |age| age > self.policy.heartbeat_timeout);
}
}
if let Some(deadline) = task.entry.lease_deadline.as_deref()
&& let Ok(deadline) = DateTime::parse_from_rfc3339(deadline)
{
return self.now > deadline.with_timezone(&Utc);
}
true
}
fn record_alerts(
&self,
run_id: &FleetRunId,
task_id: &str,
task_spec: &FleetTaskSpec,
) -> Result<usize> {
let Some(policy) = &task_spec.alert_policy else {
return Ok(0);
};
let mut count = 0;
for channel in &policy.channels {
self.ledger.record_alert(
run_id,
task_id,
alert_channel_label(channel),
&self.timestamp(),
)?;
count += 1;
}
Ok(count)
}
fn refresh_run_status(&self, run_id: &FleetRunId) -> Result<()> {
let state = self.ledger.rebuild_state()?;
let mut has_open = false;
let mut has_failed = false;
let mut has_cancelled = false;
for task in state
.tasks
.values()
.filter(|task| task.entry.run_id == *run_id)
{
match task.status {
FleetTaskLedgerStatus::Enqueued | FleetTaskLedgerStatus::Leased => has_open = true,
FleetTaskLedgerStatus::Failed => has_failed = true,
FleetTaskLedgerStatus::Cancelled => has_cancelled = true,
FleetTaskLedgerStatus::Completed => {}
}
}
let status = if has_open {
FleetRunStatus::Running
} else if has_failed {
FleetRunStatus::Failed
} else if has_cancelled {
FleetRunStatus::Cancelled
} else {
FleetRunStatus::Completed
};
self.ledger
.update_run_status(run_id, status, &self.timestamp())
}
fn append_worker_event(
&self,
run_id: &FleetRunId,
worker_id: &str,
task_id: &str,
payload: FleetWorkerEventPayload,
) -> Result<FleetWorkerEvent> {
let state = self.ledger.rebuild_state()?;
let key = event_key(worker_id, &run_id.0, task_id);
let seq = state.latest_seq.get(&key).copied().unwrap_or(0) + 1;
let event = FleetWorkerEvent {
seq,
run_id: run_id.clone(),
worker_id: worker_id.to_string(),
task_id: task_id.to_string(),
timestamp: self.timestamp(),
payload,
extra: BTreeMap::new(),
};
self.ledger.append_event(event.clone())?;
Ok(event)
}
fn timestamp(&self) -> String {
self.now.to_rfc3339_opts(SecondsFormat::Secs, true)
}
fn lease_expires_at(&self) -> String {
(self.now + chrono::Duration::seconds(self.policy.lease_seconds as i64))
.to_rfc3339_opts(SecondsFormat::Secs, true)
}
}
#[derive(Debug, Default)]
struct ActiveCounts {
by_host: BTreeMap<String, usize>,
by_task_class: BTreeMap<String, usize>,
}
fn active_counts(state: &FleetLedgerState, run: &FleetRun) -> ActiveCounts {
let mut counts = ActiveCounts::default();
for task in active_tasks_for_run(state, &run.id) {
if let Some(worker_id) = task.leased_to.as_deref()
&& let Some(worker) = run
.worker_specs
.iter()
.find(|worker| worker.id == worker_id)
{
*counts.by_host.entry(host_key(worker)).or_default() += 1;
}
if let Some(task_spec) = run
.task_specs
.iter()
.find(|spec| spec.id == task.entry.task_id)
{
*counts
.by_task_class
.entry(task_class(task_spec))
.or_default() += 1;
}
}
counts
}
fn active_tasks_for_run(state: &FleetLedgerState, run_id: &FleetRunId) -> Vec<FleetTaskState> {
state
.tasks
.values()
.filter(|task| {
task.entry.run_id == *run_id && matches!(task.status, FleetTaskLedgerStatus::Leased)
})
.cloned()
.collect()
}
fn task_spec_for(state: &FleetLedgerState, task: &FleetTaskState) -> Option<FleetTaskSpec> {
state
.runs
.get(&task.entry.run_id.0)?
.task_specs
.iter()
.find(|spec| spec.id == task.entry.task_id)
.cloned()
}
fn host_key(worker: &FleetWorkerSpec) -> String {
match &worker.host {
FleetHostSpec::Local => "local".to_string(),
FleetHostSpec::Ssh { host, .. } => format!("ssh:{host}"),
FleetHostSpec::Docker { image, .. } => format!("docker:{image}"),
}
}
fn task_class(task: &FleetTaskSpec) -> String {
task.metadata
.get("class")
.and_then(Value::as_str)
.filter(|value| !value.trim().is_empty())
.unwrap_or("default")
.to_string()
}
fn alert_channel_label(channel: &FleetAlertChannel) -> &'static str {
match channel {
FleetAlertChannel::Slack { .. } => "slack",
FleetAlertChannel::Webhook { .. } => "webhook",
FleetAlertChannel::PagerDuty { .. } => "pagerduty",
}
}
fn event_key(worker_id: &str, run_id: &str, task_id: &str) -> String {
format!("{worker_id}:{run_id}:{task_id}")
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn base_now() -> DateTime<Utc> {
DateTime::parse_from_rfc3339("2026-06-13T01:00:00Z")
.unwrap()
.with_timezone(&Utc)
}
fn scheduler(tmp: &TempDir, max_workers: usize) -> FleetScheduler {
let mut scheduler = FleetScheduler::open(
tmp.path(),
FleetSchedulerPolicy {
max_workers_per_run: max_workers,
max_workers_per_host: max_workers,
max_workers_per_task_class: max_workers,
lease_seconds: 30,
heartbeat_timeout: Duration::from_secs(5),
},
)
.unwrap();
scheduler.set_now(base_now());
scheduler
}
fn worker(id: &str) -> FleetWorkerSpec {
FleetWorkerSpec {
id: id.to_string(),
name: id.to_string(),
host: FleetHostSpec::Local,
labels: BTreeMap::new(),
capabilities: vec!["local".to_string()],
max_concurrent_tasks: Some(1),
}
}
fn task(id: &str, max_attempts: u32) -> FleetTaskSpec {
FleetTaskSpec {
id: id.to_string(),
name: id.to_string(),
description: None,
instructions: format!("do {id}"),
expected_artifacts: vec![FleetArtifactKind::Log],
scorer: None,
retry_policy: Some(FleetRetryPolicy {
max_attempts,
..FleetRetryPolicy::default()
}),
alert_policy: None,
timeout_seconds: None,
metadata: BTreeMap::new(),
}
}
fn create_run(
scheduler: &FleetScheduler,
run_id: &str,
tasks: Vec<FleetTaskSpec>,
workers: usize,
) {
let run_id = FleetRunId::from(run_id);
scheduler
.ledger
.create_run(&FleetRun {
id: run_id.clone(),
name: "scheduler smoke".to_string(),
status: FleetRunStatus::Queued,
task_specs: tasks.clone(),
worker_specs: (1..=workers)
.map(|idx| worker(&format!("worker-{idx}")))
.collect(),
labels: BTreeMap::new(),
created_at: scheduler.timestamp(),
updated_at: None,
completed_at: None,
})
.unwrap();
for task in tasks {
scheduler
.ledger
.enqueue(FleetInboxEntry {
run_id: run_id.clone(),
task_id: task.id,
priority: 0,
enqueued_at: scheduler.timestamp(),
lease_deadline: None,
attempts: 0,
})
.unwrap();
}
}
fn ledger_text(scheduler: &FleetScheduler) -> String {
std::fs::read_to_string(scheduler.ledger.path()).unwrap()
}
#[test]
fn fleet_scheduler_backpressure_prevents_over_launch() {
let tmp = TempDir::new().unwrap();
let scheduler = scheduler(&tmp, 2);
create_run(
&scheduler,
"run-1",
vec![task("task-a", 3), task("task-b", 3), task("task-c", 3)],
3,
);
let report = scheduler.tick_run(&FleetRunId::from("run-1")).unwrap();
assert_eq!(report.launched, 2);
let state = scheduler.ledger.rebuild_state().unwrap();
assert_eq!(
state.tasks["run-1:task-a"].status,
FleetTaskLedgerStatus::Leased
);
assert_eq!(
state.tasks["run-1:task-b"].status,
FleetTaskLedgerStatus::Leased
);
assert_eq!(
state.tasks["run-1:task-c"].status,
FleetTaskLedgerStatus::Enqueued
);
}
#[test]
fn fleet_scheduler_lost_heartbeat_restarts_within_retry_limit() {
let tmp = TempDir::new().unwrap();
let mut scheduler = scheduler(&tmp, 1);
create_run(&scheduler, "run-1", vec![task("task-a", 2)], 1);
scheduler.tick_run(&FleetRunId::from("run-1")).unwrap();
scheduler.set_now(base_now() + chrono::Duration::seconds(10));
let report = scheduler.tick_run(&FleetRunId::from("run-1")).unwrap();
assert_eq!(report.marked_stale, 1);
assert_eq!(report.restarted, 1);
let state = scheduler.ledger.rebuild_state().unwrap();
let task = &state.tasks["run-1:task-a"];
assert_eq!(task.status, FleetTaskLedgerStatus::Leased);
assert_eq!(task.entry.attempts, 2);
let ledger = ledger_text(&scheduler);
assert!(ledger.contains("\"state\":\"stale\""));
assert!(ledger.contains("\"state\":\"restarted\""));
}
#[test]
fn fleet_scheduler_restart_exhaustion_records_terminal_failure_and_alert() {
let tmp = TempDir::new().unwrap();
let mut scheduler = scheduler(&tmp, 1);
let mut failing = task("task-a", 1);
failing.alert_policy = Some(FleetAlertPolicy {
channels: vec![FleetAlertChannel::Slack {
webhook_url: "https://hooks.slack.invalid/secret".to_string(),
}],
after_attempts: Some(1),
after_minutes_stale: Some(1),
});
create_run(&scheduler, "run-1", vec![failing], 1);
scheduler.tick_run(&FleetRunId::from("run-1")).unwrap();
scheduler.set_now(base_now() + chrono::Duration::seconds(10));
let report = scheduler.tick_run(&FleetRunId::from("run-1")).unwrap();
assert_eq!(report.marked_stale, 1);
assert_eq!(report.restarted, 0);
assert_eq!(report.failed, 1);
assert_eq!(report.alerts, 1);
let state = scheduler.ledger.rebuild_state().unwrap();
assert_eq!(
state.tasks["run-1:task-a"].status,
FleetTaskLedgerStatus::Failed
);
let ledger = ledger_text(&scheduler);
assert!(ledger.contains("\"state\":\"failed\""));
assert!(ledger.contains("\"record\":\"alert_sent\""));
assert!(!ledger.contains("hooks.slack.invalid/secret"));
}
#[test]
fn fleet_scheduler_slow_provider_response_with_fresh_heartbeat_is_not_stale() {
let tmp = TempDir::new().unwrap();
let mut scheduler = scheduler(&tmp, 1);
create_run(&scheduler, "run-1", vec![task("task-a", 2)], 1);
scheduler.tick_run(&FleetRunId::from("run-1")).unwrap();
scheduler.set_now(base_now() + chrono::Duration::seconds(4));
scheduler
.append_worker_event(
&FleetRunId::from("run-1"),
"worker-1",
"task-a",
FleetWorkerEventPayload::ModelWait {
model: Some("deepseek-v4-pro".to_string()),
},
)
.unwrap();
scheduler
.ledger
.heartbeat("worker-1", &scheduler.timestamp(), None, None)
.unwrap();
let report = scheduler.tick_run(&FleetRunId::from("run-1")).unwrap();
assert_eq!(report.marked_stale, 0);
assert_eq!(report.restarted, 0);
let state = scheduler.ledger.rebuild_state().unwrap();
assert_eq!(state.tasks["run-1:task-a"].entry.attempts, 1);
assert_eq!(state.workers["worker-1"], FleetWorkerStatus::Busy);
}
#[test]
fn fleet_scheduler_cancel_run_interrupts_active_and_cancels_queued() {
let tmp = TempDir::new().unwrap();
let scheduler = scheduler(&tmp, 1);
create_run(
&scheduler,
"run-1",
vec![task("task-a", 3), task("task-b", 3), task("task-c", 3)],
2,
);
scheduler.tick_run(&FleetRunId::from("run-1")).unwrap();
let report = scheduler
.cancel_run(&FleetRunId::from("run-1"), "operator")
.unwrap();
assert_eq!(report.cancelled, 3);
let state = scheduler.ledger.rebuild_state().unwrap();
for task in state.tasks.values() {
assert_eq!(task.status, FleetTaskLedgerStatus::Cancelled);
}
let ledger = ledger_text(&scheduler);
assert!(ledger.contains("\"state\":\"interrupted\""));
assert!(ledger.contains("\"state\":\"cancelled\""));
}
}