merge #3161 fleet alerts

This commit is contained in:
Hunter B
2026-06-12 19:18:01 -07:00
6 changed files with 928 additions and 5 deletions
+17
View File
@@ -443,6 +443,9 @@ fn default_retry_backoff_multiplier() -> u32 {
/// Alert/escalation policy attached to a task or run. /// Alert/escalation policy attached to a task or run.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct FleetAlertPolicy { pub struct FleetAlertPolicy {
#[serde(default)]
#[serde(skip_serializing_if = "Vec::is_empty")]
pub events: Vec<FleetAlertEventClass>,
#[serde(default)] #[serde(default)]
pub channels: Vec<FleetAlertChannel>, pub channels: Vec<FleetAlertChannel>,
#[serde(default)] #[serde(default)]
@@ -451,6 +454,17 @@ pub struct FleetAlertPolicy {
pub after_minutes_stale: Option<u64>, pub after_minutes_stale: Option<u64>,
} }
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[serde(rename_all = "snake_case")]
pub enum FleetAlertEventClass {
Stale,
RestartExhausted,
NeedsHuman,
BudgetExceeded,
VerifierFailed,
RunCompleted,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "kind", rename_all = "snake_case")] #[serde(tag = "kind", rename_all = "snake_case")]
pub enum FleetAlertChannel { pub enum FleetAlertChannel {
@@ -632,6 +646,7 @@ mod tests {
#[test] #[test]
fn alert_policy_round_trip() { fn alert_policy_round_trip() {
let policy = FleetAlertPolicy { let policy = FleetAlertPolicy {
events: vec![FleetAlertEventClass::Stale],
channels: vec![FleetAlertChannel::Slack { channels: vec![FleetAlertChannel::Slack {
webhook_url: "https://hooks.slack.com/test".to_string(), webhook_url: "https://hooks.slack.com/test".to_string(),
}], }],
@@ -639,8 +654,10 @@ mod tests {
after_minutes_stale: Some(10), after_minutes_stale: Some(10),
}; };
let json = serde_json::to_string(&policy).unwrap(); let json = serde_json::to_string(&policy).unwrap();
assert!(json.contains("\"events\":[\"stale\"]"));
assert!(json.contains("\"kind\":\"slack\"")); assert!(json.contains("\"kind\":\"slack\""));
let back: FleetAlertPolicy = serde_json::from_str(&json).unwrap(); let back: FleetAlertPolicy = serde_json::from_str(&json).unwrap();
assert_eq!(back.events, vec![FleetAlertEventClass::Stale]);
assert_eq!(back.after_attempts, Some(2)); assert_eq!(back.after_attempts, Some(2));
} }
+684
View File
@@ -0,0 +1,684 @@
//! Opt-in fleet alert routing and adapter payloads.
#![allow(dead_code)]
use std::collections::BTreeMap;
use std::time::Duration;
use anyhow::{Context, Result, anyhow};
use codewhale_protocol::fleet::{
FleetAlertEventClass, FleetReceipt, FleetRunId, FleetTaskFailureKind, FleetWorkerEvent,
FleetWorkerEventPayload,
};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
const DEFAULT_ALERT_TIMEOUT_SECONDS: u64 = 10;
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct FleetAlertConfig {
#[serde(default)]
pub enabled: bool,
#[serde(default)]
pub dry_run: bool,
#[serde(default)]
pub routes: Vec<FleetAlertRoute>,
#[serde(default)]
pub adapters: BTreeMap<String, FleetAlertAdapterConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct FleetAlertRoute {
#[serde(default)]
#[serde(skip_serializing_if = "Vec::is_empty")]
pub events: Vec<FleetAlertEventClass>,
pub adapter: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum FleetAlertAdapterConfig {
Slack {
webhook_env: String,
#[serde(skip_serializing_if = "Option::is_none")]
channel: Option<String>,
},
Webhook {
url_env: String,
#[serde(skip_serializing_if = "Option::is_none")]
secret_env: Option<String>,
},
PagerDuty {
routing_key_env: String,
#[serde(default = "default_pagerduty_severity")]
severity: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct FleetAlertEvent {
pub class: FleetAlertEventClass,
pub run_id: FleetRunId,
#[serde(skip_serializing_if = "Option::is_none")]
pub worker_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub task_id: Option<String>,
pub status: String,
pub reason: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FleetAlertDelivery {
pub adapter: String,
pub event_class: FleetAlertEventClass,
pub dry_run: bool,
pub sent: bool,
pub redacted_payload: Value,
}
pub trait FleetAlertSecretResolver {
fn resolve(&self, name: &str) -> Option<String>;
}
#[derive(Debug, Clone, Copy, Default)]
pub struct FleetEnvSecretResolver;
impl FleetAlertSecretResolver for FleetEnvSecretResolver {
fn resolve(&self, name: &str) -> Option<String> {
std::env::var(name).ok().filter(|value| !value.is_empty())
}
}
pub struct FleetAlertDispatcher<R = FleetEnvSecretResolver> {
config: FleetAlertConfig,
resolver: R,
}
impl FleetAlertConfig {
pub fn disabled() -> Self {
Self::default()
}
pub fn dry_run_for_adapter(adapter: FleetAlertAdapterConfig) -> Self {
let mut adapters = BTreeMap::new();
adapters.insert("dry-run".to_string(), adapter);
Self {
enabled: true,
dry_run: true,
routes: vec![FleetAlertRoute {
events: Vec::new(),
adapter: "dry-run".to_string(),
}],
adapters,
}
}
}
impl<R> FleetAlertDispatcher<R>
where
R: FleetAlertSecretResolver,
{
pub fn new(config: FleetAlertConfig, resolver: R) -> Self {
Self { config, resolver }
}
pub fn dispatch(&self, event: &FleetAlertEvent) -> Result<Vec<FleetAlertDelivery>> {
if !self.config.enabled {
return Ok(Vec::new());
}
let mut deliveries = Vec::new();
for route in self
.config
.routes
.iter()
.filter(|route| route_matches(route, event.class))
{
let adapter = self.config.adapters.get(&route.adapter).ok_or_else(|| {
anyhow!("fleet alert adapter {} is not configured", route.adapter)
})?;
let prepared = prepare_alert(&route.adapter, adapter, event, self.config.dry_run)?;
let sent = if self.config.dry_run {
false
} else {
send_alert(adapter, &prepared.body, &self.resolver)?
};
deliveries.push(FleetAlertDelivery {
adapter: route.adapter.clone(),
event_class: event.class,
dry_run: self.config.dry_run,
sent,
redacted_payload: prepared.redacted_payload,
});
}
Ok(deliveries)
}
}
impl FleetAlertEvent {
pub fn stale_from_worker_event(event: &FleetWorkerEvent) -> Option<Self> {
let FleetWorkerEventPayload::Stale { last_heartbeat_at } = &event.payload else {
return None;
};
Some(Self {
class: FleetAlertEventClass::Stale,
run_id: event.run_id.clone(),
worker_id: Some(event.worker_id.clone()),
task_id: Some(event.task_id.clone()),
status: "stale".to_string(),
reason: last_heartbeat_at
.as_ref()
.map(|ts| format!("worker heartbeat stale since {ts}"))
.unwrap_or_else(|| "worker heartbeat is stale".to_string()),
})
}
pub fn restart_exhausted(
run_id: FleetRunId,
worker_id: impl Into<String>,
task_id: impl Into<String>,
reason: impl Into<String>,
) -> Self {
Self {
class: FleetAlertEventClass::RestartExhausted,
run_id,
worker_id: Some(worker_id.into()),
task_id: Some(task_id.into()),
status: "failed".to_string(),
reason: reason.into(),
}
}
pub fn needs_human(
run_id: FleetRunId,
worker_id: Option<String>,
task_id: Option<String>,
reason: impl Into<String>,
) -> Self {
Self {
class: FleetAlertEventClass::NeedsHuman,
run_id,
worker_id,
task_id,
status: "needs_human".to_string(),
reason: reason.into(),
}
}
pub fn budget_exceeded(
run_id: FleetRunId,
worker_id: Option<String>,
task_id: Option<String>,
reason: impl Into<String>,
) -> Self {
Self {
class: FleetAlertEventClass::BudgetExceeded,
run_id,
worker_id,
task_id,
status: "budget_exceeded".to_string(),
reason: reason.into(),
}
}
pub fn verifier_failed(receipt: &FleetReceipt) -> Option<Self> {
if receipt.failure_kind != Some(FleetTaskFailureKind::Verifier) {
return None;
}
Some(Self {
class: FleetAlertEventClass::VerifierFailed,
run_id: receipt.run_id.clone(),
worker_id: Some(receipt.worker_id.clone()),
task_id: Some(receipt.task_id.clone()),
status: "verifier_failed".to_string(),
reason: receipt
.score
.as_ref()
.and_then(|score| score.notes.clone())
.unwrap_or_else(|| "verifier failed".to_string()),
})
}
pub fn run_completed(run_id: FleetRunId, reason: impl Into<String>) -> Self {
Self {
class: FleetAlertEventClass::RunCompleted,
run_id,
worker_id: None,
task_id: None,
status: "completed".to_string(),
reason: reason.into(),
}
}
pub fn inspection_commands(&self) -> Vec<String> {
let mut commands = vec!["codewhale fleet status".to_string()];
if let Some(worker_id) = &self.worker_id {
commands.push(format!("codewhale fleet inspect {worker_id}"));
}
commands
}
}
struct PreparedAlert {
body: Value,
redacted_payload: Value,
}
fn prepare_alert(
adapter_name: &str,
adapter: &FleetAlertAdapterConfig,
event: &FleetAlertEvent,
dry_run: bool,
) -> Result<PreparedAlert> {
let safe_event = safe_event_payload(event);
let prepared = match adapter {
FleetAlertAdapterConfig::Slack {
webhook_env,
channel,
} => {
let body = slack_body(event, channel.as_deref());
let redacted_payload = json!({
"adapter": adapter_name,
"kind": "slack",
"dry_run": dry_run,
"target": redacted_env(webhook_env),
"event": safe_event,
"body": body,
});
PreparedAlert {
body,
redacted_payload,
}
}
FleetAlertAdapterConfig::Webhook {
url_env,
secret_env,
} => {
let body = json!({
"source": "codewhale",
"event": safe_event,
});
let redacted_payload = json!({
"adapter": adapter_name,
"kind": "webhook",
"dry_run": dry_run,
"target": redacted_env(url_env),
"headers": redacted_secret_header(secret_env.as_deref()),
"body": body,
});
PreparedAlert {
body,
redacted_payload,
}
}
FleetAlertAdapterConfig::PagerDuty {
routing_key_env,
severity,
} => {
let body = pagerduty_body(event, severity, redacted_env(routing_key_env));
let redacted_payload = json!({
"adapter": adapter_name,
"kind": "pagerduty",
"dry_run": dry_run,
"target": "https://events.pagerduty.com/v2/enqueue",
"body": body,
});
PreparedAlert {
body,
redacted_payload,
}
}
};
Ok(prepared)
}
fn send_alert<R>(
adapter: &FleetAlertAdapterConfig,
redacted_body: &Value,
resolver: &R,
) -> Result<bool>
where
R: FleetAlertSecretResolver,
{
let client = crate::tls::reqwest_blocking_client_builder()
.timeout(Duration::from_secs(DEFAULT_ALERT_TIMEOUT_SECONDS))
.build()
.context("building fleet alert HTTP client")?;
match adapter {
FleetAlertAdapterConfig::Slack { webhook_env, .. } => {
let url = required_secret(resolver, webhook_env)?;
client
.post(url)
.json(redacted_body)
.send()
.context("sending fleet Slack alert")?
.error_for_status()
.context("Slack alert rejected")?;
}
FleetAlertAdapterConfig::Webhook {
url_env,
secret_env,
} => {
let url = required_secret(resolver, url_env)?;
let mut request = client.post(url).json(redacted_body);
if let Some(secret_env) = secret_env {
request = request.header(
"X-CodeWhale-Webhook-Secret",
required_secret(resolver, secret_env)?,
);
}
request
.send()
.context("sending fleet webhook alert")?
.error_for_status()
.context("webhook alert rejected")?;
}
FleetAlertAdapterConfig::PagerDuty {
routing_key_env,
severity,
} => {
let routing_key = required_secret(resolver, routing_key_env)?;
let mut body = redacted_body.clone();
if let Some(map) = body.as_object_mut() {
map.insert("routing_key".to_string(), Value::String(routing_key));
}
if let Some(payload) = body.get_mut("payload").and_then(Value::as_object_mut) {
payload.insert("severity".to_string(), Value::String(severity.clone()));
}
client
.post("https://events.pagerduty.com/v2/enqueue")
.json(&body)
.send()
.context("sending fleet PagerDuty alert")?
.error_for_status()
.context("PagerDuty alert rejected")?;
}
}
Ok(true)
}
fn route_matches(route: &FleetAlertRoute, class: FleetAlertEventClass) -> bool {
route.events.is_empty() || route.events.contains(&class)
}
fn safe_event_payload(event: &FleetAlertEvent) -> Value {
json!({
"class": event.class,
"run_id": event.run_id.0.clone(),
"worker_id": event.worker_id.clone(),
"task_id": event.task_id.clone(),
"status": event.status.clone(),
"reason": short_reason(&event.reason),
"commands": event.inspection_commands(),
})
}
fn slack_body(event: &FleetAlertEvent, channel: Option<&str>) -> Value {
let text = format!(
"CodeWhale fleet {}: run={} task={} reason={}",
alert_class_label(event.class),
event.run_id.0,
event.task_id.as_deref().unwrap_or("-"),
short_reason(&event.reason)
);
let mut body = json!({
"text": text,
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": text
}
},
{
"type": "context",
"elements": [
{
"type": "mrkdwn",
"text": event.inspection_commands().join(" | ")
}
]
}
]
});
if let Some(channel) = channel
&& let Some(map) = body.as_object_mut()
{
map.insert("channel".to_string(), Value::String(channel.to_string()));
}
body
}
fn pagerduty_body(event: &FleetAlertEvent, severity: &str, routing_key: String) -> Value {
json!({
"routing_key": routing_key,
"event_action": "trigger",
"payload": {
"summary": format!("CodeWhale fleet {}: {}", alert_class_label(event.class), short_reason(&event.reason)),
"severity": severity,
"source": "codewhale",
"custom_details": safe_event_payload(event),
}
})
}
fn redacted_env(name: &str) -> String {
format!("<redacted:env:{name}>")
}
fn alert_class_label(class: FleetAlertEventClass) -> &'static str {
match class {
FleetAlertEventClass::Stale => "stale",
FleetAlertEventClass::RestartExhausted => "restart_exhausted",
FleetAlertEventClass::NeedsHuman => "needs_human",
FleetAlertEventClass::BudgetExceeded => "budget_exceeded",
FleetAlertEventClass::VerifierFailed => "verifier_failed",
FleetAlertEventClass::RunCompleted => "run_completed",
}
}
fn redacted_secret_header(secret_env: Option<&str>) -> Value {
match secret_env {
Some(name) => json!({ "X-CodeWhale-Webhook-Secret": redacted_env(name) }),
None => json!({}),
}
}
fn required_secret<R>(resolver: &R, name: &str) -> Result<String>
where
R: FleetAlertSecretResolver,
{
resolver
.resolve(name)
.ok_or_else(|| anyhow!("fleet alert secret {name} is not configured"))
}
fn short_reason(reason: &str) -> String {
let trimmed = reason.trim();
if trimmed.len() <= 240 {
return trimmed.to_string();
}
let prefix: String = trimmed.chars().take(237).collect();
format!("{prefix}...")
}
fn default_pagerduty_severity() -> String {
"error".to_string()
}
#[cfg(test)]
mod tests {
use super::*;
use codewhale_protocol::fleet::{FleetScore, FleetTaskResult};
#[derive(Default)]
struct MapResolver {
values: BTreeMap<String, String>,
}
impl FleetAlertSecretResolver for MapResolver {
fn resolve(&self, name: &str) -> Option<String> {
self.values.get(name).cloned()
}
}
fn event(class: FleetAlertEventClass) -> FleetAlertEvent {
FleetAlertEvent {
class,
run_id: FleetRunId::from("run-1"),
worker_id: Some("worker-1".to_string()),
task_id: Some("task-a".to_string()),
status: "stale".to_string(),
reason: "worker heartbeat stale".to_string(),
}
}
#[test]
fn fleet_alert_disabled_by_default() {
let dispatcher =
FleetAlertDispatcher::new(FleetAlertConfig::default(), MapResolver::default());
let deliveries = dispatcher
.dispatch(&event(FleetAlertEventClass::Stale))
.unwrap();
assert!(deliveries.is_empty());
}
#[test]
fn fleet_alert_policy_routes_event_classes_to_adapters() {
let mut adapters = BTreeMap::new();
adapters.insert(
"ops-slack".to_string(),
FleetAlertAdapterConfig::Slack {
webhook_env: "FLEET_SLACK_WEBHOOK".to_string(),
channel: Some("#fleet".to_string()),
},
);
adapters.insert(
"release-webhook".to_string(),
FleetAlertAdapterConfig::Webhook {
url_env: "FLEET_WEBHOOK_URL".to_string(),
secret_env: None,
},
);
let dispatcher = FleetAlertDispatcher::new(
FleetAlertConfig {
enabled: true,
dry_run: true,
routes: vec![
FleetAlertRoute {
events: vec![FleetAlertEventClass::Stale],
adapter: "ops-slack".to_string(),
},
FleetAlertRoute {
events: vec![FleetAlertEventClass::RunCompleted],
adapter: "release-webhook".to_string(),
},
],
adapters,
},
MapResolver::default(),
);
let deliveries = dispatcher
.dispatch(&event(FleetAlertEventClass::Stale))
.unwrap();
assert_eq!(deliveries.len(), 1);
assert_eq!(deliveries[0].adapter, "ops-slack");
assert_eq!(deliveries[0].event_class, FleetAlertEventClass::Stale);
assert!(!deliveries[0].sent);
assert_eq!(deliveries[0].redacted_payload["kind"], "slack");
}
#[test]
fn fleet_alert_dry_run_redacts_secrets() {
let mut adapters = BTreeMap::new();
adapters.insert(
"pager".to_string(),
FleetAlertAdapterConfig::PagerDuty {
routing_key_env: "FLEET_PD_ROUTING_KEY".to_string(),
severity: "critical".to_string(),
},
);
let mut resolver = MapResolver::default();
resolver.values.insert(
"FLEET_PD_ROUTING_KEY".to_string(),
"real-routing-key-secret".to_string(),
);
let dispatcher = FleetAlertDispatcher::new(
FleetAlertConfig {
enabled: true,
dry_run: true,
routes: vec![FleetAlertRoute {
events: vec![FleetAlertEventClass::RestartExhausted],
adapter: "pager".to_string(),
}],
adapters,
},
resolver,
);
let deliveries = dispatcher
.dispatch(&event(FleetAlertEventClass::RestartExhausted))
.unwrap();
let payload = serde_json::to_string(&deliveries[0].redacted_payload).unwrap();
assert!(payload.contains("<redacted:env:FLEET_PD_ROUTING_KEY>"));
assert!(!payload.contains("real-routing-key-secret"));
assert!(payload.contains("codewhale fleet inspect worker-1"));
}
#[test]
fn fleet_alert_event_is_derived_from_ledgered_stale_worker_event() {
let worker_event = FleetWorkerEvent {
seq: 4,
run_id: FleetRunId::from("run-1"),
worker_id: "worker-1".to_string(),
task_id: "task-a".to_string(),
timestamp: "2026-06-13T02:00:00Z".to_string(),
payload: FleetWorkerEventPayload::Stale {
last_heartbeat_at: Some("2026-06-13T01:57:00Z".to_string()),
},
extra: BTreeMap::new(),
};
let alert = FleetAlertEvent::stale_from_worker_event(&worker_event).unwrap();
assert_eq!(alert.class, FleetAlertEventClass::Stale);
assert_eq!(alert.worker_id.as_deref(), Some("worker-1"));
assert!(alert.reason.contains("2026-06-13T01:57:00Z"));
assert_eq!(
alert.inspection_commands(),
vec![
"codewhale fleet status".to_string(),
"codewhale fleet inspect worker-1".to_string()
]
);
}
#[test]
fn fleet_alert_verifier_failed_event_is_derived_from_receipt() {
let receipt = FleetReceipt {
run_id: FleetRunId::from("run-1"),
task_id: "task-a".to_string(),
worker_id: "worker-1".to_string(),
completed_at: "2026-06-13T02:00:00Z".to_string(),
result: FleetTaskResult::Fail,
failure_kind: Some(FleetTaskFailureKind::Verifier),
artifacts: vec![],
score: Some(FleetScore {
value: 0.0,
max: Some(1.0),
notes: Some("regex scorer could not be compiled".to_string()),
}),
};
let alert = FleetAlertEvent::verifier_failed(&receipt).unwrap();
assert_eq!(alert.class, FleetAlertEventClass::VerifierFailed);
assert_eq!(alert.status, "verifier_failed");
assert!(alert.reason.contains("regex scorer"));
}
}
+1
View File
@@ -1,5 +1,6 @@
//! Agent Fleet control plane — local-first manager, ledger, and workers. //! Agent Fleet control plane — local-first manager, ledger, and workers.
pub mod alerts;
pub mod host; pub mod host;
pub mod ledger; pub mod ledger;
pub mod manager; pub mod manager;
+27 -4
View File
@@ -224,7 +224,13 @@ impl FleetScheduler {
}, },
)?; )?;
report.failed += 1; report.failed += 1;
report.alerts += self.record_alerts(&task.entry.run_id, &task.entry.task_id, task_spec)?; report.alerts += self.record_alerts(
&task.entry.run_id,
&task.entry.task_id,
worker_id,
task_spec,
FleetAlertEventClass::RestartExhausted,
)?;
Ok(()) Ok(())
} }
@@ -358,18 +364,29 @@ impl FleetScheduler {
&self, &self,
run_id: &FleetRunId, run_id: &FleetRunId,
task_id: &str, task_id: &str,
worker_id: &str,
task_spec: &FleetTaskSpec, task_spec: &FleetTaskSpec,
event_class: FleetAlertEventClass,
) -> Result<usize> { ) -> Result<usize> {
let Some(policy) = &task_spec.alert_policy else { let Some(policy) = &task_spec.alert_policy else {
return Ok(0); return Ok(0);
}; };
if !alert_policy_matches(policy, event_class) {
return Ok(0);
}
let mut count = 0; let mut count = 0;
for channel in &policy.channels { for channel in &policy.channels {
self.ledger.record_alert( let label = alert_channel_label(channel);
self.ledger
.record_alert(run_id, task_id, label, &self.timestamp())?;
self.append_worker_event(
run_id, run_id,
worker_id,
task_id, task_id,
alert_channel_label(channel), FleetWorkerEventPayload::Escalated {
&self.timestamp(), channel: label.to_string(),
alert_id: None,
},
)?; )?;
count += 1; count += 1;
} }
@@ -516,6 +533,10 @@ fn alert_channel_label(channel: &FleetAlertChannel) -> &'static str {
} }
} }
fn alert_policy_matches(policy: &FleetAlertPolicy, class: FleetAlertEventClass) -> bool {
policy.events.is_empty() || policy.events.contains(&class)
}
fn event_key(worker_id: &str, run_id: &str, task_id: &str) -> String { fn event_key(worker_id: &str, run_id: &str, task_id: &str) -> String {
format!("{worker_id}:{run_id}:{task_id}") format!("{worker_id}:{run_id}:{task_id}")
} }
@@ -681,6 +702,7 @@ mod tests {
let mut scheduler = scheduler(&tmp, 1); let mut scheduler = scheduler(&tmp, 1);
let mut failing = task("task-a", 1); let mut failing = task("task-a", 1);
failing.alert_policy = Some(FleetAlertPolicy { failing.alert_policy = Some(FleetAlertPolicy {
events: vec![FleetAlertEventClass::RestartExhausted],
channels: vec![FleetAlertChannel::Slack { channels: vec![FleetAlertChannel::Slack {
webhook_url: "https://hooks.slack.invalid/secret".to_string(), webhook_url: "https://hooks.slack.invalid/secret".to_string(),
}], }],
@@ -704,6 +726,7 @@ mod tests {
); );
let ledger = ledger_text(&scheduler); let ledger = ledger_text(&scheduler);
assert!(ledger.contains("\"state\":\"failed\"")); assert!(ledger.contains("\"state\":\"failed\""));
assert!(ledger.contains("\"state\":\"escalated\""));
assert!(ledger.contains("\"record\":\"alert_sent\"")); assert!(ledger.contains("\"record\":\"alert_sent\""));
assert!(!ledger.contains("hooks.slack.invalid/secret")); assert!(!ledger.contains("hooks.slack.invalid/secret"));
} }
+132 -1
View File
@@ -410,6 +410,8 @@ enum FleetCommand {
#[arg(long, required = true)] #[arg(long, required = true)]
all: bool, all: bool,
}, },
/// Render a redacted fleet alert payload without sending it
AlertDryRun(FleetAlertDryRunArgs),
} }
#[derive(Args, Debug, Clone)] #[derive(Args, Debug, Clone)]
@@ -428,6 +430,63 @@ struct FleetRunArgs {
once: bool, once: bool,
} }
#[derive(Args, Debug, Clone)]
struct FleetAlertDryRunArgs {
/// Alert event class to render
#[arg(long, value_enum)]
event: FleetAlertEventArg,
/// Fleet run id
#[arg(long)]
run_id: String,
/// Worker id, when the event belongs to one worker
#[arg(long)]
worker_id: Option<String>,
/// Task id, when the event belongs to one task
#[arg(long)]
task_id: Option<String>,
/// Short human-readable reason for the alert
#[arg(long, default_value = "manual fleet alert dry-run")]
reason: String,
/// Status label to include in the payload
#[arg(long)]
status: Option<String>,
/// Adapter payload shape to render
#[arg(long, value_enum, default_value_t = FleetAlertAdapterArg::Slack)]
adapter: FleetAlertAdapterArg,
/// Environment variable containing the Slack webhook URL
#[arg(long, default_value = "CODEWHALE_FLEET_SLACK_WEBHOOK")]
slack_webhook_env: String,
/// Environment variable containing the generic webhook URL
#[arg(long, default_value = "CODEWHALE_FLEET_WEBHOOK_URL")]
webhook_url_env: String,
/// Optional environment variable containing the generic webhook secret
#[arg(long)]
webhook_secret_env: Option<String>,
/// Environment variable containing the PagerDuty routing key
#[arg(long, default_value = "CODEWHALE_FLEET_PAGERDUTY_ROUTING_KEY")]
pagerduty_routing_key_env: String,
/// PagerDuty severity to render
#[arg(long, default_value = "error")]
pagerduty_severity: String,
}
#[derive(ValueEnum, Debug, Clone, Copy)]
enum FleetAlertEventArg {
Stale,
RestartExhausted,
NeedsHuman,
BudgetExceeded,
VerifierFailed,
RunCompleted,
}
#[derive(ValueEnum, Debug, Clone, Copy)]
enum FleetAlertAdapterArg {
Slack,
Webhook,
PagerDuty,
}
#[derive(Args, Debug, Clone)] #[derive(Args, Debug, Clone)]
struct SwebenchRunArgs { struct SwebenchRunArgs {
/// SWE-bench instance id, e.g. django__django-12345 /// SWE-bench instance id, e.g. django__django-12345
@@ -1390,9 +1449,14 @@ async fn run_swebench_command(
} }
async fn run_fleet_command(workspace: &Path, args: FleetArgs) -> Result<()> { async fn run_fleet_command(workspace: &Path, args: FleetArgs) -> Result<()> {
use crate::fleet::alerts::{
FleetAlertAdapterConfig, FleetAlertConfig, FleetAlertDispatcher, FleetAlertEvent,
FleetEnvSecretResolver,
};
use crate::fleet::manager::{FleetManager, FleetStatusSnapshot, FleetWorkerInspection}; use crate::fleet::manager::{FleetManager, FleetStatusSnapshot, FleetWorkerInspection};
use codewhale_protocol::fleet::{ use codewhale_protocol::fleet::{
FleetArtifactKind, FleetWorkerEventPayload, FleetWorkerStatus, FleetAlertEventClass, FleetArtifactKind, FleetRunId, FleetWorkerEventPayload,
FleetWorkerStatus,
}; };
fn worker_status_label(status: &FleetWorkerStatus) -> &'static str { fn worker_status_label(status: &FleetWorkerStatus) -> &'static str {
@@ -1529,6 +1593,49 @@ async fn run_fleet_command(workspace: &Path, args: FleetArgs) -> Result<()> {
} }
} }
fn alert_event_class(arg: FleetAlertEventArg) -> FleetAlertEventClass {
match arg {
FleetAlertEventArg::Stale => FleetAlertEventClass::Stale,
FleetAlertEventArg::RestartExhausted => FleetAlertEventClass::RestartExhausted,
FleetAlertEventArg::NeedsHuman => FleetAlertEventClass::NeedsHuman,
FleetAlertEventArg::BudgetExceeded => FleetAlertEventClass::BudgetExceeded,
FleetAlertEventArg::VerifierFailed => FleetAlertEventClass::VerifierFailed,
FleetAlertEventArg::RunCompleted => FleetAlertEventClass::RunCompleted,
}
}
fn alert_status(class: FleetAlertEventClass, override_status: Option<String>) -> String {
if let Some(status) = override_status {
return status;
}
match class {
FleetAlertEventClass::Stale => "stale",
FleetAlertEventClass::RestartExhausted => "failed",
FleetAlertEventClass::NeedsHuman => "needs_human",
FleetAlertEventClass::BudgetExceeded => "budget_exceeded",
FleetAlertEventClass::VerifierFailed => "verifier_failed",
FleetAlertEventClass::RunCompleted => "completed",
}
.to_string()
}
fn alert_adapter(args: &FleetAlertDryRunArgs) -> FleetAlertAdapterConfig {
match args.adapter {
FleetAlertAdapterArg::Slack => FleetAlertAdapterConfig::Slack {
webhook_env: args.slack_webhook_env.clone(),
channel: None,
},
FleetAlertAdapterArg::Webhook => FleetAlertAdapterConfig::Webhook {
url_env: args.webhook_url_env.clone(),
secret_env: args.webhook_secret_env.clone(),
},
FleetAlertAdapterArg::PagerDuty => FleetAlertAdapterConfig::PagerDuty {
routing_key_env: args.pagerduty_routing_key_env.clone(),
severity: args.pagerduty_severity.clone(),
},
}
}
let manager = FleetManager::open(workspace)?; let manager = FleetManager::open(workspace)?;
match args.command { match args.command {
FleetCommand::Init => { FleetCommand::Init => {
@@ -1591,6 +1698,30 @@ async fn run_fleet_command(workspace: &Path, args: FleetArgs) -> Result<()> {
println!("stopped: {stopped}"); println!("stopped: {stopped}");
Ok(()) Ok(())
} }
FleetCommand::AlertDryRun(args) => {
let class = alert_event_class(args.event);
let adapter = alert_adapter(&args);
let event = FleetAlertEvent {
class,
run_id: FleetRunId::from(args.run_id.clone()),
worker_id: args.worker_id.clone(),
task_id: args.task_id.clone(),
status: alert_status(class, args.status.clone()),
reason: args.reason.clone(),
};
let dispatcher = FleetAlertDispatcher::new(
FleetAlertConfig::dry_run_for_adapter(adapter),
FleetEnvSecretResolver,
);
let deliveries = dispatcher.dispatch(&event)?;
for delivery in deliveries {
println!(
"{}",
serde_json::to_string_pretty(&delivery.redacted_payload)?
);
}
Ok(())
}
} }
} }
+67
View File
@@ -175,6 +175,73 @@ an explicit verifier pass completes.
} }
``` ```
## Alerts
Fleet alerting is disabled by default. A caller must supply an enabled alert
config before anything is sent. Routes match typed fleet event classes, not log
strings:
- `stale`
- `restart_exhausted`
- `needs_human`
- `budget_exceeded`
- `verifier_failed`
- `run_completed`
Adapter config stores environment variable names, not secret values. Send-time
code resolves those names from the environment or a future secrets provider.
Ledger records store only audit labels such as `slack`, `webhook`, or
`pagerduty`; task specs persisted in the ledger redact webhook URLs and routing
keys.
Example alert config shape:
```json
{
"enabled": true,
"dry_run": true,
"routes": [
{
"events": ["stale", "restart_exhausted", "verifier_failed"],
"adapter": "ops-slack"
},
{
"events": ["restart_exhausted"],
"adapter": "pager"
}
],
"adapters": {
"ops-slack": {
"kind": "slack",
"webhook_env": "CODEWHALE_FLEET_SLACK_WEBHOOK",
"channel": "#codewhale-fleet"
},
"pager": {
"kind": "pager_duty",
"routing_key_env": "CODEWHALE_FLEET_PAGERDUTY_ROUTING_KEY",
"severity": "critical"
}
}
}
```
Use dry-run to inspect a redacted adapter payload without sending:
```sh
codewhale fleet alert-dry-run \
--event stale \
--run-id fleet-demo \
--worker-id fleet-demo-local-1 \
--task-id release-triage \
--reason "worker heartbeat stale since 2026-06-13T02:00:00Z" \
--adapter slack
```
The payload includes the run id, worker id, task id, status, short reason, and
safe inspection commands such as `codewhale fleet status` and
`codewhale fleet inspect <worker-id>`. Endpoints, webhook secrets, and
PagerDuty routing keys are shown as `<redacted:env:...>`.
## Host Adapters ## Host Adapters
The host adapter boundary supports local child processes and explicit SSH The host adapter boundary supports local child processes and explicit SSH