feat(whaleflow): add memo telemetry counters (#2833)
This commit is contained in:
+4
-1
@@ -59,7 +59,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
roles without hardcoding provider-specific runtime paths (#2672). The
|
||||
`rlm_cache_change.star` dogfood workflow now exercises candidate branches,
|
||||
LoopUntil verification, tournament selection, teacher review, and mock
|
||||
execution in CI-oriented crate tests (#2679).
|
||||
execution in CI-oriented crate tests (#2679). Leaf, branch, and workflow
|
||||
results now also carry separate ARMH/shared-memo and provider prompt-cache
|
||||
telemetry counters, with mock aggregation tests, so #2671 can progress
|
||||
without wiring live RLM calls or billing-affecting provider behavior yet.
|
||||
Thanks @AdityaVG13 for the WhaleFlow draft and cost-tracking direction.
|
||||
- Added a state-store v2 schema migration for WhaleFlow trace tables covering
|
||||
workflow, branch, leaf, control-node, and teacher-candidate runs. The
|
||||
|
||||
@@ -59,7 +59,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
roles without hardcoding provider-specific runtime paths (#2672). The
|
||||
`rlm_cache_change.star` dogfood workflow now exercises candidate branches,
|
||||
LoopUntil verification, tournament selection, teacher review, and mock
|
||||
execution in CI-oriented crate tests (#2679).
|
||||
execution in CI-oriented crate tests (#2679). Leaf, branch, and workflow
|
||||
results now also carry separate ARMH/shared-memo and provider prompt-cache
|
||||
telemetry counters, with mock aggregation tests, so #2671 can progress
|
||||
without wiring live RLM calls or billing-affecting provider behavior yet.
|
||||
Thanks @AdityaVG13 for the WhaleFlow draft and cost-tracking direction.
|
||||
- Added a state-store v2 schema migration for WhaleFlow trace tables covering
|
||||
workflow, branch, leaf, control-node, and teacher-candidate runs. The
|
||||
|
||||
@@ -445,6 +445,8 @@ pub struct BranchResult {
|
||||
#[serde(default)]
|
||||
pub usage: WorkflowUsage,
|
||||
#[serde(default)]
|
||||
pub memo_usage: WorkflowMemoUsage,
|
||||
#[serde(default)]
|
||||
pub artifacts: Vec<String>,
|
||||
#[serde(default)]
|
||||
pub notes: Option<String>,
|
||||
@@ -458,6 +460,8 @@ pub struct LeafResult {
|
||||
#[serde(default)]
|
||||
pub usage: WorkflowUsage,
|
||||
#[serde(default)]
|
||||
pub memo_usage: WorkflowMemoUsage,
|
||||
#[serde(default)]
|
||||
pub output: Option<String>,
|
||||
#[serde(default)]
|
||||
pub artifacts: Vec<String>,
|
||||
@@ -486,6 +490,36 @@ impl WorkflowUsage {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
pub struct WorkflowMemoUsage {
|
||||
#[serde(default)]
|
||||
pub armh_hits: u64,
|
||||
#[serde(default)]
|
||||
pub armh_misses: u64,
|
||||
#[serde(default)]
|
||||
pub armh_saved_estimated_tokens: u64,
|
||||
#[serde(default)]
|
||||
pub provider_prompt_cache_hits: u64,
|
||||
#[serde(default)]
|
||||
pub provider_prompt_cache_misses: u64,
|
||||
}
|
||||
|
||||
impl WorkflowMemoUsage {
|
||||
pub(crate) fn add_assign(&mut self, other: Self) {
|
||||
self.armh_hits = self.armh_hits.saturating_add(other.armh_hits);
|
||||
self.armh_misses = self.armh_misses.saturating_add(other.armh_misses);
|
||||
self.armh_saved_estimated_tokens = self
|
||||
.armh_saved_estimated_tokens
|
||||
.saturating_add(other.armh_saved_estimated_tokens);
|
||||
self.provider_prompt_cache_hits = self
|
||||
.provider_prompt_cache_hits
|
||||
.saturating_add(other.provider_prompt_cache_hits);
|
||||
self.provider_prompt_cache_misses = self
|
||||
.provider_prompt_cache_misses
|
||||
.saturating_add(other.provider_prompt_cache_misses);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct ControlNodeResult {
|
||||
pub node_id: String,
|
||||
@@ -528,6 +562,8 @@ pub struct WorkflowExecution {
|
||||
#[serde(default)]
|
||||
pub usage: WorkflowUsage,
|
||||
#[serde(default)]
|
||||
pub memo_usage: WorkflowMemoUsage,
|
||||
#[serde(default)]
|
||||
pub leaf_results: Vec<LeafResult>,
|
||||
#[serde(default)]
|
||||
pub branch_results: Vec<BranchResult>,
|
||||
@@ -540,6 +576,7 @@ impl Default for WorkflowExecution {
|
||||
Self {
|
||||
status: WorkflowRunStatus::Succeeded,
|
||||
usage: WorkflowUsage::default(),
|
||||
memo_usage: WorkflowMemoUsage::default(),
|
||||
leaf_results: Vec::new(),
|
||||
branch_results: Vec::new(),
|
||||
control_node_results: Vec::new(),
|
||||
@@ -563,6 +600,8 @@ pub struct MockLeafOutcome {
|
||||
#[serde(default)]
|
||||
pub usage: WorkflowUsage,
|
||||
#[serde(default)]
|
||||
pub memo_usage: WorkflowMemoUsage,
|
||||
#[serde(default)]
|
||||
pub output: Option<String>,
|
||||
#[serde(default)]
|
||||
pub artifacts: Vec<String>,
|
||||
@@ -573,6 +612,7 @@ impl MockLeafOutcome {
|
||||
Self {
|
||||
status: WorkflowRunStatus::Succeeded,
|
||||
usage: WorkflowUsage::default(),
|
||||
memo_usage: WorkflowMemoUsage::default(),
|
||||
output: Some(output.into()),
|
||||
artifacts: Vec::new(),
|
||||
}
|
||||
@@ -582,6 +622,7 @@ impl MockLeafOutcome {
|
||||
Self {
|
||||
status: WorkflowRunStatus::Failed,
|
||||
usage: WorkflowUsage::default(),
|
||||
memo_usage: WorkflowMemoUsage::default(),
|
||||
output: Some(output.into()),
|
||||
artifacts: Vec::new(),
|
||||
}
|
||||
@@ -591,6 +632,11 @@ impl MockLeafOutcome {
|
||||
self.usage = usage;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_memo_usage(mut self, memo_usage: WorkflowMemoUsage) -> Self {
|
||||
self.memo_usage = memo_usage;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
@@ -719,8 +765,10 @@ impl MockWorkflowExecutor {
|
||||
WorkflowRunStatus::Succeeded
|
||||
};
|
||||
let mut usage = WorkflowUsage::default();
|
||||
let mut memo_usage = WorkflowMemoUsage::default();
|
||||
for result in &execution.leaf_results[before..] {
|
||||
usage.add_assign(result.usage);
|
||||
memo_usage.add_assign(result.memo_usage);
|
||||
}
|
||||
if status == WorkflowRunStatus::Failed {
|
||||
execution.mark_failed();
|
||||
@@ -730,6 +778,7 @@ impl MockWorkflowExecutor {
|
||||
task_id: spec.id.clone(),
|
||||
status,
|
||||
usage,
|
||||
memo_usage,
|
||||
artifacts: Vec::new(),
|
||||
notes: Some("mock branch set executed without runtime fanout".to_string()),
|
||||
});
|
||||
@@ -752,11 +801,13 @@ impl MockWorkflowExecutor {
|
||||
execution.mark_failed();
|
||||
}
|
||||
execution.usage.add_assign(outcome.usage);
|
||||
execution.memo_usage.add_assign(outcome.memo_usage);
|
||||
execution.leaf_results.push(LeafResult {
|
||||
leaf_id: spec.id.clone(),
|
||||
task_id: spec.id.clone(),
|
||||
status: outcome.status,
|
||||
usage: outcome.usage,
|
||||
memo_usage: outcome.memo_usage,
|
||||
output: outcome.output,
|
||||
artifacts: outcome.artifacts,
|
||||
});
|
||||
@@ -1672,6 +1723,7 @@ mod tests {
|
||||
output_tokens: 25,
|
||||
cost_microusd: 42,
|
||||
},
|
||||
memo_usage: WorkflowMemoUsage::default(),
|
||||
artifacts: vec!["trace://branches/discover".to_string()],
|
||||
notes: Some("validated prompt surfaces".to_string()),
|
||||
};
|
||||
@@ -1687,6 +1739,7 @@ mod tests {
|
||||
serde_json::from_str(r#"{"branch_id":"discover","task_id":"scan","status":"pending"}"#)
|
||||
.expect("parse minimal branch result");
|
||||
assert_eq!(minimal.usage, WorkflowUsage::default());
|
||||
assert_eq!(minimal.memo_usage, WorkflowMemoUsage::default());
|
||||
assert!(minimal.artifacts.is_empty());
|
||||
assert_eq!(minimal.notes, None);
|
||||
}
|
||||
@@ -1702,6 +1755,13 @@ mod tests {
|
||||
output_tokens: 7,
|
||||
cost_microusd: 3,
|
||||
},
|
||||
memo_usage: WorkflowMemoUsage {
|
||||
armh_hits: 1,
|
||||
armh_misses: 0,
|
||||
armh_saved_estimated_tokens: 128,
|
||||
provider_prompt_cache_hits: 2,
|
||||
provider_prompt_cache_misses: 1,
|
||||
},
|
||||
output: Some("README needs clearer setup steps".to_string()),
|
||||
artifacts: vec!["trace://leaves/scan-readme".to_string()],
|
||||
};
|
||||
@@ -1710,6 +1770,7 @@ mod tests {
|
||||
|
||||
assert!(json.contains("\"status\":\"failed\""));
|
||||
assert!(json.contains("\"input_tokens\":11"));
|
||||
assert!(json.contains("\"armh_saved_estimated_tokens\":128"));
|
||||
let parsed: LeafResult = serde_json::from_str(&json).expect("parse leaf result");
|
||||
assert_eq!(parsed, result);
|
||||
|
||||
@@ -1718,6 +1779,7 @@ mod tests {
|
||||
)
|
||||
.expect("parse minimal leaf result");
|
||||
assert_eq!(minimal.usage, WorkflowUsage::default());
|
||||
assert_eq!(minimal.memo_usage, WorkflowMemoUsage::default());
|
||||
assert_eq!(minimal.output, None);
|
||||
assert!(minimal.artifacts.is_empty());
|
||||
}
|
||||
@@ -1836,6 +1898,63 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mock_executor_aggregates_memo_usage() {
|
||||
let workflow = workflow_spec(vec![WorkflowNode::BranchSet(BranchSpec {
|
||||
id: "cache-branches".to_string(),
|
||||
description: None,
|
||||
parallel: true,
|
||||
budget: BudgetSpec::default(),
|
||||
permissions: PermissionSpec::default(),
|
||||
model_policy: ModelPolicy::default(),
|
||||
children: vec![leaf_node("rlm-hit"), leaf_node("rlm-miss")],
|
||||
})]);
|
||||
|
||||
let mut executor = MockWorkflowExecutor::new()
|
||||
.with_leaf_outcome(
|
||||
"rlm-hit",
|
||||
MockLeafOutcome::succeeded("memo hit").with_memo_usage(WorkflowMemoUsage {
|
||||
armh_hits: 1,
|
||||
armh_misses: 0,
|
||||
armh_saved_estimated_tokens: 4096,
|
||||
provider_prompt_cache_hits: 1,
|
||||
provider_prompt_cache_misses: 0,
|
||||
}),
|
||||
)
|
||||
.with_leaf_outcome(
|
||||
"rlm-miss",
|
||||
MockLeafOutcome::succeeded("memo miss").with_memo_usage(WorkflowMemoUsage {
|
||||
armh_hits: 0,
|
||||
armh_misses: 1,
|
||||
armh_saved_estimated_tokens: 0,
|
||||
provider_prompt_cache_hits: 0,
|
||||
provider_prompt_cache_misses: 1,
|
||||
}),
|
||||
);
|
||||
|
||||
let execution = executor.run(&workflow).expect("mock workflow should run");
|
||||
|
||||
assert_eq!(
|
||||
execution.memo_usage,
|
||||
WorkflowMemoUsage {
|
||||
armh_hits: 1,
|
||||
armh_misses: 1,
|
||||
armh_saved_estimated_tokens: 4096,
|
||||
provider_prompt_cache_hits: 1,
|
||||
provider_prompt_cache_misses: 1,
|
||||
}
|
||||
);
|
||||
assert_eq!(execution.branch_results[0].memo_usage, execution.memo_usage);
|
||||
assert_eq!(
|
||||
execution
|
||||
.leaf_results
|
||||
.iter()
|
||||
.map(|result| (result.memo_usage.armh_hits, result.memo_usage.armh_misses))
|
||||
.collect::<Vec<_>>(),
|
||||
vec![(1, 0), (0, 1)]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn loop_until_stops_on_pass() {
|
||||
let workflow = workflow_spec(vec![WorkflowNode::LoopUntil(LoopUntilSpec {
|
||||
|
||||
@@ -6,8 +6,9 @@ use thiserror::Error;
|
||||
|
||||
use crate::{
|
||||
BranchResult, BranchSpec, CondSpec, ControlNodeKind, ControlNodeResult, ExpandSpec, LeafResult,
|
||||
LeafSpec, LoopUntilSpec, SequenceSpec, WorkflowExecution, WorkflowExecutionError, WorkflowNode,
|
||||
WorkflowRunStatus, WorkflowSpec, WorkflowUsage, validate_workflow_nodes,
|
||||
LeafSpec, LoopUntilSpec, SequenceSpec, WorkflowExecution, WorkflowExecutionError,
|
||||
WorkflowMemoUsage, WorkflowNode, WorkflowRunStatus, WorkflowSpec, WorkflowUsage,
|
||||
validate_workflow_nodes,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
@@ -158,8 +159,10 @@ impl WorkflowReplayExecutor {
|
||||
self.execute_nodes(spec, &branch.children, execution)?;
|
||||
let status = branch_status(&execution.leaf_results[before..]);
|
||||
let mut usage = WorkflowUsage::default();
|
||||
let mut memo_usage = WorkflowMemoUsage::default();
|
||||
for result in &execution.leaf_results[before..] {
|
||||
usage.add_assign(result.usage);
|
||||
memo_usage.add_assign(result.memo_usage);
|
||||
}
|
||||
if status == WorkflowRunStatus::ReplayDiverged {
|
||||
execution.mark_replay_diverged();
|
||||
@@ -171,6 +174,7 @@ impl WorkflowReplayExecutor {
|
||||
task_id: branch.id.clone(),
|
||||
status,
|
||||
usage,
|
||||
memo_usage,
|
||||
artifacts: Vec::new(),
|
||||
notes: Some("replay branch set evaluated from recorded leaf results".to_string()),
|
||||
});
|
||||
@@ -209,6 +213,7 @@ impl WorkflowReplayExecutor {
|
||||
task_id: leaf.id.clone(),
|
||||
status: WorkflowRunStatus::ReplayDiverged,
|
||||
usage: WorkflowUsage::default(),
|
||||
memo_usage: WorkflowMemoUsage::default(),
|
||||
output: None,
|
||||
artifacts: Vec::new(),
|
||||
};
|
||||
@@ -223,6 +228,7 @@ impl WorkflowReplayExecutor {
|
||||
execution.mark_failed();
|
||||
}
|
||||
execution.usage.add_assign(result.usage);
|
||||
execution.memo_usage.add_assign(result.memo_usage);
|
||||
self.resolved_outputs
|
||||
.insert(leaf.id.clone(), result.output.clone());
|
||||
execution.leaf_results.push(result);
|
||||
@@ -545,6 +551,7 @@ mod tests {
|
||||
output_tokens: 5,
|
||||
cost_microusd: 2,
|
||||
},
|
||||
memo_usage: WorkflowMemoUsage::default(),
|
||||
output: Some(output.to_string()),
|
||||
artifacts: Vec::new(),
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user