diff --git a/crates/tui/src/fleet/ledger.rs b/crates/tui/src/fleet/ledger.rs index 051d1e32..37d2cfdd 100644 --- a/crates/tui/src/fleet/ledger.rs +++ b/crates/tui/src/fleet/ledger.rs @@ -89,6 +89,10 @@ pub struct FleetLedgerState { pub latest_events: BTreeMap, /// Artifact events keyed by worker_id:run_id:task_id:path. pub artifact_events: BTreeMap, + /// Restart events keyed by worker_id:run_id:task_id. + pub restarted_events: BTreeMap, + /// Escalation events keyed by worker_id:run_id:task_id. + pub escalated_events: BTreeMap, /// Completed receipts by run_id:task_id. pub receipts: BTreeMap, } @@ -391,12 +395,20 @@ impl FleetLedger { )?); } } + let mut compacted_events = BTreeMap::new(); for event in state.latest_events.values() { - lines.push(serde_json::to_string(&FleetLedgerRecord::EventAppended { - event: event.clone(), - })?); + compacted_events.insert(compact_event_key(event), event.clone()); } for event in state.artifact_events.values() { + compacted_events.insert(compact_event_key(event), event.clone()); + } + for event in state.restarted_events.values() { + compacted_events.insert(compact_event_key(event), event.clone()); + } + for event in state.escalated_events.values() { + compacted_events.insert(compact_event_key(event), event.clone()); + } + for event in compacted_events.values() { lines.push(serde_json::to_string(&FleetLedgerRecord::EventAppended { event: event.clone(), })?); @@ -434,6 +446,13 @@ fn event_key(worker_id: &str, run_id: &str, task_id: &str) -> String { format!("{}:{}:{}", worker_id, run_id, task_id) } +fn compact_event_key(event: &FleetWorkerEvent) -> String { + format!( + "{}:{}:{}:{}", + event.worker_id, event.run_id.0, event.task_id, event.seq + ) +} + fn mark_task_terminal( state: &mut FleetLedgerState, run_id: &FleetRunId, @@ -510,21 +529,33 @@ fn apply_record(state: &mut FleetLedgerState, record: FleetLedgerRecord) { mark_task_terminal(state, &run_id, &task_id, &worker_id, ×tamp, status); } FleetLedgerRecord::EventAppended { event } => { - let event_key = event_key(&event.worker_id, &event.run_id.0, &event.task_id); + let latest_event_key = event_key(&event.worker_id, &event.run_id.0, &event.task_id); if state .latest_seq - .get(&event_key) + .get(&latest_event_key) .copied() .is_none_or(|seq| event.seq > seq) { - state.latest_seq.insert(event_key.clone(), event.seq); - state.latest_events.insert(event_key, event.clone()); + state.latest_seq.insert(latest_event_key.clone(), event.seq); + state.latest_events.insert(latest_event_key, event.clone()); } if let FleetWorkerEventPayload::Artifact(artifact) = &event.payload { state .artifact_events .insert(artifact_event_key(&event, artifact), event.clone()); } + if matches!(&event.payload, FleetWorkerEventPayload::Restarted { .. }) { + state.restarted_events.insert( + event_key(&event.worker_id, &event.run_id.0, &event.task_id), + event.clone(), + ); + } + if matches!(&event.payload, FleetWorkerEventPayload::Escalated { .. }) { + state.escalated_events.insert( + event_key(&event.worker_id, &event.run_id.0, &event.task_id), + event.clone(), + ); + } // Derive worker status from lifecycle events. match &event.payload { FleetWorkerEventPayload::Leased { .. } diff --git a/crates/tui/src/fleet/manager.rs b/crates/tui/src/fleet/manager.rs index 31cab863..cf2fdb2a 100644 --- a/crates/tui/src/fleet/manager.rs +++ b/crates/tui/src/fleet/manager.rs @@ -54,6 +54,8 @@ pub struct FleetStatusSnapshot { pub completed: usize, pub partial: usize, pub failed: usize, + pub restarted: usize, + pub escalated: usize, pub transport_failed: usize, pub task_failed: usize, pub verifier_failed: usize, @@ -68,10 +70,14 @@ pub struct FleetWorkerInspection { pub status: FleetWorkerStatus, pub current_run_id: Option, pub current_task_id: Option, + pub objective: Option, + pub role: Option, + pub host: Option, pub latest_heartbeat_at: Option, pub latest_event: Option, pub artifacts: Vec, pub last_error: Option, + pub alert_state: Option, } impl FleetManager { @@ -94,6 +100,10 @@ impl FleetManager { self.ledger.path() } + pub fn rebuild_state(&self) -> Result { + self.ledger.rebuild_state() + } + pub fn load_task_spec(path: &Path) -> Result { load_task_spec_document(path) } @@ -228,6 +238,21 @@ impl FleetManager { let latest_event = latest_event_for_worker(&state, worker_id).cloned(); let current = active_task_for_worker(&state, worker_id) .or_else(|| latest_task_for_worker(&state, worker_id)); + let current_run_id = current.as_ref().map(|task| task.entry.run_id.clone()); + let current_task_id = current.as_ref().map(|task| task.entry.task_id.clone()); + let (objective, role) = current + .as_ref() + .and_then(|task| task_spec_for_state(&state, task)) + .map(|task_spec| { + ( + task_spec.objective.or(task_spec.description), + task_spec.worker.and_then(|worker| worker.role), + ) + }) + .unwrap_or((None, None)); + let host = current_run_id + .as_ref() + .and_then(|run_id| worker_host_for_run(&state, run_id, worker_id)); let artifacts = state .artifact_events .values() @@ -254,15 +279,20 @@ impl FleetManager { .heartbeats .get(worker_id) .map(|heartbeat| heartbeat.timestamp.clone()); + let alert_state = latest_alert_for_worker(&state, worker_id); Ok(FleetWorkerInspection { worker_id: worker_id.to_string(), status, - current_run_id: current.as_ref().map(|task| task.entry.run_id.clone()), - current_task_id: current.map(|task| task.entry.task_id.clone()), + current_run_id, + current_task_id, + objective, + role, + host, latest_heartbeat_at, latest_event, artifacts, last_error, + alert_state, }) } @@ -366,6 +396,48 @@ impl FleetManager { Ok(stopped) } + pub fn stop_run(&self, run_id: &FleetRunId) -> Result { + let state = self.ledger.rebuild_state()?; + if !state.runs.contains_key(&run_id.0) { + bail!("fleet run {} does not exist", run_id.0); + } + let now = timestamp(); + let mut stopped = 0usize; + for task in state + .tasks + .values() + .filter(|task| task.entry.run_id == *run_id) + { + if !matches!( + task.status, + FleetTaskLedgerStatus::Enqueued | FleetTaskLedgerStatus::Leased + ) { + continue; + } + if let Some(worker_id) = task.leased_to.as_deref() { + self.append_worker_event( + &task.entry.run_id, + worker_id, + &task.entry.task_id, + FleetWorkerEventPayload::Interrupted { + signal: Some("stop_run".to_string()), + }, + )?; + } + self.ledger.mark_task_terminal_status( + &task.entry.run_id, + &task.entry.task_id, + task.leased_to.as_deref(), + &now, + FleetTaskLedgerStatus::Cancelled, + )?; + stopped += 1; + } + self.ledger + .update_run_status(run_id, FleetRunStatus::Cancelled, ×tamp())?; + Ok(stopped) + } + fn start_worker_task( &self, worker_id: &str, @@ -631,6 +703,16 @@ impl FleetManager { None => {} } } + snapshot.restarted = state + .restarted_events + .values() + .filter(|event| run_filter.is_none_or(|run_id| event.run_id == *run_id)) + .count(); + snapshot.escalated = state + .escalated_events + .values() + .filter(|event| run_filter.is_none_or(|run_id| event.run_id == *run_id)) + .count(); snapshot } @@ -741,6 +823,37 @@ fn next_enqueued_task_for_run( Some((task.entry.clone(), task_spec)) } +fn task_spec_for_state(state: &FleetLedgerState, task: &FleetTaskState) -> Option { + state + .runs + .get(&task.entry.run_id.0)? + .task_specs + .iter() + .find(|spec| spec.id == task.entry.task_id) + .cloned() +} + +fn worker_host_for_run( + state: &FleetLedgerState, + run_id: &FleetRunId, + worker_id: &str, +) -> Option { + let run = state.runs.get(&run_id.0)?; + let worker = run + .worker_specs + .iter() + .find(|worker| worker.id == worker_id)?; + Some(host_label(&worker.host)) +} + +fn host_label(host: &FleetHostSpec) -> String { + match host { + FleetHostSpec::Local => "local".to_string(), + FleetHostSpec::Ssh { host, .. } => format!("ssh:{host}"), + FleetHostSpec::Docker { image, .. } => format!("docker:{image}"), + } +} + fn latest_event_for_worker<'a>( state: &'a FleetLedgerState, worker_id: &str, @@ -752,6 +865,25 @@ fn latest_event_for_worker<'a>( .max_by_key(|event| event.seq) } +fn latest_alert_for_worker(state: &FleetLedgerState, worker_id: &str) -> Option { + state + .escalated_events + .values() + .filter(|event| event.worker_id == worker_id) + .filter_map(|event| match &event.payload { + FleetWorkerEventPayload::Escalated { channel, alert_id } => Some(( + event.seq, + alert_id + .as_ref() + .map(|alert_id| format!("escalated via {channel} alert_id={alert_id}")) + .unwrap_or_else(|| format!("escalated via {channel}")), + )), + _ => None, + }) + .max_by_key(|(seq, _)| *seq) + .map(|(_, message)| message) +} + fn latest_error_for_worker(state: &FleetLedgerState, worker_id: &str) -> Option { state .latest_events @@ -1063,4 +1195,76 @@ mod tests { assert_eq!(status.verifier_failed, 1); assert_eq!(status.running, 0); } + + #[test] + fn fleet_status_counts_restarted_and_escalated_events() { + let tmp = TempDir::new().unwrap(); + let manager = FleetManager::open(tmp.path()).unwrap(); + let path = task_spec_file(&tmp, vec![task("task-a")]); + let report = manager.create_run_from_task_spec_path(&path, 1).unwrap(); + let worker_id = &report.worker_ids[0]; + + manager.restart_worker(worker_id).unwrap(); + manager + .append_worker_event( + &report.run_id, + worker_id, + "task-a", + FleetWorkerEventPayload::Escalated { + channel: "slack".to_string(), + alert_id: None, + }, + ) + .unwrap(); + + let status = manager.run_status(&report.run_id).unwrap(); + assert_eq!(status.restarted, 1); + assert_eq!(status.escalated, 1); + + manager.ledger.compact().unwrap(); + let status = manager.run_status(&report.run_id).unwrap(); + assert_eq!(status.restarted, 1); + assert_eq!(status.escalated, 1); + } + + #[test] + fn fleet_status_inspect_exposes_task_context_host_and_alert() { + let tmp = TempDir::new().unwrap(); + let manager = FleetManager::open(tmp.path()).unwrap(); + let mut contextual = task("task-a"); + contextual.objective = Some("Review the release ledger".to_string()); + contextual.worker = Some(FleetTaskWorkerProfile { + role: Some("release-reviewer".to_string()), + tool_profile: Some("read-only".to_string()), + tools: vec!["git".to_string()], + capabilities: vec!["rust".to_string()], + }); + let path = task_spec_file(&tmp, vec![contextual]); + let report = manager.create_run_from_task_spec_path(&path, 1).unwrap(); + let worker_id = &report.worker_ids[0]; + manager + .append_worker_event( + &report.run_id, + worker_id, + "task-a", + FleetWorkerEventPayload::Escalated { + channel: "pagerduty".to_string(), + alert_id: Some("alert-1".to_string()), + }, + ) + .unwrap(); + + let inspection = manager.inspect_worker(worker_id).unwrap(); + + assert_eq!( + inspection.objective.as_deref(), + Some("Review the release ledger") + ); + assert_eq!(inspection.role.as_deref(), Some("release-reviewer")); + assert_eq!(inspection.host.as_deref(), Some("local")); + assert_eq!( + inspection.alert_state.as_deref(), + Some("escalated via pagerduty alert_id=alert-1") + ); + } } diff --git a/crates/tui/src/main.rs b/crates/tui/src/main.rs index db369f08..0724c397 100644 --- a/crates/tui/src/main.rs +++ b/crates/tui/src/main.rs @@ -394,6 +394,16 @@ enum FleetCommand { /// Worker id printed by `codewhale fleet run` worker_id: String, }, + /// Print bounded log artifacts for one worker + Logs { + /// Worker id printed by `codewhale fleet run` + worker_id: String, + }, + /// List artifact refs for one worker + Artifacts { + /// Worker id printed by `codewhale fleet run` + worker_id: String, + }, /// Interrupt a running worker task and record a terminal cancellation Interrupt { /// Worker id printed by `codewhale fleet run` @@ -1538,13 +1548,15 @@ async fn run_fleet_command(workspace: &Path, args: FleetArgs) -> Result<()> { fn print_status(status: &FleetStatusSnapshot) { println!( - "fleet: runs={} queued={} running={} completed={} partial={} failed={} transport_failed={} task_failed={} verifier_failed={} cancelled={} stale={}", + "fleet: runs={} queued={} running={} completed={} partial={} failed={} restarted={} escalated={} transport_failed={} task_failed={} verifier_failed={} cancelled={} stale={}", status.runs, status.queued, status.running, status.completed, status.partial, status.failed, + status.restarted, + status.escalated, status.transport_failed, status.task_failed, status.verifier_failed, @@ -1568,6 +1580,15 @@ async fn run_fleet_command(workspace: &Path, args: FleetArgs) -> Result<()> { if let Some(task_id) = &inspection.current_task_id { println!("task: {task_id}"); } + if let Some(objective) = &inspection.objective { + println!("objective: {objective}"); + } + if let Some(role) = &inspection.role { + println!("role: {role}"); + } + if let Some(host) = &inspection.host { + println!("host: {host}"); + } if let Some(heartbeat) = &inspection.latest_heartbeat_at { println!("heartbeat: {heartbeat}"); } @@ -1591,6 +1612,61 @@ async fn run_fleet_command(workspace: &Path, args: FleetArgs) -> Result<()> { if let Some(error) = &inspection.last_error { println!("last_error: {error}"); } + if let Some(alert) = &inspection.alert_state { + println!("alert: {alert}"); + } + } + + fn print_artifacts(inspection: &FleetWorkerInspection) { + if inspection.artifacts.is_empty() { + println!("artifacts: none"); + return; + } + println!("artifacts:"); + for artifact in &inspection.artifacts { + let size = artifact + .size_bytes + .map(|size| format!(" size={size}")) + .unwrap_or_default(); + let mime = artifact + .mime_type + .as_ref() + .map(|mime| format!(" mime={mime}")) + .unwrap_or_default(); + println!( + " {} {}{}{}", + artifact_kind_label(&artifact.kind), + artifact.path.display(), + size, + mime + ); + } + } + + fn print_logs(workspace: &Path, inspection: &FleetWorkerInspection) -> Result<()> { + let mut printed = false; + for artifact in inspection + .artifacts + .iter() + .filter(|artifact| matches!(artifact.kind, FleetArtifactKind::Log)) + { + let path = workspace.join(&artifact.path); + println!("== {} ==", artifact.path.display()); + let contents = std::fs::read_to_string(&path) + .with_context(|| format!("reading fleet log {}", path.display()))?; + let preview: String = contents.chars().take(16 * 1024).collect(); + print!("{preview}"); + if contents.chars().count() > preview.chars().count() { + println!("\n[truncated]"); + } else if !preview.ends_with('\n') { + println!(); + } + printed = true; + } + if !printed { + println!("logs: none"); + } + Ok(()) } fn alert_event_class(arg: FleetAlertEventArg) -> FleetAlertEventClass { @@ -1680,6 +1756,15 @@ async fn run_fleet_command(workspace: &Path, args: FleetArgs) -> Result<()> { print_inspection(&manager.inspect_worker(&worker_id)?); Ok(()) } + FleetCommand::Logs { worker_id } => { + let inspection = manager.inspect_worker(&worker_id)?; + print_logs(workspace, &inspection) + } + FleetCommand::Artifacts { worker_id } => { + let inspection = manager.inspect_worker(&worker_id)?; + print_artifacts(&inspection); + Ok(()) + } FleetCommand::Interrupt { worker_id } => { let inspection = manager.interrupt_worker(&worker_id)?; print_inspection(&inspection); diff --git a/crates/tui/src/runtime_api.rs b/crates/tui/src/runtime_api.rs index 4de55951..ec4587ed 100644 --- a/crates/tui/src/runtime_api.rs +++ b/crates/tui/src/runtime_api.rs @@ -37,6 +37,8 @@ use crate::automation_manager::{ CreateAutomationRequest, SharedAutomationManager, UpdateAutomationRequest, spawn_scheduler, }; use crate::config::{Config, DEFAULT_TEXT_MODEL}; +use crate::fleet::ledger::{FleetLedgerState, FleetTaskLedgerStatus}; +use crate::fleet::manager::{FleetManager, FleetStatusSnapshot, FleetWorkerInspection}; use crate::mcp::McpPool; use crate::models::{ContentBlock, Message}; use crate::runtime_threads::{ @@ -53,6 +55,9 @@ use crate::skill_state::SkillStateStore; use crate::task_manager::{ NewTaskRequest, SharedTaskManager, TaskManager, TaskManagerConfig, TaskRecord, TaskSummary, }; +use codewhale_protocol::fleet::{ + FleetArtifactKind, FleetRun, FleetRunId, FleetWorkerEventPayload, FleetWorkerStatus, +}; #[derive(Clone)] pub struct RuntimeApiState { @@ -585,6 +590,22 @@ pub fn build_router(state: RuntimeApiState) -> Router { post(resume_session_thread), ) .route("/v1/workspace/status", get(workspace_status)) + .route("/v1/fleet/runs", get(list_fleet_runs)) + .route("/v1/fleet/runs/{run_id}", get(get_fleet_run)) + .route( + "/v1/fleet/runs/{run_id}/workers", + get(list_fleet_run_workers), + ) + .route("/v1/fleet/runs/{run_id}/stop", post(stop_fleet_run)) + .route("/v1/fleet/workers/{worker_id}", get(get_fleet_worker)) + .route( + "/v1/fleet/workers/{worker_id}/interrupt", + post(interrupt_fleet_worker), + ) + .route( + "/v1/fleet/workers/{worker_id}/restart", + post(restart_fleet_worker), + ) .route("/v1/stream", post(stream_turn)) .route("/v1/threads", get(list_threads).post(create_thread)) .route("/v1/threads/summary", get(list_threads_summary)) @@ -1323,6 +1344,342 @@ async fn workspace_status( Ok(Json(collect_workspace_status(&state.workspace))) } +async fn list_fleet_runs(State(state): State) -> Result, ApiError> { + let manager = open_fleet_manager(&state)?; + let ledger_state = manager + .rebuild_state() + .map_err(|err| ApiError::internal(format!("Failed to rebuild fleet state: {err}")))?; + let runs: Vec<_> = ledger_state + .runs + .values() + .map(|run| fleet_run_summary_json(&manager, run, &ledger_state)) + .collect::, _>>()?; + let status = manager + .status() + .map_err(|err| ApiError::internal(format!("Failed to read fleet status: {err}")))?; + Ok(Json(json!({ + "status": fleet_status_json(&status), + "runs": runs, + }))) +} + +async fn get_fleet_run( + State(state): State, + Path(run_id): Path, +) -> Result, ApiError> { + let manager = open_fleet_manager(&state)?; + let ledger_state = manager + .rebuild_state() + .map_err(|err| ApiError::internal(format!("Failed to rebuild fleet state: {err}")))?; + let run = ledger_state + .runs + .get(&run_id) + .ok_or_else(|| ApiError::not_found(format!("fleet run '{run_id}' not found")))?; + Ok(Json(fleet_run_detail_json(&manager, run, &ledger_state)?)) +} + +async fn list_fleet_run_workers( + State(state): State, + Path(run_id): Path, +) -> Result, ApiError> { + let manager = open_fleet_manager(&state)?; + let ledger_state = manager + .rebuild_state() + .map_err(|err| ApiError::internal(format!("Failed to rebuild fleet state: {err}")))?; + let run = ledger_state + .runs + .get(&run_id) + .ok_or_else(|| ApiError::not_found(format!("fleet run '{run_id}' not found")))?; + let workers = run + .worker_specs + .iter() + .map(|worker| { + manager + .inspect_worker(&worker.id) + .map(|inspection| fleet_worker_json(&inspection)) + .map_err(|err| { + ApiError::internal(format!( + "Failed to inspect fleet worker {}: {err}", + worker.id + )) + }) + }) + .collect::, _>>()?; + Ok(Json(json!({ + "run_id": run_id, + "workers": workers, + }))) +} + +async fn get_fleet_worker( + State(state): State, + Path(worker_id): Path, +) -> Result, ApiError> { + let manager = open_fleet_manager(&state)?; + let inspection = manager.inspect_worker(&worker_id).map_err(|err| { + ApiError::not_found(format!("fleet worker '{worker_id}' not found: {err}")) + })?; + Ok(Json(fleet_worker_json(&inspection))) +} + +async fn interrupt_fleet_worker( + State(state): State, + Path(worker_id): Path, +) -> Result, ApiError> { + let manager = open_fleet_manager(&state)?; + let inspection = manager.interrupt_worker(&worker_id).map_err(|err| { + ApiError::bad_request(format!( + "Failed to interrupt fleet worker '{worker_id}': {err}" + )) + })?; + Ok(Json(json!({ + "action": "interrupt", + "worker": fleet_worker_json(&inspection), + }))) +} + +async fn restart_fleet_worker( + State(state): State, + Path(worker_id): Path, +) -> Result, ApiError> { + let manager = open_fleet_manager(&state)?; + let inspection = manager.restart_worker(&worker_id).map_err(|err| { + ApiError::bad_request(format!( + "Failed to restart fleet worker '{worker_id}': {err}" + )) + })?; + Ok(Json(json!({ + "action": "restart", + "worker": fleet_worker_json(&inspection), + }))) +} + +async fn stop_fleet_run( + State(state): State, + Path(run_id): Path, +) -> Result, ApiError> { + let manager = open_fleet_manager(&state)?; + let run_id = FleetRunId::from(run_id); + let stopped = manager.stop_run(&run_id).map_err(|err| { + ApiError::bad_request(format!("Failed to stop fleet run '{}': {err}", run_id.0)) + })?; + let status = manager + .run_status(&run_id) + .map_err(|err| ApiError::internal(format!("Failed to read fleet run status: {err}")))?; + Ok(Json(json!({ + "action": "stop", + "run_id": run_id.0, + "stopped": stopped, + "status": fleet_status_json(&status), + }))) +} + +fn open_fleet_manager(state: &RuntimeApiState) -> Result { + FleetManager::open(&state.workspace) + .map_err(|err| ApiError::internal(format!("Failed to open fleet manager: {err}"))) +} + +fn fleet_run_summary_json( + manager: &FleetManager, + run: &FleetRun, + ledger_state: &FleetLedgerState, +) -> Result { + let status = manager + .run_status(&run.id) + .map_err(|err| ApiError::internal(format!("Failed to read fleet run status: {err}")))?; + let task_statuses = ledger_state + .tasks + .values() + .filter(|task| task.entry.run_id == run.id) + .map(|task| { + json!({ + "task_id": task.entry.task_id.clone(), + "status": fleet_task_status_label(task.status), + "leased_to": task.leased_to.clone(), + "attempts": task.entry.attempts, + }) + }) + .collect::>(); + Ok(json!({ + "id": run.id.0.clone(), + "name": run.name.clone(), + "status": fleet_status_json(&status), + "task_count": run.task_specs.len(), + "worker_count": run.worker_specs.len(), + "tasks": task_statuses, + "labels": run.labels.clone(), + "created_at": run.created_at.clone(), + "updated_at": run.updated_at.clone(), + "completed_at": run.completed_at.clone(), + })) +} + +fn fleet_run_detail_json( + manager: &FleetManager, + run: &FleetRun, + ledger_state: &FleetLedgerState, +) -> Result { + let mut value = fleet_run_summary_json(manager, run, ledger_state)?; + if let Some(map) = value.as_object_mut() { + map.insert("task_specs".to_string(), json!(run.task_specs.clone())); + map.insert("worker_specs".to_string(), json!(run.worker_specs.clone())); + } + Ok(value) +} + +fn fleet_status_json(status: &FleetStatusSnapshot) -> Value { + json!({ + "runs": status.runs, + "queued": status.queued, + "running": status.running, + "completed": status.completed, + "partial": status.partial, + "failed": status.failed, + "restarted": status.restarted, + "escalated": status.escalated, + "transport_failed": status.transport_failed, + "task_failed": status.task_failed, + "verifier_failed": status.verifier_failed, + "cancelled": status.cancelled, + "stale": status.stale, + "workers": status + .workers + .iter() + .map(|(worker_id, status)| { + ( + worker_id.clone(), + Value::String(worker_status_label(status).to_string()), + ) + }) + .collect::>(), + }) +} + +fn fleet_worker_json(inspection: &FleetWorkerInspection) -> Value { + json!({ + "worker_id": inspection.worker_id.clone(), + "status": worker_status_label(&inspection.status), + "run_id": inspection.current_run_id.as_ref().map(|run_id| run_id.0.clone()), + "task_id": inspection.current_task_id.clone(), + "objective": inspection.objective.clone(), + "role": inspection.role.clone(), + "host": inspection.host.clone(), + "latest_heartbeat_at": inspection.latest_heartbeat_at.clone(), + "latest_event": inspection.latest_event.as_ref().map(fleet_event_json), + "artifacts": inspection.artifacts.iter().map(fleet_artifact_json).collect::>(), + "last_error": inspection.last_error.clone(), + "alert_state": inspection.alert_state.clone(), + }) +} + +fn fleet_artifact_json(artifact: &codewhale_protocol::fleet::FleetArtifactRef) -> Value { + json!({ + "kind": artifact_kind_label(&artifact.kind), + "path": artifact.path.clone(), + "checksum": artifact.checksum.clone(), + "mime_type": artifact.mime_type.clone(), + "size_bytes": artifact.size_bytes, + }) +} + +fn fleet_event_json(event: &codewhale_protocol::fleet::FleetWorkerEvent) -> Value { + json!({ + "seq": event.seq, + "run_id": event.run_id.0.clone(), + "worker_id": event.worker_id.clone(), + "task_id": event.task_id.clone(), + "timestamp": event.timestamp.clone(), + "label": fleet_event_label(&event.payload), + "payload": event.payload.clone(), + }) +} + +fn worker_status_label(status: &FleetWorkerStatus) -> &'static str { + match status { + FleetWorkerStatus::Unknown => "unknown", + FleetWorkerStatus::Online => "online", + FleetWorkerStatus::Busy => "busy", + FleetWorkerStatus::Offline => "offline", + FleetWorkerStatus::Unhealthy => "unhealthy", + FleetWorkerStatus::Draining => "draining", + FleetWorkerStatus::Retired => "retired", + } +} + +fn fleet_task_status_label(status: FleetTaskLedgerStatus) -> &'static str { + match status { + FleetTaskLedgerStatus::Enqueued => "enqueued", + FleetTaskLedgerStatus::Leased => "leased", + FleetTaskLedgerStatus::Completed => "completed", + FleetTaskLedgerStatus::Failed => "failed", + FleetTaskLedgerStatus::Cancelled => "cancelled", + } +} + +fn artifact_kind_label(kind: &FleetArtifactKind) -> String { + match kind { + FleetArtifactKind::Log => "log".to_string(), + FleetArtifactKind::Patch => "patch".to_string(), + FleetArtifactKind::TestResult => "test_result".to_string(), + FleetArtifactKind::Report => "report".to_string(), + FleetArtifactKind::Checkpoint => "checkpoint".to_string(), + FleetArtifactKind::Receipt => "receipt".to_string(), + FleetArtifactKind::Other(value) => value.clone(), + } +} + +fn fleet_event_label(payload: &FleetWorkerEventPayload) -> String { + match payload { + FleetWorkerEventPayload::Queued => "queued".to_string(), + FleetWorkerEventPayload::Leased { .. } => "leased".to_string(), + FleetWorkerEventPayload::Starting => "starting".to_string(), + FleetWorkerEventPayload::Running => "running".to_string(), + FleetWorkerEventPayload::ModelWait { model } => model + .as_ref() + .map(|model| format!("model_wait model={model}")) + .unwrap_or_else(|| "model_wait".to_string()), + FleetWorkerEventPayload::RunningTool { tool, call_id } => call_id + .as_ref() + .map(|call_id| format!("running_tool tool={tool} call_id={call_id}")) + .unwrap_or_else(|| format!("running_tool tool={tool}")), + FleetWorkerEventPayload::Heartbeat { .. } => "heartbeat".to_string(), + FleetWorkerEventPayload::Artifact(artifact) => { + format!("artifact kind={}", artifact_kind_label(&artifact.kind)) + } + FleetWorkerEventPayload::Completed { exit_code, summary } => match (exit_code, summary) { + (Some(code), Some(summary)) => format!("completed exit_code={code} {summary}"), + (Some(code), None) => format!("completed exit_code={code}"), + (None, Some(summary)) => format!("completed {summary}"), + (None, None) => "completed".to_string(), + }, + FleetWorkerEventPayload::Failed { + reason, + recoverable, + } => { + format!("failed recoverable={recoverable} reason={reason}") + } + FleetWorkerEventPayload::Cancelled { cancelled_by } => cancelled_by + .as_ref() + .map(|by| format!("cancelled by={by}")) + .unwrap_or_else(|| "cancelled".to_string()), + FleetWorkerEventPayload::Interrupted { signal } => signal + .as_ref() + .map(|signal| format!("interrupted signal={signal}")) + .unwrap_or_else(|| "interrupted".to_string()), + FleetWorkerEventPayload::Stale { last_heartbeat_at } => last_heartbeat_at + .as_ref() + .map(|ts| format!("stale last_heartbeat_at={ts}")) + .unwrap_or_else(|| "stale".to_string()), + FleetWorkerEventPayload::Restarted { restart_count } => { + format!("restarted count={restart_count}") + } + FleetWorkerEventPayload::Escalated { channel, alert_id } => alert_id + .as_ref() + .map(|alert_id| format!("escalated channel={channel} alert_id={alert_id}")) + .unwrap_or_else(|| format!("escalated channel={channel}")), + } +} + async fn list_skills( State(state): State, ) -> Result, ApiError> { @@ -2980,14 +3337,38 @@ mod tests { SharedRuntimeThreadManager, tokio::task::JoinHandle<()>, )>, + > { + spawn_test_server_with_root_token_mobile_workspace( + root, + sessions_dir, + runtime_token, + mobile_enabled, + PathBuf::from("."), + ) + .await + } + + async fn spawn_test_server_with_root_token_mobile_workspace( + root: PathBuf, + sessions_dir: PathBuf, + runtime_token: Option, + mobile_enabled: bool, + workspace: PathBuf, + ) -> Result< + Option<( + SocketAddr, + SharedRuntimeThreadManager, + tokio::task::JoinHandle<()>, + )>, > { let _ = rustls::crypto::ring::default_provider().install_default(); fs::create_dir_all(&sessions_dir)?; + fs::create_dir_all(&workspace)?; let manager = TaskManager::start_with_executor( TaskManagerConfig { data_dir: root.join("tasks"), worker_count: 1, - default_workspace: PathBuf::from("."), + default_workspace: workspace.clone(), default_model: DEFAULT_TEXT_MODEL.to_string(), default_mode: "agent".to_string(), allow_shell: false, @@ -3017,7 +3398,7 @@ mod tests { }); let runtime_threads: SharedRuntimeThreadManager = Arc::new(RuntimeThreadManager::open( config, - PathBuf::from("."), + workspace.clone(), RuntimeThreadManagerConfig::from_task_data_dir(root.join("runtime")), )?); runtime_threads.attach_task_manager(manager.clone()); @@ -3029,7 +3410,7 @@ mod tests { let auth_required = runtime_token.is_some(); let state = RuntimeApiState { config: Config::default(), - workspace: PathBuf::from("."), + workspace, task_manager: manager, runtime_threads: runtime_threads.clone(), cors_origins: Vec::new(), @@ -3527,6 +3908,128 @@ mod tests { Ok(()) } + #[tokio::test] + async fn fleet_status_runtime_api_exposes_state_and_actions() -> Result<()> { + let root = std::env::temp_dir().join(format!("codewhale-fleet-api-{}", Uuid::new_v4())); + let workspace = root.join("workspace"); + fs::create_dir_all(&workspace)?; + let manager = FleetManager::open(&workspace)?; + let task = codewhale_protocol::fleet::FleetTaskSpec { + id: "task-a".to_string(), + name: "Task A".to_string(), + description: None, + objective: Some("Inspect fleet status through Runtime API".to_string()), + instructions: "Stay running for inspection.".to_string(), + worker: Some(codewhale_protocol::fleet::FleetTaskWorkerProfile { + role: Some("status-reviewer".to_string()), + tool_profile: Some("read-only".to_string()), + tools: vec!["rg".to_string()], + capabilities: vec!["fleet".to_string()], + }), + workspace: None, + input_files: Vec::new(), + context: Vec::new(), + budget: None, + tags: Vec::new(), + expected_artifacts: vec![FleetArtifactKind::Log], + scorer: None, + retry_policy: None, + alert_policy: None, + timeout_seconds: None, + metadata: std::collections::BTreeMap::new(), + }; + let report = manager.create_run( + crate::fleet::task_spec::FleetTaskSpecDocument { + name: Some("api smoke".to_string()), + labels: std::collections::BTreeMap::new(), + workers: Vec::new(), + tasks: vec![task], + }, + 1, + )?; + let worker_id = report.worker_ids[0].clone(); + let sessions_dir = root.join("sessions"); + let Some((addr, _runtime_threads, handle)) = + spawn_test_server_with_root_token_mobile_workspace( + root.clone(), + sessions_dir, + None, + false, + workspace, + ) + .await? + else { + return Ok(()); + }; + let client = crate::tls::reqwest_client(); + + let runs: serde_json::Value = client + .get(format!("http://{addr}/v1/fleet/runs")) + .send() + .await? + .error_for_status()? + .json() + .await?; + assert_eq!(runs["status"]["running"], 1); + assert_eq!(runs["runs"][0]["id"], report.run_id.0); + + let worker: serde_json::Value = client + .get(format!("http://{addr}/v1/fleet/workers/{worker_id}")) + .send() + .await? + .error_for_status()? + .json() + .await?; + assert_eq!( + worker["objective"], + "Inspect fleet status through Runtime API" + ); + assert_eq!(worker["role"], "status-reviewer"); + assert_eq!(worker["host"], "local"); + assert_eq!(worker["artifacts"][0]["kind"], "log"); + + let interrupted: serde_json::Value = client + .post(format!( + "http://{addr}/v1/fleet/workers/{worker_id}/interrupt" + )) + .send() + .await? + .error_for_status()? + .json() + .await?; + assert_eq!(interrupted["action"], "interrupt"); + assert_eq!(interrupted["worker"]["last_error"], "cancelled by operator"); + + let restarted: serde_json::Value = client + .post(format!( + "http://{addr}/v1/fleet/workers/{worker_id}/restart" + )) + .send() + .await? + .error_for_status()? + .json() + .await?; + assert_eq!(restarted["action"], "restart"); + assert_eq!(restarted["worker"]["status"], "busy"); + + let stopped: serde_json::Value = client + .post(format!( + "http://{addr}/v1/fleet/runs/{}/stop", + report.run_id.0 + )) + .send() + .await? + .error_for_status()? + .json() + .await?; + assert_eq!(stopped["action"], "stop"); + assert_eq!(stopped["stopped"], 1); + assert_eq!(stopped["status"]["cancelled"], 1); + + handle.abort(); + Ok(()) + } + #[tokio::test] async fn stream_requires_prompt() -> Result<()> { let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else { diff --git a/docs/FLEET.md b/docs/FLEET.md index 6d39e747..98c089a6 100644 --- a/docs/FLEET.md +++ b/docs/FLEET.md @@ -8,6 +8,8 @@ codewhale fleet init codewhale fleet run tasks.json --max-workers 4 codewhale fleet status codewhale fleet inspect +codewhale fleet logs +codewhale fleet artifacts codewhale fleet interrupt codewhale fleet restart codewhale fleet stop --all @@ -242,6 +244,31 @@ safe inspection commands such as `codewhale fleet status` and `codewhale fleet inspect `. Endpoints, webhook secrets, and PagerDuty routing keys are shown as ``. +## Status Surfaces + +`codewhale fleet status` shows compact counts for queued, running, completed, +partial, failed, restarted, escalated, cancelled, stale, and verifier/transport +failure sources. `inspect` shows the worker state plus the current task +objective, role, host, heartbeat, latest event, artifact refs, latest error, and +alert state. `logs` prints bounded log artifact contents, and `artifacts` lists +artifact refs without embedding large payloads. + +The Runtime API exposes the same ledger-backed projection behind the existing +runtime auth middleware: + +```text +GET /v1/fleet/runs +GET /v1/fleet/runs/{run_id} +GET /v1/fleet/runs/{run_id}/workers +GET /v1/fleet/workers/{worker_id} +POST /v1/fleet/workers/{worker_id}/interrupt +POST /v1/fleet/workers/{worker_id}/restart +POST /v1/fleet/runs/{run_id}/stop +``` + +Action endpoints call the same manager controls as the CLI and record their +decisions in the fleet ledger. + ## Host Adapters The host adapter boundary supports local child processes and explicit SSH