diff --git a/crates/protocol/src/fleet.rs b/crates/protocol/src/fleet.rs index e268ad65..40deb957 100644 --- a/crates/protocol/src/fleet.rs +++ b/crates/protocol/src/fleet.rs @@ -443,6 +443,9 @@ fn default_retry_backoff_multiplier() -> u32 { /// Alert/escalation policy attached to a task or run. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct FleetAlertPolicy { + #[serde(default)] + #[serde(skip_serializing_if = "Vec::is_empty")] + pub events: Vec, #[serde(default)] pub channels: Vec, #[serde(default)] @@ -451,6 +454,17 @@ pub struct FleetAlertPolicy { pub after_minutes_stale: Option, } +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[serde(rename_all = "snake_case")] +pub enum FleetAlertEventClass { + Stale, + RestartExhausted, + NeedsHuman, + BudgetExceeded, + VerifierFailed, + RunCompleted, +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(tag = "kind", rename_all = "snake_case")] pub enum FleetAlertChannel { @@ -632,6 +646,7 @@ mod tests { #[test] fn alert_policy_round_trip() { let policy = FleetAlertPolicy { + events: vec![FleetAlertEventClass::Stale], channels: vec![FleetAlertChannel::Slack { webhook_url: "https://hooks.slack.com/test".to_string(), }], @@ -639,8 +654,10 @@ mod tests { after_minutes_stale: Some(10), }; let json = serde_json::to_string(&policy).unwrap(); + assert!(json.contains("\"events\":[\"stale\"]")); assert!(json.contains("\"kind\":\"slack\"")); let back: FleetAlertPolicy = serde_json::from_str(&json).unwrap(); + assert_eq!(back.events, vec![FleetAlertEventClass::Stale]); assert_eq!(back.after_attempts, Some(2)); } diff --git a/crates/tui/src/fleet/alerts.rs b/crates/tui/src/fleet/alerts.rs new file mode 100644 index 00000000..f15f9286 --- /dev/null +++ b/crates/tui/src/fleet/alerts.rs @@ -0,0 +1,684 @@ +//! Opt-in fleet alert routing and adapter payloads. + +#![allow(dead_code)] + +use std::collections::BTreeMap; +use std::time::Duration; + +use anyhow::{Context, Result, anyhow}; +use codewhale_protocol::fleet::{ + FleetAlertEventClass, FleetReceipt, FleetRunId, FleetTaskFailureKind, FleetWorkerEvent, + FleetWorkerEventPayload, +}; +use serde::{Deserialize, Serialize}; +use serde_json::{Value, json}; + +const DEFAULT_ALERT_TIMEOUT_SECONDS: u64 = 10; + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct FleetAlertConfig { + #[serde(default)] + pub enabled: bool, + #[serde(default)] + pub dry_run: bool, + #[serde(default)] + pub routes: Vec, + #[serde(default)] + pub adapters: BTreeMap, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct FleetAlertRoute { + #[serde(default)] + #[serde(skip_serializing_if = "Vec::is_empty")] + pub events: Vec, + pub adapter: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum FleetAlertAdapterConfig { + Slack { + webhook_env: String, + #[serde(skip_serializing_if = "Option::is_none")] + channel: Option, + }, + Webhook { + url_env: String, + #[serde(skip_serializing_if = "Option::is_none")] + secret_env: Option, + }, + PagerDuty { + routing_key_env: String, + #[serde(default = "default_pagerduty_severity")] + severity: String, + }, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct FleetAlertEvent { + pub class: FleetAlertEventClass, + pub run_id: FleetRunId, + #[serde(skip_serializing_if = "Option::is_none")] + pub worker_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub task_id: Option, + pub status: String, + pub reason: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct FleetAlertDelivery { + pub adapter: String, + pub event_class: FleetAlertEventClass, + pub dry_run: bool, + pub sent: bool, + pub redacted_payload: Value, +} + +pub trait FleetAlertSecretResolver { + fn resolve(&self, name: &str) -> Option; +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct FleetEnvSecretResolver; + +impl FleetAlertSecretResolver for FleetEnvSecretResolver { + fn resolve(&self, name: &str) -> Option { + std::env::var(name).ok().filter(|value| !value.is_empty()) + } +} + +pub struct FleetAlertDispatcher { + config: FleetAlertConfig, + resolver: R, +} + +impl FleetAlertConfig { + pub fn disabled() -> Self { + Self::default() + } + + pub fn dry_run_for_adapter(adapter: FleetAlertAdapterConfig) -> Self { + let mut adapters = BTreeMap::new(); + adapters.insert("dry-run".to_string(), adapter); + Self { + enabled: true, + dry_run: true, + routes: vec![FleetAlertRoute { + events: Vec::new(), + adapter: "dry-run".to_string(), + }], + adapters, + } + } +} + +impl FleetAlertDispatcher +where + R: FleetAlertSecretResolver, +{ + pub fn new(config: FleetAlertConfig, resolver: R) -> Self { + Self { config, resolver } + } + + pub fn dispatch(&self, event: &FleetAlertEvent) -> Result> { + if !self.config.enabled { + return Ok(Vec::new()); + } + let mut deliveries = Vec::new(); + for route in self + .config + .routes + .iter() + .filter(|route| route_matches(route, event.class)) + { + let adapter = self.config.adapters.get(&route.adapter).ok_or_else(|| { + anyhow!("fleet alert adapter {} is not configured", route.adapter) + })?; + let prepared = prepare_alert(&route.adapter, adapter, event, self.config.dry_run)?; + let sent = if self.config.dry_run { + false + } else { + send_alert(adapter, &prepared.body, &self.resolver)? + }; + deliveries.push(FleetAlertDelivery { + adapter: route.adapter.clone(), + event_class: event.class, + dry_run: self.config.dry_run, + sent, + redacted_payload: prepared.redacted_payload, + }); + } + Ok(deliveries) + } +} + +impl FleetAlertEvent { + pub fn stale_from_worker_event(event: &FleetWorkerEvent) -> Option { + let FleetWorkerEventPayload::Stale { last_heartbeat_at } = &event.payload else { + return None; + }; + Some(Self { + class: FleetAlertEventClass::Stale, + run_id: event.run_id.clone(), + worker_id: Some(event.worker_id.clone()), + task_id: Some(event.task_id.clone()), + status: "stale".to_string(), + reason: last_heartbeat_at + .as_ref() + .map(|ts| format!("worker heartbeat stale since {ts}")) + .unwrap_or_else(|| "worker heartbeat is stale".to_string()), + }) + } + + pub fn restart_exhausted( + run_id: FleetRunId, + worker_id: impl Into, + task_id: impl Into, + reason: impl Into, + ) -> Self { + Self { + class: FleetAlertEventClass::RestartExhausted, + run_id, + worker_id: Some(worker_id.into()), + task_id: Some(task_id.into()), + status: "failed".to_string(), + reason: reason.into(), + } + } + + pub fn needs_human( + run_id: FleetRunId, + worker_id: Option, + task_id: Option, + reason: impl Into, + ) -> Self { + Self { + class: FleetAlertEventClass::NeedsHuman, + run_id, + worker_id, + task_id, + status: "needs_human".to_string(), + reason: reason.into(), + } + } + + pub fn budget_exceeded( + run_id: FleetRunId, + worker_id: Option, + task_id: Option, + reason: impl Into, + ) -> Self { + Self { + class: FleetAlertEventClass::BudgetExceeded, + run_id, + worker_id, + task_id, + status: "budget_exceeded".to_string(), + reason: reason.into(), + } + } + + pub fn verifier_failed(receipt: &FleetReceipt) -> Option { + if receipt.failure_kind != Some(FleetTaskFailureKind::Verifier) { + return None; + } + Some(Self { + class: FleetAlertEventClass::VerifierFailed, + run_id: receipt.run_id.clone(), + worker_id: Some(receipt.worker_id.clone()), + task_id: Some(receipt.task_id.clone()), + status: "verifier_failed".to_string(), + reason: receipt + .score + .as_ref() + .and_then(|score| score.notes.clone()) + .unwrap_or_else(|| "verifier failed".to_string()), + }) + } + + pub fn run_completed(run_id: FleetRunId, reason: impl Into) -> Self { + Self { + class: FleetAlertEventClass::RunCompleted, + run_id, + worker_id: None, + task_id: None, + status: "completed".to_string(), + reason: reason.into(), + } + } + + pub fn inspection_commands(&self) -> Vec { + let mut commands = vec!["codewhale fleet status".to_string()]; + if let Some(worker_id) = &self.worker_id { + commands.push(format!("codewhale fleet inspect {worker_id}")); + } + commands + } +} + +struct PreparedAlert { + body: Value, + redacted_payload: Value, +} + +fn prepare_alert( + adapter_name: &str, + adapter: &FleetAlertAdapterConfig, + event: &FleetAlertEvent, + dry_run: bool, +) -> Result { + let safe_event = safe_event_payload(event); + let prepared = match adapter { + FleetAlertAdapterConfig::Slack { + webhook_env, + channel, + } => { + let body = slack_body(event, channel.as_deref()); + let redacted_payload = json!({ + "adapter": adapter_name, + "kind": "slack", + "dry_run": dry_run, + "target": redacted_env(webhook_env), + "event": safe_event, + "body": body, + }); + PreparedAlert { + body, + redacted_payload, + } + } + FleetAlertAdapterConfig::Webhook { + url_env, + secret_env, + } => { + let body = json!({ + "source": "codewhale", + "event": safe_event, + }); + let redacted_payload = json!({ + "adapter": adapter_name, + "kind": "webhook", + "dry_run": dry_run, + "target": redacted_env(url_env), + "headers": redacted_secret_header(secret_env.as_deref()), + "body": body, + }); + PreparedAlert { + body, + redacted_payload, + } + } + FleetAlertAdapterConfig::PagerDuty { + routing_key_env, + severity, + } => { + let body = pagerduty_body(event, severity, redacted_env(routing_key_env)); + let redacted_payload = json!({ + "adapter": adapter_name, + "kind": "pagerduty", + "dry_run": dry_run, + "target": "https://events.pagerduty.com/v2/enqueue", + "body": body, + }); + PreparedAlert { + body, + redacted_payload, + } + } + }; + Ok(prepared) +} + +fn send_alert( + adapter: &FleetAlertAdapterConfig, + redacted_body: &Value, + resolver: &R, +) -> Result +where + R: FleetAlertSecretResolver, +{ + let client = crate::tls::reqwest_blocking_client_builder() + .timeout(Duration::from_secs(DEFAULT_ALERT_TIMEOUT_SECONDS)) + .build() + .context("building fleet alert HTTP client")?; + match adapter { + FleetAlertAdapterConfig::Slack { webhook_env, .. } => { + let url = required_secret(resolver, webhook_env)?; + client + .post(url) + .json(redacted_body) + .send() + .context("sending fleet Slack alert")? + .error_for_status() + .context("Slack alert rejected")?; + } + FleetAlertAdapterConfig::Webhook { + url_env, + secret_env, + } => { + let url = required_secret(resolver, url_env)?; + let mut request = client.post(url).json(redacted_body); + if let Some(secret_env) = secret_env { + request = request.header( + "X-CodeWhale-Webhook-Secret", + required_secret(resolver, secret_env)?, + ); + } + request + .send() + .context("sending fleet webhook alert")? + .error_for_status() + .context("webhook alert rejected")?; + } + FleetAlertAdapterConfig::PagerDuty { + routing_key_env, + severity, + } => { + let routing_key = required_secret(resolver, routing_key_env)?; + let mut body = redacted_body.clone(); + if let Some(map) = body.as_object_mut() { + map.insert("routing_key".to_string(), Value::String(routing_key)); + } + if let Some(payload) = body.get_mut("payload").and_then(Value::as_object_mut) { + payload.insert("severity".to_string(), Value::String(severity.clone())); + } + client + .post("https://events.pagerduty.com/v2/enqueue") + .json(&body) + .send() + .context("sending fleet PagerDuty alert")? + .error_for_status() + .context("PagerDuty alert rejected")?; + } + } + Ok(true) +} + +fn route_matches(route: &FleetAlertRoute, class: FleetAlertEventClass) -> bool { + route.events.is_empty() || route.events.contains(&class) +} + +fn safe_event_payload(event: &FleetAlertEvent) -> Value { + json!({ + "class": event.class, + "run_id": event.run_id.0.clone(), + "worker_id": event.worker_id.clone(), + "task_id": event.task_id.clone(), + "status": event.status.clone(), + "reason": short_reason(&event.reason), + "commands": event.inspection_commands(), + }) +} + +fn slack_body(event: &FleetAlertEvent, channel: Option<&str>) -> Value { + let text = format!( + "CodeWhale fleet {}: run={} task={} reason={}", + alert_class_label(event.class), + event.run_id.0, + event.task_id.as_deref().unwrap_or("-"), + short_reason(&event.reason) + ); + let mut body = json!({ + "text": text, + "blocks": [ + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": text + } + }, + { + "type": "context", + "elements": [ + { + "type": "mrkdwn", + "text": event.inspection_commands().join(" | ") + } + ] + } + ] + }); + if let Some(channel) = channel + && let Some(map) = body.as_object_mut() + { + map.insert("channel".to_string(), Value::String(channel.to_string())); + } + body +} + +fn pagerduty_body(event: &FleetAlertEvent, severity: &str, routing_key: String) -> Value { + json!({ + "routing_key": routing_key, + "event_action": "trigger", + "payload": { + "summary": format!("CodeWhale fleet {}: {}", alert_class_label(event.class), short_reason(&event.reason)), + "severity": severity, + "source": "codewhale", + "custom_details": safe_event_payload(event), + } + }) +} + +fn redacted_env(name: &str) -> String { + format!("") +} + +fn alert_class_label(class: FleetAlertEventClass) -> &'static str { + match class { + FleetAlertEventClass::Stale => "stale", + FleetAlertEventClass::RestartExhausted => "restart_exhausted", + FleetAlertEventClass::NeedsHuman => "needs_human", + FleetAlertEventClass::BudgetExceeded => "budget_exceeded", + FleetAlertEventClass::VerifierFailed => "verifier_failed", + FleetAlertEventClass::RunCompleted => "run_completed", + } +} + +fn redacted_secret_header(secret_env: Option<&str>) -> Value { + match secret_env { + Some(name) => json!({ "X-CodeWhale-Webhook-Secret": redacted_env(name) }), + None => json!({}), + } +} + +fn required_secret(resolver: &R, name: &str) -> Result +where + R: FleetAlertSecretResolver, +{ + resolver + .resolve(name) + .ok_or_else(|| anyhow!("fleet alert secret {name} is not configured")) +} + +fn short_reason(reason: &str) -> String { + let trimmed = reason.trim(); + if trimmed.len() <= 240 { + return trimmed.to_string(); + } + let prefix: String = trimmed.chars().take(237).collect(); + format!("{prefix}...") +} + +fn default_pagerduty_severity() -> String { + "error".to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + use codewhale_protocol::fleet::{FleetScore, FleetTaskResult}; + + #[derive(Default)] + struct MapResolver { + values: BTreeMap, + } + + impl FleetAlertSecretResolver for MapResolver { + fn resolve(&self, name: &str) -> Option { + self.values.get(name).cloned() + } + } + + fn event(class: FleetAlertEventClass) -> FleetAlertEvent { + FleetAlertEvent { + class, + run_id: FleetRunId::from("run-1"), + worker_id: Some("worker-1".to_string()), + task_id: Some("task-a".to_string()), + status: "stale".to_string(), + reason: "worker heartbeat stale".to_string(), + } + } + + #[test] + fn fleet_alert_disabled_by_default() { + let dispatcher = + FleetAlertDispatcher::new(FleetAlertConfig::default(), MapResolver::default()); + + let deliveries = dispatcher + .dispatch(&event(FleetAlertEventClass::Stale)) + .unwrap(); + + assert!(deliveries.is_empty()); + } + + #[test] + fn fleet_alert_policy_routes_event_classes_to_adapters() { + let mut adapters = BTreeMap::new(); + adapters.insert( + "ops-slack".to_string(), + FleetAlertAdapterConfig::Slack { + webhook_env: "FLEET_SLACK_WEBHOOK".to_string(), + channel: Some("#fleet".to_string()), + }, + ); + adapters.insert( + "release-webhook".to_string(), + FleetAlertAdapterConfig::Webhook { + url_env: "FLEET_WEBHOOK_URL".to_string(), + secret_env: None, + }, + ); + let dispatcher = FleetAlertDispatcher::new( + FleetAlertConfig { + enabled: true, + dry_run: true, + routes: vec![ + FleetAlertRoute { + events: vec![FleetAlertEventClass::Stale], + adapter: "ops-slack".to_string(), + }, + FleetAlertRoute { + events: vec![FleetAlertEventClass::RunCompleted], + adapter: "release-webhook".to_string(), + }, + ], + adapters, + }, + MapResolver::default(), + ); + + let deliveries = dispatcher + .dispatch(&event(FleetAlertEventClass::Stale)) + .unwrap(); + + assert_eq!(deliveries.len(), 1); + assert_eq!(deliveries[0].adapter, "ops-slack"); + assert_eq!(deliveries[0].event_class, FleetAlertEventClass::Stale); + assert!(!deliveries[0].sent); + assert_eq!(deliveries[0].redacted_payload["kind"], "slack"); + } + + #[test] + fn fleet_alert_dry_run_redacts_secrets() { + let mut adapters = BTreeMap::new(); + adapters.insert( + "pager".to_string(), + FleetAlertAdapterConfig::PagerDuty { + routing_key_env: "FLEET_PD_ROUTING_KEY".to_string(), + severity: "critical".to_string(), + }, + ); + let mut resolver = MapResolver::default(); + resolver.values.insert( + "FLEET_PD_ROUTING_KEY".to_string(), + "real-routing-key-secret".to_string(), + ); + let dispatcher = FleetAlertDispatcher::new( + FleetAlertConfig { + enabled: true, + dry_run: true, + routes: vec![FleetAlertRoute { + events: vec![FleetAlertEventClass::RestartExhausted], + adapter: "pager".to_string(), + }], + adapters, + }, + resolver, + ); + + let deliveries = dispatcher + .dispatch(&event(FleetAlertEventClass::RestartExhausted)) + .unwrap(); + let payload = serde_json::to_string(&deliveries[0].redacted_payload).unwrap(); + + assert!(payload.contains("")); + assert!(!payload.contains("real-routing-key-secret")); + assert!(payload.contains("codewhale fleet inspect worker-1")); + } + + #[test] + fn fleet_alert_event_is_derived_from_ledgered_stale_worker_event() { + let worker_event = FleetWorkerEvent { + seq: 4, + run_id: FleetRunId::from("run-1"), + worker_id: "worker-1".to_string(), + task_id: "task-a".to_string(), + timestamp: "2026-06-13T02:00:00Z".to_string(), + payload: FleetWorkerEventPayload::Stale { + last_heartbeat_at: Some("2026-06-13T01:57:00Z".to_string()), + }, + extra: BTreeMap::new(), + }; + + let alert = FleetAlertEvent::stale_from_worker_event(&worker_event).unwrap(); + + assert_eq!(alert.class, FleetAlertEventClass::Stale); + assert_eq!(alert.worker_id.as_deref(), Some("worker-1")); + assert!(alert.reason.contains("2026-06-13T01:57:00Z")); + assert_eq!( + alert.inspection_commands(), + vec![ + "codewhale fleet status".to_string(), + "codewhale fleet inspect worker-1".to_string() + ] + ); + } + + #[test] + fn fleet_alert_verifier_failed_event_is_derived_from_receipt() { + let receipt = FleetReceipt { + run_id: FleetRunId::from("run-1"), + task_id: "task-a".to_string(), + worker_id: "worker-1".to_string(), + completed_at: "2026-06-13T02:00:00Z".to_string(), + result: FleetTaskResult::Fail, + failure_kind: Some(FleetTaskFailureKind::Verifier), + artifacts: vec![], + score: Some(FleetScore { + value: 0.0, + max: Some(1.0), + notes: Some("regex scorer could not be compiled".to_string()), + }), + }; + + let alert = FleetAlertEvent::verifier_failed(&receipt).unwrap(); + + assert_eq!(alert.class, FleetAlertEventClass::VerifierFailed); + assert_eq!(alert.status, "verifier_failed"); + assert!(alert.reason.contains("regex scorer")); + } +} diff --git a/crates/tui/src/fleet/mod.rs b/crates/tui/src/fleet/mod.rs index 5fbf0c2b..6a57779c 100644 --- a/crates/tui/src/fleet/mod.rs +++ b/crates/tui/src/fleet/mod.rs @@ -1,5 +1,6 @@ //! Agent Fleet control plane — local-first manager, ledger, and workers. +pub mod alerts; pub mod host; pub mod ledger; pub mod manager; diff --git a/crates/tui/src/fleet/scheduler.rs b/crates/tui/src/fleet/scheduler.rs index 874594b6..dc15a770 100644 --- a/crates/tui/src/fleet/scheduler.rs +++ b/crates/tui/src/fleet/scheduler.rs @@ -224,7 +224,13 @@ impl FleetScheduler { }, )?; report.failed += 1; - report.alerts += self.record_alerts(&task.entry.run_id, &task.entry.task_id, task_spec)?; + report.alerts += self.record_alerts( + &task.entry.run_id, + &task.entry.task_id, + worker_id, + task_spec, + FleetAlertEventClass::RestartExhausted, + )?; Ok(()) } @@ -358,18 +364,29 @@ impl FleetScheduler { &self, run_id: &FleetRunId, task_id: &str, + worker_id: &str, task_spec: &FleetTaskSpec, + event_class: FleetAlertEventClass, ) -> Result { let Some(policy) = &task_spec.alert_policy else { return Ok(0); }; + if !alert_policy_matches(policy, event_class) { + return Ok(0); + } let mut count = 0; for channel in &policy.channels { - self.ledger.record_alert( + let label = alert_channel_label(channel); + self.ledger + .record_alert(run_id, task_id, label, &self.timestamp())?; + self.append_worker_event( run_id, + worker_id, task_id, - alert_channel_label(channel), - &self.timestamp(), + FleetWorkerEventPayload::Escalated { + channel: label.to_string(), + alert_id: None, + }, )?; count += 1; } @@ -516,6 +533,10 @@ fn alert_channel_label(channel: &FleetAlertChannel) -> &'static str { } } +fn alert_policy_matches(policy: &FleetAlertPolicy, class: FleetAlertEventClass) -> bool { + policy.events.is_empty() || policy.events.contains(&class) +} + fn event_key(worker_id: &str, run_id: &str, task_id: &str) -> String { format!("{worker_id}:{run_id}:{task_id}") } @@ -681,6 +702,7 @@ mod tests { let mut scheduler = scheduler(&tmp, 1); let mut failing = task("task-a", 1); failing.alert_policy = Some(FleetAlertPolicy { + events: vec![FleetAlertEventClass::RestartExhausted], channels: vec![FleetAlertChannel::Slack { webhook_url: "https://hooks.slack.invalid/secret".to_string(), }], @@ -704,6 +726,7 @@ mod tests { ); let ledger = ledger_text(&scheduler); assert!(ledger.contains("\"state\":\"failed\"")); + assert!(ledger.contains("\"state\":\"escalated\"")); assert!(ledger.contains("\"record\":\"alert_sent\"")); assert!(!ledger.contains("hooks.slack.invalid/secret")); } diff --git a/crates/tui/src/main.rs b/crates/tui/src/main.rs index 7c24368f..db369f08 100644 --- a/crates/tui/src/main.rs +++ b/crates/tui/src/main.rs @@ -410,6 +410,8 @@ enum FleetCommand { #[arg(long, required = true)] all: bool, }, + /// Render a redacted fleet alert payload without sending it + AlertDryRun(FleetAlertDryRunArgs), } #[derive(Args, Debug, Clone)] @@ -428,6 +430,63 @@ struct FleetRunArgs { once: bool, } +#[derive(Args, Debug, Clone)] +struct FleetAlertDryRunArgs { + /// Alert event class to render + #[arg(long, value_enum)] + event: FleetAlertEventArg, + /// Fleet run id + #[arg(long)] + run_id: String, + /// Worker id, when the event belongs to one worker + #[arg(long)] + worker_id: Option, + /// Task id, when the event belongs to one task + #[arg(long)] + task_id: Option, + /// Short human-readable reason for the alert + #[arg(long, default_value = "manual fleet alert dry-run")] + reason: String, + /// Status label to include in the payload + #[arg(long)] + status: Option, + /// Adapter payload shape to render + #[arg(long, value_enum, default_value_t = FleetAlertAdapterArg::Slack)] + adapter: FleetAlertAdapterArg, + /// Environment variable containing the Slack webhook URL + #[arg(long, default_value = "CODEWHALE_FLEET_SLACK_WEBHOOK")] + slack_webhook_env: String, + /// Environment variable containing the generic webhook URL + #[arg(long, default_value = "CODEWHALE_FLEET_WEBHOOK_URL")] + webhook_url_env: String, + /// Optional environment variable containing the generic webhook secret + #[arg(long)] + webhook_secret_env: Option, + /// Environment variable containing the PagerDuty routing key + #[arg(long, default_value = "CODEWHALE_FLEET_PAGERDUTY_ROUTING_KEY")] + pagerduty_routing_key_env: String, + /// PagerDuty severity to render + #[arg(long, default_value = "error")] + pagerduty_severity: String, +} + +#[derive(ValueEnum, Debug, Clone, Copy)] +enum FleetAlertEventArg { + Stale, + RestartExhausted, + NeedsHuman, + BudgetExceeded, + VerifierFailed, + RunCompleted, +} + +#[derive(ValueEnum, Debug, Clone, Copy)] +enum FleetAlertAdapterArg { + Slack, + Webhook, + PagerDuty, +} + #[derive(Args, Debug, Clone)] struct SwebenchRunArgs { /// SWE-bench instance id, e.g. django__django-12345 @@ -1390,9 +1449,14 @@ async fn run_swebench_command( } async fn run_fleet_command(workspace: &Path, args: FleetArgs) -> Result<()> { + use crate::fleet::alerts::{ + FleetAlertAdapterConfig, FleetAlertConfig, FleetAlertDispatcher, FleetAlertEvent, + FleetEnvSecretResolver, + }; use crate::fleet::manager::{FleetManager, FleetStatusSnapshot, FleetWorkerInspection}; use codewhale_protocol::fleet::{ - FleetArtifactKind, FleetWorkerEventPayload, FleetWorkerStatus, + FleetAlertEventClass, FleetArtifactKind, FleetRunId, FleetWorkerEventPayload, + FleetWorkerStatus, }; fn worker_status_label(status: &FleetWorkerStatus) -> &'static str { @@ -1529,6 +1593,49 @@ async fn run_fleet_command(workspace: &Path, args: FleetArgs) -> Result<()> { } } + fn alert_event_class(arg: FleetAlertEventArg) -> FleetAlertEventClass { + match arg { + FleetAlertEventArg::Stale => FleetAlertEventClass::Stale, + FleetAlertEventArg::RestartExhausted => FleetAlertEventClass::RestartExhausted, + FleetAlertEventArg::NeedsHuman => FleetAlertEventClass::NeedsHuman, + FleetAlertEventArg::BudgetExceeded => FleetAlertEventClass::BudgetExceeded, + FleetAlertEventArg::VerifierFailed => FleetAlertEventClass::VerifierFailed, + FleetAlertEventArg::RunCompleted => FleetAlertEventClass::RunCompleted, + } + } + + fn alert_status(class: FleetAlertEventClass, override_status: Option) -> String { + if let Some(status) = override_status { + return status; + } + match class { + FleetAlertEventClass::Stale => "stale", + FleetAlertEventClass::RestartExhausted => "failed", + FleetAlertEventClass::NeedsHuman => "needs_human", + FleetAlertEventClass::BudgetExceeded => "budget_exceeded", + FleetAlertEventClass::VerifierFailed => "verifier_failed", + FleetAlertEventClass::RunCompleted => "completed", + } + .to_string() + } + + fn alert_adapter(args: &FleetAlertDryRunArgs) -> FleetAlertAdapterConfig { + match args.adapter { + FleetAlertAdapterArg::Slack => FleetAlertAdapterConfig::Slack { + webhook_env: args.slack_webhook_env.clone(), + channel: None, + }, + FleetAlertAdapterArg::Webhook => FleetAlertAdapterConfig::Webhook { + url_env: args.webhook_url_env.clone(), + secret_env: args.webhook_secret_env.clone(), + }, + FleetAlertAdapterArg::PagerDuty => FleetAlertAdapterConfig::PagerDuty { + routing_key_env: args.pagerduty_routing_key_env.clone(), + severity: args.pagerduty_severity.clone(), + }, + } + } + let manager = FleetManager::open(workspace)?; match args.command { FleetCommand::Init => { @@ -1591,6 +1698,30 @@ async fn run_fleet_command(workspace: &Path, args: FleetArgs) -> Result<()> { println!("stopped: {stopped}"); Ok(()) } + FleetCommand::AlertDryRun(args) => { + let class = alert_event_class(args.event); + let adapter = alert_adapter(&args); + let event = FleetAlertEvent { + class, + run_id: FleetRunId::from(args.run_id.clone()), + worker_id: args.worker_id.clone(), + task_id: args.task_id.clone(), + status: alert_status(class, args.status.clone()), + reason: args.reason.clone(), + }; + let dispatcher = FleetAlertDispatcher::new( + FleetAlertConfig::dry_run_for_adapter(adapter), + FleetEnvSecretResolver, + ); + let deliveries = dispatcher.dispatch(&event)?; + for delivery in deliveries { + println!( + "{}", + serde_json::to_string_pretty(&delivery.redacted_payload)? + ); + } + Ok(()) + } } } diff --git a/docs/FLEET.md b/docs/FLEET.md index 1c227155..6d39e747 100644 --- a/docs/FLEET.md +++ b/docs/FLEET.md @@ -175,6 +175,73 @@ an explicit verifier pass completes. } ``` +## Alerts + +Fleet alerting is disabled by default. A caller must supply an enabled alert +config before anything is sent. Routes match typed fleet event classes, not log +strings: + +- `stale` +- `restart_exhausted` +- `needs_human` +- `budget_exceeded` +- `verifier_failed` +- `run_completed` + +Adapter config stores environment variable names, not secret values. Send-time +code resolves those names from the environment or a future secrets provider. +Ledger records store only audit labels such as `slack`, `webhook`, or +`pagerduty`; task specs persisted in the ledger redact webhook URLs and routing +keys. + +Example alert config shape: + +```json +{ + "enabled": true, + "dry_run": true, + "routes": [ + { + "events": ["stale", "restart_exhausted", "verifier_failed"], + "adapter": "ops-slack" + }, + { + "events": ["restart_exhausted"], + "adapter": "pager" + } + ], + "adapters": { + "ops-slack": { + "kind": "slack", + "webhook_env": "CODEWHALE_FLEET_SLACK_WEBHOOK", + "channel": "#codewhale-fleet" + }, + "pager": { + "kind": "pager_duty", + "routing_key_env": "CODEWHALE_FLEET_PAGERDUTY_ROUTING_KEY", + "severity": "critical" + } + } +} +``` + +Use dry-run to inspect a redacted adapter payload without sending: + +```sh +codewhale fleet alert-dry-run \ + --event stale \ + --run-id fleet-demo \ + --worker-id fleet-demo-local-1 \ + --task-id release-triage \ + --reason "worker heartbeat stale since 2026-06-13T02:00:00Z" \ + --adapter slack +``` + +The payload includes the run id, worker id, task id, status, short reason, and +safe inspection commands such as `codewhale fleet status` and +`codewhale fleet inspect `. Endpoints, webhook secrets, and +PagerDuty routing keys are shown as ``. + ## Host Adapters The host adapter boundary supports local child processes and explicit SSH