merge #3157 fleet manager commands
This commit is contained in:
@@ -87,6 +87,8 @@ pub struct FleetLedgerState {
|
||||
pub latest_seq: BTreeMap<String, u64>,
|
||||
/// Latest event envelope per worker_id:run_id:task_id.
|
||||
pub latest_events: BTreeMap<String, FleetWorkerEvent>,
|
||||
/// Artifact events keyed by worker_id:run_id:task_id:path.
|
||||
pub artifact_events: BTreeMap<String, FleetWorkerEvent>,
|
||||
/// Completed receipts by run_id:task_id.
|
||||
pub receipts: BTreeMap<String, FleetReceipt>,
|
||||
}
|
||||
@@ -122,6 +124,7 @@ pub struct FleetHeartbeatState {
|
||||
}
|
||||
|
||||
/// Append-only JSONL ledger for fleet runs.
|
||||
#[derive(Debug)]
|
||||
pub struct FleetLedger {
|
||||
ledger_path: PathBuf,
|
||||
}
|
||||
@@ -218,6 +221,23 @@ impl FleetLedger {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn mark_task_terminal_status(
|
||||
&self,
|
||||
run_id: &FleetRunId,
|
||||
task_id: &str,
|
||||
worker_id: Option<&str>,
|
||||
timestamp: &str,
|
||||
status: FleetTaskLedgerStatus,
|
||||
) -> Result<()> {
|
||||
self.append_record(&FleetLedgerRecord::TaskCompletedOrFailed {
|
||||
run_id: run_id.clone(),
|
||||
task_id: task_id.to_string(),
|
||||
worker_id: worker_id.unwrap_or_default().to_string(),
|
||||
timestamp: timestamp.to_string(),
|
||||
status,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn append_event(&self, event: FleetWorkerEvent) -> Result<()> {
|
||||
self.append_record(&FleetLedgerRecord::EventAppended { event })
|
||||
}
|
||||
@@ -374,6 +394,11 @@ impl FleetLedger {
|
||||
event: event.clone(),
|
||||
})?);
|
||||
}
|
||||
for event in state.artifact_events.values() {
|
||||
lines.push(serde_json::to_string(&FleetLedgerRecord::EventAppended {
|
||||
event: event.clone(),
|
||||
})?);
|
||||
}
|
||||
for (worker_id, heartbeat) in &state.heartbeats {
|
||||
lines.push(serde_json::to_string(&FleetLedgerRecord::Heartbeat {
|
||||
worker_id: worker_id.clone(),
|
||||
@@ -418,11 +443,23 @@ fn mark_task_terminal(
|
||||
let key = task_key(&run_id.0, task_id);
|
||||
if let Some(task) = state.tasks.get_mut(&key) {
|
||||
task.status = status;
|
||||
task.leased_to = Some(worker_id.to_string());
|
||||
if !worker_id.is_empty() {
|
||||
task.leased_to = Some(worker_id.to_string());
|
||||
}
|
||||
task.completed_at = Some(timestamp.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
fn artifact_event_key(event: &FleetWorkerEvent, artifact: &FleetArtifactRef) -> String {
|
||||
format!(
|
||||
"{}:{}:{}:{}",
|
||||
event.worker_id,
|
||||
event.run_id.0,
|
||||
event.task_id,
|
||||
artifact.path.display()
|
||||
)
|
||||
}
|
||||
|
||||
fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) {
|
||||
match record {
|
||||
FleetLedgerRecord::RunCreated { run } => {
|
||||
@@ -479,6 +516,11 @@ fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) {
|
||||
state.latest_seq.insert(event_key.clone(), event.seq);
|
||||
state.latest_events.insert(event_key, event.clone());
|
||||
}
|
||||
if let FleetWorkerEventPayload::Artifact(artifact) = &event.payload {
|
||||
state
|
||||
.artifact_events
|
||||
.insert(artifact_event_key(&event, artifact), event.clone());
|
||||
}
|
||||
// Derive worker status from lifecycle events.
|
||||
match &event.payload {
|
||||
FleetWorkerEventPayload::Starting | FleetWorkerEventPayload::Running => {
|
||||
|
||||
@@ -0,0 +1,982 @@
|
||||
//! Local-first fleet manager loop and operator controls.
|
||||
//!
|
||||
//! This module is intentionally ledger-first: the first manager can run in the
|
||||
//! foreground and coordinate logical local workers while later host adapters
|
||||
//! add real process and SSH execution behind the same records.
|
||||
|
||||
#![allow(dead_code)]
|
||||
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{Context, Result, anyhow, bail};
|
||||
use chrono::{DateTime, SecondsFormat, Utc};
|
||||
use codewhale_protocol::fleet::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::ledger::{FleetLedger, FleetLedgerState, FleetTaskLedgerStatus, FleetTaskState};
|
||||
|
||||
const DEFAULT_STALE_AFTER_SECONDS: u64 = 300;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FleetManager {
|
||||
workspace: PathBuf,
|
||||
ledger: FleetLedger,
|
||||
stale_after: Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FleetRunReport {
|
||||
pub run_id: FleetRunId,
|
||||
pub task_count: usize,
|
||||
pub leased: usize,
|
||||
pub queued: usize,
|
||||
pub worker_ids: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct FleetTickReport {
|
||||
pub leased: usize,
|
||||
pub heartbeats: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct FleetStatusSnapshot {
|
||||
pub runs: usize,
|
||||
pub queued: usize,
|
||||
pub running: usize,
|
||||
pub completed: usize,
|
||||
pub failed: usize,
|
||||
pub cancelled: usize,
|
||||
pub stale: usize,
|
||||
pub workers: BTreeMap<String, FleetWorkerStatus>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FleetWorkerInspection {
|
||||
pub worker_id: String,
|
||||
pub status: FleetWorkerStatus,
|
||||
pub current_run_id: Option<FleetRunId>,
|
||||
pub current_task_id: Option<String>,
|
||||
pub latest_heartbeat_at: Option<String>,
|
||||
pub latest_event: Option<FleetWorkerEvent>,
|
||||
pub artifacts: Vec<FleetArtifactRef>,
|
||||
pub last_error: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct FleetTaskSpecDocument {
|
||||
#[serde(default)]
|
||||
pub name: Option<String>,
|
||||
#[serde(default)]
|
||||
pub labels: BTreeMap<String, String>,
|
||||
#[serde(default, alias = "worker_specs")]
|
||||
pub workers: Vec<FleetWorkerSpec>,
|
||||
#[serde(default)]
|
||||
pub tasks: Vec<FleetTaskSpec>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
#[serde(untagged)]
|
||||
enum FleetTaskSpecFile {
|
||||
Document(FleetTaskSpecDocument),
|
||||
Tasks(Vec<FleetTaskSpec>),
|
||||
Single(FleetTaskSpec),
|
||||
}
|
||||
|
||||
impl FleetTaskSpecFile {
|
||||
fn into_document(self, fallback_name: String) -> FleetTaskSpecDocument {
|
||||
match self {
|
||||
Self::Document(mut doc) => {
|
||||
if doc.name.as_deref().is_none_or(str::is_empty) {
|
||||
doc.name = Some(fallback_name);
|
||||
}
|
||||
doc
|
||||
}
|
||||
Self::Tasks(tasks) => FleetTaskSpecDocument {
|
||||
name: Some(fallback_name),
|
||||
labels: BTreeMap::new(),
|
||||
workers: Vec::new(),
|
||||
tasks,
|
||||
},
|
||||
Self::Single(task) => FleetTaskSpecDocument {
|
||||
name: Some(fallback_name),
|
||||
labels: BTreeMap::new(),
|
||||
workers: Vec::new(),
|
||||
tasks: vec![task],
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FleetManager {
|
||||
pub fn open(workspace: impl AsRef<Path>) -> Result<Self> {
|
||||
let workspace = workspace.as_ref().to_path_buf();
|
||||
let ledger = FleetLedger::open(&workspace)?;
|
||||
Ok(Self {
|
||||
workspace,
|
||||
ledger,
|
||||
stale_after: Duration::from_secs(DEFAULT_STALE_AFTER_SECONDS),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn with_stale_after(mut self, stale_after: Duration) -> Self {
|
||||
self.stale_after = stale_after;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn ledger_path(&self) -> &Path {
|
||||
self.ledger.path()
|
||||
}
|
||||
|
||||
pub fn load_task_spec(path: &Path) -> Result<FleetTaskSpecDocument> {
|
||||
let raw = std::fs::read_to_string(path)
|
||||
.with_context(|| format!("reading fleet task spec {}", path.display()))?;
|
||||
let fallback_name = path
|
||||
.file_stem()
|
||||
.and_then(|s| s.to_str())
|
||||
.filter(|s| !s.is_empty())
|
||||
.unwrap_or("fleet-run")
|
||||
.to_string();
|
||||
let parsed = match path.extension().and_then(|s| s.to_str()) {
|
||||
Some("toml") => toml::from_str::<FleetTaskSpecFile>(&raw)
|
||||
.with_context(|| format!("parsing TOML fleet task spec {}", path.display()))?,
|
||||
_ => serde_json::from_str::<FleetTaskSpecFile>(&raw)
|
||||
.with_context(|| format!("parsing JSON fleet task spec {}", path.display()))?,
|
||||
};
|
||||
let doc = parsed.into_document(fallback_name);
|
||||
validate_task_spec_document(&doc)?;
|
||||
Ok(doc)
|
||||
}
|
||||
|
||||
pub fn create_run_from_task_spec_path(
|
||||
&self,
|
||||
path: &Path,
|
||||
max_workers: usize,
|
||||
) -> Result<FleetRunReport> {
|
||||
let doc = Self::load_task_spec(path)?;
|
||||
self.create_run(doc, max_workers)
|
||||
}
|
||||
|
||||
pub fn create_run(
|
||||
&self,
|
||||
mut doc: FleetTaskSpecDocument,
|
||||
max_workers: usize,
|
||||
) -> Result<FleetRunReport> {
|
||||
validate_task_spec_document(&doc)?;
|
||||
let max_workers = max_workers.clamp(1, 128);
|
||||
let run_id = FleetRunId::from(format!(
|
||||
"fleet-{}",
|
||||
&Uuid::new_v4().simple().to_string()[..8]
|
||||
));
|
||||
let now = timestamp();
|
||||
if doc.workers.is_empty() {
|
||||
doc.workers = default_local_workers(&run_id, max_workers);
|
||||
}
|
||||
let run = FleetRun {
|
||||
id: run_id.clone(),
|
||||
name: doc.name.unwrap_or_else(|| run_id.0.clone()),
|
||||
status: FleetRunStatus::Queued,
|
||||
task_specs: doc.tasks.clone(),
|
||||
worker_specs: doc.workers.clone(),
|
||||
labels: doc.labels,
|
||||
created_at: now.clone(),
|
||||
updated_at: Some(now.clone()),
|
||||
completed_at: None,
|
||||
};
|
||||
self.ledger.create_run(&run)?;
|
||||
for task in &run.task_specs {
|
||||
self.ledger.enqueue(FleetInboxEntry {
|
||||
run_id: run.id.clone(),
|
||||
task_id: task.id.clone(),
|
||||
priority: task_priority(task),
|
||||
enqueued_at: now.clone(),
|
||||
lease_deadline: None,
|
||||
attempts: 0,
|
||||
})?;
|
||||
}
|
||||
let initial_status = if run.task_specs.is_empty() {
|
||||
FleetRunStatus::Completed
|
||||
} else {
|
||||
FleetRunStatus::Running
|
||||
};
|
||||
self.ledger
|
||||
.update_run_status(&run.id, initial_status, ×tamp())?;
|
||||
let tick = self.schedule_run(&run.id, max_workers)?;
|
||||
self.refresh_run_status(&run.id)?;
|
||||
let state = self.ledger.rebuild_state()?;
|
||||
let snapshot = self.status_from_state(Some(&run.id), &state);
|
||||
Ok(FleetRunReport {
|
||||
run_id: run.id,
|
||||
task_count: run.task_specs.len(),
|
||||
leased: tick.leased,
|
||||
queued: snapshot.queued,
|
||||
worker_ids: run.worker_specs.iter().map(|w| w.id.clone()).collect(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn schedule_run(&self, run_id: &FleetRunId, max_workers: usize) -> Result<FleetTickReport> {
|
||||
let max_workers = max_workers.clamp(1, 128);
|
||||
let mut report = FleetTickReport::default();
|
||||
let state = self.ledger.rebuild_state()?;
|
||||
let run = state
|
||||
.runs
|
||||
.get(&run_id.0)
|
||||
.cloned()
|
||||
.ok_or_else(|| anyhow!("fleet run {} does not exist", run_id.0))?;
|
||||
let worker_ids = worker_ids_for_run(&run, max_workers);
|
||||
|
||||
for task in active_tasks_for_run(&state, run_id) {
|
||||
if let Some(worker_id) = task.leased_to.as_deref()
|
||||
&& worker_ids.iter().any(|id| id == worker_id)
|
||||
{
|
||||
self.ledger.heartbeat(worker_id, ×tamp(), None, None)?;
|
||||
report.heartbeats += 1;
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
let state = self.ledger.rebuild_state()?;
|
||||
let active_workers = active_workers_for_run(&state, run_id);
|
||||
if active_workers.len() >= max_workers {
|
||||
break;
|
||||
}
|
||||
let Some(worker_id) = worker_ids
|
||||
.iter()
|
||||
.find(|id| !active_workers.contains(*id))
|
||||
.cloned()
|
||||
else {
|
||||
break;
|
||||
};
|
||||
let Some((entry, task_spec)) = next_enqueued_task_for_run(&state, run_id) else {
|
||||
break;
|
||||
};
|
||||
self.start_worker_task(&worker_id, &entry, &task_spec)?;
|
||||
report.leased += 1;
|
||||
}
|
||||
|
||||
self.refresh_run_status(run_id)?;
|
||||
Ok(report)
|
||||
}
|
||||
|
||||
pub fn status(&self) -> Result<FleetStatusSnapshot> {
|
||||
let state = self.ledger.rebuild_state()?;
|
||||
Ok(self.status_from_state(None, &state))
|
||||
}
|
||||
|
||||
pub fn run_status(&self, run_id: &FleetRunId) -> Result<FleetStatusSnapshot> {
|
||||
let state = self.ledger.rebuild_state()?;
|
||||
Ok(self.status_from_state(Some(run_id), &state))
|
||||
}
|
||||
|
||||
pub fn run_has_open_work(&self, run_id: &FleetRunId) -> Result<bool> {
|
||||
let status = self.run_status(run_id)?;
|
||||
Ok(status.queued + status.running + status.stale > 0)
|
||||
}
|
||||
|
||||
pub fn inspect_worker(&self, worker_id: &str) -> Result<FleetWorkerInspection> {
|
||||
let state = self.ledger.rebuild_state()?;
|
||||
let latest_event = latest_event_for_worker(&state, worker_id).cloned();
|
||||
let current = active_task_for_worker(&state, worker_id)
|
||||
.or_else(|| latest_task_for_worker(&state, worker_id));
|
||||
let artifacts = state
|
||||
.artifact_events
|
||||
.values()
|
||||
.filter(|event| event.worker_id == worker_id)
|
||||
.filter_map(|event| match &event.payload {
|
||||
FleetWorkerEventPayload::Artifact(artifact) => Some(artifact.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.chain(
|
||||
state
|
||||
.receipts
|
||||
.values()
|
||||
.filter(|receipt| receipt.worker_id == worker_id)
|
||||
.flat_map(|receipt| receipt.artifacts.clone()),
|
||||
)
|
||||
.collect();
|
||||
let last_error = latest_error_for_worker(&state, worker_id);
|
||||
let status = state
|
||||
.workers
|
||||
.get(worker_id)
|
||||
.cloned()
|
||||
.unwrap_or(FleetWorkerStatus::Unknown);
|
||||
let latest_heartbeat_at = state
|
||||
.heartbeats
|
||||
.get(worker_id)
|
||||
.map(|heartbeat| heartbeat.timestamp.clone());
|
||||
Ok(FleetWorkerInspection {
|
||||
worker_id: worker_id.to_string(),
|
||||
status,
|
||||
current_run_id: current.as_ref().map(|task| task.entry.run_id.clone()),
|
||||
current_task_id: current.map(|task| task.entry.task_id.clone()),
|
||||
latest_heartbeat_at,
|
||||
latest_event,
|
||||
artifacts,
|
||||
last_error,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn interrupt_worker(&self, worker_id: &str) -> Result<FleetWorkerInspection> {
|
||||
let state = self.ledger.rebuild_state()?;
|
||||
let Some(task) = active_task_for_worker(&state, worker_id) else {
|
||||
bail!("worker {worker_id} has no running fleet task");
|
||||
};
|
||||
self.append_worker_event(
|
||||
&task.entry.run_id,
|
||||
worker_id,
|
||||
&task.entry.task_id,
|
||||
FleetWorkerEventPayload::Interrupted {
|
||||
signal: Some("operator".to_string()),
|
||||
},
|
||||
)?;
|
||||
self.append_worker_event(
|
||||
&task.entry.run_id,
|
||||
worker_id,
|
||||
&task.entry.task_id,
|
||||
FleetWorkerEventPayload::Cancelled {
|
||||
cancelled_by: Some("operator".to_string()),
|
||||
},
|
||||
)?;
|
||||
self.refresh_run_status(&task.entry.run_id)?;
|
||||
self.inspect_worker(worker_id)
|
||||
}
|
||||
|
||||
pub fn restart_worker(&self, worker_id: &str) -> Result<FleetWorkerInspection> {
|
||||
let state = self.ledger.rebuild_state()?;
|
||||
let Some(task) = active_task_for_worker(&state, worker_id)
|
||||
.or_else(|| latest_task_for_worker(&state, worker_id))
|
||||
else {
|
||||
bail!("worker {worker_id} has no fleet task to restart");
|
||||
};
|
||||
let now = timestamp();
|
||||
self.ledger.lease_task(
|
||||
&task.entry.run_id,
|
||||
&task.entry.task_id,
|
||||
worker_id,
|
||||
&now,
|
||||
None,
|
||||
)?;
|
||||
self.append_worker_event(
|
||||
&task.entry.run_id,
|
||||
worker_id,
|
||||
&task.entry.task_id,
|
||||
FleetWorkerEventPayload::Restarted { restart_count: 1 },
|
||||
)?;
|
||||
self.append_worker_event(
|
||||
&task.entry.run_id,
|
||||
worker_id,
|
||||
&task.entry.task_id,
|
||||
FleetWorkerEventPayload::Running,
|
||||
)?;
|
||||
self.ledger.heartbeat(worker_id, ×tamp(), None, None)?;
|
||||
self.ledger
|
||||
.update_run_status(&task.entry.run_id, FleetRunStatus::Running, ×tamp())?;
|
||||
self.inspect_worker(worker_id)
|
||||
}
|
||||
|
||||
pub fn stop_all(&self) -> Result<usize> {
|
||||
let state = self.ledger.rebuild_state()?;
|
||||
let now = timestamp();
|
||||
let mut affected_runs = BTreeSet::new();
|
||||
let mut stopped = 0usize;
|
||||
for task in state.tasks.values() {
|
||||
if !matches!(
|
||||
task.status,
|
||||
FleetTaskLedgerStatus::Enqueued | FleetTaskLedgerStatus::Leased
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
if let Some(worker_id) = task.leased_to.as_deref() {
|
||||
self.append_worker_event(
|
||||
&task.entry.run_id,
|
||||
worker_id,
|
||||
&task.entry.task_id,
|
||||
FleetWorkerEventPayload::Interrupted {
|
||||
signal: Some("stop_all".to_string()),
|
||||
},
|
||||
)?;
|
||||
}
|
||||
self.ledger.mark_task_terminal_status(
|
||||
&task.entry.run_id,
|
||||
&task.entry.task_id,
|
||||
task.leased_to.as_deref(),
|
||||
&now,
|
||||
FleetTaskLedgerStatus::Cancelled,
|
||||
)?;
|
||||
affected_runs.insert(task.entry.run_id.0.clone());
|
||||
stopped += 1;
|
||||
}
|
||||
for run_id in affected_runs {
|
||||
self.ledger.update_run_status(
|
||||
&FleetRunId::from(run_id),
|
||||
FleetRunStatus::Cancelled,
|
||||
×tamp(),
|
||||
)?;
|
||||
}
|
||||
Ok(stopped)
|
||||
}
|
||||
|
||||
fn start_worker_task(
|
||||
&self,
|
||||
worker_id: &str,
|
||||
entry: &FleetInboxEntry,
|
||||
task_spec: &FleetTaskSpec,
|
||||
) -> Result<()> {
|
||||
let now = timestamp();
|
||||
self.ledger
|
||||
.lease_task(&entry.run_id, &entry.task_id, worker_id, &now, None)?;
|
||||
self.append_worker_event(
|
||||
&entry.run_id,
|
||||
worker_id,
|
||||
&entry.task_id,
|
||||
FleetWorkerEventPayload::Leased {
|
||||
lease_expires_at: None,
|
||||
},
|
||||
)?;
|
||||
self.append_worker_event(
|
||||
&entry.run_id,
|
||||
worker_id,
|
||||
&entry.task_id,
|
||||
FleetWorkerEventPayload::Starting,
|
||||
)?;
|
||||
let log_artifact = self.write_log_artifact(&entry.run_id, worker_id, task_spec)?;
|
||||
self.append_worker_event(
|
||||
&entry.run_id,
|
||||
worker_id,
|
||||
&entry.task_id,
|
||||
FleetWorkerEventPayload::Artifact(log_artifact.clone()),
|
||||
)?;
|
||||
self.append_worker_event(
|
||||
&entry.run_id,
|
||||
worker_id,
|
||||
&entry.task_id,
|
||||
FleetWorkerEventPayload::Running,
|
||||
)?;
|
||||
self.ledger.heartbeat(worker_id, ×tamp(), None, None)?;
|
||||
self.maybe_complete_local_simulation(entry, worker_id, task_spec, log_artifact)
|
||||
}
|
||||
|
||||
fn maybe_complete_local_simulation(
|
||||
&self,
|
||||
entry: &FleetInboxEntry,
|
||||
worker_id: &str,
|
||||
task_spec: &FleetTaskSpec,
|
||||
log_artifact: FleetArtifactRef,
|
||||
) -> Result<()> {
|
||||
let Some(result) = local_simulation_result(task_spec) else {
|
||||
return Ok(());
|
||||
};
|
||||
let now = timestamp();
|
||||
let (payload, receipt_result) = match result {
|
||||
FleetLocalSimulationResult::Pass => (
|
||||
FleetWorkerEventPayload::Completed {
|
||||
exit_code: Some(0),
|
||||
summary: Some("local fleet smoke task completed".to_string()),
|
||||
},
|
||||
FleetTaskResult::Pass,
|
||||
),
|
||||
FleetLocalSimulationResult::Fail => (
|
||||
FleetWorkerEventPayload::Failed {
|
||||
reason: "local fleet smoke task failed".to_string(),
|
||||
recoverable: false,
|
||||
},
|
||||
FleetTaskResult::Fail,
|
||||
),
|
||||
FleetLocalSimulationResult::Skip => (
|
||||
FleetWorkerEventPayload::Completed {
|
||||
exit_code: Some(0),
|
||||
summary: Some("local fleet smoke task skipped".to_string()),
|
||||
},
|
||||
FleetTaskResult::Skip,
|
||||
),
|
||||
FleetLocalSimulationResult::Timeout => (
|
||||
FleetWorkerEventPayload::Failed {
|
||||
reason: "local fleet smoke task timed out".to_string(),
|
||||
recoverable: true,
|
||||
},
|
||||
FleetTaskResult::Timeout,
|
||||
),
|
||||
};
|
||||
self.append_worker_event(&entry.run_id, worker_id, &entry.task_id, payload)?;
|
||||
self.ledger.record_receipt(FleetReceipt {
|
||||
run_id: entry.run_id.clone(),
|
||||
task_id: entry.task_id.clone(),
|
||||
worker_id: worker_id.to_string(),
|
||||
completed_at: now,
|
||||
result: receipt_result,
|
||||
artifacts: vec![log_artifact],
|
||||
score: None,
|
||||
})
|
||||
}
|
||||
|
||||
fn append_worker_event(
|
||||
&self,
|
||||
run_id: &FleetRunId,
|
||||
worker_id: &str,
|
||||
task_id: &str,
|
||||
payload: FleetWorkerEventPayload,
|
||||
) -> Result<FleetWorkerEvent> {
|
||||
let state = self.ledger.rebuild_state()?;
|
||||
let key = event_key(worker_id, &run_id.0, task_id);
|
||||
let seq = state.latest_seq.get(&key).copied().unwrap_or(0) + 1;
|
||||
let event = FleetWorkerEvent {
|
||||
seq,
|
||||
run_id: run_id.clone(),
|
||||
worker_id: worker_id.to_string(),
|
||||
task_id: task_id.to_string(),
|
||||
timestamp: timestamp(),
|
||||
payload,
|
||||
extra: BTreeMap::new(),
|
||||
};
|
||||
self.ledger.append_event(event.clone())?;
|
||||
Ok(event)
|
||||
}
|
||||
|
||||
fn write_log_artifact(
|
||||
&self,
|
||||
run_id: &FleetRunId,
|
||||
worker_id: &str,
|
||||
task_spec: &FleetTaskSpec,
|
||||
) -> Result<FleetArtifactRef> {
|
||||
let rel_path = PathBuf::from(".codewhale")
|
||||
.join("fleet")
|
||||
.join(safe_path_segment(&run_id.0))
|
||||
.join(safe_path_segment(&task_spec.id))
|
||||
.join(format!("{}.log", safe_path_segment(worker_id)));
|
||||
let abs_path = self.workspace.join(&rel_path);
|
||||
if let Some(parent) = abs_path.parent() {
|
||||
std::fs::create_dir_all(parent)
|
||||
.with_context(|| format!("creating fleet artifact dir {}", parent.display()))?;
|
||||
}
|
||||
let contents = format!(
|
||||
"run_id={}\ntask_id={}\ntask_name={}\nworker_id={}\nstatus=started\n",
|
||||
run_id.0, task_spec.id, task_spec.name, worker_id
|
||||
);
|
||||
std::fs::write(&abs_path, contents)
|
||||
.with_context(|| format!("writing fleet worker log {}", abs_path.display()))?;
|
||||
let size_bytes = std::fs::metadata(&abs_path).ok().map(|m| m.len());
|
||||
Ok(FleetArtifactRef {
|
||||
kind: FleetArtifactKind::Log,
|
||||
path: rel_path,
|
||||
checksum: None,
|
||||
mime_type: Some("text/plain".to_string()),
|
||||
size_bytes,
|
||||
})
|
||||
}
|
||||
|
||||
fn refresh_run_status(&self, run_id: &FleetRunId) -> Result<()> {
|
||||
let state = self.ledger.rebuild_state()?;
|
||||
let mut has_queued = false;
|
||||
let mut has_running = false;
|
||||
let mut has_failed = false;
|
||||
let mut has_cancelled = false;
|
||||
let mut has_tasks = false;
|
||||
for task in state
|
||||
.tasks
|
||||
.values()
|
||||
.filter(|task| task.entry.run_id == *run_id)
|
||||
{
|
||||
has_tasks = true;
|
||||
match task.status {
|
||||
FleetTaskLedgerStatus::Enqueued => has_queued = true,
|
||||
FleetTaskLedgerStatus::Leased => has_running = true,
|
||||
FleetTaskLedgerStatus::Failed => has_failed = true,
|
||||
FleetTaskLedgerStatus::Cancelled => has_cancelled = true,
|
||||
FleetTaskLedgerStatus::Completed => {}
|
||||
}
|
||||
}
|
||||
let status = if !has_tasks {
|
||||
FleetRunStatus::Completed
|
||||
} else if has_queued || has_running {
|
||||
FleetRunStatus::Running
|
||||
} else if has_failed {
|
||||
FleetRunStatus::Failed
|
||||
} else if has_cancelled {
|
||||
FleetRunStatus::Cancelled
|
||||
} else {
|
||||
FleetRunStatus::Completed
|
||||
};
|
||||
self.ledger
|
||||
.update_run_status(run_id, status, ×tamp())
|
||||
.context("updating fleet run status")
|
||||
}
|
||||
|
||||
fn status_from_state(
|
||||
&self,
|
||||
run_filter: Option<&FleetRunId>,
|
||||
state: &FleetLedgerState,
|
||||
) -> FleetStatusSnapshot {
|
||||
let mut snapshot = FleetStatusSnapshot {
|
||||
runs: state.runs.len(),
|
||||
workers: state.workers.clone(),
|
||||
..FleetStatusSnapshot::default()
|
||||
};
|
||||
for task in state.tasks.values() {
|
||||
if run_filter.is_some_and(|run_id| task.entry.run_id != *run_id) {
|
||||
continue;
|
||||
}
|
||||
match task.status {
|
||||
FleetTaskLedgerStatus::Enqueued => snapshot.queued += 1,
|
||||
FleetTaskLedgerStatus::Leased => {
|
||||
if self.task_is_stale(task, state) {
|
||||
snapshot.stale += 1;
|
||||
} else {
|
||||
snapshot.running += 1;
|
||||
}
|
||||
}
|
||||
FleetTaskLedgerStatus::Completed => snapshot.completed += 1,
|
||||
FleetTaskLedgerStatus::Failed => snapshot.failed += 1,
|
||||
FleetTaskLedgerStatus::Cancelled => snapshot.cancelled += 1,
|
||||
}
|
||||
}
|
||||
snapshot
|
||||
}
|
||||
|
||||
fn task_is_stale(&self, task: &FleetTaskState, state: &FleetLedgerState) -> bool {
|
||||
let Some(worker_id) = task.leased_to.as_deref() else {
|
||||
return true;
|
||||
};
|
||||
let Some(heartbeat) = state.heartbeats.get(worker_id) else {
|
||||
return true;
|
||||
};
|
||||
let Ok(last) = DateTime::parse_from_rfc3339(&heartbeat.timestamp) else {
|
||||
return true;
|
||||
};
|
||||
let age = Utc::now().signed_duration_since(last.with_timezone(&Utc));
|
||||
age.to_std()
|
||||
.is_ok_and(|duration| duration > self.stale_after)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
enum FleetLocalSimulationResult {
|
||||
Pass,
|
||||
Fail,
|
||||
Skip,
|
||||
Timeout,
|
||||
}
|
||||
|
||||
fn validate_task_spec_document(doc: &FleetTaskSpecDocument) -> Result<()> {
|
||||
if doc.tasks.is_empty() {
|
||||
bail!("fleet task spec must include at least one task");
|
||||
}
|
||||
let mut ids = BTreeSet::new();
|
||||
for task in &doc.tasks {
|
||||
if task.id.trim().is_empty() {
|
||||
bail!("fleet task id cannot be empty");
|
||||
}
|
||||
if !ids.insert(task.id.clone()) {
|
||||
bail!("duplicate fleet task id {}", task.id);
|
||||
}
|
||||
if task.name.trim().is_empty() {
|
||||
bail!("fleet task {} name cannot be empty", task.id);
|
||||
}
|
||||
if task.instructions.trim().is_empty() {
|
||||
bail!("fleet task {} instructions cannot be empty", task.id);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn default_local_workers(run_id: &FleetRunId, max_workers: usize) -> Vec<FleetWorkerSpec> {
|
||||
(1..=max_workers)
|
||||
.map(|index| FleetWorkerSpec {
|
||||
id: format!("{}-local-{}", run_id.0, index),
|
||||
name: format!("Local worker {index}"),
|
||||
host: FleetHostSpec::Local,
|
||||
labels: BTreeMap::new(),
|
||||
capabilities: vec!["local".to_string()],
|
||||
max_concurrent_tasks: Some(1),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn worker_ids_for_run(run: &FleetRun, max_workers: usize) -> Vec<String> {
|
||||
run.worker_specs
|
||||
.iter()
|
||||
.take(max_workers)
|
||||
.map(|worker| worker.id.clone())
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn active_workers_for_run(state: &FleetLedgerState, run_id: &FleetRunId) -> BTreeSet<String> {
|
||||
active_tasks_for_run(state, run_id)
|
||||
.filter_map(|task| task.leased_to.clone())
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn active_tasks_for_run<'a>(
|
||||
state: &'a FleetLedgerState,
|
||||
run_id: &'a FleetRunId,
|
||||
) -> impl Iterator<Item = &'a FleetTaskState> {
|
||||
state.tasks.values().filter(move |task| {
|
||||
task.entry.run_id == *run_id && matches!(task.status, FleetTaskLedgerStatus::Leased)
|
||||
})
|
||||
}
|
||||
|
||||
fn active_task_for_worker<'a>(
|
||||
state: &'a FleetLedgerState,
|
||||
worker_id: &str,
|
||||
) -> Option<&'a FleetTaskState> {
|
||||
state.tasks.values().find(|task| {
|
||||
task.leased_to.as_deref() == Some(worker_id)
|
||||
&& matches!(task.status, FleetTaskLedgerStatus::Leased)
|
||||
})
|
||||
}
|
||||
|
||||
fn latest_task_for_worker<'a>(
|
||||
state: &'a FleetLedgerState,
|
||||
worker_id: &str,
|
||||
) -> Option<&'a FleetTaskState> {
|
||||
state
|
||||
.tasks
|
||||
.values()
|
||||
.filter(|task| task.leased_to.as_deref() == Some(worker_id))
|
||||
.max_by_key(|task| task.completed_at.as_deref().or(task.leased_at.as_deref()))
|
||||
}
|
||||
|
||||
fn next_enqueued_task_for_run(
|
||||
state: &FleetLedgerState,
|
||||
run_id: &FleetRunId,
|
||||
) -> Option<(FleetInboxEntry, FleetTaskSpec)> {
|
||||
let run = state.runs.get(&run_id.0)?;
|
||||
let task = state
|
||||
.tasks
|
||||
.values()
|
||||
.filter(|task| {
|
||||
task.entry.run_id == *run_id && matches!(task.status, FleetTaskLedgerStatus::Enqueued)
|
||||
})
|
||||
.min_by_key(|task| {
|
||||
(
|
||||
task.entry.priority,
|
||||
task.entry.enqueued_at.clone(),
|
||||
task.entry.task_id.clone(),
|
||||
)
|
||||
})?;
|
||||
let task_spec = run
|
||||
.task_specs
|
||||
.iter()
|
||||
.find(|spec| spec.id == task.entry.task_id)
|
||||
.cloned()?;
|
||||
Some((task.entry.clone(), task_spec))
|
||||
}
|
||||
|
||||
fn latest_event_for_worker<'a>(
|
||||
state: &'a FleetLedgerState,
|
||||
worker_id: &str,
|
||||
) -> Option<&'a FleetWorkerEvent> {
|
||||
state
|
||||
.latest_events
|
||||
.values()
|
||||
.filter(|event| event.worker_id == worker_id)
|
||||
.max_by_key(|event| event.seq)
|
||||
}
|
||||
|
||||
fn latest_error_for_worker(state: &FleetLedgerState, worker_id: &str) -> Option<String> {
|
||||
state
|
||||
.latest_events
|
||||
.values()
|
||||
.filter(|event| event.worker_id == worker_id)
|
||||
.filter_map(|event| match &event.payload {
|
||||
FleetWorkerEventPayload::Failed { reason, .. } => {
|
||||
Some((event.seq, format!("failed: {reason}")))
|
||||
}
|
||||
FleetWorkerEventPayload::Cancelled { cancelled_by } => Some((
|
||||
event.seq,
|
||||
cancelled_by
|
||||
.as_ref()
|
||||
.map(|by| format!("cancelled by {by}"))
|
||||
.unwrap_or_else(|| "cancelled".to_string()),
|
||||
)),
|
||||
FleetWorkerEventPayload::Interrupted { signal } => Some((
|
||||
event.seq,
|
||||
signal
|
||||
.as_ref()
|
||||
.map(|signal| format!("interrupted by {signal}"))
|
||||
.unwrap_or_else(|| "interrupted".to_string()),
|
||||
)),
|
||||
FleetWorkerEventPayload::Stale { last_heartbeat_at } => Some((
|
||||
event.seq,
|
||||
last_heartbeat_at
|
||||
.as_ref()
|
||||
.map(|ts| format!("stale since {ts}"))
|
||||
.unwrap_or_else(|| "stale".to_string()),
|
||||
)),
|
||||
_ => None,
|
||||
})
|
||||
.max_by_key(|(seq, _)| *seq)
|
||||
.map(|(_, message)| message)
|
||||
}
|
||||
|
||||
fn local_simulation_result(task: &FleetTaskSpec) -> Option<FleetLocalSimulationResult> {
|
||||
if task
|
||||
.metadata
|
||||
.get("local_complete")
|
||||
.and_then(Value::as_bool)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
return Some(FleetLocalSimulationResult::Pass);
|
||||
}
|
||||
match task
|
||||
.metadata
|
||||
.get("local_result")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_ascii_lowercase)
|
||||
.as_deref()
|
||||
{
|
||||
Some("pass" | "passed" | "ok" | "completed") => Some(FleetLocalSimulationResult::Pass),
|
||||
Some("fail" | "failed" | "error") => Some(FleetLocalSimulationResult::Fail),
|
||||
Some("skip" | "skipped") => Some(FleetLocalSimulationResult::Skip),
|
||||
Some("timeout" | "timed_out") => Some(FleetLocalSimulationResult::Timeout),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn task_priority(task: &FleetTaskSpec) -> i32 {
|
||||
task.metadata
|
||||
.get("priority")
|
||||
.and_then(Value::as_i64)
|
||||
.and_then(|value| i32::try_from(value).ok())
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
fn event_key(worker_id: &str, run_id: &str, task_id: &str) -> String {
|
||||
format!("{worker_id}:{run_id}:{task_id}")
|
||||
}
|
||||
|
||||
fn timestamp() -> String {
|
||||
Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true)
|
||||
}
|
||||
|
||||
fn safe_path_segment(value: &str) -> String {
|
||||
value
|
||||
.chars()
|
||||
.map(|ch| {
|
||||
if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.') {
|
||||
ch
|
||||
} else {
|
||||
'_'
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use serde_json::json;
|
||||
use tempfile::TempDir;
|
||||
|
||||
fn task(id: &str) -> FleetTaskSpec {
|
||||
FleetTaskSpec {
|
||||
id: id.to_string(),
|
||||
name: id.to_string(),
|
||||
description: None,
|
||||
instructions: format!("do {id}"),
|
||||
expected_artifacts: vec![FleetArtifactKind::Log],
|
||||
scorer: None,
|
||||
retry_policy: None,
|
||||
alert_policy: None,
|
||||
timeout_seconds: None,
|
||||
metadata: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn task_spec_file(dir: &TempDir, tasks: Vec<FleetTaskSpec>) -> PathBuf {
|
||||
let path = dir.path().join("fleet-tasks.json");
|
||||
let doc = json!({
|
||||
"name": "manager smoke",
|
||||
"tasks": tasks,
|
||||
});
|
||||
std::fs::write(&path, serde_json::to_string_pretty(&doc).unwrap()).unwrap();
|
||||
path
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fleet_manager_creates_run_and_starts_workers_up_to_cap() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let manager = FleetManager::open(tmp.path()).unwrap();
|
||||
let path = task_spec_file(&tmp, vec![task("task-a"), task("task-b"), task("task-c")]);
|
||||
|
||||
let report = manager.create_run_from_task_spec_path(&path, 2).unwrap();
|
||||
|
||||
assert_eq!(report.task_count, 3);
|
||||
assert_eq!(report.leased, 2);
|
||||
assert_eq!(report.queued, 1);
|
||||
assert_eq!(report.worker_ids.len(), 2);
|
||||
let status = manager.run_status(&report.run_id).unwrap();
|
||||
assert_eq!(status.queued, 1);
|
||||
assert_eq!(status.running, 2);
|
||||
assert_eq!(status.completed, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fleet_manager_inspect_exposes_heartbeat_artifacts_and_errors() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let manager = FleetManager::open(tmp.path()).unwrap();
|
||||
let path = task_spec_file(&tmp, vec![task("task-a")]);
|
||||
let report = manager.create_run_from_task_spec_path(&path, 1).unwrap();
|
||||
let worker_id = &report.worker_ids[0];
|
||||
|
||||
let inspection = manager.inspect_worker(worker_id).unwrap();
|
||||
assert_eq!(inspection.status, FleetWorkerStatus::Busy);
|
||||
assert_eq!(inspection.current_task_id.as_deref(), Some("task-a"));
|
||||
assert!(inspection.latest_heartbeat_at.is_some());
|
||||
assert_eq!(inspection.artifacts.len(), 1);
|
||||
assert!(inspection.last_error.is_none());
|
||||
|
||||
let inspection = manager.interrupt_worker(worker_id).unwrap();
|
||||
assert_eq!(inspection.status, FleetWorkerStatus::Online);
|
||||
assert_eq!(
|
||||
inspection.last_error.as_deref(),
|
||||
Some("cancelled by operator")
|
||||
);
|
||||
let status = manager.run_status(&report.run_id).unwrap();
|
||||
assert_eq!(status.cancelled, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fleet_manager_restart_and_stop_all_are_ledgered() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let manager = FleetManager::open(tmp.path()).unwrap();
|
||||
let path = task_spec_file(&tmp, vec![task("task-a"), task("task-b")]);
|
||||
let report = manager.create_run_from_task_spec_path(&path, 1).unwrap();
|
||||
let worker_id = &report.worker_ids[0];
|
||||
|
||||
manager.interrupt_worker(worker_id).unwrap();
|
||||
let inspection = manager.restart_worker(worker_id).unwrap();
|
||||
assert_eq!(inspection.status, FleetWorkerStatus::Busy);
|
||||
let status = manager.run_status(&report.run_id).unwrap();
|
||||
assert_eq!(status.running, 1);
|
||||
assert_eq!(status.queued, 1);
|
||||
|
||||
let stopped = manager.stop_all().unwrap();
|
||||
assert_eq!(stopped, 2);
|
||||
let status = manager.run_status(&report.run_id).unwrap();
|
||||
assert_eq!(status.cancelled, 2);
|
||||
assert_eq!(status.running, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fleet_manager_can_record_completed_local_smoke_tasks() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let manager = FleetManager::open(tmp.path()).unwrap();
|
||||
let mut completed = task("task-a");
|
||||
completed
|
||||
.metadata
|
||||
.insert("local_result".to_string(), json!("pass"));
|
||||
let path = task_spec_file(&tmp, vec![completed]);
|
||||
|
||||
let report = manager.create_run_from_task_spec_path(&path, 1).unwrap();
|
||||
|
||||
let status = manager.run_status(&report.run_id).unwrap();
|
||||
assert_eq!(status.completed, 1);
|
||||
assert_eq!(status.running, 0);
|
||||
let state = manager.ledger.rebuild_state().unwrap();
|
||||
assert_eq!(state.receipts.len(), 1);
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
//! Agent Fleet control plane — local-first manager, ledger, and workers.
|
||||
|
||||
pub mod ledger;
|
||||
pub mod manager;
|
||||
|
||||
@@ -243,6 +243,8 @@ enum Commands {
|
||||
Exec(ExecArgs),
|
||||
/// Generate SWE-bench prediction rows from CodeWhale runs
|
||||
Swebench(SwebenchArgs),
|
||||
/// Manage local Agent Fleet runs and workers
|
||||
Fleet(FleetArgs),
|
||||
/// Run a code review over a git diff
|
||||
Review(ReviewArgs),
|
||||
/// Open the TUI pre-seeded with a GitHub PR's title, body, and diff (#451)
|
||||
@@ -373,6 +375,59 @@ enum SwebenchCommand {
|
||||
Export(SwebenchExportArgs),
|
||||
}
|
||||
|
||||
#[derive(Args, Debug, Clone)]
|
||||
struct FleetArgs {
|
||||
#[command(subcommand)]
|
||||
command: FleetCommand,
|
||||
}
|
||||
|
||||
#[derive(Subcommand, Debug, Clone)]
|
||||
enum FleetCommand {
|
||||
/// Initialize the local fleet ledger for this workspace
|
||||
Init,
|
||||
/// Create a run from a task spec and start the foreground manager loop
|
||||
Run(FleetRunArgs),
|
||||
/// Show queued/running/completed/failed/stale fleet counts
|
||||
Status,
|
||||
/// Inspect one worker's status, heartbeat, latest event, and artifacts
|
||||
Inspect {
|
||||
/// Worker id printed by `codewhale fleet run`
|
||||
worker_id: String,
|
||||
},
|
||||
/// Interrupt a running worker task and record a terminal cancellation
|
||||
Interrupt {
|
||||
/// Worker id printed by `codewhale fleet run`
|
||||
worker_id: String,
|
||||
},
|
||||
/// Restart the latest task for a worker
|
||||
Restart {
|
||||
/// Worker id printed by `codewhale fleet run`
|
||||
worker_id: String,
|
||||
},
|
||||
/// Stop all queued and running fleet work
|
||||
Stop {
|
||||
/// Confirm stopping all queued and running fleet tasks
|
||||
#[arg(long, required = true)]
|
||||
all: bool,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Args, Debug, Clone)]
|
||||
struct FleetRunArgs {
|
||||
/// JSON or TOML task spec to enqueue
|
||||
#[arg(value_name = "TASK_SPEC")]
|
||||
task_spec: PathBuf,
|
||||
/// Maximum local workers to lease concurrently
|
||||
#[arg(long, default_value_t = 4)]
|
||||
max_workers: usize,
|
||||
/// Seconds without heartbeat before a running task is counted stale
|
||||
#[arg(long, default_value_t = 300)]
|
||||
stale_after_seconds: u64,
|
||||
/// Schedule once and return instead of staying in the manager loop
|
||||
#[arg(long, hide = true, default_value_t = false)]
|
||||
once: bool,
|
||||
}
|
||||
|
||||
#[derive(Args, Debug, Clone)]
|
||||
struct SwebenchRunArgs {
|
||||
/// SWE-bench instance id, e.g. django__django-12345
|
||||
@@ -1068,6 +1123,10 @@ async fn main() -> Result<()> {
|
||||
);
|
||||
run_swebench_command(&config, &model, workspace, max_subagents, args).await
|
||||
}
|
||||
Commands::Fleet(args) => {
|
||||
let workspace = resolve_workspace(&cli);
|
||||
run_fleet_command(&workspace, args).await
|
||||
}
|
||||
Commands::Review(args) => {
|
||||
let config = load_config_from_cli(&cli)?;
|
||||
run_review(&config, args).await
|
||||
@@ -1330,6 +1389,207 @@ async fn run_swebench_command(
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_fleet_command(workspace: &Path, args: FleetArgs) -> Result<()> {
|
||||
use crate::fleet::manager::{FleetManager, FleetStatusSnapshot, FleetWorkerInspection};
|
||||
use codewhale_protocol::fleet::{
|
||||
FleetArtifactKind, FleetWorkerEventPayload, FleetWorkerStatus,
|
||||
};
|
||||
|
||||
fn worker_status_label(status: &FleetWorkerStatus) -> &'static str {
|
||||
match status {
|
||||
FleetWorkerStatus::Unknown => "unknown",
|
||||
FleetWorkerStatus::Online => "online",
|
||||
FleetWorkerStatus::Busy => "busy",
|
||||
FleetWorkerStatus::Offline => "offline",
|
||||
FleetWorkerStatus::Unhealthy => "unhealthy",
|
||||
FleetWorkerStatus::Draining => "draining",
|
||||
FleetWorkerStatus::Retired => "retired",
|
||||
}
|
||||
}
|
||||
|
||||
fn artifact_kind_label(kind: &FleetArtifactKind) -> String {
|
||||
match kind {
|
||||
FleetArtifactKind::Log => "log".to_string(),
|
||||
FleetArtifactKind::Patch => "patch".to_string(),
|
||||
FleetArtifactKind::TestResult => "test_result".to_string(),
|
||||
FleetArtifactKind::Report => "report".to_string(),
|
||||
FleetArtifactKind::Checkpoint => "checkpoint".to_string(),
|
||||
FleetArtifactKind::Receipt => "receipt".to_string(),
|
||||
FleetArtifactKind::Other(value) => value.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
fn event_label(payload: &FleetWorkerEventPayload) -> String {
|
||||
match payload {
|
||||
FleetWorkerEventPayload::Queued => "queued".to_string(),
|
||||
FleetWorkerEventPayload::Leased { .. } => "leased".to_string(),
|
||||
FleetWorkerEventPayload::Starting => "starting".to_string(),
|
||||
FleetWorkerEventPayload::Running => "running".to_string(),
|
||||
FleetWorkerEventPayload::ModelWait { model } => model
|
||||
.as_ref()
|
||||
.map(|model| format!("model_wait model={model}"))
|
||||
.unwrap_or_else(|| "model_wait".to_string()),
|
||||
FleetWorkerEventPayload::RunningTool { tool, call_id } => call_id
|
||||
.as_ref()
|
||||
.map(|call_id| format!("running_tool tool={tool} call_id={call_id}"))
|
||||
.unwrap_or_else(|| format!("running_tool tool={tool}")),
|
||||
FleetWorkerEventPayload::Heartbeat { .. } => "heartbeat".to_string(),
|
||||
FleetWorkerEventPayload::Artifact(artifact) => {
|
||||
format!("artifact kind={}", artifact_kind_label(&artifact.kind))
|
||||
}
|
||||
FleetWorkerEventPayload::Completed { exit_code, summary } => match (exit_code, summary)
|
||||
{
|
||||
(Some(code), Some(summary)) => format!("completed exit_code={code} {summary}"),
|
||||
(Some(code), None) => format!("completed exit_code={code}"),
|
||||
(None, Some(summary)) => format!("completed {summary}"),
|
||||
(None, None) => "completed".to_string(),
|
||||
},
|
||||
FleetWorkerEventPayload::Failed {
|
||||
reason,
|
||||
recoverable,
|
||||
} => {
|
||||
format!("failed recoverable={recoverable} reason={reason}")
|
||||
}
|
||||
FleetWorkerEventPayload::Cancelled { cancelled_by } => cancelled_by
|
||||
.as_ref()
|
||||
.map(|by| format!("cancelled by={by}"))
|
||||
.unwrap_or_else(|| "cancelled".to_string()),
|
||||
FleetWorkerEventPayload::Interrupted { signal } => signal
|
||||
.as_ref()
|
||||
.map(|signal| format!("interrupted signal={signal}"))
|
||||
.unwrap_or_else(|| "interrupted".to_string()),
|
||||
FleetWorkerEventPayload::Stale { last_heartbeat_at } => last_heartbeat_at
|
||||
.as_ref()
|
||||
.map(|ts| format!("stale last_heartbeat_at={ts}"))
|
||||
.unwrap_or_else(|| "stale".to_string()),
|
||||
FleetWorkerEventPayload::Restarted { restart_count } => {
|
||||
format!("restarted count={restart_count}")
|
||||
}
|
||||
FleetWorkerEventPayload::Escalated { channel, alert_id } => alert_id
|
||||
.as_ref()
|
||||
.map(|alert_id| format!("escalated channel={channel} alert_id={alert_id}"))
|
||||
.unwrap_or_else(|| format!("escalated channel={channel}")),
|
||||
}
|
||||
}
|
||||
|
||||
fn print_status(status: &FleetStatusSnapshot) {
|
||||
println!(
|
||||
"fleet: runs={} queued={} running={} completed={} failed={} cancelled={} stale={}",
|
||||
status.runs,
|
||||
status.queued,
|
||||
status.running,
|
||||
status.completed,
|
||||
status.failed,
|
||||
status.cancelled,
|
||||
status.stale
|
||||
);
|
||||
if !status.workers.is_empty() {
|
||||
println!("workers:");
|
||||
for (worker_id, worker_status) in &status.workers {
|
||||
println!(" {worker_id} {}", worker_status_label(worker_status));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn print_inspection(inspection: &FleetWorkerInspection) {
|
||||
println!("worker: {}", inspection.worker_id);
|
||||
println!("status: {}", worker_status_label(&inspection.status));
|
||||
if let Some(run_id) = &inspection.current_run_id {
|
||||
println!("run: {}", run_id.0);
|
||||
}
|
||||
if let Some(task_id) = &inspection.current_task_id {
|
||||
println!("task: {task_id}");
|
||||
}
|
||||
if let Some(heartbeat) = &inspection.latest_heartbeat_at {
|
||||
println!("heartbeat: {heartbeat}");
|
||||
}
|
||||
if let Some(event) = &inspection.latest_event {
|
||||
println!(
|
||||
"latest_event: seq={} {}",
|
||||
event.seq,
|
||||
event_label(&event.payload)
|
||||
);
|
||||
}
|
||||
if !inspection.artifacts.is_empty() {
|
||||
println!("artifacts:");
|
||||
for artifact in &inspection.artifacts {
|
||||
println!(
|
||||
" {} {}",
|
||||
artifact_kind_label(&artifact.kind),
|
||||
artifact.path.display()
|
||||
);
|
||||
}
|
||||
}
|
||||
if let Some(error) = &inspection.last_error {
|
||||
println!("last_error: {error}");
|
||||
}
|
||||
}
|
||||
|
||||
let manager = FleetManager::open(workspace)?;
|
||||
match args.command {
|
||||
FleetCommand::Init => {
|
||||
println!("fleet ledger: {}", manager.ledger_path().display());
|
||||
Ok(())
|
||||
}
|
||||
FleetCommand::Run(args) => {
|
||||
let max_workers = args.max_workers.clamp(1, 128);
|
||||
let manager =
|
||||
manager.with_stale_after(Duration::from_secs(args.stale_after_seconds.max(1)));
|
||||
let report = manager.create_run_from_task_spec_path(&args.task_spec, max_workers)?;
|
||||
println!(
|
||||
"fleet run: {} tasks={} leased={} queued={}",
|
||||
report.run_id.0, report.task_count, report.leased, report.queued
|
||||
);
|
||||
println!("workers:");
|
||||
for worker_id in &report.worker_ids {
|
||||
println!(" {worker_id}");
|
||||
}
|
||||
if args.once {
|
||||
print_status(&manager.run_status(&report.run_id)?);
|
||||
return Ok(());
|
||||
}
|
||||
println!(
|
||||
"manager loop running; use `codewhale fleet status`, `inspect`, `interrupt`, or `stop --all` from another terminal."
|
||||
);
|
||||
loop {
|
||||
manager.schedule_run(&report.run_id, max_workers)?;
|
||||
if !manager.run_has_open_work(&report.run_id)? {
|
||||
print_status(&manager.run_status(&report.run_id)?);
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
FleetCommand::Status => {
|
||||
print_status(&manager.status()?);
|
||||
Ok(())
|
||||
}
|
||||
FleetCommand::Inspect { worker_id } => {
|
||||
print_inspection(&manager.inspect_worker(&worker_id)?);
|
||||
Ok(())
|
||||
}
|
||||
FleetCommand::Interrupt { worker_id } => {
|
||||
let inspection = manager.interrupt_worker(&worker_id)?;
|
||||
print_inspection(&inspection);
|
||||
Ok(())
|
||||
}
|
||||
FleetCommand::Restart { worker_id } => {
|
||||
let inspection = manager.restart_worker(&worker_id)?;
|
||||
print_inspection(&inspection);
|
||||
Ok(())
|
||||
}
|
||||
FleetCommand::Stop { all } => {
|
||||
if !all {
|
||||
bail!("pass --all to stop all fleet work");
|
||||
}
|
||||
let stopped = manager.stop_all()?;
|
||||
println!("stopped: {stopped}");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn swebench_prompt(
|
||||
instance_id: &str,
|
||||
workspace: &Path,
|
||||
|
||||
Reference in New Issue
Block a user