From 85b5ca5560b16753a603195ea6cf784c7ec40963 Mon Sep 17 00:00:00 2001 From: Hunter Bown Date: Fri, 5 Jun 2026 22:23:49 -0700 Subject: [PATCH] feat(whaleflow): add memo telemetry counters (#2833) --- CHANGELOG.md | 5 +- crates/tui/CHANGELOG.md | 5 +- crates/whaleflow/src/lib.rs | 119 +++++++++++++++++++++++++++++++++ crates/whaleflow/src/replay.rs | 11 ++- 4 files changed, 136 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ca8463ff..aa88d50d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,7 +59,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 roles without hardcoding provider-specific runtime paths (#2672). The `rlm_cache_change.star` dogfood workflow now exercises candidate branches, LoopUntil verification, tournament selection, teacher review, and mock - execution in CI-oriented crate tests (#2679). + execution in CI-oriented crate tests (#2679). Leaf, branch, and workflow + results now also carry separate ARMH/shared-memo and provider prompt-cache + telemetry counters, with mock aggregation tests, so #2671 can progress + without wiring live RLM calls or billing-affecting provider behavior yet. Thanks @AdityaVG13 for the WhaleFlow draft and cost-tracking direction. - Added a state-store v2 schema migration for WhaleFlow trace tables covering workflow, branch, leaf, control-node, and teacher-candidate runs. The diff --git a/crates/tui/CHANGELOG.md b/crates/tui/CHANGELOG.md index ca8463ff..aa88d50d 100644 --- a/crates/tui/CHANGELOG.md +++ b/crates/tui/CHANGELOG.md @@ -59,7 +59,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 roles without hardcoding provider-specific runtime paths (#2672). The `rlm_cache_change.star` dogfood workflow now exercises candidate branches, LoopUntil verification, tournament selection, teacher review, and mock - execution in CI-oriented crate tests (#2679). + execution in CI-oriented crate tests (#2679). Leaf, branch, and workflow + results now also carry separate ARMH/shared-memo and provider prompt-cache + telemetry counters, with mock aggregation tests, so #2671 can progress + without wiring live RLM calls or billing-affecting provider behavior yet. Thanks @AdityaVG13 for the WhaleFlow draft and cost-tracking direction. - Added a state-store v2 schema migration for WhaleFlow trace tables covering workflow, branch, leaf, control-node, and teacher-candidate runs. The diff --git a/crates/whaleflow/src/lib.rs b/crates/whaleflow/src/lib.rs index f172e4c3..02a865b6 100644 --- a/crates/whaleflow/src/lib.rs +++ b/crates/whaleflow/src/lib.rs @@ -445,6 +445,8 @@ pub struct BranchResult { #[serde(default)] pub usage: WorkflowUsage, #[serde(default)] + pub memo_usage: WorkflowMemoUsage, + #[serde(default)] pub artifacts: Vec, #[serde(default)] pub notes: Option, @@ -458,6 +460,8 @@ pub struct LeafResult { #[serde(default)] pub usage: WorkflowUsage, #[serde(default)] + pub memo_usage: WorkflowMemoUsage, + #[serde(default)] pub output: Option, #[serde(default)] pub artifacts: Vec, @@ -486,6 +490,36 @@ impl WorkflowUsage { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +pub struct WorkflowMemoUsage { + #[serde(default)] + pub armh_hits: u64, + #[serde(default)] + pub armh_misses: u64, + #[serde(default)] + pub armh_saved_estimated_tokens: u64, + #[serde(default)] + pub provider_prompt_cache_hits: u64, + #[serde(default)] + pub provider_prompt_cache_misses: u64, +} + +impl WorkflowMemoUsage { + pub(crate) fn add_assign(&mut self, other: Self) { + self.armh_hits = self.armh_hits.saturating_add(other.armh_hits); + self.armh_misses = self.armh_misses.saturating_add(other.armh_misses); + self.armh_saved_estimated_tokens = self + .armh_saved_estimated_tokens + .saturating_add(other.armh_saved_estimated_tokens); + self.provider_prompt_cache_hits = self + .provider_prompt_cache_hits + .saturating_add(other.provider_prompt_cache_hits); + self.provider_prompt_cache_misses = self + .provider_prompt_cache_misses + .saturating_add(other.provider_prompt_cache_misses); + } +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ControlNodeResult { pub node_id: String, @@ -528,6 +562,8 @@ pub struct WorkflowExecution { #[serde(default)] pub usage: WorkflowUsage, #[serde(default)] + pub memo_usage: WorkflowMemoUsage, + #[serde(default)] pub leaf_results: Vec, #[serde(default)] pub branch_results: Vec, @@ -540,6 +576,7 @@ impl Default for WorkflowExecution { Self { status: WorkflowRunStatus::Succeeded, usage: WorkflowUsage::default(), + memo_usage: WorkflowMemoUsage::default(), leaf_results: Vec::new(), branch_results: Vec::new(), control_node_results: Vec::new(), @@ -563,6 +600,8 @@ pub struct MockLeafOutcome { #[serde(default)] pub usage: WorkflowUsage, #[serde(default)] + pub memo_usage: WorkflowMemoUsage, + #[serde(default)] pub output: Option, #[serde(default)] pub artifacts: Vec, @@ -573,6 +612,7 @@ impl MockLeafOutcome { Self { status: WorkflowRunStatus::Succeeded, usage: WorkflowUsage::default(), + memo_usage: WorkflowMemoUsage::default(), output: Some(output.into()), artifacts: Vec::new(), } @@ -582,6 +622,7 @@ impl MockLeafOutcome { Self { status: WorkflowRunStatus::Failed, usage: WorkflowUsage::default(), + memo_usage: WorkflowMemoUsage::default(), output: Some(output.into()), artifacts: Vec::new(), } @@ -591,6 +632,11 @@ impl MockLeafOutcome { self.usage = usage; self } + + pub fn with_memo_usage(mut self, memo_usage: WorkflowMemoUsage) -> Self { + self.memo_usage = memo_usage; + self + } } #[derive(Debug, Default, Clone)] @@ -719,8 +765,10 @@ impl MockWorkflowExecutor { WorkflowRunStatus::Succeeded }; let mut usage = WorkflowUsage::default(); + let mut memo_usage = WorkflowMemoUsage::default(); for result in &execution.leaf_results[before..] { usage.add_assign(result.usage); + memo_usage.add_assign(result.memo_usage); } if status == WorkflowRunStatus::Failed { execution.mark_failed(); @@ -730,6 +778,7 @@ impl MockWorkflowExecutor { task_id: spec.id.clone(), status, usage, + memo_usage, artifacts: Vec::new(), notes: Some("mock branch set executed without runtime fanout".to_string()), }); @@ -752,11 +801,13 @@ impl MockWorkflowExecutor { execution.mark_failed(); } execution.usage.add_assign(outcome.usage); + execution.memo_usage.add_assign(outcome.memo_usage); execution.leaf_results.push(LeafResult { leaf_id: spec.id.clone(), task_id: spec.id.clone(), status: outcome.status, usage: outcome.usage, + memo_usage: outcome.memo_usage, output: outcome.output, artifacts: outcome.artifacts, }); @@ -1672,6 +1723,7 @@ mod tests { output_tokens: 25, cost_microusd: 42, }, + memo_usage: WorkflowMemoUsage::default(), artifacts: vec!["trace://branches/discover".to_string()], notes: Some("validated prompt surfaces".to_string()), }; @@ -1687,6 +1739,7 @@ mod tests { serde_json::from_str(r#"{"branch_id":"discover","task_id":"scan","status":"pending"}"#) .expect("parse minimal branch result"); assert_eq!(minimal.usage, WorkflowUsage::default()); + assert_eq!(minimal.memo_usage, WorkflowMemoUsage::default()); assert!(minimal.artifacts.is_empty()); assert_eq!(minimal.notes, None); } @@ -1702,6 +1755,13 @@ mod tests { output_tokens: 7, cost_microusd: 3, }, + memo_usage: WorkflowMemoUsage { + armh_hits: 1, + armh_misses: 0, + armh_saved_estimated_tokens: 128, + provider_prompt_cache_hits: 2, + provider_prompt_cache_misses: 1, + }, output: Some("README needs clearer setup steps".to_string()), artifacts: vec!["trace://leaves/scan-readme".to_string()], }; @@ -1710,6 +1770,7 @@ mod tests { assert!(json.contains("\"status\":\"failed\"")); assert!(json.contains("\"input_tokens\":11")); + assert!(json.contains("\"armh_saved_estimated_tokens\":128")); let parsed: LeafResult = serde_json::from_str(&json).expect("parse leaf result"); assert_eq!(parsed, result); @@ -1718,6 +1779,7 @@ mod tests { ) .expect("parse minimal leaf result"); assert_eq!(minimal.usage, WorkflowUsage::default()); + assert_eq!(minimal.memo_usage, WorkflowMemoUsage::default()); assert_eq!(minimal.output, None); assert!(minimal.artifacts.is_empty()); } @@ -1836,6 +1898,63 @@ mod tests { ); } + #[test] + fn mock_executor_aggregates_memo_usage() { + let workflow = workflow_spec(vec![WorkflowNode::BranchSet(BranchSpec { + id: "cache-branches".to_string(), + description: None, + parallel: true, + budget: BudgetSpec::default(), + permissions: PermissionSpec::default(), + model_policy: ModelPolicy::default(), + children: vec![leaf_node("rlm-hit"), leaf_node("rlm-miss")], + })]); + + let mut executor = MockWorkflowExecutor::new() + .with_leaf_outcome( + "rlm-hit", + MockLeafOutcome::succeeded("memo hit").with_memo_usage(WorkflowMemoUsage { + armh_hits: 1, + armh_misses: 0, + armh_saved_estimated_tokens: 4096, + provider_prompt_cache_hits: 1, + provider_prompt_cache_misses: 0, + }), + ) + .with_leaf_outcome( + "rlm-miss", + MockLeafOutcome::succeeded("memo miss").with_memo_usage(WorkflowMemoUsage { + armh_hits: 0, + armh_misses: 1, + armh_saved_estimated_tokens: 0, + provider_prompt_cache_hits: 0, + provider_prompt_cache_misses: 1, + }), + ); + + let execution = executor.run(&workflow).expect("mock workflow should run"); + + assert_eq!( + execution.memo_usage, + WorkflowMemoUsage { + armh_hits: 1, + armh_misses: 1, + armh_saved_estimated_tokens: 4096, + provider_prompt_cache_hits: 1, + provider_prompt_cache_misses: 1, + } + ); + assert_eq!(execution.branch_results[0].memo_usage, execution.memo_usage); + assert_eq!( + execution + .leaf_results + .iter() + .map(|result| (result.memo_usage.armh_hits, result.memo_usage.armh_misses)) + .collect::>(), + vec![(1, 0), (0, 1)] + ); + } + #[test] fn loop_until_stops_on_pass() { let workflow = workflow_spec(vec![WorkflowNode::LoopUntil(LoopUntilSpec { diff --git a/crates/whaleflow/src/replay.rs b/crates/whaleflow/src/replay.rs index 7459a13b..6eeaea5c 100644 --- a/crates/whaleflow/src/replay.rs +++ b/crates/whaleflow/src/replay.rs @@ -6,8 +6,9 @@ use thiserror::Error; use crate::{ BranchResult, BranchSpec, CondSpec, ControlNodeKind, ControlNodeResult, ExpandSpec, LeafResult, - LeafSpec, LoopUntilSpec, SequenceSpec, WorkflowExecution, WorkflowExecutionError, WorkflowNode, - WorkflowRunStatus, WorkflowSpec, WorkflowUsage, validate_workflow_nodes, + LeafSpec, LoopUntilSpec, SequenceSpec, WorkflowExecution, WorkflowExecutionError, + WorkflowMemoUsage, WorkflowNode, WorkflowRunStatus, WorkflowSpec, WorkflowUsage, + validate_workflow_nodes, }; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] @@ -158,8 +159,10 @@ impl WorkflowReplayExecutor { self.execute_nodes(spec, &branch.children, execution)?; let status = branch_status(&execution.leaf_results[before..]); let mut usage = WorkflowUsage::default(); + let mut memo_usage = WorkflowMemoUsage::default(); for result in &execution.leaf_results[before..] { usage.add_assign(result.usage); + memo_usage.add_assign(result.memo_usage); } if status == WorkflowRunStatus::ReplayDiverged { execution.mark_replay_diverged(); @@ -171,6 +174,7 @@ impl WorkflowReplayExecutor { task_id: branch.id.clone(), status, usage, + memo_usage, artifacts: Vec::new(), notes: Some("replay branch set evaluated from recorded leaf results".to_string()), }); @@ -209,6 +213,7 @@ impl WorkflowReplayExecutor { task_id: leaf.id.clone(), status: WorkflowRunStatus::ReplayDiverged, usage: WorkflowUsage::default(), + memo_usage: WorkflowMemoUsage::default(), output: None, artifacts: Vec::new(), }; @@ -223,6 +228,7 @@ impl WorkflowReplayExecutor { execution.mark_failed(); } execution.usage.add_assign(result.usage); + execution.memo_usage.add_assign(result.memo_usage); self.resolved_outputs .insert(leaf.id.clone(), result.output.clone()); execution.leaf_results.push(result); @@ -545,6 +551,7 @@ mod tests { output_tokens: 5, cost_microusd: 2, }, + memo_usage: WorkflowMemoUsage::default(), output: Some(output.to_string()), artifacts: Vec::new(), }