feat(whaleflow): add trace store schema migration

Adds a state-store v2 schema migration for WhaleFlow workflow, branch, leaf, control-node, and teacher-candidate trace tables. Keeps workflow execution/replay deferred and preserves @AdityaVG13 WhaleFlow draft credit in the changelog.
This commit is contained in:
Hunter Bown
2026-06-05 19:50:59 -07:00
committed by GitHub
parent f8b26b492e
commit a2cc6bd6f6
4 changed files with 227 additions and 1 deletions
+4
View File
@@ -40,6 +40,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
release-safe. The foundation now includes serializable branch, leaf, and
control-node result records toward the #2668 TraceStore contract. Thanks
@AdityaVG13 for the WhaleFlow draft and cost-tracking direction.
- Added a state-store v2 schema migration for WhaleFlow trace tables covering
workflow, branch, leaf, control-node, and teacher-candidate runs. The
migration creates persistence shape only; workflow execution and replay
remain deferred until the runtime semantics are safe (#2668).
- Added an official VS Code extension Phase 0 scaffold with terminal launch,
local runtime attach checks, status bar state, and a read-only Agent View
preview backed by recent runtime thread summaries. This answers the VS Code
+99 -1
View File
@@ -267,7 +267,7 @@ impl StateStore {
fn init_schema(&self) -> Result<()> {
let conn = self.conn()?;
let user_version: u32 = conn.query_row("PRAGMA user_version;", [], |row| row.get(0))?;
let mut user_version: u32 = conn.query_row("PRAGMA user_version;", [], |row| row.get(0))?;
if user_version == 0 {
conn.execute_batch(
r#"
@@ -376,6 +376,104 @@ impl StateStore {
"#,
)
.context("failed to initialize thread schema")?;
user_version = 1;
}
if user_version < 2 {
conn.execute_batch(
r#"
BEGIN;
CREATE TABLE IF NOT EXISTS workflow_runs (
id TEXT PRIMARY KEY,
workflow_id TEXT NOT NULL,
goal TEXT NOT NULL,
status TEXT NOT NULL,
input_hash TEXT,
started_at INTEGER NOT NULL,
completed_at INTEGER,
metadata_json TEXT NOT NULL DEFAULT '{}'
);
CREATE INDEX IF NOT EXISTS idx_workflow_runs_status_started_at
ON workflow_runs(status, started_at DESC);
CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow_started_at
ON workflow_runs(workflow_id, started_at DESC);
CREATE TABLE IF NOT EXISTS branch_runs (
id TEXT PRIMARY KEY,
workflow_run_id TEXT NOT NULL,
branch_id TEXT NOT NULL,
node_id TEXT NOT NULL,
status TEXT NOT NULL,
started_at INTEGER NOT NULL,
completed_at INTEGER,
result_json TEXT NOT NULL DEFAULT '{}',
FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_branch_runs_workflow_run_id
ON branch_runs(workflow_run_id);
CREATE INDEX IF NOT EXISTS idx_branch_runs_branch_id
ON branch_runs(branch_id);
CREATE TABLE IF NOT EXISTS leaf_runs (
id TEXT PRIMARY KEY,
workflow_run_id TEXT NOT NULL,
branch_run_id TEXT,
leaf_id TEXT NOT NULL,
task_id TEXT NOT NULL,
input_hash TEXT,
status TEXT NOT NULL,
output_json TEXT NOT NULL DEFAULT '{}',
artifacts_json TEXT NOT NULL DEFAULT '[]',
started_at INTEGER NOT NULL,
completed_at INTEGER,
FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE,
FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_leaf_runs_workflow_run_id
ON leaf_runs(workflow_run_id);
CREATE INDEX IF NOT EXISTS idx_leaf_runs_replay_lookup
ON leaf_runs(workflow_run_id, leaf_id, input_hash);
CREATE TABLE IF NOT EXISTS control_node_runs (
id TEXT PRIMARY KEY,
workflow_run_id TEXT NOT NULL,
node_id TEXT NOT NULL,
kind TEXT NOT NULL,
status TEXT NOT NULL,
selected_children_json TEXT NOT NULL DEFAULT '[]',
result_json TEXT NOT NULL DEFAULT '{}',
started_at INTEGER NOT NULL,
completed_at INTEGER,
FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_control_node_runs_workflow_run_id
ON control_node_runs(workflow_run_id);
CREATE INDEX IF NOT EXISTS idx_control_node_runs_node_id
ON control_node_runs(node_id);
CREATE TABLE IF NOT EXISTS teacher_candidates (
id TEXT PRIMARY KEY,
workflow_run_id TEXT NOT NULL,
control_node_run_id TEXT NOT NULL,
candidate_id TEXT NOT NULL,
branch_run_id TEXT,
score REAL,
passed INTEGER,
rationale_json TEXT NOT NULL DEFAULT '{}',
created_at INTEGER NOT NULL,
FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE,
FOREIGN KEY(control_node_run_id) REFERENCES control_node_runs(id) ON DELETE CASCADE,
FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_teacher_candidates_workflow_run_id
ON teacher_candidates(workflow_run_id);
CREATE INDEX IF NOT EXISTS idx_teacher_candidates_control_node_run_id
ON teacher_candidates(control_node_run_id);
PRAGMA user_version = 2;
COMMIT;
"#,
)
.context("failed to initialize workflow trace schema")?;
}
Ok(())
}
+120
View File
@@ -12,6 +12,30 @@ fn temp_state_path(label: &str) -> PathBuf {
))
}
fn assert_workflow_trace_schema(conn: &Connection) {
let user_version: u32 = conn
.query_row("PRAGMA user_version;", [], |row| row.get(0))
.expect("read user_version");
assert_eq!(user_version, 2);
for table in [
"workflow_runs",
"branch_runs",
"leaf_runs",
"control_node_runs",
"teacher_candidates",
] {
let exists: bool = conn
.query_row(
"SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?1)",
[table],
|row| row.get(0),
)
.unwrap_or_else(|err| panic!("read sqlite_master for {table}: {err}"));
assert!(exists, "missing workflow trace table {table}");
}
}
#[test]
fn upsert_and_resume_thread_metadata() {
let path = temp_state_path("upsert_resume");
@@ -157,6 +181,102 @@ fn init_schema_migration() {
StateStore::open(Some(path.clone())).expect("open state store");
}
#[test]
fn fresh_schema_includes_workflow_trace_tables() {
let path = temp_state_path("fresh_schema_includes_workflow_trace_tables");
StateStore::open(Some(path.clone())).expect("open state store");
let conn = Connection::open(&path).expect("open state db");
assert_workflow_trace_schema(&conn);
}
#[test]
fn v1_schema_migrates_workflow_trace_tables() {
let path = temp_state_path("v1_schema_migrates_workflow_trace_tables");
let conn = Connection::open(&path).expect("open state db");
conn.execute_batch(
r#"
CREATE TABLE threads (
id TEXT PRIMARY KEY,
rollout_path TEXT,
preview TEXT NOT NULL,
ephemeral INTEGER NOT NULL,
model_provider TEXT NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
status TEXT NOT NULL,
path TEXT,
cwd TEXT NOT NULL,
cli_version TEXT NOT NULL,
source TEXT NOT NULL,
title TEXT,
sandbox_policy TEXT,
approval_mode TEXT,
archived INTEGER NOT NULL DEFAULT 0,
archived_at INTEGER,
git_sha TEXT,
git_branch TEXT,
git_origin_url TEXT,
memory_mode TEXT,
current_leaf_id INTEGER
);
CREATE TABLE messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
thread_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
item_json TEXT,
created_at INTEGER NOT NULL,
parent_entry_id INTEGER
);
CREATE TABLE checkpoints (
thread_id TEXT NOT NULL,
checkpoint_id TEXT NOT NULL,
state_json TEXT NOT NULL,
created_at INTEGER NOT NULL,
PRIMARY KEY(thread_id, checkpoint_id)
);
CREATE TABLE jobs (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
status TEXT NOT NULL,
progress INTEGER,
detail TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
CREATE TABLE thread_dynamic_tools (
thread_id TEXT NOT NULL,
position INTEGER NOT NULL,
name TEXT NOT NULL,
description TEXT,
input_schema TEXT NOT NULL,
PRIMARY KEY (thread_id, position)
);
INSERT INTO threads (
id, preview, ephemeral, model_provider, created_at, updated_at, status, cwd, cli_version, source, archived
)
VALUES (
'thread-test-1', 'hello', false, 'deepseek', 0, 0, 'running', '/tmp/project', '0.0.0-test', 'interactive', false
);
PRAGMA user_version = 1;
"#,
)
.expect("create v1 schema");
drop(conn);
let store = StateStore::open(Some(path.clone())).expect("open state store");
let thread = store
.get_thread("thread-test-1")
.expect("read thread")
.expect("thread survives migration");
assert_eq!(thread.preview, "hello");
let conn = Connection::open(&path).expect("open state db");
assert_workflow_trace_schema(&conn);
}
#[test]
fn init_schema_migration_same_second_messages() {
let path = temp_state_path("init_schema_migration_same_second_messages");
+4
View File
@@ -40,6 +40,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
release-safe. The foundation now includes serializable branch, leaf, and
control-node result records toward the #2668 TraceStore contract. Thanks
@AdityaVG13 for the WhaleFlow draft and cost-tracking direction.
- Added a state-store v2 schema migration for WhaleFlow trace tables covering
workflow, branch, leaf, control-node, and teacher-candidate runs. The
migration creates persistence shape only; workflow execution and replay
remain deferred until the runtime semantics are safe (#2668).
- Added an official VS Code extension Phase 0 scaffold with terminal launch,
local runtime attach checks, status bar state, and a read-only Agent View
preview backed by recent runtime thread summaries. This answers the VS Code