feat(whaleflow): mark mock cancellation and budgets (#2841)

This commit is contained in:
Hunter Bown
2026-06-05 23:17:01 -07:00
committed by GitHub
parent 6a527fc161
commit 7fc4ec820a
3 changed files with 248 additions and 28 deletions
+5 -2
View File
@@ -61,8 +61,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
examples plus a one-pass repair for common `ctx.*` authoring aliases (#2670).
Leaf, branch, and workflow execution results now carry deterministic token
and cost telemetry fields that the mock executor can aggregate without live
provider calls or runtime sub-agent fanout (#2486). A crate-only replay
executor now evaluates workflows from recorded leaf/control records, computes
provider calls or runtime sub-agent fanout (#2486). The mock executor now
carries crate-local cancellation and budget-exhaustion status markers so the
branch/leaf runtime contract can be tested before live workflow execution is
exposed (#2669). A crate-only replay executor now evaluates workflows from
recorded leaf/control records, computes
stable SHA-256 leaf input hashes, and marks missing records as
`replay_diverged` instead of calling models again (#2673); the runtime replay
command and live-provider replay fallback remain deferred. The crate also now
+5 -2
View File
@@ -61,8 +61,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
examples plus a one-pass repair for common `ctx.*` authoring aliases (#2670).
Leaf, branch, and workflow execution results now carry deterministic token
and cost telemetry fields that the mock executor can aggregate without live
provider calls or runtime sub-agent fanout (#2486). A crate-only replay
executor now evaluates workflows from recorded leaf/control records, computes
provider calls or runtime sub-agent fanout (#2486). The mock executor now
carries crate-local cancellation and budget-exhaustion status markers so the
branch/leaf runtime contract can be tested before live workflow execution is
exposed (#2669). A crate-only replay executor now evaluates workflows from
recorded leaf/control records, computes
stable SHA-256 leaf input hashes, and marks missing records as
`replay_diverged` instead of calling models again (#2673); the runtime replay
command and live-provider replay fallback remain deferred. The crate also now
+238 -24
View File
@@ -542,6 +542,7 @@ pub enum WorkflowRunStatus {
Succeeded,
Failed,
Cancelled,
BudgetExceeded,
ReplayDiverged,
}
@@ -591,9 +592,24 @@ impl WorkflowExecution {
self.status = WorkflowRunStatus::Failed;
}
pub fn mark_cancelled(&mut self) {
self.status = WorkflowRunStatus::Cancelled;
}
pub fn mark_budget_exceeded(&mut self) {
self.status = WorkflowRunStatus::BudgetExceeded;
}
pub(crate) fn mark_replay_diverged(&mut self) {
self.status = WorkflowRunStatus::ReplayDiverged;
}
fn should_stop_mock_execution(&self) -> bool {
matches!(
self.status,
WorkflowRunStatus::Cancelled | WorkflowRunStatus::BudgetExceeded
)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -646,6 +662,9 @@ pub struct MockWorkflowExecutor {
leaf_outcomes: BTreeMap<String, MockLeafOutcome>,
predicate_results: BTreeMap<String, Vec<bool>>,
generated_nodes: BTreeMap<String, Vec<WorkflowNode>>,
cancelled: bool,
max_leaf_steps: Option<u32>,
leaf_steps_executed: u32,
}
impl MockWorkflowExecutor {
@@ -680,6 +699,16 @@ impl MockWorkflowExecutor {
self
}
pub fn with_cancelled(mut self) -> Self {
self.cancelled = true;
self
}
pub fn with_max_leaf_steps(mut self, max_leaf_steps: u32) -> Self {
self.max_leaf_steps = Some(max_leaf_steps);
self
}
pub fn run(
&mut self,
spec: &WorkflowSpec,
@@ -696,6 +725,9 @@ impl MockWorkflowExecutor {
execution: &mut WorkflowExecution,
) -> Result<(), WorkflowExecutionError> {
for node in nodes {
if execution.should_stop_mock_execution() {
break;
}
self.execute_node(node, execution)?;
}
Ok(())
@@ -758,23 +790,14 @@ impl MockWorkflowExecutor {
) -> Result<(), WorkflowExecutionError> {
let before = execution.leaf_results.len();
self.execute_nodes(&spec.children, execution)?;
let branch_failed = execution.leaf_results[before..]
.iter()
.any(|result| result.status != WorkflowRunStatus::Succeeded);
let status = if branch_failed {
WorkflowRunStatus::Failed
} else {
WorkflowRunStatus::Succeeded
};
let status = aggregate_mock_status(&execution.leaf_results[before..]);
let mut usage = WorkflowUsage::default();
let mut memo_usage = WorkflowMemoUsage::default();
for result in &execution.leaf_results[before..] {
usage.add_assign(result.usage);
memo_usage.add_assign(result.memo_usage);
}
if status == WorkflowRunStatus::Failed {
execution.mark_failed();
}
mark_execution_for_status(execution, status);
execution.branch_results.push(BranchResult {
branch_id: spec.id.clone(),
task_id: spec.id.clone(),
@@ -795,13 +818,8 @@ impl MockWorkflowExecutor {
}
fn execute_leaf(&mut self, spec: &LeafSpec, execution: &mut WorkflowExecution) {
let outcome = self
.leaf_outcomes
.remove(&spec.id)
.unwrap_or_else(|| MockLeafOutcome::succeeded(format!("mock leaf {}", spec.id)));
if outcome.status != WorkflowRunStatus::Succeeded {
execution.mark_failed();
}
let outcome = self.mock_leaf_outcome(spec);
mark_execution_for_status(execution, outcome.status);
execution.usage.add_assign(outcome.usage);
execution.memo_usage.add_assign(outcome.memo_usage);
execution.leaf_results.push(LeafResult {
@@ -824,21 +842,27 @@ impl MockWorkflowExecutor {
let mut iterations = 0;
let mut passed = false;
while iterations < max_iterations {
if execution.should_stop_mock_execution() {
break;
}
iterations += 1;
self.execute_nodes(&spec.children, execution)?;
if execution.should_stop_mock_execution() {
break;
}
if self.next_predicate_result(&spec.id) {
passed = true;
break;
}
}
let status = if passed {
let status = if execution.should_stop_mock_execution() {
execution.status
} else if passed {
WorkflowRunStatus::Succeeded
} else {
WorkflowRunStatus::Failed
};
if status == WorkflowRunStatus::Failed {
execution.mark_failed();
}
mark_execution_for_status(execution, status);
execution.control_node_results.push(ControlNodeResult {
node_id: spec.id.clone(),
kind: ControlNodeKind::LoopUntil,
@@ -861,10 +885,15 @@ impl MockWorkflowExecutor {
&spec.else_nodes
};
self.execute_nodes(selected_nodes, execution)?;
let status = if execution.should_stop_mock_execution() {
execution.status
} else {
WorkflowRunStatus::Succeeded
};
execution.control_node_results.push(ControlNodeResult {
node_id: spec.id.clone(),
kind: ControlNodeKind::Cond,
status: WorkflowRunStatus::Succeeded,
status,
selected_children: selected_nodes.iter().map(node_id).collect(),
summary: Some(format!("predicate_result={passed}")),
});
@@ -882,16 +911,47 @@ impl MockWorkflowExecutor {
}
validate_workflow_node_shapes(&nodes)?;
self.execute_nodes(&nodes, execution)?;
let status = if execution.should_stop_mock_execution() {
execution.status
} else {
WorkflowRunStatus::Succeeded
};
execution.control_node_results.push(ControlNodeResult {
node_id: spec.id.clone(),
kind: ControlNodeKind::Expand,
status: WorkflowRunStatus::Succeeded,
status,
selected_children: nodes.iter().map(node_id).collect(),
summary: Some(format!("expanded_from={}", spec.source)),
});
Ok(())
}
fn mock_leaf_outcome(&mut self, spec: &LeafSpec) -> MockLeafOutcome {
if self.cancelled {
return MockLeafOutcome {
status: WorkflowRunStatus::Cancelled,
usage: WorkflowUsage::default(),
memo_usage: WorkflowMemoUsage::default(),
output: Some("mock workflow cancelled before leaf execution".to_string()),
artifacts: Vec::new(),
};
}
if self.max_leaf_steps == Some(self.leaf_steps_executed) || spec.budget.max_steps == Some(0)
{
return MockLeafOutcome {
status: WorkflowRunStatus::BudgetExceeded,
usage: WorkflowUsage::default(),
memo_usage: WorkflowMemoUsage::default(),
output: Some("mock workflow leaf budget exhausted".to_string()),
artifacts: Vec::new(),
};
}
self.leaf_steps_executed = self.leaf_steps_executed.saturating_add(1);
self.leaf_outcomes
.remove(&spec.id)
.unwrap_or_else(|| MockLeafOutcome::succeeded(format!("mock leaf {}", spec.id)))
}
fn next_predicate_result(&mut self, node_id: &str) -> bool {
let Some(results) = self.predicate_results.get_mut(node_id) else {
return false;
@@ -903,6 +963,37 @@ impl MockWorkflowExecutor {
}
}
fn aggregate_mock_status(results: &[LeafResult]) -> WorkflowRunStatus {
if results
.iter()
.any(|result| result.status == WorkflowRunStatus::Cancelled)
{
WorkflowRunStatus::Cancelled
} else if results
.iter()
.any(|result| result.status == WorkflowRunStatus::BudgetExceeded)
{
WorkflowRunStatus::BudgetExceeded
} else if results
.iter()
.any(|result| result.status != WorkflowRunStatus::Succeeded)
{
WorkflowRunStatus::Failed
} else {
WorkflowRunStatus::Succeeded
}
}
fn mark_execution_for_status(execution: &mut WorkflowExecution, status: WorkflowRunStatus) {
match status {
WorkflowRunStatus::Succeeded | WorkflowRunStatus::Pending | WorkflowRunStatus::Running => {}
WorkflowRunStatus::Failed => execution.mark_failed(),
WorkflowRunStatus::Cancelled => execution.mark_cancelled(),
WorkflowRunStatus::BudgetExceeded => execution.mark_budget_exceeded(),
WorkflowRunStatus::ReplayDiverged => execution.mark_replay_diverged(),
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct BranchCandidate {
pub branch_id: String,
@@ -1720,6 +1811,21 @@ mod tests {
})
}
fn leaf_node_with_budget(id: &str, budget: BudgetSpec) -> WorkflowNode {
WorkflowNode::Leaf(LeafSpec {
id: id.to_string(),
prompt: format!("run {id}"),
agent_type: AgentType::General,
mode: TaskMode::ReadOnly,
isolation: IsolationMode::Shared,
file_scope: Vec::new(),
depends_on_results: Vec::new(),
budget,
permissions: PermissionSpec::default(),
model_policy: ModelPolicy::default(),
})
}
fn invalid_leaf_node(id: &str) -> WorkflowNode {
WorkflowNode::Leaf(LeafSpec {
id: id.to_string(),
@@ -2399,6 +2505,114 @@ mod tests {
);
}
#[test]
fn mock_executor_marks_cancelled_before_leaf() {
let workflow = workflow_spec(vec![WorkflowNode::BranchSet(BranchSpec {
id: "discover".to_string(),
description: None,
parallel: true,
budget: BudgetSpec::default(),
permissions: PermissionSpec::default(),
model_policy: ModelPolicy::default(),
children: vec![leaf_node("scan-readme"), leaf_node("scan-tests")],
})]);
let mut executor = MockWorkflowExecutor::new().with_cancelled();
let execution = executor.run(&workflow).expect("mock workflow should run");
assert_eq!(execution.status, WorkflowRunStatus::Cancelled);
assert_eq!(execution.leaf_results.len(), 1);
assert_eq!(
execution.leaf_results[0].status,
WorkflowRunStatus::Cancelled
);
assert_eq!(
execution.branch_results[0].status,
WorkflowRunStatus::Cancelled
);
assert_eq!(
control_result(&execution, "discover").status,
WorkflowRunStatus::Cancelled
);
}
#[test]
fn mock_executor_stops_when_global_leaf_budget_is_exhausted() {
let workflow = workflow_spec(vec![WorkflowNode::BranchSet(BranchSpec {
id: "discover".to_string(),
description: None,
parallel: true,
budget: BudgetSpec::default(),
permissions: PermissionSpec::default(),
model_policy: ModelPolicy::default(),
children: vec![
leaf_node("scan-readme"),
leaf_node("scan-config"),
leaf_node("scan-tests"),
],
})]);
let mut executor = MockWorkflowExecutor::new().with_max_leaf_steps(1);
let execution = executor.run(&workflow).expect("mock workflow should run");
assert_eq!(execution.status, WorkflowRunStatus::BudgetExceeded);
assert_eq!(
execution
.leaf_results
.iter()
.map(|result| (result.leaf_id.as_str(), result.status))
.collect::<Vec<_>>(),
vec![
("scan-readme", WorkflowRunStatus::Succeeded),
("scan-config", WorkflowRunStatus::BudgetExceeded)
]
);
assert_eq!(
execution.branch_results[0].status,
WorkflowRunStatus::BudgetExceeded
);
}
#[test]
fn mock_executor_honors_zero_step_leaf_budget() {
let workflow = workflow_spec(vec![WorkflowNode::BranchSet(BranchSpec {
id: "verify".to_string(),
description: None,
parallel: false,
budget: BudgetSpec::default(),
permissions: PermissionSpec::default(),
model_policy: ModelPolicy::default(),
children: vec![
leaf_node_with_budget(
"run-tests",
BudgetSpec {
max_steps: Some(0),
timeout_secs: None,
max_parallel: None,
},
),
leaf_node("summarize"),
],
})]);
let mut executor = MockWorkflowExecutor::new();
let execution = executor.run(&workflow).expect("mock workflow should run");
assert_eq!(execution.status, WorkflowRunStatus::BudgetExceeded);
assert_eq!(execution.leaf_results.len(), 1);
assert_eq!(
execution.leaf_results[0].status,
WorkflowRunStatus::BudgetExceeded
);
assert!(
execution.leaf_results[0]
.output
.as_deref()
.unwrap_or_default()
.contains("budget exhausted")
);
}
#[test]
fn loop_until_stops_on_pass() {
let workflow = workflow_spec(vec![WorkflowNode::LoopUntil(LoopUntilSpec {