//! Persistent state management for conversation threads, messages, and jobs. //! //! The [`StateStore`] is the primary entry point, backed by a SQLite database and an //! append-only JSONL session index file. It provides CRUD operations for: //! //! - **Threads** — conversation metadata, archival, and session indexing. //! - **Messages** — append-only message storage with tree-structured branching. //! - **Checkpoints** — named state snapshots for restoring conversation progress. //! - **Jobs** — background task tracking with status and progress. //! - **Dynamic tools** — per-thread tool registrations. use std::collections::HashMap; use std::fs::{self, OpenOptions}; use std::io::{BufRead, BufReader, Write}; use std::path::{Path, PathBuf}; use anyhow::{Context, Result}; use chrono::Utc; use rusqlite::{Connection, OptionalExtension, params}; use serde::{Deserialize, Serialize}; use serde_json::Value; /// Lifecycle status of a conversation thread. /// /// Serialized as lowercase snake_case strings (e.g. `"running"`, `"archived"`). #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum ThreadStatus { /// Thread is actively being worked on. Running, /// Thread exists but has no active work in progress. Idle, /// Thread has finished its task successfully. Completed, /// Thread encountered an unrecoverable error. Failed, /// Thread has been temporarily paused by the user. Paused, /// Thread has been archived and is hidden from default listings. Archived, } /// Indicates how a session was initiated. /// /// Serialized as lowercase snake_case strings. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum SessionSource { /// Started by a user interacting with the CLI. Interactive, /// Resumed from a previously persisted session. Resume, /// Created by forking an existing conversation at a specific message. Fork, /// Initiated programmatically via the API. Api, /// Source is unknown or unspecified. Unknown, } /// Metadata for a persisted conversation thread. /// /// Each thread represents a single conversation session and stores its /// configuration, git context, and current status. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ThreadMetadata { /// Unique identifier for this thread. pub id: String, /// Optional filesystem path to the rollout (JSONL transcript) file. pub rollout_path: Option, /// Short preview or summary of the thread content. pub preview: String, /// Whether this thread is ephemeral (not persisted long-term). pub ephemeral: bool, /// Identifier of the model provider used for this thread (e.g. `"openai"`). pub model_provider: String, /// Unix timestamp (seconds) when the thread was created. pub created_at: i64, /// Unix timestamp (seconds) of the most recent update to the thread. pub updated_at: i64, /// Current lifecycle status of the thread. pub status: ThreadStatus, /// Optional filesystem path associated with the thread working context. pub path: Option, /// Working directory that was active when the thread was created. pub cwd: PathBuf, /// Version of the CLI that created this thread. pub cli_version: String, /// How this session was initiated. pub source: SessionSource, /// User-assigned display name for the thread. pub name: Option, /// Serialized sandbox policy applied to this thread, if any. pub sandbox_policy: Option, /// Approval mode configured for tool calls in this thread. pub approval_mode: Option, /// Whether the thread has been archived. pub archived: bool, /// Unix timestamp (seconds) when the thread was archived, or `None` if not archived. pub archived_at: Option, /// Git commit SHA of the working tree when the thread was created. pub git_sha: Option, /// Git branch checked out when the thread was created. pub git_branch: Option, /// URL of the git remote origin, if available. pub git_origin_url: Option, /// Memory mode configured for this thread (e.g. `"local"`, `"remote"`). pub memory_mode: Option, /// ID of the current leaf message in the conversation tree. pub current_leaf_id: Option, } /// A dynamically registered tool associated with a thread. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DynamicToolRecord { /// Ordinal position of this tool in the thread tool list. pub position: i64, /// Unique name identifying the tool. pub name: String, /// Human-readable description of what the tool does. pub description: Option, /// JSON Schema describing the tool input parameters. pub input_schema: Value, } /// A single message entry in a conversation thread. /// /// Messages form a tree structure via [`parent_entry_id`](Self::parent_entry_id), /// enabling conversation branching and forking. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct MessageRecord { /// Auto-incremented unique identifier for this message. pub id: i64, /// ID of the thread this message belongs to. pub thread_id: String, /// Role of the message sender (e.g. `"user"`, `"assistant"`, `"system"`). pub role: String, /// Text content of the message. pub content: String, /// Optional structured item payload (tool calls, tool results, etc.). pub item: Option, /// Unix timestamp (seconds) when the message was created. pub created_at: i64, /// ID of the parent message, forming a tree structure. `None` for root messages. pub parent_entry_id: Option, } /// A named checkpoint capturing the state of a thread at a point in time. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CheckpointRecord { /// ID of the thread this checkpoint belongs to. pub thread_id: String, /// Unique identifier for this checkpoint within its thread. pub checkpoint_id: String, /// Serialized state snapshot stored as a JSON value. pub state: Value, /// Unix timestamp (seconds) when the checkpoint was created or last updated. pub created_at: i64, } /// Status of a background job. /// /// Serialized as lowercase snake_case strings. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum JobStateStatus { /// Job is waiting to be executed. Queued, /// Job is currently executing. Running, /// Job has finished successfully. Completed, /// Job has failed with an error. Failed, /// Job was cancelled before completion. Cancelled, } /// Persisted state of a background job. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct JobStateRecord { /// Unique identifier for the job. pub id: String, /// Human-readable name describing the job. pub name: String, /// Current lifecycle status of the job. pub status: JobStateStatus, /// Completion progress as a percentage (0--100), if available. pub progress: Option, /// Optional detail message providing additional status information. pub detail: Option, /// Unix timestamp (seconds) when the job was created. pub created_at: i64, /// Unix timestamp (seconds) of the most recent status update. pub updated_at: i64, } /// Persisted lifecycle status for a thread goal. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum ThreadGoalStatus { /// Goal is active and should continue receiving work. Active, /// Goal is paused by the user. Paused, /// Goal is blocked and cannot make meaningful progress. Blocked, /// Goal stopped because account/service usage limits were reached. UsageLimited, /// Goal stopped because its explicit token budget was reached. BudgetLimited, /// Goal has been completed. Complete, } /// Persisted goal state attached to a thread. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct ThreadGoalRecord { /// Thread this goal belongs to. pub thread_id: String, /// Stable identifier for this goal revision. pub goal_id: String, /// User-visible objective. pub objective: String, /// Current lifecycle status. pub status: ThreadGoalStatus, /// Optional token budget requested by the user. pub token_budget: Option, /// Tokens consumed while pursuing the goal. pub tokens_used: i64, /// Elapsed wall-clock work time in seconds. pub time_used_seconds: i64, /// Unix timestamp (seconds) when the goal was created. pub created_at: i64, /// Unix timestamp (seconds) when the goal was last updated. pub updated_at: i64, } /// Filters for listing conversation threads. #[derive(Debug, Clone)] pub struct ThreadListFilters { /// Whether to include archived threads in the results. pub include_archived: bool, /// Maximum number of threads to return. Defaults to 50. pub limit: Option, } impl Default for ThreadListFilters { fn default() -> Self { Self { include_archived: false, limit: Some(50), } } } #[derive(Debug, Clone, Serialize, Deserialize)] struct SessionIndexEntry { thread_id: String, thread_name: Option, updated_at: i64, rollout_path: Option, } /// Persistent storage for conversation threads, messages, checkpoints, and jobs. /// /// Backed by a SQLite database and an append-only JSONL session index file. /// The database schema is automatically initialized and migrated on [`open`](Self::open). #[derive(Debug, Clone)] pub struct StateStore { db_path: PathBuf, session_index_path: PathBuf, } impl StateStore { /// Open (or create) a state store at the given database path. /// /// If `path` is `None`, the default location (`~/.codewhale/state.db`, with /// `~/.deepseek/state.db` as a legacy fallback) is used. /// The database schema is created automatically if it does not exist. pub fn open(path: Option) -> Result { let db_path = path.unwrap_or_else(default_state_db_path); let session_index_path = db_path .parent() .unwrap_or_else(|| Path::new(".")) .join("session_index.jsonl"); if let Some(parent) = db_path.parent() { fs::create_dir_all(parent).with_context(|| { format!("failed to create state directory {}", parent.display()) })?; } let store = Self { db_path, session_index_path, }; store.init_schema()?; Ok(store) } /// Returns the filesystem path of the underlying SQLite database. pub fn db_path(&self) -> &Path { &self.db_path } fn conn(&self) -> Result { Connection::open(&self.db_path) .with_context(|| format!("failed to open state db {}", self.db_path.display())) } fn init_schema(&self) -> Result<()> { let conn = self.conn()?; let mut user_version: u32 = conn.query_row("PRAGMA user_version;", [], |row| row.get(0))?; if user_version == 0 { conn.execute_batch( r#" BEGIN; CREATE TABLE IF NOT EXISTS threads ( id TEXT PRIMARY KEY, rollout_path TEXT, preview TEXT NOT NULL, ephemeral INTEGER NOT NULL, model_provider TEXT NOT NULL, created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, status TEXT NOT NULL, path TEXT, cwd TEXT NOT NULL, cli_version TEXT NOT NULL, source TEXT NOT NULL, title TEXT, sandbox_policy TEXT, approval_mode TEXT, archived INTEGER NOT NULL DEFAULT 0, archived_at INTEGER, git_sha TEXT, git_branch TEXT, git_origin_url TEXT, memory_mode TEXT ); CREATE INDEX IF NOT EXISTS idx_threads_updated_at ON threads(updated_at DESC); CREATE INDEX IF NOT EXISTS idx_threads_archived_at ON threads(archived_at DESC); CREATE INDEX IF NOT EXISTS idx_threads_archived_updated ON threads(archived, updated_at DESC); CREATE TABLE IF NOT EXISTS thread_dynamic_tools ( thread_id TEXT NOT NULL, position INTEGER NOT NULL, name TEXT NOT NULL, description TEXT, input_schema TEXT NOT NULL, PRIMARY KEY (thread_id, position), FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, thread_id TEXT NOT NULL, role TEXT NOT NULL, content TEXT NOT NULL, item_json TEXT, created_at INTEGER NOT NULL, FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_messages_thread_created_at ON messages(thread_id, created_at ASC); CREATE TABLE IF NOT EXISTS checkpoints ( thread_id TEXT NOT NULL, checkpoint_id TEXT NOT NULL, state_json TEXT NOT NULL, created_at INTEGER NOT NULL, PRIMARY KEY(thread_id, checkpoint_id), FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_checkpoints_thread_created_at ON checkpoints(thread_id, created_at DESC); CREATE TABLE IF NOT EXISTS jobs ( id TEXT PRIMARY KEY, name TEXT NOT NULL, status TEXT NOT NULL, progress INTEGER, detail TEXT, created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL ); CREATE INDEX IF NOT EXISTS idx_jobs_updated_at ON jobs(updated_at DESC); -- Add parent_entry_id column, and set to last message before current message ALTER TABLE messages ADD COLUMN parent_entry_id INTEGER NULL; UPDATE messages SET parent_entry_id = ( SELECT m2.id FROM messages m2 WHERE m2.thread_id = messages.thread_id AND ( m2.created_at < messages.created_at OR ( m2.created_at = messages.created_at AND m2.id < messages.id ) ) ORDER BY m2.created_at DESC, m2.id DESC LIMIT 1 ); CREATE INDEX idx_messages_parent_entry_id ON messages(parent_entry_id); -- Add current_leaf_id column, and set to last message in thread ALTER TABLE threads ADD COLUMN current_leaf_id INTEGER NULL; UPDATE threads SET current_leaf_id = ( SELECT m.id FROM messages m WHERE m.thread_id = threads.id ORDER BY m.id DESC LIMIT 1 ); PRAGMA user_version = 1; COMMIT; "#, ) .context("failed to initialize thread schema")?; user_version = 1; } if user_version < 2 { conn.execute_batch( r#" BEGIN; CREATE TABLE IF NOT EXISTS workflow_runs ( id TEXT PRIMARY KEY, workflow_id TEXT NOT NULL, goal TEXT NOT NULL, status TEXT NOT NULL, input_hash TEXT, started_at INTEGER NOT NULL, completed_at INTEGER, metadata_json TEXT NOT NULL DEFAULT '{}' ); CREATE INDEX IF NOT EXISTS idx_workflow_runs_status_started_at ON workflow_runs(status, started_at DESC); CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow_started_at ON workflow_runs(workflow_id, started_at DESC); CREATE TABLE IF NOT EXISTS branch_runs ( id TEXT PRIMARY KEY, workflow_run_id TEXT NOT NULL, branch_id TEXT NOT NULL, node_id TEXT NOT NULL, status TEXT NOT NULL, started_at INTEGER NOT NULL, completed_at INTEGER, result_json TEXT NOT NULL DEFAULT '{}', FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_branch_runs_workflow_run_id ON branch_runs(workflow_run_id); CREATE INDEX IF NOT EXISTS idx_branch_runs_branch_id ON branch_runs(branch_id); CREATE TABLE IF NOT EXISTS leaf_runs ( id TEXT PRIMARY KEY, workflow_run_id TEXT NOT NULL, branch_run_id TEXT, leaf_id TEXT NOT NULL, task_id TEXT NOT NULL, input_hash TEXT, status TEXT NOT NULL, output_json TEXT NOT NULL DEFAULT '{}', artifacts_json TEXT NOT NULL DEFAULT '[]', started_at INTEGER NOT NULL, completed_at INTEGER, FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE, FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL ); CREATE INDEX IF NOT EXISTS idx_leaf_runs_workflow_run_id ON leaf_runs(workflow_run_id); CREATE INDEX IF NOT EXISTS idx_leaf_runs_replay_lookup ON leaf_runs(workflow_run_id, leaf_id, input_hash); CREATE TABLE IF NOT EXISTS control_node_runs ( id TEXT PRIMARY KEY, workflow_run_id TEXT NOT NULL, node_id TEXT NOT NULL, kind TEXT NOT NULL, status TEXT NOT NULL, selected_children_json TEXT NOT NULL DEFAULT '[]', result_json TEXT NOT NULL DEFAULT '{}', started_at INTEGER NOT NULL, completed_at INTEGER, FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_control_node_runs_workflow_run_id ON control_node_runs(workflow_run_id); CREATE INDEX IF NOT EXISTS idx_control_node_runs_node_id ON control_node_runs(node_id); CREATE TABLE IF NOT EXISTS teacher_candidates ( id TEXT PRIMARY KEY, workflow_run_id TEXT NOT NULL, control_node_run_id TEXT NOT NULL, candidate_id TEXT NOT NULL, branch_run_id TEXT, score REAL, passed INTEGER, rationale_json TEXT NOT NULL DEFAULT '{}', created_at INTEGER NOT NULL, FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE, FOREIGN KEY(control_node_run_id) REFERENCES control_node_runs(id) ON DELETE CASCADE, FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL ); CREATE INDEX IF NOT EXISTS idx_teacher_candidates_workflow_run_id ON teacher_candidates(workflow_run_id); CREATE INDEX IF NOT EXISTS idx_teacher_candidates_control_node_run_id ON teacher_candidates(control_node_run_id); PRAGMA user_version = 2; COMMIT; "#, ) .context("failed to initialize workflow trace schema")?; user_version = 2; } if user_version < 3 { conn.execute_batch( r#" BEGIN; CREATE TABLE IF NOT EXISTS thread_goals ( thread_id TEXT PRIMARY KEY NOT NULL, goal_id TEXT NOT NULL, objective TEXT NOT NULL, status TEXT NOT NULL CHECK(status IN ( 'active', 'paused', 'blocked', 'usage_limited', 'budget_limited', 'complete' )), token_budget INTEGER, tokens_used INTEGER NOT NULL DEFAULT 0, time_used_seconds INTEGER NOT NULL DEFAULT 0, created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE ); PRAGMA user_version = 3; COMMIT; "#, ) .context("failed to initialize thread goal schema")?; } Ok(()) } /// Insert or update thread metadata. /// /// This does **not** update `current_leaf_id`; use [`append_message`](Self::append_message) /// or [`set_current_leaf_id`](Self::set_current_leaf_id) for that. pub fn upsert_thread(&self, thread: &ThreadMetadata) -> Result<()> { let conn = self.conn()?; conn.execute( r#" INSERT INTO threads ( id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode ) VALUES ( ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21 ) ON CONFLICT(id) DO UPDATE SET rollout_path=excluded.rollout_path, preview=excluded.preview, ephemeral=excluded.ephemeral, model_provider=excluded.model_provider, created_at=excluded.created_at, updated_at=excluded.updated_at, status=excluded.status, path=excluded.path, cwd=excluded.cwd, cli_version=excluded.cli_version, source=excluded.source, title=excluded.title, sandbox_policy=excluded.sandbox_policy, approval_mode=excluded.approval_mode, archived=excluded.archived, archived_at=excluded.archived_at, git_sha=excluded.git_sha, git_branch=excluded.git_branch, git_origin_url=excluded.git_origin_url, memory_mode=excluded.memory_mode "#, params![ thread.id, path_to_opt_string(thread.rollout_path.as_deref()), thread.preview, bool_to_i64(thread.ephemeral), thread.model_provider, thread.created_at, thread.updated_at, thread_status_to_str(&thread.status), path_to_opt_string(thread.path.as_deref()), thread.cwd.display().to_string(), thread.cli_version, session_source_to_str(&thread.source), thread.name, thread.sandbox_policy, thread.approval_mode, bool_to_i64(thread.archived), thread.archived_at, thread.git_sha, thread.git_branch, thread.git_origin_url, thread.memory_mode, ], ) .context("failed to upsert thread metadata")?; self.append_thread_name( &thread.id, thread.name.clone(), thread.updated_at, thread.rollout_path.clone(), )?; Ok(()) } /// Retrieve a single thread by its ID. /// /// Returns `None` if no thread with the given ID exists. pub fn get_thread(&self, id: &str) -> Result> { let conn = self.conn()?; conn.query_row( r#" SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads WHERE id = ?1 "#, params![id], row_to_thread, ) .optional() .context("failed to read thread") } /// List threads ordered by most recently updated. /// /// Use [`ThreadListFilters`] to control whether archived threads are included /// and the maximum number of results returned. pub fn list_threads(&self, filters: ThreadListFilters) -> Result> { let conn = self.conn()?; let sql = if filters.include_archived { "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads ORDER BY updated_at DESC LIMIT ?1" } else { "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads WHERE archived = 0 ORDER BY updated_at DESC LIMIT ?1" }; let mut stmt = conn.prepare(sql).context("failed to prepare list query")?; let limit = i64::try_from(filters.limit.unwrap_or(50)).unwrap_or(50); let mut rows = stmt .query(params![limit]) .context("failed to query threads")?; let mut out = Vec::new(); while let Some(row) = rows.next().context("failed to iterate thread rows")? { out.push(row_to_thread(row)?); } Ok(out) } /// Archive a thread, setting its status to [`ThreadStatus::Archived`] and /// recording the current timestamp. pub fn mark_archived(&self, id: &str) -> Result<()> { let conn = self.conn()?; conn.execute( "UPDATE threads SET archived = 1, archived_at = ?2, status = ?3 WHERE id = ?1", params![ id, Utc::now().timestamp(), thread_status_to_str(&ThreadStatus::Archived) ], ) .context("failed to archive thread")?; Ok(()) } /// Unarchive a thread, removing the archived flag and clearing `archived_at`. pub fn mark_unarchived(&self, id: &str) -> Result<()> { let conn = self.conn()?; conn.execute( "UPDATE threads SET archived = 0, archived_at = NULL WHERE id = ?1", params![id], ) .context("failed to unarchive thread")?; Ok(()) } /// Permanently delete a thread and all of its associated data /// (messages, checkpoints, dynamic tools) via cascading foreign keys. pub fn delete_thread(&self, id: &str) -> Result<()> { let conn = self.conn()?; conn.execute("DELETE FROM threads WHERE id = ?1", params![id]) .context("failed to delete thread")?; Ok(()) } /// Set the memory mode for a thread. /// /// Pass `None` to clear the memory mode. pub fn set_thread_memory_mode(&self, id: &str, mode: Option<&str>) -> Result<()> { let conn = self.conn()?; conn.execute( "UPDATE threads SET memory_mode = ?2 WHERE id = ?1", params![id, mode], ) .context("failed to update thread memory mode")?; Ok(()) } /// Get the memory mode configured for a thread. /// /// Returns `None` if the thread does not exist or has no memory mode set. pub fn get_thread_memory_mode(&self, id: &str) -> Result> { let conn = self.conn()?; conn.query_row( "SELECT memory_mode FROM threads WHERE id = ?1", params![id], |row| row.get::<_, Option>(0), ) .optional() .context("failed to read thread memory mode") .map(Option::flatten) } /// Insert or replace the persisted goal for a thread. pub fn upsert_thread_goal(&self, goal: &ThreadGoalRecord) -> Result<()> { let conn = self.conn()?; let exists: Option = conn .query_row( "SELECT 1 FROM threads WHERE id = ?1", params![goal.thread_id], |row| row.get(0), ) .optional() .context("failed to verify thread before saving goal")?; if exists.is_none() { anyhow::bail!("thread {} not found", goal.thread_id); } conn.execute( r#" INSERT INTO thread_goals ( thread_id, goal_id, objective, status, token_budget, tokens_used, time_used_seconds, created_at, updated_at ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9) ON CONFLICT(thread_id) DO UPDATE SET goal_id=excluded.goal_id, objective=excluded.objective, status=excluded.status, token_budget=excluded.token_budget, tokens_used=excluded.tokens_used, time_used_seconds=excluded.time_used_seconds, created_at=excluded.created_at, updated_at=excluded.updated_at "#, params![ goal.thread_id, goal.goal_id, goal.objective, thread_goal_status_to_str(&goal.status), goal.token_budget, goal.tokens_used, goal.time_used_seconds, goal.created_at, goal.updated_at, ], ) .context("failed to upsert thread goal")?; Ok(()) } /// Retrieve the persisted goal for a thread. pub fn get_thread_goal(&self, thread_id: &str) -> Result> { let conn = self.conn()?; conn.query_row( r#" SELECT thread_id, goal_id, objective, status, token_budget, tokens_used, time_used_seconds, created_at, updated_at FROM thread_goals WHERE thread_id = ?1 "#, params![thread_id], row_to_thread_goal, ) .optional() .context("failed to read thread goal") } /// Delete the persisted goal for a thread. pub fn delete_thread_goal(&self, thread_id: &str) -> Result { let conn = self.conn()?; let changed = conn .execute( "DELETE FROM thread_goals WHERE thread_id = ?1", params![thread_id], ) .context("failed to delete thread goal")?; Ok(changed > 0) } /// List all leaf messages in a thread. /// /// A leaf message is one that has no other message referencing it as a parent. /// In a branching conversation tree, there may be multiple leaf messages. pub fn list_leaf_messages(&self, thread_id: &str) -> Result> { let conn = self.conn()?; let mut stmt = conn .prepare( r#" SELECT m1.id, m1.thread_id, m1.role, m1.content, m1.item_json, m1.created_at, m1.parent_entry_id FROM messages m1 LEFT JOIN messages m2 ON m1.id = m2.parent_entry_id WHERE m1.thread_id = ?1 AND m2.id IS NULL "#, ) .context("failed to prepare message listing query")?; let mut rows = stmt .query(params![thread_id]) .with_context(|| format!("failed to list leaf messages for thread {thread_id}"))?; let mut out = Vec::new(); while let Some(row) = rows.next().context("failed to iterate message rows")? { let item_json: Option = row.get(4).context("failed to read item json")?; let item = item_json .as_deref() .map(serde_json::from_str) .transpose() .with_context(|| { format!("failed to parse message item json in thread {thread_id}") })?; out.push(MessageRecord { id: row.get(0).context("failed to read message id")?, thread_id: row.get(1).context("failed to read message thread id")?, role: row.get(2).context("failed to read message role")?, content: row.get(3).context("failed to read message content")?, item, created_at: row.get(5).context("failed to read message timestamp")?, parent_entry_id: row.get(6).context("failed to read parent entry id")?, }); } Ok(out) } /// Update the current leaf message pointer for a thread. /// /// This controls which branch of the conversation tree is considered active /// when listing messages via [`list_messages`](Self::list_messages). pub fn set_current_leaf_id(&self, thread_id: &str, current_leaf_id: &str) -> Result<()> { let conn = self.conn()?; conn.execute( "UPDATE threads SET current_leaf_id = ?1 WHERE id = ?2", params![current_leaf_id, thread_id], ) .context("failed to update thread current leaf id")?; Ok(()) } /// Replace the dynamic tools for a thread. /// /// All existing dynamic tools for the thread are deleted and replaced with the /// provided list. The operation is performed within a transaction. pub fn persist_dynamic_tools( &self, thread_id: &str, tools: &[DynamicToolRecord], ) -> Result<()> { let mut conn = self.conn()?; let tx = conn .transaction() .context("failed to begin dynamic tools transaction")?; tx.execute( "DELETE FROM thread_dynamic_tools WHERE thread_id = ?1", params![thread_id], ) .context("failed to clear dynamic tools")?; for tool in tools { tx.execute( "INSERT INTO thread_dynamic_tools(thread_id, position, name, description, input_schema) VALUES (?1, ?2, ?3, ?4, ?5)", params![ thread_id, tool.position, tool.name, tool.description, tool.input_schema.to_string() ], ) .with_context(|| format!("failed to persist dynamic tool {}", tool.name))?; } tx.commit().context("failed to commit dynamic tools")?; Ok(()) } /// Retrieve all dynamic tools registered for a thread, ordered by position. pub fn get_dynamic_tools(&self, thread_id: &str) -> Result> { let conn = self.conn()?; let mut stmt = conn .prepare( "SELECT position, name, description, input_schema FROM thread_dynamic_tools WHERE thread_id = ?1 ORDER BY position ASC", ) .context("failed to prepare get dynamic tools query")?; let mut rows = stmt .query(params![thread_id]) .context("failed to query dynamic tools")?; let mut out = Vec::new(); while let Some(row) = rows.next().context("failed to iterate dynamic tools")? { let input_schema_raw: String = row.get(3).context("failed to read tool input schema")?; let input_schema: Value = serde_json::from_str(&input_schema_raw).with_context(|| { format!("failed to parse input schema for dynamic tool in thread {thread_id}") })?; out.push(DynamicToolRecord { position: row.get(0).context("failed to read tool position")?, name: row.get(1).context("failed to read tool name")?, description: row.get(2).context("failed to read tool description")?, input_schema, }); } Ok(out) } /// Append a new message to a thread. /// /// The message is linked to the thread's current leaf as its parent, and the /// thread's `current_leaf_id` is updated to the new message. Returns the ID /// of the newly created message. pub fn append_message( &self, thread_id: &str, role: &str, content: &str, item: Option, ) -> Result { let mut conn = self.conn()?; let created_at = Utc::now().timestamp(); let item_json = item .as_ref() .map(serde_json::to_string) .transpose() .context("failed to serialize message item payload")?; let tx = conn .transaction() .context("failed to begin append message transaction")?; let current_leaf_id: Option = tx .query_row( "SELECT current_leaf_id FROM threads WHERE id = ?1", params![thread_id], |row| row.get(0), ) .with_context(|| { format!("failed to query thread current leaf id for thread {thread_id}") })?; let next_leaf_id: i64 = tx.query_row( r#" INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id) SELECT ?1, ?2, ?3, ?4, ?5, ?6 RETURNING id "#, params![thread_id, role, content, item_json, created_at, current_leaf_id], |row| row.get(0) ).with_context(|| format!("failed to append message for thread {thread_id}"))?; tx.execute( r#" UPDATE threads SET current_leaf_id = ?1 WHERE id = ?2; "#, params![next_leaf_id, thread_id], ) .with_context(|| { format!("failed to update thread current leaf id for thread {thread_id}") })?; tx.commit() .context("failed to commit append message transaction")?; Ok(next_leaf_id) } /// List messages in the current conversation branch, walking backwards from /// the thread's `current_leaf_id`. /// /// Messages are returned in chronological order (oldest first). The `limit` /// parameter caps how many ancestor messages are traversed; it defaults to 500. pub fn list_messages( &self, thread_id: &str, limit: Option, ) -> Result> { let conn = self.conn()?; let limit = i64::try_from(limit.unwrap_or(500)).unwrap_or(500); let mut stmt = conn .prepare( r#" WITH RECURSIVE leaf_id AS ( SELECT current_leaf_id FROM threads WHERE id = ?1 ), ancestors AS ( SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id, 0 AS depth FROM messages WHERE id = (SELECT current_leaf_id FROM leaf_id) UNION ALL SELECT m.id, m.thread_id, m.role, m.content, m.item_json, m.created_at, m.parent_entry_id, a.depth + 1 FROM messages m JOIN ancestors a ON m.id = a.parent_entry_id WHERE a.depth < ?2 ) SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id FROM ancestors ORDER BY depth DESC "# ) .context("failed to prepare message listing query")?; let mut rows = stmt .query(params![thread_id, limit - 1]) .with_context(|| format!("failed to list messages for thread {thread_id}"))?; let mut out = Vec::new(); while let Some(row) = rows.next().context("failed to iterate message rows")? { let item_json: Option = row.get(4).context("failed to read item json")?; let item = item_json .as_deref() .map(serde_json::from_str) .transpose() .with_context(|| { format!("failed to parse message item json in thread {thread_id}") })?; out.push(MessageRecord { id: row.get(0).context("failed to read message id")?, thread_id: row.get(1).context("failed to read message thread id")?, role: row.get(2).context("failed to read message role")?, content: row.get(3).context("failed to read message content")?, item, created_at: row.get(5).context("failed to read message timestamp")?, parent_entry_id: row.get(6).context("failed to read parent entry id")?, }); } Ok(out) } /// Fork the conversation at a specific message. /// /// Creates a new message whose parent is `message_id` and updates the thread's /// `current_leaf_id` to the new message. Returns the ID of the new message. /// This enables branching conversations from any point in the history. pub fn fork_at_message( &self, message_id: &str, role: &str, content: &str, item: Option, ) -> Result { let mut conn = self.conn()?; let created_at = Utc::now().timestamp(); let item_json = item .as_ref() .map(serde_json::to_string) .transpose() .context("failed to serialize message item payload")?; let tx = conn .transaction() .context("failed to begin fork message transaction")?; let thread_id: String = tx .query_row( "SELECT thread_id FROM messages WHERE id = ?1", params![message_id], |row| row.get(0), ) .with_context(|| format!("failed to query thread id for message {message_id}"))?; let next_leaf_id: i64 = tx.query_row( r#" INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id) SELECT ?1, ?2, ?3, ?4, ?5, ?6 RETURNING id "#, params![thread_id, role, content, item_json, created_at, message_id], |row| row.get(0) ).with_context(|| format!("failed to fork at message for thread {:?}", thread_id))?; tx.execute( r#" UPDATE threads SET current_leaf_id = ?1 WHERE id = ?2; "#, params![next_leaf_id, thread_id], ) .with_context(|| { format!( "failed to update thread current leaf id for thread {:?}", thread_id ) })?; tx.commit() .context("failed to commit fork message transaction")?; Ok(next_leaf_id) } /// Delete all messages belonging to a thread and reset its `current_leaf_id`. /// /// Returns the number of messages deleted. pub fn clear_messages(&self, thread_id: &str) -> Result { let mut conn = self.conn()?; let tx = conn .transaction() .context("failed to begin clear messages transaction")?; tx.execute( r#" UPDATE threads SET current_leaf_id = NULL WHERE id = ?1; "#, params![thread_id], ) .with_context(|| format!("failed to clear messages for thread {thread_id}"))?; let result = tx .execute( r#" DELETE FROM messages WHERE thread_id = ?1 "#, params![thread_id], ) .with_context(|| format!("failed to clear messages for thread {thread_id}"))?; tx.commit() .context("failed to commit clear messages transaction")?; Ok(result) } /// Save (or update) a named checkpoint for a thread. /// /// If a checkpoint with the same `thread_id` and `checkpoint_id` already exists, /// its state and timestamp are overwritten. pub fn save_checkpoint( &self, thread_id: &str, checkpoint_id: &str, state: &Value, ) -> Result<()> { let conn = self.conn()?; let state_json = serde_json::to_string(state).context("failed to encode checkpoint state")?; conn.execute( r#" INSERT INTO checkpoints(thread_id, checkpoint_id, state_json, created_at) VALUES (?1, ?2, ?3, ?4) ON CONFLICT(thread_id, checkpoint_id) DO UPDATE SET state_json = excluded.state_json, created_at = excluded.created_at "#, params![thread_id, checkpoint_id, state_json, Utc::now().timestamp()], ) .with_context(|| { format!("failed to save checkpoint {checkpoint_id} for thread {thread_id}") })?; Ok(()) } /// Load a checkpoint for a thread. /// /// If `checkpoint_id` is provided, loads that specific checkpoint. Otherwise, /// loads the most recently created checkpoint for the thread. Returns `None` /// if no matching checkpoint exists. pub fn load_checkpoint( &self, thread_id: &str, checkpoint_id: Option<&str>, ) -> Result> { let conn = self.conn()?; if let Some(checkpoint_id) = checkpoint_id { let row = conn .query_row( "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2", params![thread_id, checkpoint_id], |row| { let state_json: String = row.get(2)?; let state = serde_json::from_str(&state_json).unwrap_or(Value::Null); Ok(CheckpointRecord { thread_id: row.get(0)?, checkpoint_id: row.get(1)?, state, created_at: row.get(3)?, }) }, ) .optional() .with_context(|| { format!("failed to load checkpoint {checkpoint_id} for thread {thread_id}") })?; return Ok(row); } conn.query_row( "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT 1", params![thread_id], |row| { let state_json: String = row.get(2)?; let state = serde_json::from_str(&state_json).unwrap_or(Value::Null); Ok(CheckpointRecord { thread_id: row.get(0)?, checkpoint_id: row.get(1)?, state, created_at: row.get(3)?, }) }, ) .optional() .with_context(|| format!("failed to load latest checkpoint for thread {thread_id}")) } /// List checkpoints for a thread, ordered by creation time (newest first). /// /// The `limit` parameter caps the number of results and defaults to 100. pub fn list_checkpoints( &self, thread_id: &str, limit: Option, ) -> Result> { let conn = self.conn()?; let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100); let mut stmt = conn .prepare( "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT ?2", ) .context("failed to prepare checkpoint list query")?; let mut rows = stmt .query(params![thread_id, limit]) .with_context(|| format!("failed to list checkpoints for thread {thread_id}"))?; let mut out = Vec::new(); while let Some(row) = rows.next().context("failed to iterate checkpoint rows")? { let state_json: String = row.get(2).context("failed to read checkpoint state json")?; let state = serde_json::from_str(&state_json).unwrap_or(Value::Null); out.push(CheckpointRecord { thread_id: row.get(0).context("failed to read checkpoint thread id")?, checkpoint_id: row.get(1).context("failed to read checkpoint id")?, state, created_at: row.get(3).context("failed to read checkpoint timestamp")?, }); } Ok(out) } /// Delete a specific checkpoint from a thread. pub fn delete_checkpoint(&self, thread_id: &str, checkpoint_id: &str) -> Result<()> { let conn = self.conn()?; conn.execute( "DELETE FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2", params![thread_id, checkpoint_id], ) .with_context(|| { format!("failed to delete checkpoint {checkpoint_id} for thread {thread_id}") })?; Ok(()) } /// Insert or update a background job record. pub fn upsert_job(&self, job: &JobStateRecord) -> Result<()> { let conn = self.conn()?; conn.execute( r#" INSERT INTO jobs(id, name, status, progress, detail, created_at, updated_at) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) ON CONFLICT(id) DO UPDATE SET name = excluded.name, status = excluded.status, progress = excluded.progress, detail = excluded.detail, created_at = excluded.created_at, updated_at = excluded.updated_at "#, params![ job.id, job.name, job_state_status_to_str(&job.status), job.progress.map(i64::from), job.detail, job.created_at, job.updated_at ], ) .with_context(|| format!("failed to upsert job {}", job.id))?; Ok(()) } /// Retrieve a single job by its ID. /// /// Returns `None` if no job with the given ID exists. pub fn get_job(&self, id: &str) -> Result> { let conn = self.conn()?; conn.query_row( "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs WHERE id = ?1", params![id], |row| { let status_raw: String = row.get(2)?; let progress: Option = row.get(3)?; Ok(JobStateRecord { id: row.get(0)?, name: row.get(1)?, status: job_state_status_from_str(&status_raw), progress: progress.and_then(|v| u8::try_from(v).ok()), detail: row.get(4)?, created_at: row.get(5)?, updated_at: row.get(6)?, }) }, ) .optional() .with_context(|| format!("failed to read job {id}")) } /// List jobs ordered by most recently updated. /// /// The `limit` parameter caps the number of results and defaults to 100. pub fn list_jobs(&self, limit: Option) -> Result> { let conn = self.conn()?; let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100); let mut stmt = conn .prepare( "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs ORDER BY updated_at DESC LIMIT ?1", ) .context("failed to prepare job list query")?; let mut rows = stmt .query(params![limit]) .context("failed to query persisted jobs")?; let mut out = Vec::new(); while let Some(row) = rows.next().context("failed to iterate persisted jobs")? { let status_raw: String = row.get(2).context("failed to read job status")?; let progress: Option = row.get(3).context("failed to read job progress")?; out.push(JobStateRecord { id: row.get(0).context("failed to read job id")?, name: row.get(1).context("failed to read job name")?, status: job_state_status_from_str(&status_raw), progress: progress.and_then(|v| u8::try_from(v).ok()), detail: row.get(4).context("failed to read job detail")?, created_at: row.get(5).context("failed to read job created_at")?, updated_at: row.get(6).context("failed to read job updated_at")?, }); } Ok(out) } /// Permanently delete a job record. pub fn delete_job(&self, id: &str) -> Result<()> { let conn = self.conn()?; conn.execute("DELETE FROM jobs WHERE id = ?1", params![id]) .with_context(|| format!("failed to delete job {id}"))?; Ok(()) } /// Look up the rollout file path for a thread by its ID. pub fn find_rollout_path_by_id(&self, id: &str) -> Result> { let conn = self.conn()?; conn.query_row( "SELECT rollout_path FROM threads WHERE id = ?1", params![id], |row| row.get::<_, Option>(0), ) .optional() .context("failed to lookup rollout path") .map(|opt| opt.flatten().map(PathBuf::from)) } /// Append an entry to the JSONL session index file. /// /// The session index is an append-only log that maps thread IDs to their names, /// update timestamps, and rollout paths. It is used for fast name-based lookups /// without opening the SQLite database. pub fn append_thread_name( &self, thread_id: &str, thread_name: Option, updated_at: i64, rollout_path: Option, ) -> Result<()> { if let Some(parent) = self.session_index_path.parent() { fs::create_dir_all(parent).with_context(|| { format!( "failed to create session index directory {}", parent.display() ) })?; } let entry = SessionIndexEntry { thread_id: thread_id.to_string(), thread_name, updated_at, rollout_path, }; let encoded = serde_json::to_string(&entry).context("failed to serialize session index entry")?; let mut file = OpenOptions::new() .create(true) .append(true) .open(&self.session_index_path) .with_context(|| { format!( "failed to open session index {}", self.session_index_path.display() ) })?; writeln!(file, "{encoded}").context("failed to append session index entry")?; Ok(()) } /// Find the display name for a thread by its ID, using the session index. /// /// Returns `None` if the thread is not in the index or has no name. pub fn find_thread_name_by_id(&self, thread_id: &str) -> Result> { let map = self.session_index_map()?; Ok(map .get(thread_id) .and_then(|entry| entry.thread_name.clone())) } /// Look up display names for multiple thread IDs at once. /// /// Returns a map from thread ID to its name (which may be `None`). pub fn find_thread_names_by_ids( &self, ids: &[String], ) -> Result>> { let map = self.session_index_map()?; let mut out = HashMap::new(); for id in ids { let name = map.get(id).and_then(|entry| entry.thread_name.clone()); out.insert(id.clone(), name); } Ok(out) } /// Find the rollout path for a thread by its display name (case-insensitive). /// /// If multiple threads share the same name, the most recently updated one is returned. /// Returns `None` if no matching thread is found. pub fn find_thread_path_by_name_str(&self, name: &str) -> Result> { let map = self.session_index_map()?; let matched = map .values() .filter(|entry| { entry .thread_name .as_deref() .is_some_and(|n| n.eq_ignore_ascii_case(name)) }) .max_by_key(|entry| entry.updated_at); Ok(matched.and_then(|entry| entry.rollout_path.clone())) } fn session_index_map(&self) -> Result> { if !self.session_index_path.exists() { return Ok(HashMap::new()); } let file = OpenOptions::new() .read(true) .open(&self.session_index_path) .with_context(|| { format!( "failed to read session index {}", self.session_index_path.display() ) })?; let reader = BufReader::new(file); let mut latest = HashMap::::new(); for line in reader.lines() { let line = line.context("failed to read session index line")?; if line.trim().is_empty() { continue; } let parsed: SessionIndexEntry = serde_json::from_str(&line).context("failed to parse session index entry")?; latest.insert(parsed.thread_id.clone(), parsed); } Ok(latest) } } fn default_state_db_path() -> PathBuf { let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from(".")); // Prefer the CodeWhale directory, falling back to legacy DeepSeek path // so existing installs don't lose their session history. let primary = home.join(".codewhale").join("state.db"); if primary.exists() || !home.join(".deepseek").join("state.db").exists() { primary } else { home.join(".deepseek").join("state.db") } } fn bool_to_i64(value: bool) -> i64 { if value { 1 } else { 0 } } fn i64_to_bool(value: i64) -> bool { value != 0 } fn thread_status_to_str(status: &ThreadStatus) -> &'static str { match status { ThreadStatus::Running => "running", ThreadStatus::Idle => "idle", ThreadStatus::Completed => "completed", ThreadStatus::Failed => "failed", ThreadStatus::Paused => "paused", ThreadStatus::Archived => "archived", } } fn thread_status_from_str(value: &str) -> ThreadStatus { match value { "running" => ThreadStatus::Running, "idle" => ThreadStatus::Idle, "completed" => ThreadStatus::Completed, "failed" => ThreadStatus::Failed, "paused" => ThreadStatus::Paused, "archived" => ThreadStatus::Archived, _ => ThreadStatus::Idle, } } fn session_source_to_str(source: &SessionSource) -> &'static str { match source { SessionSource::Interactive => "interactive", SessionSource::Resume => "resume", SessionSource::Fork => "fork", SessionSource::Api => "api", SessionSource::Unknown => "unknown", } } fn session_source_from_str(value: &str) -> SessionSource { match value { "interactive" => SessionSource::Interactive, "resume" => SessionSource::Resume, "fork" => SessionSource::Fork, "api" => SessionSource::Api, _ => SessionSource::Unknown, } } fn path_to_opt_string(path: Option<&Path>) -> Option { path.map(|p| p.display().to_string()) } fn job_state_status_to_str(status: &JobStateStatus) -> &'static str { match status { JobStateStatus::Queued => "queued", JobStateStatus::Running => "running", JobStateStatus::Completed => "completed", JobStateStatus::Failed => "failed", JobStateStatus::Cancelled => "cancelled", } } fn job_state_status_from_str(value: &str) -> JobStateStatus { match value { "queued" => JobStateStatus::Queued, "running" => JobStateStatus::Running, "completed" => JobStateStatus::Completed, "failed" => JobStateStatus::Failed, "cancelled" => JobStateStatus::Cancelled, _ => JobStateStatus::Queued, } } fn thread_goal_status_to_str(status: &ThreadGoalStatus) -> &'static str { match status { ThreadGoalStatus::Active => "active", ThreadGoalStatus::Paused => "paused", ThreadGoalStatus::Blocked => "blocked", ThreadGoalStatus::UsageLimited => "usage_limited", ThreadGoalStatus::BudgetLimited => "budget_limited", ThreadGoalStatus::Complete => "complete", } } fn thread_goal_status_from_str(value: &str) -> ThreadGoalStatus { match value { "active" => ThreadGoalStatus::Active, "paused" => ThreadGoalStatus::Paused, "blocked" => ThreadGoalStatus::Blocked, "usage_limited" => ThreadGoalStatus::UsageLimited, "budget_limited" => ThreadGoalStatus::BudgetLimited, "complete" => ThreadGoalStatus::Complete, _ => ThreadGoalStatus::Active, } } fn row_to_thread(row: &rusqlite::Row<'_>) -> rusqlite::Result { let status_raw: String = row.get(7)?; let source_raw: String = row.get(11)?; let rollout_path: Option = row.get(1)?; let path: Option = row.get(8)?; Ok(ThreadMetadata { id: row.get(0)?, rollout_path: rollout_path.map(PathBuf::from), preview: row.get(2)?, ephemeral: i64_to_bool(row.get(3)?), model_provider: row.get(4)?, created_at: row.get(5)?, updated_at: row.get(6)?, status: thread_status_from_str(&status_raw), path: path.map(PathBuf::from), cwd: PathBuf::from(row.get::<_, String>(9)?), cli_version: row.get(10)?, source: session_source_from_str(&source_raw), name: row.get(12)?, sandbox_policy: row.get(13)?, approval_mode: row.get(14)?, archived: i64_to_bool(row.get(15)?), archived_at: row.get(16)?, git_sha: row.get(17)?, git_branch: row.get(18)?, git_origin_url: row.get(19)?, memory_mode: row.get(20)?, current_leaf_id: row.get(21)?, }) } fn row_to_thread_goal(row: &rusqlite::Row<'_>) -> rusqlite::Result { let status_raw: String = row.get(3)?; Ok(ThreadGoalRecord { thread_id: row.get(0)?, goal_id: row.get(1)?, objective: row.get(2)?, status: thread_goal_status_from_str(&status_raw), token_budget: row.get(4)?, tokens_used: row.get(5)?, time_used_seconds: row.get(6)?, created_at: row.get(7)?, updated_at: row.get(8)?, }) } #[cfg(test)] mod tests { use super::*; use std::time::{SystemTime, UNIX_EPOCH}; fn temp_state_store(name: &str) -> StateStore { let suffix = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("system time") .as_nanos(); let dir = std::env::temp_dir().join(format!( "codewhale-state-{name}-{}-{suffix}", std::process::id() )); fs::create_dir_all(&dir).expect("create temp state dir"); StateStore::open(Some(dir.join("state.db"))).expect("open state store") } fn test_thread(id: &str) -> ThreadMetadata { ThreadMetadata { id: id.to_string(), rollout_path: None, preview: "test thread".to_string(), ephemeral: false, model_provider: "deepseek".to_string(), created_at: 10, updated_at: 10, status: ThreadStatus::Running, path: None, cwd: PathBuf::from("/tmp/codewhale"), cli_version: "0.0.0-test".to_string(), source: SessionSource::Interactive, name: None, sandbox_policy: None, approval_mode: None, archived: false, archived_at: None, git_sha: None, git_branch: None, git_origin_url: None, memory_mode: None, current_leaf_id: None, } } fn test_goal(thread_id: &str, objective: &str) -> ThreadGoalRecord { ThreadGoalRecord { thread_id: thread_id.to_string(), goal_id: "goal-1".to_string(), objective: objective.to_string(), status: ThreadGoalStatus::Active, token_budget: Some(123), tokens_used: 7, time_used_seconds: 11, created_at: 100, updated_at: 101, } } #[test] fn thread_goal_crud_round_trips_and_replaces() { let store = temp_state_store("thread-goal-crud"); store .upsert_thread(&test_thread("thread-1")) .expect("upsert thread"); let goal = test_goal("thread-1", "Ship v0.8.59"); store.upsert_thread_goal(&goal).expect("upsert goal"); assert_eq!( store .get_thread_goal("thread-1") .expect("read goal") .as_ref(), Some(&goal) ); let mut replacement = test_goal("thread-1", "Ship v0.8.59 safely"); replacement.goal_id = "goal-2".to_string(); replacement.status = ThreadGoalStatus::BudgetLimited; replacement.token_budget = None; replacement.updated_at = 202; store .upsert_thread_goal(&replacement) .expect("replace goal"); assert_eq!( store.get_thread_goal("thread-1").expect("read replacement"), Some(replacement) ); assert!(store.delete_thread_goal("thread-1").expect("delete goal")); assert!( store .get_thread_goal("thread-1") .expect("read empty") .is_none() ); assert!(!store.delete_thread_goal("thread-1").expect("delete empty")); } #[test] fn thread_goal_requires_existing_thread() { let store = temp_state_store("thread-goal-missing-thread"); let err = store .upsert_thread_goal(&test_goal("missing-thread", "nope")) .expect_err("goal without a thread should fail"); assert!(err.to_string().contains("thread missing-thread not found")); } }