merge #3162 fleet status surfaces

This commit is contained in:
Hunter B
2026-06-12 19:35:31 -07:00
5 changed files with 863 additions and 13 deletions
+38 -7
View File
@@ -89,6 +89,10 @@ pub struct FleetLedgerState {
pub latest_events: BTreeMap<String, FleetWorkerEvent>,
/// Artifact events keyed by worker_id:run_id:task_id:path.
pub artifact_events: BTreeMap<String, FleetWorkerEvent>,
/// Restart events keyed by worker_id:run_id:task_id.
pub restarted_events: BTreeMap<String, FleetWorkerEvent>,
/// Escalation events keyed by worker_id:run_id:task_id.
pub escalated_events: BTreeMap<String, FleetWorkerEvent>,
/// Completed receipts by run_id:task_id.
pub receipts: BTreeMap<String, FleetReceipt>,
}
@@ -391,12 +395,20 @@ impl FleetLedger {
)?);
}
}
let mut compacted_events = BTreeMap::new();
for event in state.latest_events.values() {
lines.push(serde_json::to_string(&FleetLedgerRecord::EventAppended {
event: event.clone(),
})?);
compacted_events.insert(compact_event_key(event), event.clone());
}
for event in state.artifact_events.values() {
compacted_events.insert(compact_event_key(event), event.clone());
}
for event in state.restarted_events.values() {
compacted_events.insert(compact_event_key(event), event.clone());
}
for event in state.escalated_events.values() {
compacted_events.insert(compact_event_key(event), event.clone());
}
for event in compacted_events.values() {
lines.push(serde_json::to_string(&FleetLedgerRecord::EventAppended {
event: event.clone(),
})?);
@@ -434,6 +446,13 @@ fn event_key(worker_id: &str, run_id: &str, task_id: &str) -> String {
format!("{}:{}:{}", worker_id, run_id, task_id)
}
fn compact_event_key(event: &FleetWorkerEvent) -> String {
format!(
"{}:{}:{}:{}",
event.worker_id, event.run_id.0, event.task_id, event.seq
)
}
fn mark_task_terminal(
state: &mut FleetLedgerState,
run_id: &FleetRunId,
@@ -510,21 +529,33 @@ fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) {
mark_task_terminal(state, &run_id, &task_id, &worker_id, &timestamp, status);
}
FleetLedgerRecord::EventAppended { event } => {
let event_key = event_key(&event.worker_id, &event.run_id.0, &event.task_id);
let latest_event_key = event_key(&event.worker_id, &event.run_id.0, &event.task_id);
if state
.latest_seq
.get(&event_key)
.get(&latest_event_key)
.copied()
.is_none_or(|seq| event.seq > seq)
{
state.latest_seq.insert(event_key.clone(), event.seq);
state.latest_events.insert(event_key, event.clone());
state.latest_seq.insert(latest_event_key.clone(), event.seq);
state.latest_events.insert(latest_event_key, event.clone());
}
if let FleetWorkerEventPayload::Artifact(artifact) = &event.payload {
state
.artifact_events
.insert(artifact_event_key(&event, artifact), event.clone());
}
if matches!(&event.payload, FleetWorkerEventPayload::Restarted { .. }) {
state.restarted_events.insert(
event_key(&event.worker_id, &event.run_id.0, &event.task_id),
event.clone(),
);
}
if matches!(&event.payload, FleetWorkerEventPayload::Escalated { .. }) {
state.escalated_events.insert(
event_key(&event.worker_id, &event.run_id.0, &event.task_id),
event.clone(),
);
}
// Derive worker status from lifecycle events.
match &event.payload {
FleetWorkerEventPayload::Leased { .. }
+206 -2
View File
@@ -54,6 +54,8 @@ pub struct FleetStatusSnapshot {
pub completed: usize,
pub partial: usize,
pub failed: usize,
pub restarted: usize,
pub escalated: usize,
pub transport_failed: usize,
pub task_failed: usize,
pub verifier_failed: usize,
@@ -68,10 +70,14 @@ pub struct FleetWorkerInspection {
pub status: FleetWorkerStatus,
pub current_run_id: Option<FleetRunId>,
pub current_task_id: Option<String>,
pub objective: Option<String>,
pub role: Option<String>,
pub host: Option<String>,
pub latest_heartbeat_at: Option<String>,
pub latest_event: Option<FleetWorkerEvent>,
pub artifacts: Vec<FleetArtifactRef>,
pub last_error: Option<String>,
pub alert_state: Option<String>,
}
impl FleetManager {
@@ -94,6 +100,10 @@ impl FleetManager {
self.ledger.path()
}
pub fn rebuild_state(&self) -> Result<FleetLedgerState> {
self.ledger.rebuild_state()
}
pub fn load_task_spec(path: &Path) -> Result<FleetTaskSpecDocument> {
load_task_spec_document(path)
}
@@ -228,6 +238,21 @@ impl FleetManager {
let latest_event = latest_event_for_worker(&state, worker_id).cloned();
let current = active_task_for_worker(&state, worker_id)
.or_else(|| latest_task_for_worker(&state, worker_id));
let current_run_id = current.as_ref().map(|task| task.entry.run_id.clone());
let current_task_id = current.as_ref().map(|task| task.entry.task_id.clone());
let (objective, role) = current
.as_ref()
.and_then(|task| task_spec_for_state(&state, task))
.map(|task_spec| {
(
task_spec.objective.or(task_spec.description),
task_spec.worker.and_then(|worker| worker.role),
)
})
.unwrap_or((None, None));
let host = current_run_id
.as_ref()
.and_then(|run_id| worker_host_for_run(&state, run_id, worker_id));
let artifacts = state
.artifact_events
.values()
@@ -254,15 +279,20 @@ impl FleetManager {
.heartbeats
.get(worker_id)
.map(|heartbeat| heartbeat.timestamp.clone());
let alert_state = latest_alert_for_worker(&state, worker_id);
Ok(FleetWorkerInspection {
worker_id: worker_id.to_string(),
status,
current_run_id: current.as_ref().map(|task| task.entry.run_id.clone()),
current_task_id: current.map(|task| task.entry.task_id.clone()),
current_run_id,
current_task_id,
objective,
role,
host,
latest_heartbeat_at,
latest_event,
artifacts,
last_error,
alert_state,
})
}
@@ -366,6 +396,48 @@ impl FleetManager {
Ok(stopped)
}
pub fn stop_run(&self, run_id: &FleetRunId) -> Result<usize> {
let state = self.ledger.rebuild_state()?;
if !state.runs.contains_key(&run_id.0) {
bail!("fleet run {} does not exist", run_id.0);
}
let now = timestamp();
let mut stopped = 0usize;
for task in state
.tasks
.values()
.filter(|task| task.entry.run_id == *run_id)
{
if !matches!(
task.status,
FleetTaskLedgerStatus::Enqueued | FleetTaskLedgerStatus::Leased
) {
continue;
}
if let Some(worker_id) = task.leased_to.as_deref() {
self.append_worker_event(
&task.entry.run_id,
worker_id,
&task.entry.task_id,
FleetWorkerEventPayload::Interrupted {
signal: Some("stop_run".to_string()),
},
)?;
}
self.ledger.mark_task_terminal_status(
&task.entry.run_id,
&task.entry.task_id,
task.leased_to.as_deref(),
&now,
FleetTaskLedgerStatus::Cancelled,
)?;
stopped += 1;
}
self.ledger
.update_run_status(run_id, FleetRunStatus::Cancelled, &timestamp())?;
Ok(stopped)
}
fn start_worker_task(
&self,
worker_id: &str,
@@ -631,6 +703,16 @@ impl FleetManager {
None => {}
}
}
snapshot.restarted = state
.restarted_events
.values()
.filter(|event| run_filter.is_none_or(|run_id| event.run_id == *run_id))
.count();
snapshot.escalated = state
.escalated_events
.values()
.filter(|event| run_filter.is_none_or(|run_id| event.run_id == *run_id))
.count();
snapshot
}
@@ -741,6 +823,37 @@ fn next_enqueued_task_for_run(
Some((task.entry.clone(), task_spec))
}
fn task_spec_for_state(state: &FleetLedgerState, task: &FleetTaskState) -> Option<FleetTaskSpec> {
state
.runs
.get(&task.entry.run_id.0)?
.task_specs
.iter()
.find(|spec| spec.id == task.entry.task_id)
.cloned()
}
fn worker_host_for_run(
state: &FleetLedgerState,
run_id: &FleetRunId,
worker_id: &str,
) -> Option<String> {
let run = state.runs.get(&run_id.0)?;
let worker = run
.worker_specs
.iter()
.find(|worker| worker.id == worker_id)?;
Some(host_label(&worker.host))
}
fn host_label(host: &FleetHostSpec) -> String {
match host {
FleetHostSpec::Local => "local".to_string(),
FleetHostSpec::Ssh { host, .. } => format!("ssh:{host}"),
FleetHostSpec::Docker { image, .. } => format!("docker:{image}"),
}
}
fn latest_event_for_worker<'a>(
state: &'a FleetLedgerState,
worker_id: &str,
@@ -752,6 +865,25 @@ fn latest_event_for_worker<'a>(
.max_by_key(|event| event.seq)
}
fn latest_alert_for_worker(state: &FleetLedgerState, worker_id: &str) -> Option<String> {
state
.escalated_events
.values()
.filter(|event| event.worker_id == worker_id)
.filter_map(|event| match &event.payload {
FleetWorkerEventPayload::Escalated { channel, alert_id } => Some((
event.seq,
alert_id
.as_ref()
.map(|alert_id| format!("escalated via {channel} alert_id={alert_id}"))
.unwrap_or_else(|| format!("escalated via {channel}")),
)),
_ => None,
})
.max_by_key(|(seq, _)| *seq)
.map(|(_, message)| message)
}
fn latest_error_for_worker(state: &FleetLedgerState, worker_id: &str) -> Option<String> {
state
.latest_events
@@ -1063,4 +1195,76 @@ mod tests {
assert_eq!(status.verifier_failed, 1);
assert_eq!(status.running, 0);
}
#[test]
fn fleet_status_counts_restarted_and_escalated_events() {
let tmp = TempDir::new().unwrap();
let manager = FleetManager::open(tmp.path()).unwrap();
let path = task_spec_file(&tmp, vec![task("task-a")]);
let report = manager.create_run_from_task_spec_path(&path, 1).unwrap();
let worker_id = &report.worker_ids[0];
manager.restart_worker(worker_id).unwrap();
manager
.append_worker_event(
&report.run_id,
worker_id,
"task-a",
FleetWorkerEventPayload::Escalated {
channel: "slack".to_string(),
alert_id: None,
},
)
.unwrap();
let status = manager.run_status(&report.run_id).unwrap();
assert_eq!(status.restarted, 1);
assert_eq!(status.escalated, 1);
manager.ledger.compact().unwrap();
let status = manager.run_status(&report.run_id).unwrap();
assert_eq!(status.restarted, 1);
assert_eq!(status.escalated, 1);
}
#[test]
fn fleet_status_inspect_exposes_task_context_host_and_alert() {
let tmp = TempDir::new().unwrap();
let manager = FleetManager::open(tmp.path()).unwrap();
let mut contextual = task("task-a");
contextual.objective = Some("Review the release ledger".to_string());
contextual.worker = Some(FleetTaskWorkerProfile {
role: Some("release-reviewer".to_string()),
tool_profile: Some("read-only".to_string()),
tools: vec!["git".to_string()],
capabilities: vec!["rust".to_string()],
});
let path = task_spec_file(&tmp, vec![contextual]);
let report = manager.create_run_from_task_spec_path(&path, 1).unwrap();
let worker_id = &report.worker_ids[0];
manager
.append_worker_event(
&report.run_id,
worker_id,
"task-a",
FleetWorkerEventPayload::Escalated {
channel: "pagerduty".to_string(),
alert_id: Some("alert-1".to_string()),
},
)
.unwrap();
let inspection = manager.inspect_worker(worker_id).unwrap();
assert_eq!(
inspection.objective.as_deref(),
Some("Review the release ledger")
);
assert_eq!(inspection.role.as_deref(), Some("release-reviewer"));
assert_eq!(inspection.host.as_deref(), Some("local"));
assert_eq!(
inspection.alert_state.as_deref(),
Some("escalated via pagerduty alert_id=alert-1")
);
}
}
+86 -1
View File
@@ -394,6 +394,16 @@ enum FleetCommand {
/// Worker id printed by `codewhale fleet run`
worker_id: String,
},
/// Print bounded log artifacts for one worker
Logs {
/// Worker id printed by `codewhale fleet run`
worker_id: String,
},
/// List artifact refs for one worker
Artifacts {
/// Worker id printed by `codewhale fleet run`
worker_id: String,
},
/// Interrupt a running worker task and record a terminal cancellation
Interrupt {
/// Worker id printed by `codewhale fleet run`
@@ -1538,13 +1548,15 @@ async fn run_fleet_command(workspace: &Path, args: FleetArgs) -> Result<()> {
fn print_status(status: &FleetStatusSnapshot) {
println!(
"fleet: runs={} queued={} running={} completed={} partial={} failed={} transport_failed={} task_failed={} verifier_failed={} cancelled={} stale={}",
"fleet: runs={} queued={} running={} completed={} partial={} failed={} restarted={} escalated={} transport_failed={} task_failed={} verifier_failed={} cancelled={} stale={}",
status.runs,
status.queued,
status.running,
status.completed,
status.partial,
status.failed,
status.restarted,
status.escalated,
status.transport_failed,
status.task_failed,
status.verifier_failed,
@@ -1568,6 +1580,15 @@ async fn run_fleet_command(workspace: &Path, args: FleetArgs) -> Result<()> {
if let Some(task_id) = &inspection.current_task_id {
println!("task: {task_id}");
}
if let Some(objective) = &inspection.objective {
println!("objective: {objective}");
}
if let Some(role) = &inspection.role {
println!("role: {role}");
}
if let Some(host) = &inspection.host {
println!("host: {host}");
}
if let Some(heartbeat) = &inspection.latest_heartbeat_at {
println!("heartbeat: {heartbeat}");
}
@@ -1591,6 +1612,61 @@ async fn run_fleet_command(workspace: &Path, args: FleetArgs) -> Result<()> {
if let Some(error) = &inspection.last_error {
println!("last_error: {error}");
}
if let Some(alert) = &inspection.alert_state {
println!("alert: {alert}");
}
}
fn print_artifacts(inspection: &FleetWorkerInspection) {
if inspection.artifacts.is_empty() {
println!("artifacts: none");
return;
}
println!("artifacts:");
for artifact in &inspection.artifacts {
let size = artifact
.size_bytes
.map(|size| format!(" size={size}"))
.unwrap_or_default();
let mime = artifact
.mime_type
.as_ref()
.map(|mime| format!(" mime={mime}"))
.unwrap_or_default();
println!(
" {} {}{}{}",
artifact_kind_label(&artifact.kind),
artifact.path.display(),
size,
mime
);
}
}
fn print_logs(workspace: &Path, inspection: &FleetWorkerInspection) -> Result<()> {
let mut printed = false;
for artifact in inspection
.artifacts
.iter()
.filter(|artifact| matches!(artifact.kind, FleetArtifactKind::Log))
{
let path = workspace.join(&artifact.path);
println!("== {} ==", artifact.path.display());
let contents = std::fs::read_to_string(&path)
.with_context(|| format!("reading fleet log {}", path.display()))?;
let preview: String = contents.chars().take(16 * 1024).collect();
print!("{preview}");
if contents.chars().count() > preview.chars().count() {
println!("\n[truncated]");
} else if !preview.ends_with('\n') {
println!();
}
printed = true;
}
if !printed {
println!("logs: none");
}
Ok(())
}
fn alert_event_class(arg: FleetAlertEventArg) -> FleetAlertEventClass {
@@ -1680,6 +1756,15 @@ async fn run_fleet_command(workspace: &Path, args: FleetArgs) -> Result<()> {
print_inspection(&manager.inspect_worker(&worker_id)?);
Ok(())
}
FleetCommand::Logs { worker_id } => {
let inspection = manager.inspect_worker(&worker_id)?;
print_logs(workspace, &inspection)
}
FleetCommand::Artifacts { worker_id } => {
let inspection = manager.inspect_worker(&worker_id)?;
print_artifacts(&inspection);
Ok(())
}
FleetCommand::Interrupt { worker_id } => {
let inspection = manager.interrupt_worker(&worker_id)?;
print_inspection(&inspection);
+506 -3
View File
@@ -37,6 +37,8 @@ use crate::automation_manager::{
CreateAutomationRequest, SharedAutomationManager, UpdateAutomationRequest, spawn_scheduler,
};
use crate::config::{Config, DEFAULT_TEXT_MODEL};
use crate::fleet::ledger::{FleetLedgerState, FleetTaskLedgerStatus};
use crate::fleet::manager::{FleetManager, FleetStatusSnapshot, FleetWorkerInspection};
use crate::mcp::McpPool;
use crate::models::{ContentBlock, Message};
use crate::runtime_threads::{
@@ -53,6 +55,9 @@ use crate::skill_state::SkillStateStore;
use crate::task_manager::{
NewTaskRequest, SharedTaskManager, TaskManager, TaskManagerConfig, TaskRecord, TaskSummary,
};
use codewhale_protocol::fleet::{
FleetArtifactKind, FleetRun, FleetRunId, FleetWorkerEventPayload, FleetWorkerStatus,
};
#[derive(Clone)]
pub struct RuntimeApiState {
@@ -585,6 +590,22 @@ pub fn build_router(state: RuntimeApiState) -> Router {
post(resume_session_thread),
)
.route("/v1/workspace/status", get(workspace_status))
.route("/v1/fleet/runs", get(list_fleet_runs))
.route("/v1/fleet/runs/{run_id}", get(get_fleet_run))
.route(
"/v1/fleet/runs/{run_id}/workers",
get(list_fleet_run_workers),
)
.route("/v1/fleet/runs/{run_id}/stop", post(stop_fleet_run))
.route("/v1/fleet/workers/{worker_id}", get(get_fleet_worker))
.route(
"/v1/fleet/workers/{worker_id}/interrupt",
post(interrupt_fleet_worker),
)
.route(
"/v1/fleet/workers/{worker_id}/restart",
post(restart_fleet_worker),
)
.route("/v1/stream", post(stream_turn))
.route("/v1/threads", get(list_threads).post(create_thread))
.route("/v1/threads/summary", get(list_threads_summary))
@@ -1323,6 +1344,342 @@ async fn workspace_status(
Ok(Json(collect_workspace_status(&state.workspace)))
}
async fn list_fleet_runs(State(state): State<RuntimeApiState>) -> Result<Json<Value>, ApiError> {
let manager = open_fleet_manager(&state)?;
let ledger_state = manager
.rebuild_state()
.map_err(|err| ApiError::internal(format!("Failed to rebuild fleet state: {err}")))?;
let runs: Vec<_> = ledger_state
.runs
.values()
.map(|run| fleet_run_summary_json(&manager, run, &ledger_state))
.collect::<Result<Vec<_>, _>>()?;
let status = manager
.status()
.map_err(|err| ApiError::internal(format!("Failed to read fleet status: {err}")))?;
Ok(Json(json!({
"status": fleet_status_json(&status),
"runs": runs,
})))
}
async fn get_fleet_run(
State(state): State<RuntimeApiState>,
Path(run_id): Path<String>,
) -> Result<Json<Value>, ApiError> {
let manager = open_fleet_manager(&state)?;
let ledger_state = manager
.rebuild_state()
.map_err(|err| ApiError::internal(format!("Failed to rebuild fleet state: {err}")))?;
let run = ledger_state
.runs
.get(&run_id)
.ok_or_else(|| ApiError::not_found(format!("fleet run '{run_id}' not found")))?;
Ok(Json(fleet_run_detail_json(&manager, run, &ledger_state)?))
}
async fn list_fleet_run_workers(
State(state): State<RuntimeApiState>,
Path(run_id): Path<String>,
) -> Result<Json<Value>, ApiError> {
let manager = open_fleet_manager(&state)?;
let ledger_state = manager
.rebuild_state()
.map_err(|err| ApiError::internal(format!("Failed to rebuild fleet state: {err}")))?;
let run = ledger_state
.runs
.get(&run_id)
.ok_or_else(|| ApiError::not_found(format!("fleet run '{run_id}' not found")))?;
let workers = run
.worker_specs
.iter()
.map(|worker| {
manager
.inspect_worker(&worker.id)
.map(|inspection| fleet_worker_json(&inspection))
.map_err(|err| {
ApiError::internal(format!(
"Failed to inspect fleet worker {}: {err}",
worker.id
))
})
})
.collect::<Result<Vec<_>, _>>()?;
Ok(Json(json!({
"run_id": run_id,
"workers": workers,
})))
}
async fn get_fleet_worker(
State(state): State<RuntimeApiState>,
Path(worker_id): Path<String>,
) -> Result<Json<Value>, ApiError> {
let manager = open_fleet_manager(&state)?;
let inspection = manager.inspect_worker(&worker_id).map_err(|err| {
ApiError::not_found(format!("fleet worker '{worker_id}' not found: {err}"))
})?;
Ok(Json(fleet_worker_json(&inspection)))
}
async fn interrupt_fleet_worker(
State(state): State<RuntimeApiState>,
Path(worker_id): Path<String>,
) -> Result<Json<Value>, ApiError> {
let manager = open_fleet_manager(&state)?;
let inspection = manager.interrupt_worker(&worker_id).map_err(|err| {
ApiError::bad_request(format!(
"Failed to interrupt fleet worker '{worker_id}': {err}"
))
})?;
Ok(Json(json!({
"action": "interrupt",
"worker": fleet_worker_json(&inspection),
})))
}
async fn restart_fleet_worker(
State(state): State<RuntimeApiState>,
Path(worker_id): Path<String>,
) -> Result<Json<Value>, ApiError> {
let manager = open_fleet_manager(&state)?;
let inspection = manager.restart_worker(&worker_id).map_err(|err| {
ApiError::bad_request(format!(
"Failed to restart fleet worker '{worker_id}': {err}"
))
})?;
Ok(Json(json!({
"action": "restart",
"worker": fleet_worker_json(&inspection),
})))
}
async fn stop_fleet_run(
State(state): State<RuntimeApiState>,
Path(run_id): Path<String>,
) -> Result<Json<Value>, ApiError> {
let manager = open_fleet_manager(&state)?;
let run_id = FleetRunId::from(run_id);
let stopped = manager.stop_run(&run_id).map_err(|err| {
ApiError::bad_request(format!("Failed to stop fleet run '{}': {err}", run_id.0))
})?;
let status = manager
.run_status(&run_id)
.map_err(|err| ApiError::internal(format!("Failed to read fleet run status: {err}")))?;
Ok(Json(json!({
"action": "stop",
"run_id": run_id.0,
"stopped": stopped,
"status": fleet_status_json(&status),
})))
}
fn open_fleet_manager(state: &RuntimeApiState) -> Result<FleetManager, ApiError> {
FleetManager::open(&state.workspace)
.map_err(|err| ApiError::internal(format!("Failed to open fleet manager: {err}")))
}
fn fleet_run_summary_json(
manager: &FleetManager,
run: &FleetRun,
ledger_state: &FleetLedgerState,
) -> Result<Value, ApiError> {
let status = manager
.run_status(&run.id)
.map_err(|err| ApiError::internal(format!("Failed to read fleet run status: {err}")))?;
let task_statuses = ledger_state
.tasks
.values()
.filter(|task| task.entry.run_id == run.id)
.map(|task| {
json!({
"task_id": task.entry.task_id.clone(),
"status": fleet_task_status_label(task.status),
"leased_to": task.leased_to.clone(),
"attempts": task.entry.attempts,
})
})
.collect::<Vec<_>>();
Ok(json!({
"id": run.id.0.clone(),
"name": run.name.clone(),
"status": fleet_status_json(&status),
"task_count": run.task_specs.len(),
"worker_count": run.worker_specs.len(),
"tasks": task_statuses,
"labels": run.labels.clone(),
"created_at": run.created_at.clone(),
"updated_at": run.updated_at.clone(),
"completed_at": run.completed_at.clone(),
}))
}
fn fleet_run_detail_json(
manager: &FleetManager,
run: &FleetRun,
ledger_state: &FleetLedgerState,
) -> Result<Value, ApiError> {
let mut value = fleet_run_summary_json(manager, run, ledger_state)?;
if let Some(map) = value.as_object_mut() {
map.insert("task_specs".to_string(), json!(run.task_specs.clone()));
map.insert("worker_specs".to_string(), json!(run.worker_specs.clone()));
}
Ok(value)
}
fn fleet_status_json(status: &FleetStatusSnapshot) -> Value {
json!({
"runs": status.runs,
"queued": status.queued,
"running": status.running,
"completed": status.completed,
"partial": status.partial,
"failed": status.failed,
"restarted": status.restarted,
"escalated": status.escalated,
"transport_failed": status.transport_failed,
"task_failed": status.task_failed,
"verifier_failed": status.verifier_failed,
"cancelled": status.cancelled,
"stale": status.stale,
"workers": status
.workers
.iter()
.map(|(worker_id, status)| {
(
worker_id.clone(),
Value::String(worker_status_label(status).to_string()),
)
})
.collect::<serde_json::Map<String, Value>>(),
})
}
fn fleet_worker_json(inspection: &FleetWorkerInspection) -> Value {
json!({
"worker_id": inspection.worker_id.clone(),
"status": worker_status_label(&inspection.status),
"run_id": inspection.current_run_id.as_ref().map(|run_id| run_id.0.clone()),
"task_id": inspection.current_task_id.clone(),
"objective": inspection.objective.clone(),
"role": inspection.role.clone(),
"host": inspection.host.clone(),
"latest_heartbeat_at": inspection.latest_heartbeat_at.clone(),
"latest_event": inspection.latest_event.as_ref().map(fleet_event_json),
"artifacts": inspection.artifacts.iter().map(fleet_artifact_json).collect::<Vec<_>>(),
"last_error": inspection.last_error.clone(),
"alert_state": inspection.alert_state.clone(),
})
}
fn fleet_artifact_json(artifact: &codewhale_protocol::fleet::FleetArtifactRef) -> Value {
json!({
"kind": artifact_kind_label(&artifact.kind),
"path": artifact.path.clone(),
"checksum": artifact.checksum.clone(),
"mime_type": artifact.mime_type.clone(),
"size_bytes": artifact.size_bytes,
})
}
fn fleet_event_json(event: &codewhale_protocol::fleet::FleetWorkerEvent) -> Value {
json!({
"seq": event.seq,
"run_id": event.run_id.0.clone(),
"worker_id": event.worker_id.clone(),
"task_id": event.task_id.clone(),
"timestamp": event.timestamp.clone(),
"label": fleet_event_label(&event.payload),
"payload": event.payload.clone(),
})
}
fn worker_status_label(status: &FleetWorkerStatus) -> &'static str {
match status {
FleetWorkerStatus::Unknown => "unknown",
FleetWorkerStatus::Online => "online",
FleetWorkerStatus::Busy => "busy",
FleetWorkerStatus::Offline => "offline",
FleetWorkerStatus::Unhealthy => "unhealthy",
FleetWorkerStatus::Draining => "draining",
FleetWorkerStatus::Retired => "retired",
}
}
fn fleet_task_status_label(status: FleetTaskLedgerStatus) -> &'static str {
match status {
FleetTaskLedgerStatus::Enqueued => "enqueued",
FleetTaskLedgerStatus::Leased => "leased",
FleetTaskLedgerStatus::Completed => "completed",
FleetTaskLedgerStatus::Failed => "failed",
FleetTaskLedgerStatus::Cancelled => "cancelled",
}
}
fn artifact_kind_label(kind: &FleetArtifactKind) -> String {
match kind {
FleetArtifactKind::Log => "log".to_string(),
FleetArtifactKind::Patch => "patch".to_string(),
FleetArtifactKind::TestResult => "test_result".to_string(),
FleetArtifactKind::Report => "report".to_string(),
FleetArtifactKind::Checkpoint => "checkpoint".to_string(),
FleetArtifactKind::Receipt => "receipt".to_string(),
FleetArtifactKind::Other(value) => value.clone(),
}
}
fn fleet_event_label(payload: &FleetWorkerEventPayload) -> String {
match payload {
FleetWorkerEventPayload::Queued => "queued".to_string(),
FleetWorkerEventPayload::Leased { .. } => "leased".to_string(),
FleetWorkerEventPayload::Starting => "starting".to_string(),
FleetWorkerEventPayload::Running => "running".to_string(),
FleetWorkerEventPayload::ModelWait { model } => model
.as_ref()
.map(|model| format!("model_wait model={model}"))
.unwrap_or_else(|| "model_wait".to_string()),
FleetWorkerEventPayload::RunningTool { tool, call_id } => call_id
.as_ref()
.map(|call_id| format!("running_tool tool={tool} call_id={call_id}"))
.unwrap_or_else(|| format!("running_tool tool={tool}")),
FleetWorkerEventPayload::Heartbeat { .. } => "heartbeat".to_string(),
FleetWorkerEventPayload::Artifact(artifact) => {
format!("artifact kind={}", artifact_kind_label(&artifact.kind))
}
FleetWorkerEventPayload::Completed { exit_code, summary } => match (exit_code, summary) {
(Some(code), Some(summary)) => format!("completed exit_code={code} {summary}"),
(Some(code), None) => format!("completed exit_code={code}"),
(None, Some(summary)) => format!("completed {summary}"),
(None, None) => "completed".to_string(),
},
FleetWorkerEventPayload::Failed {
reason,
recoverable,
} => {
format!("failed recoverable={recoverable} reason={reason}")
}
FleetWorkerEventPayload::Cancelled { cancelled_by } => cancelled_by
.as_ref()
.map(|by| format!("cancelled by={by}"))
.unwrap_or_else(|| "cancelled".to_string()),
FleetWorkerEventPayload::Interrupted { signal } => signal
.as_ref()
.map(|signal| format!("interrupted signal={signal}"))
.unwrap_or_else(|| "interrupted".to_string()),
FleetWorkerEventPayload::Stale { last_heartbeat_at } => last_heartbeat_at
.as_ref()
.map(|ts| format!("stale last_heartbeat_at={ts}"))
.unwrap_or_else(|| "stale".to_string()),
FleetWorkerEventPayload::Restarted { restart_count } => {
format!("restarted count={restart_count}")
}
FleetWorkerEventPayload::Escalated { channel, alert_id } => alert_id
.as_ref()
.map(|alert_id| format!("escalated channel={channel} alert_id={alert_id}"))
.unwrap_or_else(|| format!("escalated channel={channel}")),
}
}
async fn list_skills(
State(state): State<RuntimeApiState>,
) -> Result<Json<SkillsResponse>, ApiError> {
@@ -2980,14 +3337,38 @@ mod tests {
SharedRuntimeThreadManager,
tokio::task::JoinHandle<()>,
)>,
> {
spawn_test_server_with_root_token_mobile_workspace(
root,
sessions_dir,
runtime_token,
mobile_enabled,
PathBuf::from("."),
)
.await
}
async fn spawn_test_server_with_root_token_mobile_workspace(
root: PathBuf,
sessions_dir: PathBuf,
runtime_token: Option<String>,
mobile_enabled: bool,
workspace: PathBuf,
) -> Result<
Option<(
SocketAddr,
SharedRuntimeThreadManager,
tokio::task::JoinHandle<()>,
)>,
> {
let _ = rustls::crypto::ring::default_provider().install_default();
fs::create_dir_all(&sessions_dir)?;
fs::create_dir_all(&workspace)?;
let manager = TaskManager::start_with_executor(
TaskManagerConfig {
data_dir: root.join("tasks"),
worker_count: 1,
default_workspace: PathBuf::from("."),
default_workspace: workspace.clone(),
default_model: DEFAULT_TEXT_MODEL.to_string(),
default_mode: "agent".to_string(),
allow_shell: false,
@@ -3017,7 +3398,7 @@ mod tests {
});
let runtime_threads: SharedRuntimeThreadManager = Arc::new(RuntimeThreadManager::open(
config,
PathBuf::from("."),
workspace.clone(),
RuntimeThreadManagerConfig::from_task_data_dir(root.join("runtime")),
)?);
runtime_threads.attach_task_manager(manager.clone());
@@ -3029,7 +3410,7 @@ mod tests {
let auth_required = runtime_token.is_some();
let state = RuntimeApiState {
config: Config::default(),
workspace: PathBuf::from("."),
workspace,
task_manager: manager,
runtime_threads: runtime_threads.clone(),
cors_origins: Vec::new(),
@@ -3527,6 +3908,128 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn fleet_status_runtime_api_exposes_state_and_actions() -> Result<()> {
let root = std::env::temp_dir().join(format!("codewhale-fleet-api-{}", Uuid::new_v4()));
let workspace = root.join("workspace");
fs::create_dir_all(&workspace)?;
let manager = FleetManager::open(&workspace)?;
let task = codewhale_protocol::fleet::FleetTaskSpec {
id: "task-a".to_string(),
name: "Task A".to_string(),
description: None,
objective: Some("Inspect fleet status through Runtime API".to_string()),
instructions: "Stay running for inspection.".to_string(),
worker: Some(codewhale_protocol::fleet::FleetTaskWorkerProfile {
role: Some("status-reviewer".to_string()),
tool_profile: Some("read-only".to_string()),
tools: vec!["rg".to_string()],
capabilities: vec!["fleet".to_string()],
}),
workspace: None,
input_files: Vec::new(),
context: Vec::new(),
budget: None,
tags: Vec::new(),
expected_artifacts: vec![FleetArtifactKind::Log],
scorer: None,
retry_policy: None,
alert_policy: None,
timeout_seconds: None,
metadata: std::collections::BTreeMap::new(),
};
let report = manager.create_run(
crate::fleet::task_spec::FleetTaskSpecDocument {
name: Some("api smoke".to_string()),
labels: std::collections::BTreeMap::new(),
workers: Vec::new(),
tasks: vec![task],
},
1,
)?;
let worker_id = report.worker_ids[0].clone();
let sessions_dir = root.join("sessions");
let Some((addr, _runtime_threads, handle)) =
spawn_test_server_with_root_token_mobile_workspace(
root.clone(),
sessions_dir,
None,
false,
workspace,
)
.await?
else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let runs: serde_json::Value = client
.get(format!("http://{addr}/v1/fleet/runs"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(runs["status"]["running"], 1);
assert_eq!(runs["runs"][0]["id"], report.run_id.0);
let worker: serde_json::Value = client
.get(format!("http://{addr}/v1/fleet/workers/{worker_id}"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(
worker["objective"],
"Inspect fleet status through Runtime API"
);
assert_eq!(worker["role"], "status-reviewer");
assert_eq!(worker["host"], "local");
assert_eq!(worker["artifacts"][0]["kind"], "log");
let interrupted: serde_json::Value = client
.post(format!(
"http://{addr}/v1/fleet/workers/{worker_id}/interrupt"
))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(interrupted["action"], "interrupt");
assert_eq!(interrupted["worker"]["last_error"], "cancelled by operator");
let restarted: serde_json::Value = client
.post(format!(
"http://{addr}/v1/fleet/workers/{worker_id}/restart"
))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(restarted["action"], "restart");
assert_eq!(restarted["worker"]["status"], "busy");
let stopped: serde_json::Value = client
.post(format!(
"http://{addr}/v1/fleet/runs/{}/stop",
report.run_id.0
))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(stopped["action"], "stop");
assert_eq!(stopped["stopped"], 1);
assert_eq!(stopped["status"]["cancelled"], 1);
handle.abort();
Ok(())
}
#[tokio::test]
async fn stream_requires_prompt() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
+27
View File
@@ -8,6 +8,8 @@ codewhale fleet init
codewhale fleet run tasks.json --max-workers 4
codewhale fleet status
codewhale fleet inspect <worker-id>
codewhale fleet logs <worker-id>
codewhale fleet artifacts <worker-id>
codewhale fleet interrupt <worker-id>
codewhale fleet restart <worker-id>
codewhale fleet stop --all
@@ -242,6 +244,31 @@ safe inspection commands such as `codewhale fleet status` and
`codewhale fleet inspect <worker-id>`. Endpoints, webhook secrets, and
PagerDuty routing keys are shown as `<redacted:env:...>`.
## Status Surfaces
`codewhale fleet status` shows compact counts for queued, running, completed,
partial, failed, restarted, escalated, cancelled, stale, and verifier/transport
failure sources. `inspect` shows the worker state plus the current task
objective, role, host, heartbeat, latest event, artifact refs, latest error, and
alert state. `logs` prints bounded log artifact contents, and `artifacts` lists
artifact refs without embedding large payloads.
The Runtime API exposes the same ledger-backed projection behind the existing
runtime auth middleware:
```text
GET /v1/fleet/runs
GET /v1/fleet/runs/{run_id}
GET /v1/fleet/runs/{run_id}/workers
GET /v1/fleet/workers/{worker_id}
POST /v1/fleet/workers/{worker_id}/interrupt
POST /v1/fleet/workers/{worker_id}/restart
POST /v1/fleet/runs/{run_id}/stop
```
Action endpoints call the same manager controls as the CLI and record their
decisions in the fleet ledger.
## Host Adapters
The host adapter boundary supports local child processes and explicit SSH