1760 lines
67 KiB
Rust
1760 lines
67 KiB
Rust
//! Persistent state management for conversation threads, messages, and jobs.
|
|
//!
|
|
//! The [`StateStore`] is the primary entry point, backed by a SQLite database and an
|
|
//! append-only JSONL session index file. It provides CRUD operations for:
|
|
//!
|
|
//! - **Threads** — conversation metadata, archival, and session indexing.
|
|
//! - **Messages** — append-only message storage with tree-structured branching.
|
|
//! - **Checkpoints** — named state snapshots for restoring conversation progress.
|
|
//! - **Jobs** — background task tracking with status and progress.
|
|
//! - **Dynamic tools** — per-thread tool registrations.
|
|
|
|
use std::collections::HashMap;
|
|
use std::fs::{self, OpenOptions};
|
|
use std::io::{BufRead, BufReader, Write};
|
|
use std::path::{Path, PathBuf};
|
|
|
|
use anyhow::{Context, Result};
|
|
use chrono::Utc;
|
|
use rusqlite::{Connection, OptionalExtension, params};
|
|
use serde::{Deserialize, Serialize};
|
|
use serde_json::Value;
|
|
|
|
/// Lifecycle status of a conversation thread.
|
|
///
|
|
/// Serialized as lowercase snake_case strings (e.g. `"running"`, `"archived"`).
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
|
#[serde(rename_all = "snake_case")]
|
|
pub enum ThreadStatus {
|
|
/// Thread is actively being worked on.
|
|
Running,
|
|
/// Thread exists but has no active work in progress.
|
|
Idle,
|
|
/// Thread has finished its task successfully.
|
|
Completed,
|
|
/// Thread encountered an unrecoverable error.
|
|
Failed,
|
|
/// Thread has been temporarily paused by the user.
|
|
Paused,
|
|
/// Thread has been archived and is hidden from default listings.
|
|
Archived,
|
|
}
|
|
|
|
/// Indicates how a session was initiated.
|
|
///
|
|
/// Serialized as lowercase snake_case strings.
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
|
#[serde(rename_all = "snake_case")]
|
|
pub enum SessionSource {
|
|
/// Started by a user interacting with the CLI.
|
|
Interactive,
|
|
/// Resumed from a previously persisted session.
|
|
Resume,
|
|
/// Created by forking an existing conversation at a specific message.
|
|
Fork,
|
|
/// Initiated programmatically via the API.
|
|
Api,
|
|
/// Source is unknown or unspecified.
|
|
Unknown,
|
|
}
|
|
|
|
/// Metadata for a persisted conversation thread.
|
|
///
|
|
/// Each thread represents a single conversation session and stores its
|
|
/// configuration, git context, and current status.
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct ThreadMetadata {
|
|
/// Unique identifier for this thread.
|
|
pub id: String,
|
|
/// Optional filesystem path to the rollout (JSONL transcript) file.
|
|
pub rollout_path: Option<PathBuf>,
|
|
/// Short preview or summary of the thread content.
|
|
pub preview: String,
|
|
/// Whether this thread is ephemeral (not persisted long-term).
|
|
pub ephemeral: bool,
|
|
/// Identifier of the model provider used for this thread (e.g. `"openai"`).
|
|
pub model_provider: String,
|
|
/// Unix timestamp (seconds) when the thread was created.
|
|
pub created_at: i64,
|
|
/// Unix timestamp (seconds) of the most recent update to the thread.
|
|
pub updated_at: i64,
|
|
/// Current lifecycle status of the thread.
|
|
pub status: ThreadStatus,
|
|
/// Optional filesystem path associated with the thread working context.
|
|
pub path: Option<PathBuf>,
|
|
/// Working directory that was active when the thread was created.
|
|
pub cwd: PathBuf,
|
|
/// Version of the CLI that created this thread.
|
|
pub cli_version: String,
|
|
/// How this session was initiated.
|
|
pub source: SessionSource,
|
|
/// User-assigned display name for the thread.
|
|
pub name: Option<String>,
|
|
/// Serialized sandbox policy applied to this thread, if any.
|
|
pub sandbox_policy: Option<String>,
|
|
/// Approval mode configured for tool calls in this thread.
|
|
pub approval_mode: Option<String>,
|
|
/// Whether the thread has been archived.
|
|
pub archived: bool,
|
|
/// Unix timestamp (seconds) when the thread was archived, or `None` if not archived.
|
|
pub archived_at: Option<i64>,
|
|
/// Git commit SHA of the working tree when the thread was created.
|
|
pub git_sha: Option<String>,
|
|
/// Git branch checked out when the thread was created.
|
|
pub git_branch: Option<String>,
|
|
/// URL of the git remote origin, if available.
|
|
pub git_origin_url: Option<String>,
|
|
/// Memory mode configured for this thread (e.g. `"local"`, `"remote"`).
|
|
pub memory_mode: Option<String>,
|
|
/// ID of the current leaf message in the conversation tree.
|
|
pub current_leaf_id: Option<i64>,
|
|
}
|
|
|
|
/// A dynamically registered tool associated with a thread.
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct DynamicToolRecord {
|
|
/// Ordinal position of this tool in the thread tool list.
|
|
pub position: i64,
|
|
/// Unique name identifying the tool.
|
|
pub name: String,
|
|
/// Human-readable description of what the tool does.
|
|
pub description: Option<String>,
|
|
/// JSON Schema describing the tool input parameters.
|
|
pub input_schema: Value,
|
|
}
|
|
|
|
/// A single message entry in a conversation thread.
|
|
///
|
|
/// Messages form a tree structure via [`parent_entry_id`](Self::parent_entry_id),
|
|
/// enabling conversation branching and forking.
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct MessageRecord {
|
|
/// Auto-incremented unique identifier for this message.
|
|
pub id: i64,
|
|
/// ID of the thread this message belongs to.
|
|
pub thread_id: String,
|
|
/// Role of the message sender (e.g. `"user"`, `"assistant"`, `"system"`).
|
|
pub role: String,
|
|
/// Text content of the message.
|
|
pub content: String,
|
|
/// Optional structured item payload (tool calls, tool results, etc.).
|
|
pub item: Option<Value>,
|
|
/// Unix timestamp (seconds) when the message was created.
|
|
pub created_at: i64,
|
|
/// ID of the parent message, forming a tree structure. `None` for root messages.
|
|
pub parent_entry_id: Option<i64>,
|
|
}
|
|
|
|
/// A named checkpoint capturing the state of a thread at a point in time.
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct CheckpointRecord {
|
|
/// ID of the thread this checkpoint belongs to.
|
|
pub thread_id: String,
|
|
/// Unique identifier for this checkpoint within its thread.
|
|
pub checkpoint_id: String,
|
|
/// Serialized state snapshot stored as a JSON value.
|
|
pub state: Value,
|
|
/// Unix timestamp (seconds) when the checkpoint was created or last updated.
|
|
pub created_at: i64,
|
|
}
|
|
|
|
/// Status of a background job.
|
|
///
|
|
/// Serialized as lowercase snake_case strings.
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
|
#[serde(rename_all = "snake_case")]
|
|
pub enum JobStateStatus {
|
|
/// Job is waiting to be executed.
|
|
Queued,
|
|
/// Job is currently executing.
|
|
Running,
|
|
/// Job has finished successfully.
|
|
Completed,
|
|
/// Job has failed with an error.
|
|
Failed,
|
|
/// Job was cancelled before completion.
|
|
Cancelled,
|
|
}
|
|
|
|
/// Persisted state of a background job.
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct JobStateRecord {
|
|
/// Unique identifier for the job.
|
|
pub id: String,
|
|
/// Human-readable name describing the job.
|
|
pub name: String,
|
|
/// Current lifecycle status of the job.
|
|
pub status: JobStateStatus,
|
|
/// Completion progress as a percentage (0--100), if available.
|
|
pub progress: Option<u8>,
|
|
/// Optional detail message providing additional status information.
|
|
pub detail: Option<String>,
|
|
/// Unix timestamp (seconds) when the job was created.
|
|
pub created_at: i64,
|
|
/// Unix timestamp (seconds) of the most recent status update.
|
|
pub updated_at: i64,
|
|
}
|
|
|
|
/// Persisted lifecycle status for a thread goal.
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
|
#[serde(rename_all = "snake_case")]
|
|
pub enum ThreadGoalStatus {
|
|
/// Goal is active and should continue receiving work.
|
|
Active,
|
|
/// Goal is paused by the user.
|
|
Paused,
|
|
/// Goal is blocked and cannot make meaningful progress.
|
|
Blocked,
|
|
/// Goal stopped because account/service usage limits were reached.
|
|
UsageLimited,
|
|
/// Goal stopped because its explicit token budget was reached.
|
|
BudgetLimited,
|
|
/// Goal has been completed.
|
|
Complete,
|
|
}
|
|
|
|
/// Persisted goal state attached to a thread.
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
|
pub struct ThreadGoalRecord {
|
|
/// Thread this goal belongs to.
|
|
pub thread_id: String,
|
|
/// Stable identifier for this goal revision.
|
|
pub goal_id: String,
|
|
/// User-visible objective.
|
|
pub objective: String,
|
|
/// Current lifecycle status.
|
|
pub status: ThreadGoalStatus,
|
|
/// Optional token budget requested by the user.
|
|
pub token_budget: Option<i64>,
|
|
/// Tokens consumed while pursuing the goal.
|
|
pub tokens_used: i64,
|
|
/// Elapsed wall-clock work time in seconds.
|
|
pub time_used_seconds: i64,
|
|
/// Unix timestamp (seconds) when the goal was created.
|
|
pub created_at: i64,
|
|
/// Unix timestamp (seconds) when the goal was last updated.
|
|
pub updated_at: i64,
|
|
}
|
|
|
|
/// Filters for listing conversation threads.
|
|
#[derive(Debug, Clone)]
|
|
pub struct ThreadListFilters {
|
|
/// Whether to include archived threads in the results.
|
|
pub include_archived: bool,
|
|
/// Maximum number of threads to return. Defaults to 50.
|
|
pub limit: Option<usize>,
|
|
}
|
|
|
|
impl Default for ThreadListFilters {
|
|
fn default() -> Self {
|
|
Self {
|
|
include_archived: false,
|
|
limit: Some(50),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
struct SessionIndexEntry {
|
|
thread_id: String,
|
|
thread_name: Option<String>,
|
|
updated_at: i64,
|
|
rollout_path: Option<PathBuf>,
|
|
}
|
|
|
|
/// Persistent storage for conversation threads, messages, checkpoints, and jobs.
|
|
///
|
|
/// Backed by a SQLite database and an append-only JSONL session index file.
|
|
/// The database schema is automatically initialized and migrated on [`open`](Self::open).
|
|
#[derive(Debug, Clone)]
|
|
pub struct StateStore {
|
|
db_path: PathBuf,
|
|
session_index_path: PathBuf,
|
|
}
|
|
|
|
impl StateStore {
|
|
/// Open (or create) a state store at the given database path.
|
|
///
|
|
/// If `path` is `None`, the default location (`~/.codewhale/state.db`, with
|
|
/// `~/.deepseek/state.db` as a legacy fallback) is used.
|
|
/// The database schema is created automatically if it does not exist.
|
|
pub fn open(path: Option<PathBuf>) -> Result<Self> {
|
|
let db_path = path.unwrap_or_else(default_state_db_path);
|
|
let session_index_path = db_path
|
|
.parent()
|
|
.unwrap_or_else(|| Path::new("."))
|
|
.join("session_index.jsonl");
|
|
if let Some(parent) = db_path.parent() {
|
|
fs::create_dir_all(parent).with_context(|| {
|
|
format!("failed to create state directory {}", parent.display())
|
|
})?;
|
|
}
|
|
let store = Self {
|
|
db_path,
|
|
session_index_path,
|
|
};
|
|
store.init_schema()?;
|
|
Ok(store)
|
|
}
|
|
|
|
/// Returns the filesystem path of the underlying SQLite database.
|
|
pub fn db_path(&self) -> &Path {
|
|
&self.db_path
|
|
}
|
|
|
|
fn conn(&self) -> Result<Connection> {
|
|
Connection::open(&self.db_path)
|
|
.with_context(|| format!("failed to open state db {}", self.db_path.display()))
|
|
}
|
|
|
|
fn init_schema(&self) -> Result<()> {
|
|
let conn = self.conn()?;
|
|
let mut user_version: u32 = conn.query_row("PRAGMA user_version;", [], |row| row.get(0))?;
|
|
if user_version == 0 {
|
|
conn.execute_batch(
|
|
r#"
|
|
BEGIN;
|
|
CREATE TABLE IF NOT EXISTS threads (
|
|
id TEXT PRIMARY KEY,
|
|
rollout_path TEXT,
|
|
preview TEXT NOT NULL,
|
|
ephemeral INTEGER NOT NULL,
|
|
model_provider TEXT NOT NULL,
|
|
created_at INTEGER NOT NULL,
|
|
updated_at INTEGER NOT NULL,
|
|
status TEXT NOT NULL,
|
|
path TEXT,
|
|
cwd TEXT NOT NULL,
|
|
cli_version TEXT NOT NULL,
|
|
source TEXT NOT NULL,
|
|
title TEXT,
|
|
sandbox_policy TEXT,
|
|
approval_mode TEXT,
|
|
archived INTEGER NOT NULL DEFAULT 0,
|
|
archived_at INTEGER,
|
|
git_sha TEXT,
|
|
git_branch TEXT,
|
|
git_origin_url TEXT,
|
|
memory_mode TEXT
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_threads_updated_at ON threads(updated_at DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_threads_archived_at ON threads(archived_at DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_threads_archived_updated ON threads(archived, updated_at DESC);
|
|
|
|
CREATE TABLE IF NOT EXISTS thread_dynamic_tools (
|
|
thread_id TEXT NOT NULL,
|
|
position INTEGER NOT NULL,
|
|
name TEXT NOT NULL,
|
|
description TEXT,
|
|
input_schema TEXT NOT NULL,
|
|
PRIMARY KEY (thread_id, position),
|
|
FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS messages (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
thread_id TEXT NOT NULL,
|
|
role TEXT NOT NULL,
|
|
content TEXT NOT NULL,
|
|
item_json TEXT,
|
|
created_at INTEGER NOT NULL,
|
|
FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_messages_thread_created_at ON messages(thread_id, created_at ASC);
|
|
|
|
CREATE TABLE IF NOT EXISTS checkpoints (
|
|
thread_id TEXT NOT NULL,
|
|
checkpoint_id TEXT NOT NULL,
|
|
state_json TEXT NOT NULL,
|
|
created_at INTEGER NOT NULL,
|
|
PRIMARY KEY(thread_id, checkpoint_id),
|
|
FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_checkpoints_thread_created_at ON checkpoints(thread_id, created_at DESC);
|
|
|
|
CREATE TABLE IF NOT EXISTS jobs (
|
|
id TEXT PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
status TEXT NOT NULL,
|
|
progress INTEGER,
|
|
detail TEXT,
|
|
created_at INTEGER NOT NULL,
|
|
updated_at INTEGER NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_jobs_updated_at ON jobs(updated_at DESC);
|
|
|
|
-- Add parent_entry_id column, and set to last message before current message
|
|
ALTER TABLE messages ADD COLUMN parent_entry_id INTEGER NULL;
|
|
UPDATE messages
|
|
SET parent_entry_id = (
|
|
SELECT m2.id
|
|
FROM messages m2
|
|
WHERE m2.thread_id = messages.thread_id
|
|
AND (
|
|
m2.created_at < messages.created_at
|
|
OR (
|
|
m2.created_at = messages.created_at
|
|
AND m2.id < messages.id
|
|
)
|
|
)
|
|
ORDER BY m2.created_at DESC, m2.id DESC
|
|
LIMIT 1
|
|
);
|
|
CREATE INDEX idx_messages_parent_entry_id ON messages(parent_entry_id);
|
|
|
|
-- Add current_leaf_id column, and set to last message in thread
|
|
ALTER TABLE threads ADD COLUMN current_leaf_id INTEGER NULL;
|
|
UPDATE threads
|
|
SET current_leaf_id = (
|
|
SELECT m.id
|
|
FROM messages m
|
|
WHERE m.thread_id = threads.id
|
|
ORDER BY m.id DESC
|
|
LIMIT 1
|
|
);
|
|
|
|
PRAGMA user_version = 1;
|
|
COMMIT;
|
|
"#,
|
|
)
|
|
.context("failed to initialize thread schema")?;
|
|
user_version = 1;
|
|
}
|
|
if user_version < 2 {
|
|
conn.execute_batch(
|
|
r#"
|
|
BEGIN;
|
|
CREATE TABLE IF NOT EXISTS workflow_runs (
|
|
id TEXT PRIMARY KEY,
|
|
workflow_id TEXT NOT NULL,
|
|
goal TEXT NOT NULL,
|
|
status TEXT NOT NULL,
|
|
input_hash TEXT,
|
|
started_at INTEGER NOT NULL,
|
|
completed_at INTEGER,
|
|
metadata_json TEXT NOT NULL DEFAULT '{}'
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_workflow_runs_status_started_at
|
|
ON workflow_runs(status, started_at DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow_started_at
|
|
ON workflow_runs(workflow_id, started_at DESC);
|
|
|
|
CREATE TABLE IF NOT EXISTS branch_runs (
|
|
id TEXT PRIMARY KEY,
|
|
workflow_run_id TEXT NOT NULL,
|
|
branch_id TEXT NOT NULL,
|
|
node_id TEXT NOT NULL,
|
|
status TEXT NOT NULL,
|
|
started_at INTEGER NOT NULL,
|
|
completed_at INTEGER,
|
|
result_json TEXT NOT NULL DEFAULT '{}',
|
|
FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_branch_runs_workflow_run_id
|
|
ON branch_runs(workflow_run_id);
|
|
CREATE INDEX IF NOT EXISTS idx_branch_runs_branch_id
|
|
ON branch_runs(branch_id);
|
|
|
|
CREATE TABLE IF NOT EXISTS leaf_runs (
|
|
id TEXT PRIMARY KEY,
|
|
workflow_run_id TEXT NOT NULL,
|
|
branch_run_id TEXT,
|
|
leaf_id TEXT NOT NULL,
|
|
task_id TEXT NOT NULL,
|
|
input_hash TEXT,
|
|
status TEXT NOT NULL,
|
|
output_json TEXT NOT NULL DEFAULT '{}',
|
|
artifacts_json TEXT NOT NULL DEFAULT '[]',
|
|
started_at INTEGER NOT NULL,
|
|
completed_at INTEGER,
|
|
FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE,
|
|
FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_leaf_runs_workflow_run_id
|
|
ON leaf_runs(workflow_run_id);
|
|
CREATE INDEX IF NOT EXISTS idx_leaf_runs_replay_lookup
|
|
ON leaf_runs(workflow_run_id, leaf_id, input_hash);
|
|
|
|
CREATE TABLE IF NOT EXISTS control_node_runs (
|
|
id TEXT PRIMARY KEY,
|
|
workflow_run_id TEXT NOT NULL,
|
|
node_id TEXT NOT NULL,
|
|
kind TEXT NOT NULL,
|
|
status TEXT NOT NULL,
|
|
selected_children_json TEXT NOT NULL DEFAULT '[]',
|
|
result_json TEXT NOT NULL DEFAULT '{}',
|
|
started_at INTEGER NOT NULL,
|
|
completed_at INTEGER,
|
|
FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_control_node_runs_workflow_run_id
|
|
ON control_node_runs(workflow_run_id);
|
|
CREATE INDEX IF NOT EXISTS idx_control_node_runs_node_id
|
|
ON control_node_runs(node_id);
|
|
|
|
CREATE TABLE IF NOT EXISTS teacher_candidates (
|
|
id TEXT PRIMARY KEY,
|
|
workflow_run_id TEXT NOT NULL,
|
|
control_node_run_id TEXT NOT NULL,
|
|
candidate_id TEXT NOT NULL,
|
|
branch_run_id TEXT,
|
|
score REAL,
|
|
passed INTEGER,
|
|
rationale_json TEXT NOT NULL DEFAULT '{}',
|
|
created_at INTEGER NOT NULL,
|
|
FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE,
|
|
FOREIGN KEY(control_node_run_id) REFERENCES control_node_runs(id) ON DELETE CASCADE,
|
|
FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_teacher_candidates_workflow_run_id
|
|
ON teacher_candidates(workflow_run_id);
|
|
CREATE INDEX IF NOT EXISTS idx_teacher_candidates_control_node_run_id
|
|
ON teacher_candidates(control_node_run_id);
|
|
|
|
PRAGMA user_version = 2;
|
|
COMMIT;
|
|
"#,
|
|
)
|
|
.context("failed to initialize workflow trace schema")?;
|
|
user_version = 2;
|
|
}
|
|
if user_version < 3 {
|
|
conn.execute_batch(
|
|
r#"
|
|
BEGIN;
|
|
CREATE TABLE IF NOT EXISTS thread_goals (
|
|
thread_id TEXT PRIMARY KEY NOT NULL,
|
|
goal_id TEXT NOT NULL,
|
|
objective TEXT NOT NULL,
|
|
status TEXT NOT NULL CHECK(status IN (
|
|
'active',
|
|
'paused',
|
|
'blocked',
|
|
'usage_limited',
|
|
'budget_limited',
|
|
'complete'
|
|
)),
|
|
token_budget INTEGER,
|
|
tokens_used INTEGER NOT NULL DEFAULT 0,
|
|
time_used_seconds INTEGER NOT NULL DEFAULT 0,
|
|
created_at INTEGER NOT NULL,
|
|
updated_at INTEGER NOT NULL,
|
|
FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
|
|
);
|
|
|
|
PRAGMA user_version = 3;
|
|
COMMIT;
|
|
"#,
|
|
)
|
|
.context("failed to initialize thread goal schema")?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Insert or update thread metadata.
|
|
///
|
|
/// This does **not** update `current_leaf_id`; use [`append_message`](Self::append_message)
|
|
/// or [`set_current_leaf_id`](Self::set_current_leaf_id) for that.
|
|
pub fn upsert_thread(&self, thread: &ThreadMetadata) -> Result<()> {
|
|
let conn = self.conn()?;
|
|
conn.execute(
|
|
r#"
|
|
INSERT INTO threads (
|
|
id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
|
|
cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
|
|
git_sha, git_branch, git_origin_url, memory_mode
|
|
) VALUES (
|
|
?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10,
|
|
?11, ?12, ?13, ?14, ?15, ?16, ?17,
|
|
?18, ?19, ?20, ?21
|
|
)
|
|
ON CONFLICT(id) DO UPDATE SET
|
|
rollout_path=excluded.rollout_path,
|
|
preview=excluded.preview,
|
|
ephemeral=excluded.ephemeral,
|
|
model_provider=excluded.model_provider,
|
|
created_at=excluded.created_at,
|
|
updated_at=excluded.updated_at,
|
|
status=excluded.status,
|
|
path=excluded.path,
|
|
cwd=excluded.cwd,
|
|
cli_version=excluded.cli_version,
|
|
source=excluded.source,
|
|
title=excluded.title,
|
|
sandbox_policy=excluded.sandbox_policy,
|
|
approval_mode=excluded.approval_mode,
|
|
archived=excluded.archived,
|
|
archived_at=excluded.archived_at,
|
|
git_sha=excluded.git_sha,
|
|
git_branch=excluded.git_branch,
|
|
git_origin_url=excluded.git_origin_url,
|
|
memory_mode=excluded.memory_mode
|
|
"#,
|
|
params![
|
|
thread.id,
|
|
path_to_opt_string(thread.rollout_path.as_deref()),
|
|
thread.preview,
|
|
bool_to_i64(thread.ephemeral),
|
|
thread.model_provider,
|
|
thread.created_at,
|
|
thread.updated_at,
|
|
thread_status_to_str(&thread.status),
|
|
path_to_opt_string(thread.path.as_deref()),
|
|
thread.cwd.display().to_string(),
|
|
thread.cli_version,
|
|
session_source_to_str(&thread.source),
|
|
thread.name,
|
|
thread.sandbox_policy,
|
|
thread.approval_mode,
|
|
bool_to_i64(thread.archived),
|
|
thread.archived_at,
|
|
thread.git_sha,
|
|
thread.git_branch,
|
|
thread.git_origin_url,
|
|
thread.memory_mode,
|
|
],
|
|
)
|
|
.context("failed to upsert thread metadata")?;
|
|
|
|
self.append_thread_name(
|
|
&thread.id,
|
|
thread.name.clone(),
|
|
thread.updated_at,
|
|
thread.rollout_path.clone(),
|
|
)?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Retrieve a single thread by its ID.
|
|
///
|
|
/// Returns `None` if no thread with the given ID exists.
|
|
pub fn get_thread(&self, id: &str) -> Result<Option<ThreadMetadata>> {
|
|
let conn = self.conn()?;
|
|
conn.query_row(
|
|
r#"
|
|
SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
|
|
cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
|
|
git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id
|
|
FROM threads
|
|
WHERE id = ?1
|
|
"#,
|
|
params![id],
|
|
row_to_thread,
|
|
)
|
|
.optional()
|
|
.context("failed to read thread")
|
|
}
|
|
|
|
/// List threads ordered by most recently updated.
|
|
///
|
|
/// Use [`ThreadListFilters`] to control whether archived threads are included
|
|
/// and the maximum number of results returned.
|
|
pub fn list_threads(&self, filters: ThreadListFilters) -> Result<Vec<ThreadMetadata>> {
|
|
let conn = self.conn()?;
|
|
let sql = if filters.include_archived {
|
|
"SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads ORDER BY updated_at DESC LIMIT ?1"
|
|
} else {
|
|
"SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads WHERE archived = 0 ORDER BY updated_at DESC LIMIT ?1"
|
|
};
|
|
|
|
let mut stmt = conn.prepare(sql).context("failed to prepare list query")?;
|
|
let limit = i64::try_from(filters.limit.unwrap_or(50)).unwrap_or(50);
|
|
let mut rows = stmt
|
|
.query(params![limit])
|
|
.context("failed to query threads")?;
|
|
let mut out = Vec::new();
|
|
while let Some(row) = rows.next().context("failed to iterate thread rows")? {
|
|
out.push(row_to_thread(row)?);
|
|
}
|
|
Ok(out)
|
|
}
|
|
|
|
/// Archive a thread, setting its status to [`ThreadStatus::Archived`] and
|
|
/// recording the current timestamp.
|
|
pub fn mark_archived(&self, id: &str) -> Result<()> {
|
|
let conn = self.conn()?;
|
|
conn.execute(
|
|
"UPDATE threads SET archived = 1, archived_at = ?2, status = ?3 WHERE id = ?1",
|
|
params![
|
|
id,
|
|
Utc::now().timestamp(),
|
|
thread_status_to_str(&ThreadStatus::Archived)
|
|
],
|
|
)
|
|
.context("failed to archive thread")?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Unarchive a thread, removing the archived flag and clearing `archived_at`.
|
|
pub fn mark_unarchived(&self, id: &str) -> Result<()> {
|
|
let conn = self.conn()?;
|
|
conn.execute(
|
|
"UPDATE threads SET archived = 0, archived_at = NULL WHERE id = ?1",
|
|
params![id],
|
|
)
|
|
.context("failed to unarchive thread")?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Permanently delete a thread and all of its associated data
|
|
/// (messages, checkpoints, dynamic tools) via cascading foreign keys.
|
|
pub fn delete_thread(&self, id: &str) -> Result<()> {
|
|
let conn = self.conn()?;
|
|
conn.execute("DELETE FROM threads WHERE id = ?1", params![id])
|
|
.context("failed to delete thread")?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Set the memory mode for a thread.
|
|
///
|
|
/// Pass `None` to clear the memory mode.
|
|
pub fn set_thread_memory_mode(&self, id: &str, mode: Option<&str>) -> Result<()> {
|
|
let conn = self.conn()?;
|
|
conn.execute(
|
|
"UPDATE threads SET memory_mode = ?2 WHERE id = ?1",
|
|
params![id, mode],
|
|
)
|
|
.context("failed to update thread memory mode")?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Get the memory mode configured for a thread.
|
|
///
|
|
/// Returns `None` if the thread does not exist or has no memory mode set.
|
|
pub fn get_thread_memory_mode(&self, id: &str) -> Result<Option<String>> {
|
|
let conn = self.conn()?;
|
|
conn.query_row(
|
|
"SELECT memory_mode FROM threads WHERE id = ?1",
|
|
params![id],
|
|
|row| row.get::<_, Option<String>>(0),
|
|
)
|
|
.optional()
|
|
.context("failed to read thread memory mode")
|
|
.map(Option::flatten)
|
|
}
|
|
|
|
/// Insert or replace the persisted goal for a thread.
|
|
pub fn upsert_thread_goal(&self, goal: &ThreadGoalRecord) -> Result<()> {
|
|
let conn = self.conn()?;
|
|
let exists: Option<i64> = conn
|
|
.query_row(
|
|
"SELECT 1 FROM threads WHERE id = ?1",
|
|
params![goal.thread_id],
|
|
|row| row.get(0),
|
|
)
|
|
.optional()
|
|
.context("failed to verify thread before saving goal")?;
|
|
if exists.is_none() {
|
|
anyhow::bail!("thread {} not found", goal.thread_id);
|
|
}
|
|
|
|
conn.execute(
|
|
r#"
|
|
INSERT INTO thread_goals (
|
|
thread_id, goal_id, objective, status, token_budget, tokens_used,
|
|
time_used_seconds, created_at, updated_at
|
|
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
|
|
ON CONFLICT(thread_id) DO UPDATE SET
|
|
goal_id=excluded.goal_id,
|
|
objective=excluded.objective,
|
|
status=excluded.status,
|
|
token_budget=excluded.token_budget,
|
|
tokens_used=excluded.tokens_used,
|
|
time_used_seconds=excluded.time_used_seconds,
|
|
created_at=excluded.created_at,
|
|
updated_at=excluded.updated_at
|
|
"#,
|
|
params![
|
|
goal.thread_id,
|
|
goal.goal_id,
|
|
goal.objective,
|
|
thread_goal_status_to_str(&goal.status),
|
|
goal.token_budget,
|
|
goal.tokens_used,
|
|
goal.time_used_seconds,
|
|
goal.created_at,
|
|
goal.updated_at,
|
|
],
|
|
)
|
|
.context("failed to upsert thread goal")?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Retrieve the persisted goal for a thread.
|
|
pub fn get_thread_goal(&self, thread_id: &str) -> Result<Option<ThreadGoalRecord>> {
|
|
let conn = self.conn()?;
|
|
conn.query_row(
|
|
r#"
|
|
SELECT thread_id, goal_id, objective, status, token_budget, tokens_used,
|
|
time_used_seconds, created_at, updated_at
|
|
FROM thread_goals
|
|
WHERE thread_id = ?1
|
|
"#,
|
|
params![thread_id],
|
|
row_to_thread_goal,
|
|
)
|
|
.optional()
|
|
.context("failed to read thread goal")
|
|
}
|
|
|
|
/// Delete the persisted goal for a thread.
|
|
pub fn delete_thread_goal(&self, thread_id: &str) -> Result<bool> {
|
|
let conn = self.conn()?;
|
|
let changed = conn
|
|
.execute(
|
|
"DELETE FROM thread_goals WHERE thread_id = ?1",
|
|
params![thread_id],
|
|
)
|
|
.context("failed to delete thread goal")?;
|
|
Ok(changed > 0)
|
|
}
|
|
|
|
/// List all leaf messages in a thread.
|
|
///
|
|
/// A leaf message is one that has no other message referencing it as a parent.
|
|
/// In a branching conversation tree, there may be multiple leaf messages.
|
|
pub fn list_leaf_messages(&self, thread_id: &str) -> Result<Vec<MessageRecord>> {
|
|
let conn = self.conn()?;
|
|
let mut stmt = conn
|
|
.prepare(
|
|
r#"
|
|
SELECT m1.id, m1.thread_id, m1.role, m1.content, m1.item_json, m1.created_at, m1.parent_entry_id
|
|
FROM messages m1
|
|
LEFT JOIN messages m2 ON m1.id = m2.parent_entry_id
|
|
WHERE m1.thread_id = ?1 AND m2.id IS NULL
|
|
"#,
|
|
)
|
|
.context("failed to prepare message listing query")?;
|
|
let mut rows = stmt
|
|
.query(params![thread_id])
|
|
.with_context(|| format!("failed to list leaf messages for thread {thread_id}"))?;
|
|
let mut out = Vec::new();
|
|
while let Some(row) = rows.next().context("failed to iterate message rows")? {
|
|
let item_json: Option<String> = row.get(4).context("failed to read item json")?;
|
|
let item = item_json
|
|
.as_deref()
|
|
.map(serde_json::from_str)
|
|
.transpose()
|
|
.with_context(|| {
|
|
format!("failed to parse message item json in thread {thread_id}")
|
|
})?;
|
|
out.push(MessageRecord {
|
|
id: row.get(0).context("failed to read message id")?,
|
|
thread_id: row.get(1).context("failed to read message thread id")?,
|
|
role: row.get(2).context("failed to read message role")?,
|
|
content: row.get(3).context("failed to read message content")?,
|
|
item,
|
|
created_at: row.get(5).context("failed to read message timestamp")?,
|
|
parent_entry_id: row.get(6).context("failed to read parent entry id")?,
|
|
});
|
|
}
|
|
Ok(out)
|
|
}
|
|
|
|
/// Update the current leaf message pointer for a thread.
|
|
///
|
|
/// This controls which branch of the conversation tree is considered active
|
|
/// when listing messages via [`list_messages`](Self::list_messages).
|
|
pub fn set_current_leaf_id(&self, thread_id: &str, current_leaf_id: &str) -> Result<()> {
|
|
let conn = self.conn()?;
|
|
conn.execute(
|
|
"UPDATE threads SET current_leaf_id = ?1 WHERE id = ?2",
|
|
params![current_leaf_id, thread_id],
|
|
)
|
|
.context("failed to update thread current leaf id")?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Replace the dynamic tools for a thread.
|
|
///
|
|
/// All existing dynamic tools for the thread are deleted and replaced with the
|
|
/// provided list. The operation is performed within a transaction.
|
|
pub fn persist_dynamic_tools(
|
|
&self,
|
|
thread_id: &str,
|
|
tools: &[DynamicToolRecord],
|
|
) -> Result<()> {
|
|
let mut conn = self.conn()?;
|
|
let tx = conn
|
|
.transaction()
|
|
.context("failed to begin dynamic tools transaction")?;
|
|
tx.execute(
|
|
"DELETE FROM thread_dynamic_tools WHERE thread_id = ?1",
|
|
params![thread_id],
|
|
)
|
|
.context("failed to clear dynamic tools")?;
|
|
for tool in tools {
|
|
tx.execute(
|
|
"INSERT INTO thread_dynamic_tools(thread_id, position, name, description, input_schema) VALUES (?1, ?2, ?3, ?4, ?5)",
|
|
params![
|
|
thread_id,
|
|
tool.position,
|
|
tool.name,
|
|
tool.description,
|
|
tool.input_schema.to_string()
|
|
],
|
|
)
|
|
.with_context(|| format!("failed to persist dynamic tool {}", tool.name))?;
|
|
}
|
|
tx.commit().context("failed to commit dynamic tools")?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Retrieve all dynamic tools registered for a thread, ordered by position.
|
|
pub fn get_dynamic_tools(&self, thread_id: &str) -> Result<Vec<DynamicToolRecord>> {
|
|
let conn = self.conn()?;
|
|
let mut stmt = conn
|
|
.prepare(
|
|
"SELECT position, name, description, input_schema FROM thread_dynamic_tools WHERE thread_id = ?1 ORDER BY position ASC",
|
|
)
|
|
.context("failed to prepare get dynamic tools query")?;
|
|
let mut rows = stmt
|
|
.query(params![thread_id])
|
|
.context("failed to query dynamic tools")?;
|
|
let mut out = Vec::new();
|
|
while let Some(row) = rows.next().context("failed to iterate dynamic tools")? {
|
|
let input_schema_raw: String =
|
|
row.get(3).context("failed to read tool input schema")?;
|
|
let input_schema: Value =
|
|
serde_json::from_str(&input_schema_raw).with_context(|| {
|
|
format!("failed to parse input schema for dynamic tool in thread {thread_id}")
|
|
})?;
|
|
out.push(DynamicToolRecord {
|
|
position: row.get(0).context("failed to read tool position")?,
|
|
name: row.get(1).context("failed to read tool name")?,
|
|
description: row.get(2).context("failed to read tool description")?,
|
|
input_schema,
|
|
});
|
|
}
|
|
Ok(out)
|
|
}
|
|
|
|
/// Append a new message to a thread.
|
|
///
|
|
/// The message is linked to the thread's current leaf as its parent, and the
|
|
/// thread's `current_leaf_id` is updated to the new message. Returns the ID
|
|
/// of the newly created message.
|
|
pub fn append_message(
|
|
&self,
|
|
thread_id: &str,
|
|
role: &str,
|
|
content: &str,
|
|
item: Option<Value>,
|
|
) -> Result<i64> {
|
|
let mut conn = self.conn()?;
|
|
let created_at = Utc::now().timestamp();
|
|
let item_json = item
|
|
.as_ref()
|
|
.map(serde_json::to_string)
|
|
.transpose()
|
|
.context("failed to serialize message item payload")?;
|
|
|
|
let tx = conn
|
|
.transaction()
|
|
.context("failed to begin append message transaction")?;
|
|
|
|
let current_leaf_id: Option<i64> = tx
|
|
.query_row(
|
|
"SELECT current_leaf_id FROM threads WHERE id = ?1",
|
|
params![thread_id],
|
|
|row| row.get(0),
|
|
)
|
|
.with_context(|| {
|
|
format!("failed to query thread current leaf id for thread {thread_id}")
|
|
})?;
|
|
|
|
let next_leaf_id: i64 = tx.query_row(
|
|
r#"
|
|
INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id)
|
|
SELECT ?1, ?2, ?3, ?4, ?5, ?6
|
|
RETURNING id
|
|
"#, params![thread_id, role, content, item_json, created_at, current_leaf_id], |row| row.get(0)
|
|
).with_context(|| format!("failed to append message for thread {thread_id}"))?;
|
|
|
|
tx.execute(
|
|
r#"
|
|
UPDATE threads
|
|
SET current_leaf_id = ?1
|
|
WHERE id = ?2;
|
|
"#,
|
|
params![next_leaf_id, thread_id],
|
|
)
|
|
.with_context(|| {
|
|
format!("failed to update thread current leaf id for thread {thread_id}")
|
|
})?;
|
|
|
|
tx.commit()
|
|
.context("failed to commit append message transaction")?;
|
|
|
|
Ok(next_leaf_id)
|
|
}
|
|
|
|
/// List messages in the current conversation branch, walking backwards from
|
|
/// the thread's `current_leaf_id`.
|
|
///
|
|
/// Messages are returned in chronological order (oldest first). The `limit`
|
|
/// parameter caps how many ancestor messages are traversed; it defaults to 500.
|
|
pub fn list_messages(
|
|
&self,
|
|
thread_id: &str,
|
|
limit: Option<usize>,
|
|
) -> Result<Vec<MessageRecord>> {
|
|
let conn = self.conn()?;
|
|
let limit = i64::try_from(limit.unwrap_or(500)).unwrap_or(500);
|
|
let mut stmt = conn
|
|
.prepare(
|
|
r#"
|
|
WITH RECURSIVE
|
|
leaf_id AS (
|
|
SELECT current_leaf_id FROM threads WHERE id = ?1
|
|
),
|
|
ancestors AS (
|
|
SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id, 0 AS depth
|
|
FROM messages
|
|
WHERE id = (SELECT current_leaf_id FROM leaf_id)
|
|
|
|
UNION ALL
|
|
|
|
SELECT m.id, m.thread_id, m.role, m.content, m.item_json, m.created_at, m.parent_entry_id, a.depth + 1
|
|
FROM messages m
|
|
JOIN ancestors a ON m.id = a.parent_entry_id
|
|
WHERE a.depth < ?2
|
|
)
|
|
SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id FROM ancestors
|
|
ORDER BY depth DESC
|
|
"#
|
|
)
|
|
.context("failed to prepare message listing query")?;
|
|
let mut rows = stmt
|
|
.query(params![thread_id, limit - 1])
|
|
.with_context(|| format!("failed to list messages for thread {thread_id}"))?;
|
|
let mut out = Vec::new();
|
|
while let Some(row) = rows.next().context("failed to iterate message rows")? {
|
|
let item_json: Option<String> = row.get(4).context("failed to read item json")?;
|
|
let item = item_json
|
|
.as_deref()
|
|
.map(serde_json::from_str)
|
|
.transpose()
|
|
.with_context(|| {
|
|
format!("failed to parse message item json in thread {thread_id}")
|
|
})?;
|
|
out.push(MessageRecord {
|
|
id: row.get(0).context("failed to read message id")?,
|
|
thread_id: row.get(1).context("failed to read message thread id")?,
|
|
role: row.get(2).context("failed to read message role")?,
|
|
content: row.get(3).context("failed to read message content")?,
|
|
item,
|
|
created_at: row.get(5).context("failed to read message timestamp")?,
|
|
parent_entry_id: row.get(6).context("failed to read parent entry id")?,
|
|
});
|
|
}
|
|
Ok(out)
|
|
}
|
|
|
|
/// Fork the conversation at a specific message.
|
|
///
|
|
/// Creates a new message whose parent is `message_id` and updates the thread's
|
|
/// `current_leaf_id` to the new message. Returns the ID of the new message.
|
|
/// This enables branching conversations from any point in the history.
|
|
pub fn fork_at_message(
|
|
&self,
|
|
message_id: &str,
|
|
role: &str,
|
|
content: &str,
|
|
item: Option<Value>,
|
|
) -> Result<i64> {
|
|
let mut conn = self.conn()?;
|
|
let created_at = Utc::now().timestamp();
|
|
let item_json = item
|
|
.as_ref()
|
|
.map(serde_json::to_string)
|
|
.transpose()
|
|
.context("failed to serialize message item payload")?;
|
|
|
|
let tx = conn
|
|
.transaction()
|
|
.context("failed to begin fork message transaction")?;
|
|
|
|
let thread_id: String = tx
|
|
.query_row(
|
|
"SELECT thread_id FROM messages WHERE id = ?1",
|
|
params![message_id],
|
|
|row| row.get(0),
|
|
)
|
|
.with_context(|| format!("failed to query thread id for message {message_id}"))?;
|
|
|
|
let next_leaf_id: i64 = tx.query_row(
|
|
r#"
|
|
INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id)
|
|
SELECT ?1, ?2, ?3, ?4, ?5, ?6
|
|
RETURNING id
|
|
"#, params![thread_id, role, content, item_json, created_at, message_id], |row| row.get(0)
|
|
).with_context(|| format!("failed to fork at message for thread {:?}", thread_id))?;
|
|
|
|
tx.execute(
|
|
r#"
|
|
UPDATE threads
|
|
SET current_leaf_id = ?1
|
|
WHERE id = ?2;
|
|
"#,
|
|
params![next_leaf_id, thread_id],
|
|
)
|
|
.with_context(|| {
|
|
format!(
|
|
"failed to update thread current leaf id for thread {:?}",
|
|
thread_id
|
|
)
|
|
})?;
|
|
|
|
tx.commit()
|
|
.context("failed to commit fork message transaction")?;
|
|
|
|
Ok(next_leaf_id)
|
|
}
|
|
|
|
/// Delete all messages belonging to a thread and reset its `current_leaf_id`.
|
|
///
|
|
/// Returns the number of messages deleted.
|
|
pub fn clear_messages(&self, thread_id: &str) -> Result<usize> {
|
|
let mut conn = self.conn()?;
|
|
let tx = conn
|
|
.transaction()
|
|
.context("failed to begin clear messages transaction")?;
|
|
|
|
tx.execute(
|
|
r#"
|
|
UPDATE threads
|
|
SET current_leaf_id = NULL
|
|
WHERE id = ?1;
|
|
"#,
|
|
params![thread_id],
|
|
)
|
|
.with_context(|| format!("failed to clear messages for thread {thread_id}"))?;
|
|
let result = tx
|
|
.execute(
|
|
r#"
|
|
DELETE FROM messages WHERE thread_id = ?1
|
|
"#,
|
|
params![thread_id],
|
|
)
|
|
.with_context(|| format!("failed to clear messages for thread {thread_id}"))?;
|
|
tx.commit()
|
|
.context("failed to commit clear messages transaction")?;
|
|
|
|
Ok(result)
|
|
}
|
|
|
|
/// Save (or update) a named checkpoint for a thread.
|
|
///
|
|
/// If a checkpoint with the same `thread_id` and `checkpoint_id` already exists,
|
|
/// its state and timestamp are overwritten.
|
|
pub fn save_checkpoint(
|
|
&self,
|
|
thread_id: &str,
|
|
checkpoint_id: &str,
|
|
state: &Value,
|
|
) -> Result<()> {
|
|
let conn = self.conn()?;
|
|
let state_json =
|
|
serde_json::to_string(state).context("failed to encode checkpoint state")?;
|
|
conn.execute(
|
|
r#"
|
|
INSERT INTO checkpoints(thread_id, checkpoint_id, state_json, created_at)
|
|
VALUES (?1, ?2, ?3, ?4)
|
|
ON CONFLICT(thread_id, checkpoint_id) DO UPDATE SET
|
|
state_json = excluded.state_json,
|
|
created_at = excluded.created_at
|
|
"#,
|
|
params![thread_id, checkpoint_id, state_json, Utc::now().timestamp()],
|
|
)
|
|
.with_context(|| {
|
|
format!("failed to save checkpoint {checkpoint_id} for thread {thread_id}")
|
|
})?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Load a checkpoint for a thread.
|
|
///
|
|
/// If `checkpoint_id` is provided, loads that specific checkpoint. Otherwise,
|
|
/// loads the most recently created checkpoint for the thread. Returns `None`
|
|
/// if no matching checkpoint exists.
|
|
pub fn load_checkpoint(
|
|
&self,
|
|
thread_id: &str,
|
|
checkpoint_id: Option<&str>,
|
|
) -> Result<Option<CheckpointRecord>> {
|
|
let conn = self.conn()?;
|
|
if let Some(checkpoint_id) = checkpoint_id {
|
|
let row = conn
|
|
.query_row(
|
|
"SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
|
|
params![thread_id, checkpoint_id],
|
|
|row| {
|
|
let state_json: String = row.get(2)?;
|
|
let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
|
|
Ok(CheckpointRecord {
|
|
thread_id: row.get(0)?,
|
|
checkpoint_id: row.get(1)?,
|
|
state,
|
|
created_at: row.get(3)?,
|
|
})
|
|
},
|
|
)
|
|
.optional()
|
|
.with_context(|| {
|
|
format!("failed to load checkpoint {checkpoint_id} for thread {thread_id}")
|
|
})?;
|
|
return Ok(row);
|
|
}
|
|
|
|
conn.query_row(
|
|
"SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT 1",
|
|
params![thread_id],
|
|
|row| {
|
|
let state_json: String = row.get(2)?;
|
|
let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
|
|
Ok(CheckpointRecord {
|
|
thread_id: row.get(0)?,
|
|
checkpoint_id: row.get(1)?,
|
|
state,
|
|
created_at: row.get(3)?,
|
|
})
|
|
},
|
|
)
|
|
.optional()
|
|
.with_context(|| format!("failed to load latest checkpoint for thread {thread_id}"))
|
|
}
|
|
|
|
/// List checkpoints for a thread, ordered by creation time (newest first).
|
|
///
|
|
/// The `limit` parameter caps the number of results and defaults to 100.
|
|
pub fn list_checkpoints(
|
|
&self,
|
|
thread_id: &str,
|
|
limit: Option<usize>,
|
|
) -> Result<Vec<CheckpointRecord>> {
|
|
let conn = self.conn()?;
|
|
let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
|
|
let mut stmt = conn
|
|
.prepare(
|
|
"SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT ?2",
|
|
)
|
|
.context("failed to prepare checkpoint list query")?;
|
|
let mut rows = stmt
|
|
.query(params![thread_id, limit])
|
|
.with_context(|| format!("failed to list checkpoints for thread {thread_id}"))?;
|
|
|
|
let mut out = Vec::new();
|
|
while let Some(row) = rows.next().context("failed to iterate checkpoint rows")? {
|
|
let state_json: String = row.get(2).context("failed to read checkpoint state json")?;
|
|
let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
|
|
out.push(CheckpointRecord {
|
|
thread_id: row.get(0).context("failed to read checkpoint thread id")?,
|
|
checkpoint_id: row.get(1).context("failed to read checkpoint id")?,
|
|
state,
|
|
created_at: row.get(3).context("failed to read checkpoint timestamp")?,
|
|
});
|
|
}
|
|
Ok(out)
|
|
}
|
|
|
|
/// Delete a specific checkpoint from a thread.
|
|
pub fn delete_checkpoint(&self, thread_id: &str, checkpoint_id: &str) -> Result<()> {
|
|
let conn = self.conn()?;
|
|
conn.execute(
|
|
"DELETE FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
|
|
params![thread_id, checkpoint_id],
|
|
)
|
|
.with_context(|| {
|
|
format!("failed to delete checkpoint {checkpoint_id} for thread {thread_id}")
|
|
})?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Insert or update a background job record.
|
|
pub fn upsert_job(&self, job: &JobStateRecord) -> Result<()> {
|
|
let conn = self.conn()?;
|
|
conn.execute(
|
|
r#"
|
|
INSERT INTO jobs(id, name, status, progress, detail, created_at, updated_at)
|
|
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
|
|
ON CONFLICT(id) DO UPDATE SET
|
|
name = excluded.name,
|
|
status = excluded.status,
|
|
progress = excluded.progress,
|
|
detail = excluded.detail,
|
|
created_at = excluded.created_at,
|
|
updated_at = excluded.updated_at
|
|
"#,
|
|
params![
|
|
job.id,
|
|
job.name,
|
|
job_state_status_to_str(&job.status),
|
|
job.progress.map(i64::from),
|
|
job.detail,
|
|
job.created_at,
|
|
job.updated_at
|
|
],
|
|
)
|
|
.with_context(|| format!("failed to upsert job {}", job.id))?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Retrieve a single job by its ID.
|
|
///
|
|
/// Returns `None` if no job with the given ID exists.
|
|
pub fn get_job(&self, id: &str) -> Result<Option<JobStateRecord>> {
|
|
let conn = self.conn()?;
|
|
conn.query_row(
|
|
"SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs WHERE id = ?1",
|
|
params![id],
|
|
|row| {
|
|
let status_raw: String = row.get(2)?;
|
|
let progress: Option<i64> = row.get(3)?;
|
|
Ok(JobStateRecord {
|
|
id: row.get(0)?,
|
|
name: row.get(1)?,
|
|
status: job_state_status_from_str(&status_raw),
|
|
progress: progress.and_then(|v| u8::try_from(v).ok()),
|
|
detail: row.get(4)?,
|
|
created_at: row.get(5)?,
|
|
updated_at: row.get(6)?,
|
|
})
|
|
},
|
|
)
|
|
.optional()
|
|
.with_context(|| format!("failed to read job {id}"))
|
|
}
|
|
|
|
/// List jobs ordered by most recently updated.
|
|
///
|
|
/// The `limit` parameter caps the number of results and defaults to 100.
|
|
pub fn list_jobs(&self, limit: Option<usize>) -> Result<Vec<JobStateRecord>> {
|
|
let conn = self.conn()?;
|
|
let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
|
|
let mut stmt = conn
|
|
.prepare(
|
|
"SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs ORDER BY updated_at DESC LIMIT ?1",
|
|
)
|
|
.context("failed to prepare job list query")?;
|
|
let mut rows = stmt
|
|
.query(params![limit])
|
|
.context("failed to query persisted jobs")?;
|
|
let mut out = Vec::new();
|
|
while let Some(row) = rows.next().context("failed to iterate persisted jobs")? {
|
|
let status_raw: String = row.get(2).context("failed to read job status")?;
|
|
let progress: Option<i64> = row.get(3).context("failed to read job progress")?;
|
|
out.push(JobStateRecord {
|
|
id: row.get(0).context("failed to read job id")?,
|
|
name: row.get(1).context("failed to read job name")?,
|
|
status: job_state_status_from_str(&status_raw),
|
|
progress: progress.and_then(|v| u8::try_from(v).ok()),
|
|
detail: row.get(4).context("failed to read job detail")?,
|
|
created_at: row.get(5).context("failed to read job created_at")?,
|
|
updated_at: row.get(6).context("failed to read job updated_at")?,
|
|
});
|
|
}
|
|
Ok(out)
|
|
}
|
|
|
|
/// Permanently delete a job record.
|
|
pub fn delete_job(&self, id: &str) -> Result<()> {
|
|
let conn = self.conn()?;
|
|
conn.execute("DELETE FROM jobs WHERE id = ?1", params![id])
|
|
.with_context(|| format!("failed to delete job {id}"))?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Look up the rollout file path for a thread by its ID.
|
|
pub fn find_rollout_path_by_id(&self, id: &str) -> Result<Option<PathBuf>> {
|
|
let conn = self.conn()?;
|
|
conn.query_row(
|
|
"SELECT rollout_path FROM threads WHERE id = ?1",
|
|
params![id],
|
|
|row| row.get::<_, Option<String>>(0),
|
|
)
|
|
.optional()
|
|
.context("failed to lookup rollout path")
|
|
.map(|opt| opt.flatten().map(PathBuf::from))
|
|
}
|
|
|
|
/// Append an entry to the JSONL session index file.
|
|
///
|
|
/// The session index is an append-only log that maps thread IDs to their names,
|
|
/// update timestamps, and rollout paths. It is used for fast name-based lookups
|
|
/// without opening the SQLite database.
|
|
pub fn append_thread_name(
|
|
&self,
|
|
thread_id: &str,
|
|
thread_name: Option<String>,
|
|
updated_at: i64,
|
|
rollout_path: Option<PathBuf>,
|
|
) -> Result<()> {
|
|
if let Some(parent) = self.session_index_path.parent() {
|
|
fs::create_dir_all(parent).with_context(|| {
|
|
format!(
|
|
"failed to create session index directory {}",
|
|
parent.display()
|
|
)
|
|
})?;
|
|
}
|
|
let entry = SessionIndexEntry {
|
|
thread_id: thread_id.to_string(),
|
|
thread_name,
|
|
updated_at,
|
|
rollout_path,
|
|
};
|
|
let encoded =
|
|
serde_json::to_string(&entry).context("failed to serialize session index entry")?;
|
|
let mut file = OpenOptions::new()
|
|
.create(true)
|
|
.append(true)
|
|
.open(&self.session_index_path)
|
|
.with_context(|| {
|
|
format!(
|
|
"failed to open session index {}",
|
|
self.session_index_path.display()
|
|
)
|
|
})?;
|
|
writeln!(file, "{encoded}").context("failed to append session index entry")?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Find the display name for a thread by its ID, using the session index.
|
|
///
|
|
/// Returns `None` if the thread is not in the index or has no name.
|
|
pub fn find_thread_name_by_id(&self, thread_id: &str) -> Result<Option<String>> {
|
|
let map = self.session_index_map()?;
|
|
Ok(map
|
|
.get(thread_id)
|
|
.and_then(|entry| entry.thread_name.clone()))
|
|
}
|
|
|
|
/// Look up display names for multiple thread IDs at once.
|
|
///
|
|
/// Returns a map from thread ID to its name (which may be `None`).
|
|
pub fn find_thread_names_by_ids(
|
|
&self,
|
|
ids: &[String],
|
|
) -> Result<HashMap<String, Option<String>>> {
|
|
let map = self.session_index_map()?;
|
|
let mut out = HashMap::new();
|
|
for id in ids {
|
|
let name = map.get(id).and_then(|entry| entry.thread_name.clone());
|
|
out.insert(id.clone(), name);
|
|
}
|
|
Ok(out)
|
|
}
|
|
|
|
/// Find the rollout path for a thread by its display name (case-insensitive).
|
|
///
|
|
/// If multiple threads share the same name, the most recently updated one is returned.
|
|
/// Returns `None` if no matching thread is found.
|
|
pub fn find_thread_path_by_name_str(&self, name: &str) -> Result<Option<PathBuf>> {
|
|
let map = self.session_index_map()?;
|
|
let matched = map
|
|
.values()
|
|
.filter(|entry| {
|
|
entry
|
|
.thread_name
|
|
.as_deref()
|
|
.is_some_and(|n| n.eq_ignore_ascii_case(name))
|
|
})
|
|
.max_by_key(|entry| entry.updated_at);
|
|
Ok(matched.and_then(|entry| entry.rollout_path.clone()))
|
|
}
|
|
|
|
fn session_index_map(&self) -> Result<HashMap<String, SessionIndexEntry>> {
|
|
if !self.session_index_path.exists() {
|
|
return Ok(HashMap::new());
|
|
}
|
|
let file = OpenOptions::new()
|
|
.read(true)
|
|
.open(&self.session_index_path)
|
|
.with_context(|| {
|
|
format!(
|
|
"failed to read session index {}",
|
|
self.session_index_path.display()
|
|
)
|
|
})?;
|
|
let reader = BufReader::new(file);
|
|
let mut latest = HashMap::<String, SessionIndexEntry>::new();
|
|
for line in reader.lines() {
|
|
let line = line.context("failed to read session index line")?;
|
|
if line.trim().is_empty() {
|
|
continue;
|
|
}
|
|
let parsed: SessionIndexEntry =
|
|
serde_json::from_str(&line).context("failed to parse session index entry")?;
|
|
latest.insert(parsed.thread_id.clone(), parsed);
|
|
}
|
|
Ok(latest)
|
|
}
|
|
}
|
|
|
|
fn default_state_db_path() -> PathBuf {
|
|
let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from("."));
|
|
// Prefer the CodeWhale directory, falling back to legacy DeepSeek path
|
|
// so existing installs don't lose their session history.
|
|
let primary = home.join(".codewhale").join("state.db");
|
|
if primary.exists() || !home.join(".deepseek").join("state.db").exists() {
|
|
primary
|
|
} else {
|
|
home.join(".deepseek").join("state.db")
|
|
}
|
|
}
|
|
|
|
fn bool_to_i64(value: bool) -> i64 {
|
|
if value { 1 } else { 0 }
|
|
}
|
|
|
|
fn i64_to_bool(value: i64) -> bool {
|
|
value != 0
|
|
}
|
|
|
|
fn thread_status_to_str(status: &ThreadStatus) -> &'static str {
|
|
match status {
|
|
ThreadStatus::Running => "running",
|
|
ThreadStatus::Idle => "idle",
|
|
ThreadStatus::Completed => "completed",
|
|
ThreadStatus::Failed => "failed",
|
|
ThreadStatus::Paused => "paused",
|
|
ThreadStatus::Archived => "archived",
|
|
}
|
|
}
|
|
|
|
fn thread_status_from_str(value: &str) -> ThreadStatus {
|
|
match value {
|
|
"running" => ThreadStatus::Running,
|
|
"idle" => ThreadStatus::Idle,
|
|
"completed" => ThreadStatus::Completed,
|
|
"failed" => ThreadStatus::Failed,
|
|
"paused" => ThreadStatus::Paused,
|
|
"archived" => ThreadStatus::Archived,
|
|
_ => ThreadStatus::Idle,
|
|
}
|
|
}
|
|
|
|
fn session_source_to_str(source: &SessionSource) -> &'static str {
|
|
match source {
|
|
SessionSource::Interactive => "interactive",
|
|
SessionSource::Resume => "resume",
|
|
SessionSource::Fork => "fork",
|
|
SessionSource::Api => "api",
|
|
SessionSource::Unknown => "unknown",
|
|
}
|
|
}
|
|
|
|
fn session_source_from_str(value: &str) -> SessionSource {
|
|
match value {
|
|
"interactive" => SessionSource::Interactive,
|
|
"resume" => SessionSource::Resume,
|
|
"fork" => SessionSource::Fork,
|
|
"api" => SessionSource::Api,
|
|
_ => SessionSource::Unknown,
|
|
}
|
|
}
|
|
|
|
fn path_to_opt_string(path: Option<&Path>) -> Option<String> {
|
|
path.map(|p| p.display().to_string())
|
|
}
|
|
|
|
fn job_state_status_to_str(status: &JobStateStatus) -> &'static str {
|
|
match status {
|
|
JobStateStatus::Queued => "queued",
|
|
JobStateStatus::Running => "running",
|
|
JobStateStatus::Completed => "completed",
|
|
JobStateStatus::Failed => "failed",
|
|
JobStateStatus::Cancelled => "cancelled",
|
|
}
|
|
}
|
|
|
|
fn job_state_status_from_str(value: &str) -> JobStateStatus {
|
|
match value {
|
|
"queued" => JobStateStatus::Queued,
|
|
"running" => JobStateStatus::Running,
|
|
"completed" => JobStateStatus::Completed,
|
|
"failed" => JobStateStatus::Failed,
|
|
"cancelled" => JobStateStatus::Cancelled,
|
|
_ => JobStateStatus::Queued,
|
|
}
|
|
}
|
|
|
|
fn thread_goal_status_to_str(status: &ThreadGoalStatus) -> &'static str {
|
|
match status {
|
|
ThreadGoalStatus::Active => "active",
|
|
ThreadGoalStatus::Paused => "paused",
|
|
ThreadGoalStatus::Blocked => "blocked",
|
|
ThreadGoalStatus::UsageLimited => "usage_limited",
|
|
ThreadGoalStatus::BudgetLimited => "budget_limited",
|
|
ThreadGoalStatus::Complete => "complete",
|
|
}
|
|
}
|
|
|
|
fn thread_goal_status_from_str(value: &str) -> ThreadGoalStatus {
|
|
match value {
|
|
"active" => ThreadGoalStatus::Active,
|
|
"paused" => ThreadGoalStatus::Paused,
|
|
"blocked" => ThreadGoalStatus::Blocked,
|
|
"usage_limited" => ThreadGoalStatus::UsageLimited,
|
|
"budget_limited" => ThreadGoalStatus::BudgetLimited,
|
|
"complete" => ThreadGoalStatus::Complete,
|
|
_ => ThreadGoalStatus::Active,
|
|
}
|
|
}
|
|
|
|
fn row_to_thread(row: &rusqlite::Row<'_>) -> rusqlite::Result<ThreadMetadata> {
|
|
let status_raw: String = row.get(7)?;
|
|
let source_raw: String = row.get(11)?;
|
|
let rollout_path: Option<String> = row.get(1)?;
|
|
let path: Option<String> = row.get(8)?;
|
|
Ok(ThreadMetadata {
|
|
id: row.get(0)?,
|
|
rollout_path: rollout_path.map(PathBuf::from),
|
|
preview: row.get(2)?,
|
|
ephemeral: i64_to_bool(row.get(3)?),
|
|
model_provider: row.get(4)?,
|
|
created_at: row.get(5)?,
|
|
updated_at: row.get(6)?,
|
|
status: thread_status_from_str(&status_raw),
|
|
path: path.map(PathBuf::from),
|
|
cwd: PathBuf::from(row.get::<_, String>(9)?),
|
|
cli_version: row.get(10)?,
|
|
source: session_source_from_str(&source_raw),
|
|
name: row.get(12)?,
|
|
sandbox_policy: row.get(13)?,
|
|
approval_mode: row.get(14)?,
|
|
archived: i64_to_bool(row.get(15)?),
|
|
archived_at: row.get(16)?,
|
|
git_sha: row.get(17)?,
|
|
git_branch: row.get(18)?,
|
|
git_origin_url: row.get(19)?,
|
|
memory_mode: row.get(20)?,
|
|
current_leaf_id: row.get(21)?,
|
|
})
|
|
}
|
|
|
|
fn row_to_thread_goal(row: &rusqlite::Row<'_>) -> rusqlite::Result<ThreadGoalRecord> {
|
|
let status_raw: String = row.get(3)?;
|
|
Ok(ThreadGoalRecord {
|
|
thread_id: row.get(0)?,
|
|
goal_id: row.get(1)?,
|
|
objective: row.get(2)?,
|
|
status: thread_goal_status_from_str(&status_raw),
|
|
token_budget: row.get(4)?,
|
|
tokens_used: row.get(5)?,
|
|
time_used_seconds: row.get(6)?,
|
|
created_at: row.get(7)?,
|
|
updated_at: row.get(8)?,
|
|
})
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use std::time::{SystemTime, UNIX_EPOCH};
|
|
|
|
fn temp_state_store(name: &str) -> StateStore {
|
|
let suffix = SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.expect("system time")
|
|
.as_nanos();
|
|
let dir = std::env::temp_dir().join(format!(
|
|
"codewhale-state-{name}-{}-{suffix}",
|
|
std::process::id()
|
|
));
|
|
fs::create_dir_all(&dir).expect("create temp state dir");
|
|
StateStore::open(Some(dir.join("state.db"))).expect("open state store")
|
|
}
|
|
|
|
fn test_thread(id: &str) -> ThreadMetadata {
|
|
ThreadMetadata {
|
|
id: id.to_string(),
|
|
rollout_path: None,
|
|
preview: "test thread".to_string(),
|
|
ephemeral: false,
|
|
model_provider: "deepseek".to_string(),
|
|
created_at: 10,
|
|
updated_at: 10,
|
|
status: ThreadStatus::Running,
|
|
path: None,
|
|
cwd: PathBuf::from("/tmp/codewhale"),
|
|
cli_version: "0.0.0-test".to_string(),
|
|
source: SessionSource::Interactive,
|
|
name: None,
|
|
sandbox_policy: None,
|
|
approval_mode: None,
|
|
archived: false,
|
|
archived_at: None,
|
|
git_sha: None,
|
|
git_branch: None,
|
|
git_origin_url: None,
|
|
memory_mode: None,
|
|
current_leaf_id: None,
|
|
}
|
|
}
|
|
|
|
fn test_goal(thread_id: &str, objective: &str) -> ThreadGoalRecord {
|
|
ThreadGoalRecord {
|
|
thread_id: thread_id.to_string(),
|
|
goal_id: "goal-1".to_string(),
|
|
objective: objective.to_string(),
|
|
status: ThreadGoalStatus::Active,
|
|
token_budget: Some(123),
|
|
tokens_used: 7,
|
|
time_used_seconds: 11,
|
|
created_at: 100,
|
|
updated_at: 101,
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn thread_goal_crud_round_trips_and_replaces() {
|
|
let store = temp_state_store("thread-goal-crud");
|
|
store
|
|
.upsert_thread(&test_thread("thread-1"))
|
|
.expect("upsert thread");
|
|
|
|
let goal = test_goal("thread-1", "Ship v0.8.59");
|
|
store.upsert_thread_goal(&goal).expect("upsert goal");
|
|
assert_eq!(
|
|
store
|
|
.get_thread_goal("thread-1")
|
|
.expect("read goal")
|
|
.as_ref(),
|
|
Some(&goal)
|
|
);
|
|
|
|
let mut replacement = test_goal("thread-1", "Ship v0.8.59 safely");
|
|
replacement.goal_id = "goal-2".to_string();
|
|
replacement.status = ThreadGoalStatus::BudgetLimited;
|
|
replacement.token_budget = None;
|
|
replacement.updated_at = 202;
|
|
store
|
|
.upsert_thread_goal(&replacement)
|
|
.expect("replace goal");
|
|
assert_eq!(
|
|
store.get_thread_goal("thread-1").expect("read replacement"),
|
|
Some(replacement)
|
|
);
|
|
|
|
assert!(store.delete_thread_goal("thread-1").expect("delete goal"));
|
|
assert!(
|
|
store
|
|
.get_thread_goal("thread-1")
|
|
.expect("read empty")
|
|
.is_none()
|
|
);
|
|
assert!(!store.delete_thread_goal("thread-1").expect("delete empty"));
|
|
}
|
|
|
|
#[test]
|
|
fn thread_goal_requires_existing_thread() {
|
|
let store = temp_state_store("thread-goal-missing-thread");
|
|
let err = store
|
|
.upsert_thread_goal(&test_goal("missing-thread", "nope"))
|
|
.expect_err("goal without a thread should fail");
|
|
assert!(err.to_string().contains("thread missing-thread not found"));
|
|
}
|
|
}
|