feat(whaleflow): add mock executor skeleton
Add a crate-local mock executor over WorkflowSpec that records leaf, branch, and control-node results for Sequence, BranchSet, Leaf, Reduce, TeacherReview, LoopUntil, Cond, and Expand. Add reducer scaffolding for BranchTournament and ParetoFrontier, plus #2669 acceptance-style tests, without exposing workflow_run, spawning agents, or applying worktrees. Refs #2669. Harvests narrow WhaleFlow executor intent from #2482/#2486. Co-authored-by: AdityaVG13 <44177453+AdityaVG13@users.noreply.github.com>
This commit is contained in:
+5
-1
@@ -40,7 +40,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
release-safe. The foundation now includes explicit `WorkflowSpec`,
|
||||
`WorkflowNode`, branch/leaf/policy metadata structs, plus serializable branch,
|
||||
leaf, and control-node result records toward the #2668 TraceStore contract.
|
||||
Thanks @AdityaVG13 for the WhaleFlow draft and cost-tracking direction.
|
||||
It also adds a crate-local mock executor skeleton for Sequence, BranchSet,
|
||||
Leaf, Reduce, LoopUntil, Cond, Expand, BranchTournament, and ParetoFrontier
|
||||
control flow so #2669 can progress without spawning agents, applying
|
||||
worktrees, or exposing a `workflow_run` runtime tool yet. Thanks @AdityaVG13
|
||||
for the WhaleFlow draft and cost-tracking direction.
|
||||
- Added a state-store v2 schema migration for WhaleFlow trace tables covering
|
||||
workflow, branch, leaf, control-node, and teacher-candidate runs. The
|
||||
migration creates persistence shape only; workflow execution and replay
|
||||
|
||||
@@ -40,7 +40,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
release-safe. The foundation now includes explicit `WorkflowSpec`,
|
||||
`WorkflowNode`, branch/leaf/policy metadata structs, plus serializable branch,
|
||||
leaf, and control-node result records toward the #2668 TraceStore contract.
|
||||
Thanks @AdityaVG13 for the WhaleFlow draft and cost-tracking direction.
|
||||
It also adds a crate-local mock executor skeleton for Sequence, BranchSet,
|
||||
Leaf, Reduce, LoopUntil, Cond, Expand, BranchTournament, and ParetoFrontier
|
||||
control flow so #2669 can progress without spawning agents, applying
|
||||
worktrees, or exposing a `workflow_run` runtime tool yet. Thanks @AdityaVG13
|
||||
for the WhaleFlow draft and cost-tracking direction.
|
||||
- Added a state-store v2 schema migration for WhaleFlow trace tables covering
|
||||
workflow, branch, leaf, control-node, and teacher-candidate runs. The
|
||||
migration creates persistence shape only; workflow execution and replay
|
||||
|
||||
@@ -153,6 +153,8 @@ pub struct ExpandSpec {
|
||||
pub id: String,
|
||||
pub source: String,
|
||||
#[serde(default)]
|
||||
pub max_children: Option<usize>,
|
||||
#[serde(default)]
|
||||
pub template: Option<Box<WorkflowNode>>,
|
||||
}
|
||||
|
||||
@@ -480,6 +482,454 @@ pub enum ControlNodeKind {
|
||||
Expand,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct WorkflowExecution {
|
||||
pub status: WorkflowRunStatus,
|
||||
#[serde(default)]
|
||||
pub leaf_results: Vec<LeafResult>,
|
||||
#[serde(default)]
|
||||
pub branch_results: Vec<BranchResult>,
|
||||
#[serde(default)]
|
||||
pub control_node_results: Vec<ControlNodeResult>,
|
||||
}
|
||||
|
||||
impl Default for WorkflowExecution {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
status: WorkflowRunStatus::Succeeded,
|
||||
leaf_results: Vec::new(),
|
||||
branch_results: Vec::new(),
|
||||
control_node_results: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl WorkflowExecution {
|
||||
pub fn mark_failed(&mut self) {
|
||||
self.status = WorkflowRunStatus::Failed;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct MockLeafOutcome {
|
||||
pub status: WorkflowRunStatus,
|
||||
#[serde(default)]
|
||||
pub output: Option<String>,
|
||||
#[serde(default)]
|
||||
pub artifacts: Vec<String>,
|
||||
}
|
||||
|
||||
impl MockLeafOutcome {
|
||||
pub fn succeeded(output: impl Into<String>) -> Self {
|
||||
Self {
|
||||
status: WorkflowRunStatus::Succeeded,
|
||||
output: Some(output.into()),
|
||||
artifacts: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn failed(output: impl Into<String>) -> Self {
|
||||
Self {
|
||||
status: WorkflowRunStatus::Failed,
|
||||
output: Some(output.into()),
|
||||
artifacts: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct MockWorkflowExecutor {
|
||||
leaf_outcomes: BTreeMap<String, MockLeafOutcome>,
|
||||
predicate_results: BTreeMap<String, Vec<bool>>,
|
||||
generated_nodes: BTreeMap<String, Vec<WorkflowNode>>,
|
||||
}
|
||||
|
||||
impl MockWorkflowExecutor {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub fn with_leaf_outcome(
|
||||
mut self,
|
||||
leaf_id: impl Into<String>,
|
||||
outcome: MockLeafOutcome,
|
||||
) -> Self {
|
||||
self.leaf_outcomes.insert(leaf_id.into(), outcome);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_predicate_results(
|
||||
mut self,
|
||||
node_id: impl Into<String>,
|
||||
results: Vec<bool>,
|
||||
) -> Self {
|
||||
self.predicate_results.insert(node_id.into(), results);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_generated_nodes(
|
||||
mut self,
|
||||
node_id: impl Into<String>,
|
||||
nodes: Vec<WorkflowNode>,
|
||||
) -> Self {
|
||||
self.generated_nodes.insert(node_id.into(), nodes);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn run(
|
||||
&mut self,
|
||||
spec: &WorkflowSpec,
|
||||
) -> Result<WorkflowExecution, WorkflowExecutionError> {
|
||||
validate_workflow_nodes(&spec.nodes)?;
|
||||
let mut execution = WorkflowExecution::default();
|
||||
self.execute_nodes(&spec.nodes, &mut execution)?;
|
||||
Ok(execution)
|
||||
}
|
||||
|
||||
fn execute_nodes(
|
||||
&mut self,
|
||||
nodes: &[WorkflowNode],
|
||||
execution: &mut WorkflowExecution,
|
||||
) -> Result<(), WorkflowExecutionError> {
|
||||
for node in nodes {
|
||||
self.execute_node(node, execution)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn execute_node(
|
||||
&mut self,
|
||||
node: &WorkflowNode,
|
||||
execution: &mut WorkflowExecution,
|
||||
) -> Result<(), WorkflowExecutionError> {
|
||||
match node {
|
||||
WorkflowNode::BranchSet(spec) => self.execute_branch_set(spec, execution),
|
||||
WorkflowNode::Leaf(spec) => {
|
||||
self.execute_leaf(spec, execution);
|
||||
Ok(())
|
||||
}
|
||||
WorkflowNode::Sequence(spec) => {
|
||||
self.execute_nodes(&spec.children, execution)?;
|
||||
execution.control_node_results.push(ControlNodeResult {
|
||||
node_id: spec.id.clone(),
|
||||
kind: ControlNodeKind::Sequence,
|
||||
status: execution.status,
|
||||
selected_children: spec.children.iter().map(node_id).collect(),
|
||||
summary: Some("sequence executed in declaration order".to_string()),
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
WorkflowNode::Reduce(spec) => {
|
||||
execution.control_node_results.push(ControlNodeResult {
|
||||
node_id: spec.id.clone(),
|
||||
kind: ControlNodeKind::Reduce,
|
||||
status: WorkflowRunStatus::Succeeded,
|
||||
selected_children: spec.inputs.clone(),
|
||||
summary: Some(spec.prompt.clone()),
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
WorkflowNode::TeacherReview(spec) => {
|
||||
execution.control_node_results.push(ControlNodeResult {
|
||||
node_id: spec.id.clone(),
|
||||
kind: ControlNodeKind::TeacherReview,
|
||||
status: WorkflowRunStatus::Succeeded,
|
||||
selected_children: spec.candidates.clone(),
|
||||
summary: Some(
|
||||
"teacher review scaffold selected declared candidates".to_string(),
|
||||
),
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
WorkflowNode::LoopUntil(spec) => self.execute_loop_until(spec, execution),
|
||||
WorkflowNode::Cond(spec) => self.execute_cond(spec, execution),
|
||||
WorkflowNode::Expand(spec) => self.execute_expand(spec, execution),
|
||||
}
|
||||
}
|
||||
|
||||
fn execute_branch_set(
|
||||
&mut self,
|
||||
spec: &BranchSpec,
|
||||
execution: &mut WorkflowExecution,
|
||||
) -> Result<(), WorkflowExecutionError> {
|
||||
let before = execution.leaf_results.len();
|
||||
self.execute_nodes(&spec.children, execution)?;
|
||||
let branch_failed = execution.leaf_results[before..]
|
||||
.iter()
|
||||
.any(|result| result.status != WorkflowRunStatus::Succeeded);
|
||||
let status = if branch_failed {
|
||||
WorkflowRunStatus::Failed
|
||||
} else {
|
||||
WorkflowRunStatus::Succeeded
|
||||
};
|
||||
if status == WorkflowRunStatus::Failed {
|
||||
execution.mark_failed();
|
||||
}
|
||||
execution.branch_results.push(BranchResult {
|
||||
branch_id: spec.id.clone(),
|
||||
task_id: spec.id.clone(),
|
||||
status,
|
||||
artifacts: Vec::new(),
|
||||
notes: Some("mock branch set executed without runtime fanout".to_string()),
|
||||
});
|
||||
execution.control_node_results.push(ControlNodeResult {
|
||||
node_id: spec.id.clone(),
|
||||
kind: ControlNodeKind::BranchSet,
|
||||
status,
|
||||
selected_children: spec.children.iter().map(node_id).collect(),
|
||||
summary: Some("branch set scaffold executed children deterministically".to_string()),
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn execute_leaf(&mut self, spec: &LeafSpec, execution: &mut WorkflowExecution) {
|
||||
let outcome = self
|
||||
.leaf_outcomes
|
||||
.remove(&spec.id)
|
||||
.unwrap_or_else(|| MockLeafOutcome::succeeded(format!("mock leaf {}", spec.id)));
|
||||
if outcome.status != WorkflowRunStatus::Succeeded {
|
||||
execution.mark_failed();
|
||||
}
|
||||
execution.leaf_results.push(LeafResult {
|
||||
leaf_id: spec.id.clone(),
|
||||
task_id: spec.id.clone(),
|
||||
status: outcome.status,
|
||||
output: outcome.output,
|
||||
artifacts: outcome.artifacts,
|
||||
});
|
||||
}
|
||||
|
||||
fn execute_loop_until(
|
||||
&mut self,
|
||||
spec: &LoopUntilSpec,
|
||||
execution: &mut WorkflowExecution,
|
||||
) -> Result<(), WorkflowExecutionError> {
|
||||
let max_iterations = spec.max_iterations.unwrap_or(1).max(1);
|
||||
let mut iterations = 0;
|
||||
let mut passed = false;
|
||||
while iterations < max_iterations {
|
||||
iterations += 1;
|
||||
self.execute_nodes(&spec.children, execution)?;
|
||||
if self.next_predicate_result(&spec.id) {
|
||||
passed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
let status = if passed {
|
||||
WorkflowRunStatus::Succeeded
|
||||
} else {
|
||||
WorkflowRunStatus::Failed
|
||||
};
|
||||
if status == WorkflowRunStatus::Failed {
|
||||
execution.mark_failed();
|
||||
}
|
||||
execution.control_node_results.push(ControlNodeResult {
|
||||
node_id: spec.id.clone(),
|
||||
kind: ControlNodeKind::LoopUntil,
|
||||
status,
|
||||
selected_children: spec.children.iter().map(node_id).collect(),
|
||||
summary: Some(format!("loop_until iterations={iterations}")),
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn execute_cond(
|
||||
&mut self,
|
||||
spec: &CondSpec,
|
||||
execution: &mut WorkflowExecution,
|
||||
) -> Result<(), WorkflowExecutionError> {
|
||||
let passed = self.next_predicate_result(&spec.id);
|
||||
let selected_nodes = if passed {
|
||||
&spec.then_nodes
|
||||
} else {
|
||||
&spec.else_nodes
|
||||
};
|
||||
self.execute_nodes(selected_nodes, execution)?;
|
||||
execution.control_node_results.push(ControlNodeResult {
|
||||
node_id: spec.id.clone(),
|
||||
kind: ControlNodeKind::Cond,
|
||||
status: WorkflowRunStatus::Succeeded,
|
||||
selected_children: selected_nodes.iter().map(node_id).collect(),
|
||||
summary: Some(format!("predicate_result={passed}")),
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn execute_expand(
|
||||
&mut self,
|
||||
spec: &ExpandSpec,
|
||||
execution: &mut WorkflowExecution,
|
||||
) -> Result<(), WorkflowExecutionError> {
|
||||
let mut nodes = self.generated_nodes.remove(&spec.id).unwrap_or_default();
|
||||
if let Some(max_children) = spec.max_children {
|
||||
nodes.truncate(max_children);
|
||||
}
|
||||
validate_workflow_nodes(&nodes)?;
|
||||
self.execute_nodes(&nodes, execution)?;
|
||||
execution.control_node_results.push(ControlNodeResult {
|
||||
node_id: spec.id.clone(),
|
||||
kind: ControlNodeKind::Expand,
|
||||
status: WorkflowRunStatus::Succeeded,
|
||||
selected_children: nodes.iter().map(node_id).collect(),
|
||||
summary: Some(format!("expanded_from={}", spec.source)),
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn next_predicate_result(&mut self, node_id: &str) -> bool {
|
||||
let Some(results) = self.predicate_results.get_mut(node_id) else {
|
||||
return false;
|
||||
};
|
||||
if results.is_empty() {
|
||||
return false;
|
||||
}
|
||||
results.remove(0)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct BranchCandidate {
|
||||
pub branch_id: String,
|
||||
pub status: WorkflowRunStatus,
|
||||
pub score: u32,
|
||||
pub cost: u64,
|
||||
#[serde(default)]
|
||||
pub diversity_key: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct BranchTournament {
|
||||
#[serde(default)]
|
||||
pub min_score: u32,
|
||||
}
|
||||
|
||||
impl BranchTournament {
|
||||
pub fn select(&self, candidates: &[BranchCandidate]) -> Option<BranchCandidate> {
|
||||
candidates
|
||||
.iter()
|
||||
.filter(|candidate| {
|
||||
candidate.status == WorkflowRunStatus::Succeeded
|
||||
&& candidate.score >= self.min_score
|
||||
})
|
||||
.min_by_key(|candidate| (candidate.cost, std::cmp::Reverse(candidate.score)))
|
||||
.cloned()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct ParetoFrontier {
|
||||
#[serde(default = "default_frontier_limit")]
|
||||
pub max_items: usize,
|
||||
}
|
||||
|
||||
impl Default for ParetoFrontier {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
max_items: default_frontier_limit(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ParetoFrontier {
|
||||
pub fn select(&self, candidates: &[BranchCandidate]) -> Vec<BranchCandidate> {
|
||||
let mut frontier: Vec<_> = candidates
|
||||
.iter()
|
||||
.filter(|candidate| candidate.status == WorkflowRunStatus::Succeeded)
|
||||
.filter(|candidate| {
|
||||
!candidates.iter().any(|other| {
|
||||
other.status == WorkflowRunStatus::Succeeded
|
||||
&& other.score >= candidate.score
|
||||
&& other.cost <= candidate.cost
|
||||
&& (other.score > candidate.score || other.cost < candidate.cost)
|
||||
})
|
||||
})
|
||||
.cloned()
|
||||
.collect();
|
||||
frontier.sort_by_key(|candidate| (std::cmp::Reverse(candidate.score), candidate.cost));
|
||||
frontier.truncate(self.max_items.max(1));
|
||||
frontier
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Error)]
|
||||
pub enum WorkflowExecutionError {
|
||||
#[error("{kind} node id must not be empty")]
|
||||
EmptyNodeId { kind: &'static str },
|
||||
#[error("leaf `{leaf}` prompt must not be empty")]
|
||||
EmptyLeafPrompt { leaf: String },
|
||||
#[error("duplicate workflow node `{node}`")]
|
||||
DuplicateNodeId { node: String },
|
||||
}
|
||||
|
||||
fn default_frontier_limit() -> usize {
|
||||
8
|
||||
}
|
||||
|
||||
fn node_id(node: &WorkflowNode) -> String {
|
||||
match node {
|
||||
WorkflowNode::BranchSet(spec) => spec.id.clone(),
|
||||
WorkflowNode::Leaf(spec) => spec.id.clone(),
|
||||
WorkflowNode::Sequence(spec) => spec.id.clone(),
|
||||
WorkflowNode::Reduce(spec) => spec.id.clone(),
|
||||
WorkflowNode::TeacherReview(spec) => spec.id.clone(),
|
||||
WorkflowNode::LoopUntil(spec) => spec.id.clone(),
|
||||
WorkflowNode::Cond(spec) => spec.id.clone(),
|
||||
WorkflowNode::Expand(spec) => spec.id.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_workflow_nodes(nodes: &[WorkflowNode]) -> Result<(), WorkflowExecutionError> {
|
||||
let mut seen = BTreeSet::new();
|
||||
validate_workflow_nodes_inner(nodes, &mut seen)
|
||||
}
|
||||
|
||||
fn validate_workflow_nodes_inner(
|
||||
nodes: &[WorkflowNode],
|
||||
seen: &mut BTreeSet<String>,
|
||||
) -> Result<(), WorkflowExecutionError> {
|
||||
for node in nodes {
|
||||
let id = node_id(node);
|
||||
let kind = control_kind_name(node);
|
||||
if id.trim().is_empty() {
|
||||
return Err(WorkflowExecutionError::EmptyNodeId { kind });
|
||||
}
|
||||
if !seen.insert(id.clone()) {
|
||||
return Err(WorkflowExecutionError::DuplicateNodeId { node: id });
|
||||
}
|
||||
match node {
|
||||
WorkflowNode::BranchSet(spec) => validate_workflow_nodes_inner(&spec.children, seen)?,
|
||||
WorkflowNode::Leaf(spec) => {
|
||||
if spec.prompt.trim().is_empty() {
|
||||
return Err(WorkflowExecutionError::EmptyLeafPrompt {
|
||||
leaf: spec.id.clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
WorkflowNode::Sequence(spec) => validate_workflow_nodes_inner(&spec.children, seen)?,
|
||||
WorkflowNode::LoopUntil(spec) => validate_workflow_nodes_inner(&spec.children, seen)?,
|
||||
WorkflowNode::Cond(spec) => {
|
||||
validate_workflow_nodes_inner(&spec.then_nodes, seen)?;
|
||||
validate_workflow_nodes_inner(&spec.else_nodes, seen)?;
|
||||
}
|
||||
WorkflowNode::Reduce(_) | WorkflowNode::TeacherReview(_) | WorkflowNode::Expand(_) => {}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn control_kind_name(node: &WorkflowNode) -> &'static str {
|
||||
match node {
|
||||
WorkflowNode::BranchSet(_) => "branch_set",
|
||||
WorkflowNode::Leaf(_) => "leaf",
|
||||
WorkflowNode::Sequence(_) => "sequence",
|
||||
WorkflowNode::Reduce(_) => "reduce",
|
||||
WorkflowNode::TeacherReview(_) => "teacher_review",
|
||||
WorkflowNode::LoopUntil(_) => "loop_until",
|
||||
WorkflowNode::Cond(_) => "cond",
|
||||
WorkflowNode::Expand(_) => "expand",
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Error)]
|
||||
pub enum WorkflowValidationError {
|
||||
#[error("{field} must not be empty")]
|
||||
@@ -697,6 +1147,76 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn leaf_node(id: &str) -> WorkflowNode {
|
||||
WorkflowNode::Leaf(LeafSpec {
|
||||
id: id.to_string(),
|
||||
prompt: format!("run {id}"),
|
||||
agent_type: AgentType::General,
|
||||
mode: TaskMode::ReadOnly,
|
||||
isolation: IsolationMode::Shared,
|
||||
file_scope: Vec::new(),
|
||||
depends_on_results: Vec::new(),
|
||||
budget: BudgetSpec::default(),
|
||||
permissions: PermissionSpec::default(),
|
||||
model_policy: ModelPolicy::default(),
|
||||
})
|
||||
}
|
||||
|
||||
fn invalid_leaf_node(id: &str) -> WorkflowNode {
|
||||
WorkflowNode::Leaf(LeafSpec {
|
||||
id: id.to_string(),
|
||||
prompt: " ".to_string(),
|
||||
agent_type: AgentType::General,
|
||||
mode: TaskMode::ReadOnly,
|
||||
isolation: IsolationMode::Shared,
|
||||
file_scope: Vec::new(),
|
||||
depends_on_results: Vec::new(),
|
||||
budget: BudgetSpec::default(),
|
||||
permissions: PermissionSpec::default(),
|
||||
model_policy: ModelPolicy::default(),
|
||||
})
|
||||
}
|
||||
|
||||
fn workflow_spec(nodes: Vec<WorkflowNode>) -> WorkflowSpec {
|
||||
WorkflowSpec {
|
||||
id: Some("mock-workflow".to_string()),
|
||||
goal: "prove mock executor control flow".to_string(),
|
||||
description: None,
|
||||
budget: BudgetSpec::default(),
|
||||
permissions: PermissionSpec::default(),
|
||||
model_policy: ModelPolicy::default(),
|
||||
promotion_policy: PromotionPolicy::default(),
|
||||
nodes,
|
||||
}
|
||||
}
|
||||
|
||||
fn control_result<'a>(
|
||||
execution: &'a WorkflowExecution,
|
||||
node_id: &str,
|
||||
) -> &'a ControlNodeResult {
|
||||
execution
|
||||
.control_node_results
|
||||
.iter()
|
||||
.find(|result| result.node_id == node_id)
|
||||
.expect("control node result should exist")
|
||||
}
|
||||
|
||||
fn candidate(
|
||||
branch_id: &str,
|
||||
status: WorkflowRunStatus,
|
||||
score: u32,
|
||||
cost: u64,
|
||||
diversity_key: &str,
|
||||
) -> BranchCandidate {
|
||||
BranchCandidate {
|
||||
branch_id: branch_id.to_string(),
|
||||
status,
|
||||
score,
|
||||
cost,
|
||||
diversity_key: Some(diversity_key.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn independent_phases_preserve_declaration_order() {
|
||||
let workflow = config(vec![
|
||||
@@ -1033,6 +1553,7 @@ mod tests {
|
||||
then_nodes: vec![WorkflowNode::Expand(ExpandSpec {
|
||||
id: "split-followups".to_string(),
|
||||
source: "summarize".to_string(),
|
||||
max_children: None,
|
||||
template: Some(Box::new(WorkflowNode::Leaf(LeafSpec {
|
||||
id: "followup-template".to_string(),
|
||||
prompt: "Patch one independent gap".to_string(),
|
||||
@@ -1147,4 +1668,216 @@ mod tests {
|
||||
assert!(minimal.selected_children.is_empty());
|
||||
assert_eq!(minimal.summary, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn run_mock_three_branch_workflow() {
|
||||
let workflow = workflow_spec(vec![WorkflowNode::BranchSet(BranchSpec {
|
||||
id: "discover".to_string(),
|
||||
description: None,
|
||||
parallel: true,
|
||||
budget: BudgetSpec::default(),
|
||||
permissions: PermissionSpec::default(),
|
||||
model_policy: ModelPolicy::default(),
|
||||
children: vec![
|
||||
leaf_node("scan-readme"),
|
||||
leaf_node("scan-config"),
|
||||
leaf_node("scan-tests"),
|
||||
],
|
||||
})]);
|
||||
|
||||
let mut executor = MockWorkflowExecutor::new();
|
||||
let execution = executor.run(&workflow).expect("mock workflow should run");
|
||||
|
||||
assert_eq!(execution.status, WorkflowRunStatus::Succeeded);
|
||||
assert_eq!(
|
||||
execution
|
||||
.leaf_results
|
||||
.iter()
|
||||
.map(|result| result.leaf_id.as_str())
|
||||
.collect::<Vec<_>>(),
|
||||
vec!["scan-readme", "scan-config", "scan-tests"]
|
||||
);
|
||||
assert_eq!(execution.branch_results.len(), 1);
|
||||
assert_eq!(execution.branch_results[0].branch_id, "discover");
|
||||
assert_eq!(
|
||||
control_result(&execution, "discover").selected_children,
|
||||
vec!["scan-readme", "scan-config", "scan-tests"]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn loop_until_stops_on_pass() {
|
||||
let workflow = workflow_spec(vec![WorkflowNode::LoopUntil(LoopUntilSpec {
|
||||
id: "verify".to_string(),
|
||||
condition: "verification passed".to_string(),
|
||||
max_iterations: Some(5),
|
||||
children: vec![leaf_node("run-check")],
|
||||
})]);
|
||||
|
||||
let mut executor =
|
||||
MockWorkflowExecutor::new().with_predicate_results("verify", vec![false, false, true]);
|
||||
let execution = executor.run(&workflow).expect("loop should run");
|
||||
|
||||
assert_eq!(execution.status, WorkflowRunStatus::Succeeded);
|
||||
assert_eq!(execution.leaf_results.len(), 3);
|
||||
assert_eq!(
|
||||
control_result(&execution, "verify").summary.as_deref(),
|
||||
Some("loop_until iterations=3")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn loop_until_honors_max_iters() {
|
||||
let workflow = workflow_spec(vec![WorkflowNode::LoopUntil(LoopUntilSpec {
|
||||
id: "verify".to_string(),
|
||||
condition: "verification passed".to_string(),
|
||||
max_iterations: Some(2),
|
||||
children: vec![leaf_node("run-check")],
|
||||
})]);
|
||||
|
||||
let mut executor =
|
||||
MockWorkflowExecutor::new().with_predicate_results("verify", vec![false, false, true]);
|
||||
let execution = executor.run(&workflow).expect("loop should run");
|
||||
|
||||
assert_eq!(execution.status, WorkflowRunStatus::Failed);
|
||||
assert_eq!(execution.leaf_results.len(), 2);
|
||||
assert_eq!(
|
||||
control_result(&execution, "verify").summary.as_deref(),
|
||||
Some("loop_until iterations=2")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cond_uses_logged_predicate_result() {
|
||||
let workflow = workflow_spec(vec![WorkflowNode::Cond(CondSpec {
|
||||
id: "should-fix".to_string(),
|
||||
condition: "finding requires a patch".to_string(),
|
||||
then_nodes: vec![leaf_node("patch")],
|
||||
else_nodes: vec![leaf_node("report-only")],
|
||||
})]);
|
||||
|
||||
let mut executor =
|
||||
MockWorkflowExecutor::new().with_predicate_results("should-fix", vec![true]);
|
||||
let execution = executor.run(&workflow).expect("cond should run");
|
||||
|
||||
assert_eq!(
|
||||
execution
|
||||
.leaf_results
|
||||
.iter()
|
||||
.map(|result| result.leaf_id.as_str())
|
||||
.collect::<Vec<_>>(),
|
||||
vec!["patch"]
|
||||
);
|
||||
assert_eq!(
|
||||
control_result(&execution, "should-fix").summary.as_deref(),
|
||||
Some("predicate_result=true")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn expand_respects_max_children() {
|
||||
let workflow = workflow_spec(vec![WorkflowNode::Expand(ExpandSpec {
|
||||
id: "split".to_string(),
|
||||
source: "plan".to_string(),
|
||||
max_children: Some(2),
|
||||
template: None,
|
||||
})]);
|
||||
|
||||
let generated = vec![leaf_node("first"), leaf_node("second"), leaf_node("third")];
|
||||
let mut executor = MockWorkflowExecutor::new().with_generated_nodes("split", generated);
|
||||
let execution = executor.run(&workflow).expect("expand should run");
|
||||
|
||||
assert_eq!(
|
||||
execution
|
||||
.leaf_results
|
||||
.iter()
|
||||
.map(|result| result.leaf_id.as_str())
|
||||
.collect::<Vec<_>>(),
|
||||
vec!["first", "second"]
|
||||
);
|
||||
assert_eq!(
|
||||
control_result(&execution, "split").selected_children,
|
||||
vec!["first", "second"]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn expand_generated_nodes_validate_before_run() {
|
||||
let workflow = workflow_spec(vec![WorkflowNode::Expand(ExpandSpec {
|
||||
id: "split".to_string(),
|
||||
source: "plan".to_string(),
|
||||
max_children: None,
|
||||
template: None,
|
||||
})]);
|
||||
|
||||
let mut executor = MockWorkflowExecutor::new()
|
||||
.with_generated_nodes("split", vec![invalid_leaf_node("bad")]);
|
||||
let err = executor
|
||||
.run(&workflow)
|
||||
.expect_err("invalid generated leaf should fail before execution");
|
||||
|
||||
assert_eq!(
|
||||
err,
|
||||
WorkflowExecutionError::EmptyLeafPrompt {
|
||||
leaf: "bad".to_string()
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tournament_selects_passing_minimal_branch() {
|
||||
let tournament = BranchTournament { min_score: 60 };
|
||||
let candidates = vec![
|
||||
candidate(
|
||||
"expensive-pass",
|
||||
WorkflowRunStatus::Succeeded,
|
||||
90,
|
||||
90,
|
||||
"quality",
|
||||
),
|
||||
candidate("failed-cheap", WorkflowRunStatus::Failed, 100, 1, "broken"),
|
||||
candidate(
|
||||
"cheap-pass",
|
||||
WorkflowRunStatus::Succeeded,
|
||||
70,
|
||||
10,
|
||||
"minimal",
|
||||
),
|
||||
candidate("too-low", WorkflowRunStatus::Succeeded, 40, 2, "weak"),
|
||||
];
|
||||
|
||||
let selected = tournament
|
||||
.select(&candidates)
|
||||
.expect("one passing branch should be selected");
|
||||
|
||||
assert_eq!(selected.branch_id, "cheap-pass");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pareto_frontier_keeps_diverse_candidates() {
|
||||
let frontier = ParetoFrontier { max_items: 4 };
|
||||
let candidates = vec![
|
||||
candidate("quality", WorkflowRunStatus::Succeeded, 95, 100, "quality"),
|
||||
candidate("minimal", WorkflowRunStatus::Succeeded, 70, 10, "small"),
|
||||
candidate("dominated", WorkflowRunStatus::Succeeded, 60, 40, "middle"),
|
||||
candidate("failed", WorkflowRunStatus::Failed, 100, 1, "broken"),
|
||||
];
|
||||
|
||||
let selected = frontier.select(&candidates);
|
||||
|
||||
assert_eq!(
|
||||
selected
|
||||
.iter()
|
||||
.map(|candidate| candidate.branch_id.as_str())
|
||||
.collect::<Vec<_>>(),
|
||||
vec!["quality", "minimal"]
|
||||
);
|
||||
assert_eq!(
|
||||
selected
|
||||
.iter()
|
||||
.filter_map(|candidate| candidate.diversity_key.as_deref())
|
||||
.collect::<Vec<_>>(),
|
||||
vec!["quality", "small"]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user