feat(whaleflow): add usage telemetry to mock results (#2827)
This commit is contained in:
@@ -47,6 +47,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
authoring layer now compiles fail-closed model-authored workflow files into
|
||||
that typed IR, with `rlm_cache_change.star` and `issue_fix_tournament.star`
|
||||
examples plus a one-pass repair for common `ctx.*` authoring aliases (#2670).
|
||||
Leaf, branch, and workflow execution results now carry deterministic token
|
||||
and cost telemetry fields that the mock executor can aggregate without live
|
||||
provider calls or runtime sub-agent fanout (#2486).
|
||||
Thanks @AdityaVG13 for the WhaleFlow draft and cost-tracking direction.
|
||||
- Added a state-store v2 schema migration for WhaleFlow trace tables covering
|
||||
workflow, branch, leaf, control-node, and teacher-candidate runs. The
|
||||
|
||||
@@ -47,6 +47,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
authoring layer now compiles fail-closed model-authored workflow files into
|
||||
that typed IR, with `rlm_cache_change.star` and `issue_fix_tournament.star`
|
||||
examples plus a one-pass repair for common `ctx.*` authoring aliases (#2670).
|
||||
Leaf, branch, and workflow execution results now carry deterministic token
|
||||
and cost telemetry fields that the mock executor can aggregate without live
|
||||
provider calls or runtime sub-agent fanout (#2486).
|
||||
Thanks @AdityaVG13 for the WhaleFlow draft and cost-tracking direction.
|
||||
- Added a state-store v2 schema migration for WhaleFlow trace tables covering
|
||||
workflow, branch, leaf, control-node, and teacher-candidate runs. The
|
||||
|
||||
@@ -439,6 +439,8 @@ pub struct BranchResult {
|
||||
pub task_id: String,
|
||||
pub status: WorkflowRunStatus,
|
||||
#[serde(default)]
|
||||
pub usage: WorkflowUsage,
|
||||
#[serde(default)]
|
||||
pub artifacts: Vec<String>,
|
||||
#[serde(default)]
|
||||
pub notes: Option<String>,
|
||||
@@ -450,11 +452,36 @@ pub struct LeafResult {
|
||||
pub task_id: String,
|
||||
pub status: WorkflowRunStatus,
|
||||
#[serde(default)]
|
||||
pub usage: WorkflowUsage,
|
||||
#[serde(default)]
|
||||
pub output: Option<String>,
|
||||
#[serde(default)]
|
||||
pub artifacts: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
pub struct WorkflowUsage {
|
||||
#[serde(default)]
|
||||
pub input_tokens: u64,
|
||||
#[serde(default)]
|
||||
pub output_tokens: u64,
|
||||
#[serde(default)]
|
||||
pub cost_microusd: u64,
|
||||
}
|
||||
|
||||
impl WorkflowUsage {
|
||||
#[must_use]
|
||||
pub fn total_tokens(self) -> u64 {
|
||||
self.input_tokens.saturating_add(self.output_tokens)
|
||||
}
|
||||
|
||||
fn add_assign(&mut self, other: Self) {
|
||||
self.input_tokens = self.input_tokens.saturating_add(other.input_tokens);
|
||||
self.output_tokens = self.output_tokens.saturating_add(other.output_tokens);
|
||||
self.cost_microusd = self.cost_microusd.saturating_add(other.cost_microusd);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct ControlNodeResult {
|
||||
pub node_id: String,
|
||||
@@ -494,6 +521,8 @@ pub enum ControlNodeKind {
|
||||
pub struct WorkflowExecution {
|
||||
pub status: WorkflowRunStatus,
|
||||
#[serde(default)]
|
||||
pub usage: WorkflowUsage,
|
||||
#[serde(default)]
|
||||
pub leaf_results: Vec<LeafResult>,
|
||||
#[serde(default)]
|
||||
pub branch_results: Vec<BranchResult>,
|
||||
@@ -505,6 +534,7 @@ impl Default for WorkflowExecution {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
status: WorkflowRunStatus::Succeeded,
|
||||
usage: WorkflowUsage::default(),
|
||||
leaf_results: Vec::new(),
|
||||
branch_results: Vec::new(),
|
||||
control_node_results: Vec::new(),
|
||||
@@ -522,6 +552,8 @@ impl WorkflowExecution {
|
||||
pub struct MockLeafOutcome {
|
||||
pub status: WorkflowRunStatus,
|
||||
#[serde(default)]
|
||||
pub usage: WorkflowUsage,
|
||||
#[serde(default)]
|
||||
pub output: Option<String>,
|
||||
#[serde(default)]
|
||||
pub artifacts: Vec<String>,
|
||||
@@ -531,6 +563,7 @@ impl MockLeafOutcome {
|
||||
pub fn succeeded(output: impl Into<String>) -> Self {
|
||||
Self {
|
||||
status: WorkflowRunStatus::Succeeded,
|
||||
usage: WorkflowUsage::default(),
|
||||
output: Some(output.into()),
|
||||
artifacts: Vec::new(),
|
||||
}
|
||||
@@ -539,10 +572,16 @@ impl MockLeafOutcome {
|
||||
pub fn failed(output: impl Into<String>) -> Self {
|
||||
Self {
|
||||
status: WorkflowRunStatus::Failed,
|
||||
usage: WorkflowUsage::default(),
|
||||
output: Some(output.into()),
|
||||
artifacts: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_usage(mut self, usage: WorkflowUsage) -> Self {
|
||||
self.usage = usage;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
@@ -670,6 +709,10 @@ impl MockWorkflowExecutor {
|
||||
} else {
|
||||
WorkflowRunStatus::Succeeded
|
||||
};
|
||||
let mut usage = WorkflowUsage::default();
|
||||
for result in &execution.leaf_results[before..] {
|
||||
usage.add_assign(result.usage);
|
||||
}
|
||||
if status == WorkflowRunStatus::Failed {
|
||||
execution.mark_failed();
|
||||
}
|
||||
@@ -677,6 +720,7 @@ impl MockWorkflowExecutor {
|
||||
branch_id: spec.id.clone(),
|
||||
task_id: spec.id.clone(),
|
||||
status,
|
||||
usage,
|
||||
artifacts: Vec::new(),
|
||||
notes: Some("mock branch set executed without runtime fanout".to_string()),
|
||||
});
|
||||
@@ -698,10 +742,12 @@ impl MockWorkflowExecutor {
|
||||
if outcome.status != WorkflowRunStatus::Succeeded {
|
||||
execution.mark_failed();
|
||||
}
|
||||
execution.usage.add_assign(outcome.usage);
|
||||
execution.leaf_results.push(LeafResult {
|
||||
leaf_id: spec.id.clone(),
|
||||
task_id: spec.id.clone(),
|
||||
status: outcome.status,
|
||||
usage: outcome.usage,
|
||||
output: outcome.output,
|
||||
artifacts: outcome.artifacts,
|
||||
});
|
||||
@@ -1610,6 +1656,11 @@ mod tests {
|
||||
branch_id: "discover".to_string(),
|
||||
task_id: "scan".to_string(),
|
||||
status: WorkflowRunStatus::Succeeded,
|
||||
usage: WorkflowUsage {
|
||||
input_tokens: 100,
|
||||
output_tokens: 25,
|
||||
cost_microusd: 42,
|
||||
},
|
||||
artifacts: vec!["trace://branches/discover".to_string()],
|
||||
notes: Some("validated prompt surfaces".to_string()),
|
||||
};
|
||||
@@ -1617,12 +1668,14 @@ mod tests {
|
||||
let json = serde_json::to_string(&result).expect("serialize branch result");
|
||||
|
||||
assert!(json.contains("\"status\":\"succeeded\""));
|
||||
assert!(json.contains("\"cost_microusd\":42"));
|
||||
let parsed: BranchResult = serde_json::from_str(&json).expect("parse branch result");
|
||||
assert_eq!(parsed, result);
|
||||
|
||||
let minimal: BranchResult =
|
||||
serde_json::from_str(r#"{"branch_id":"discover","task_id":"scan","status":"pending"}"#)
|
||||
.expect("parse minimal branch result");
|
||||
assert_eq!(minimal.usage, WorkflowUsage::default());
|
||||
assert!(minimal.artifacts.is_empty());
|
||||
assert_eq!(minimal.notes, None);
|
||||
}
|
||||
@@ -1633,6 +1686,11 @@ mod tests {
|
||||
leaf_id: "scan-readme".to_string(),
|
||||
task_id: "scan".to_string(),
|
||||
status: WorkflowRunStatus::Failed,
|
||||
usage: WorkflowUsage {
|
||||
input_tokens: 11,
|
||||
output_tokens: 7,
|
||||
cost_microusd: 3,
|
||||
},
|
||||
output: Some("README needs clearer setup steps".to_string()),
|
||||
artifacts: vec!["trace://leaves/scan-readme".to_string()],
|
||||
};
|
||||
@@ -1640,6 +1698,7 @@ mod tests {
|
||||
let json = serde_json::to_string(&result).expect("serialize leaf result");
|
||||
|
||||
assert!(json.contains("\"status\":\"failed\""));
|
||||
assert!(json.contains("\"input_tokens\":11"));
|
||||
let parsed: LeafResult = serde_json::from_str(&json).expect("parse leaf result");
|
||||
assert_eq!(parsed, result);
|
||||
|
||||
@@ -1647,6 +1706,7 @@ mod tests {
|
||||
r#"{"leaf_id":"scan-readme","task_id":"scan","status":"pending"}"#,
|
||||
)
|
||||
.expect("parse minimal leaf result");
|
||||
assert_eq!(minimal.usage, WorkflowUsage::default());
|
||||
assert_eq!(minimal.output, None);
|
||||
assert!(minimal.artifacts.is_empty());
|
||||
}
|
||||
@@ -1713,6 +1773,58 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mock_executor_aggregates_leaf_usage() {
|
||||
let workflow = workflow_spec(vec![WorkflowNode::BranchSet(BranchSpec {
|
||||
id: "discover".to_string(),
|
||||
description: None,
|
||||
parallel: true,
|
||||
budget: BudgetSpec::default(),
|
||||
permissions: PermissionSpec::default(),
|
||||
model_policy: ModelPolicy::default(),
|
||||
children: vec![leaf_node("scan-readme"), leaf_node("scan-tests")],
|
||||
})]);
|
||||
|
||||
let mut executor = MockWorkflowExecutor::new()
|
||||
.with_leaf_outcome(
|
||||
"scan-readme",
|
||||
MockLeafOutcome::succeeded("readme ok").with_usage(WorkflowUsage {
|
||||
input_tokens: 100,
|
||||
output_tokens: 25,
|
||||
cost_microusd: 500,
|
||||
}),
|
||||
)
|
||||
.with_leaf_outcome(
|
||||
"scan-tests",
|
||||
MockLeafOutcome::succeeded("tests ok").with_usage(WorkflowUsage {
|
||||
input_tokens: 50,
|
||||
output_tokens: 10,
|
||||
cost_microusd: 250,
|
||||
}),
|
||||
);
|
||||
|
||||
let execution = executor.run(&workflow).expect("mock workflow should run");
|
||||
|
||||
assert_eq!(
|
||||
execution.usage,
|
||||
WorkflowUsage {
|
||||
input_tokens: 150,
|
||||
output_tokens: 35,
|
||||
cost_microusd: 750,
|
||||
}
|
||||
);
|
||||
assert_eq!(execution.usage.total_tokens(), 185);
|
||||
assert_eq!(execution.branch_results[0].usage, execution.usage);
|
||||
assert_eq!(
|
||||
execution
|
||||
.leaf_results
|
||||
.iter()
|
||||
.map(|result| result.usage.cost_microusd)
|
||||
.collect::<Vec<_>>(),
|
||||
vec![500, 250]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn loop_until_stops_on_pass() {
|
||||
let workflow = workflow_spec(vec![WorkflowNode::LoopUntil(LoopUntilSpec {
|
||||
|
||||
Reference in New Issue
Block a user