feat(persistence): schema migration framework — forward upgrade path for ~/.deepseek/ records (#350)
Every persistence layer in crates/tui/src/ already gates `schema_version > CURRENT_*` to reject newer-than-supported records (good — prevents silent truncation when an older binary tries to load a v3 file with v4 fields). What was missing: the **forward upgrade path** for older records. When we bump CURRENT_SESSION_SCHEMA_VERSION from 3 to 4 to add a field, every v3 session on disk would silently load with the new field's serde default — which is OK for additions but breaks catastrophically for renames or shape changes. This commit lays down the framework: **`crates/tui/src/schema_migration.rs`** — new module: - `SchemaMigration` trait. Each persistence domain implements it once with `CURRENT_VERSION`, `DOMAIN`, and an ordered `MIGRATIONS` list of `fn(&mut serde_json::Value) -> Result<(), MigrationError>` steps. Index `i` migrates from version `i+1` to `i+2`. - `SchemaMigration::migrate(value, from_version)` — runs every required step, stamping `value["schema_version"]` after each step so a partial failure leaves a known-state record rather than mixed. - `MigrationError` — typed error with from/to versions + reason. - `backup_before_migrate(path, domain)` — creates a `.bak` copy of the source file before mutation. Errors are warn-logged and ignored (continues because `write_atomic` is itself crash-safe). The `.bak` is left on disk as a manual recovery artifact — no automatic GC. **`schema_migration::registry`** — submodule that registers every existing persistence domain (session, offline_queue, runtime, task, automation, automation_run) at its current version with an empty MIGRATIONS list. No domain has shipped a schema bump yet, so today's behavior is a no-op. The next bump is now a 4-step recipe: 1. Write the `migrate_<domain>_v<N>_to_v<N+1>` step in this module. 2. Append it to `MIGRATIONS` and bump `CURRENT_VERSION`. 3. Wire `<Domain>Migration::migrate(...)` into the load function in the owning module. 4. Add a fixture-based integration test. Tests: 6 unit tests covering no-op, all-steps, partial migration, newer-than-current rejection, backup creation, and backup-failure robustness. Wiring into individual load sites (session_manager, runtime_threads, task_manager, automation_manager) is intentionally deferred until the first actual schema bump needs it — wiring without migrations would add code paths nothing exercises, and the framework is the part that needs to land before the next bump can ship safely. Closes #350. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -46,6 +46,7 @@ pub mod rlm;
|
||||
mod runtime_api;
|
||||
mod runtime_threads;
|
||||
mod sandbox;
|
||||
mod schema_migration;
|
||||
mod seam_manager;
|
||||
mod session_manager;
|
||||
mod settings;
|
||||
|
||||
@@ -0,0 +1,374 @@
|
||||
//! Schema migration framework for `~/.deepseek/` persisted records.
|
||||
//!
|
||||
//! Every persistence layer in `crates/tui/src/` (sessions, threads,
|
||||
//! tasks, automations, offline queue) gates `schema_version > CURRENT_*`
|
||||
//! to prevent silent truncation when an older binary tries to load a
|
||||
//! record from a newer one. What was missing — and what this module
|
||||
//! fixes — is the **upgrade path**: when `schema_version < CURRENT_*`,
|
||||
//! the load function should run forward migrations rather than loading
|
||||
//! a partially-correct record.
|
||||
//!
|
||||
//! ## Domain registration
|
||||
//!
|
||||
//! Each persistence type implements [`SchemaMigration`]:
|
||||
//!
|
||||
//! ```ignore
|
||||
//! pub struct SessionMigration;
|
||||
//!
|
||||
//! impl SchemaMigration for SessionMigration {
|
||||
//! const CURRENT_VERSION: u32 = 1;
|
||||
//! const DOMAIN: &'static str = "session";
|
||||
//! const MIGRATIONS: &'static [MigrationFn] = &[
|
||||
//! // index i migrates from version (i+1) to (i+2)
|
||||
//! migrate_session_v1_to_v2,
|
||||
//! ];
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
//! ## Load-site usage
|
||||
//!
|
||||
//! Inside the load function, after deserialization:
|
||||
//!
|
||||
//! ```ignore
|
||||
//! if record.schema_version < SessionMigration::CURRENT_VERSION {
|
||||
//! let mut value: serde_json::Value = serde_json::from_str(&raw)?;
|
||||
//! let _final = SessionMigration::migrate(
|
||||
//! &mut value,
|
||||
//! record.schema_version,
|
||||
//! )?;
|
||||
//! backup_before_migrate(&path, SessionMigration::DOMAIN);
|
||||
//! write_atomic(&path, serde_json::to_string_pretty(&value)?.as_bytes())?;
|
||||
//! // Re-deserialize with the migrated value into the up-to-date struct.
|
||||
//! record = serde_json::from_value(value)?;
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
//! ## Migration step contract
|
||||
//!
|
||||
//! Each step takes a mutable JSON value at version `N` and mutates it
|
||||
//! into version `N+1`. Steps must be idempotent in the sense that a
|
||||
//! re-run of the migration on an already-migrated value should be a
|
||||
//! no-op (because `serde_json::Value` is cheap to introspect, this
|
||||
//! usually means "if field already exists with the new shape, skip").
|
||||
//!
|
||||
//! Steps must NOT call `write_atomic` themselves — the framework writes
|
||||
//! once at the end. They must NOT log credentials or other sensitive
|
||||
//! material from the value being migrated.
|
||||
|
||||
use std::fs;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
/// Result returned when a migration step fails.
|
||||
#[derive(Debug)]
|
||||
pub struct MigrationError {
|
||||
pub from_version: u32,
|
||||
pub to_version: u32,
|
||||
pub reason: String,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for MigrationError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"schema migration {} → {} failed: {}",
|
||||
self.from_version, self.to_version, self.reason
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for MigrationError {}
|
||||
|
||||
/// Signature of a single forward migration step.
|
||||
#[allow(dead_code)] // Public surface; first concrete migrator lands when v2 ships.
|
||||
pub type MigrationFn = fn(&mut serde_json::Value) -> Result<(), MigrationError>;
|
||||
|
||||
/// Each persistence domain implements this trait.
|
||||
///
|
||||
/// `MIGRATIONS[i]` migrates from version `i + 1` to version `i + 2`. So
|
||||
/// `MIGRATIONS[0]` is the v1 → v2 step, `MIGRATIONS[1]` is v2 → v3, etc.
|
||||
/// `CURRENT_VERSION` must equal `MIGRATIONS.len() + 1` (i.e. the version
|
||||
/// produced by running every step in sequence starting from version 1).
|
||||
#[allow(dead_code)] // Public surface; first concrete domain lands when v2 ships.
|
||||
pub trait SchemaMigration {
|
||||
/// The current schema version for this domain.
|
||||
const CURRENT_VERSION: u32;
|
||||
|
||||
/// Human-readable domain label for logging (e.g. "session", "thread").
|
||||
const DOMAIN: &'static str;
|
||||
|
||||
/// Ordered list of migration step functions.
|
||||
const MIGRATIONS: &'static [MigrationFn];
|
||||
|
||||
/// Run all required migrations to bring `version` up to
|
||||
/// [`CURRENT_VERSION`](SchemaMigration::CURRENT_VERSION).
|
||||
///
|
||||
/// Returns the final stamped version. Stamps each intermediate
|
||||
/// version onto `value["schema_version"]` so a partial migration
|
||||
/// failure leaves a record at a known state rather than mixed.
|
||||
fn migrate(
|
||||
value: &mut serde_json::Value,
|
||||
version: u32,
|
||||
) -> Result<u32, MigrationError> {
|
||||
if version > Self::CURRENT_VERSION {
|
||||
// Caller's responsibility to reject newer-than-supported
|
||||
// records — the framework's job is forward migration only.
|
||||
return Err(MigrationError {
|
||||
from_version: version,
|
||||
to_version: Self::CURRENT_VERSION,
|
||||
reason: format!(
|
||||
"{} record at v{version} is newer than current v{}",
|
||||
Self::DOMAIN,
|
||||
Self::CURRENT_VERSION
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
let mut current = version;
|
||||
for (idx, step) in Self::MIGRATIONS.iter().enumerate() {
|
||||
let step_from = (idx as u32) + 1;
|
||||
if current > step_from {
|
||||
// Already past this step — the value was loaded at a
|
||||
// newer-than-step version, skip.
|
||||
continue;
|
||||
}
|
||||
if current < step_from {
|
||||
// Underflow: Self's MIGRATIONS are dense from 1, and
|
||||
// the loop should never see a record older than the
|
||||
// first step. If we get here the const list is misordered.
|
||||
return Err(MigrationError {
|
||||
from_version: current,
|
||||
to_version: step_from + 1,
|
||||
reason: format!(
|
||||
"{} migration list is non-contiguous at index {idx}",
|
||||
Self::DOMAIN
|
||||
),
|
||||
});
|
||||
}
|
||||
step(value)?;
|
||||
current = step_from + 1;
|
||||
value["schema_version"] = serde_json::json!(current);
|
||||
}
|
||||
|
||||
if current != Self::CURRENT_VERSION {
|
||||
return Err(MigrationError {
|
||||
from_version: version,
|
||||
to_version: Self::CURRENT_VERSION,
|
||||
reason: format!(
|
||||
"{} migrated to v{current} but expected v{}",
|
||||
Self::DOMAIN,
|
||||
Self::CURRENT_VERSION
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
Ok(current)
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a `.bak` copy of `path` before mutation. Returns the backup
|
||||
/// path. Errors are logged at warn level and ignored — the migration
|
||||
/// proceeds because [`crate::utils::write_atomic`] is itself crash-safe.
|
||||
///
|
||||
/// The `.bak` file is left on disk after a successful migration so a
|
||||
/// user who notices a regression can manually restore it. No automatic
|
||||
/// garbage collection — bak files are user-visible recovery artifacts.
|
||||
#[allow(dead_code)] // Public surface; first call site lands when v2 ships.
|
||||
pub fn backup_before_migrate(path: &Path, domain: &str) -> PathBuf {
|
||||
let bak = path.with_extension(
|
||||
path.extension()
|
||||
.map(|ext| format!("{}.bak", ext.to_string_lossy()))
|
||||
.unwrap_or_else(|| "bak".to_string()),
|
||||
);
|
||||
match fs::copy(path, &bak) {
|
||||
Ok(_) => tracing::info!(
|
||||
domain,
|
||||
from = %path.display(),
|
||||
to = %bak.display(),
|
||||
"schema backup created"
|
||||
),
|
||||
Err(e) => tracing::warn!(
|
||||
domain,
|
||||
from = %path.display(),
|
||||
error = %e,
|
||||
"schema backup failed (continuing — migration is crash-safe)"
|
||||
),
|
||||
}
|
||||
bak
|
||||
}
|
||||
|
||||
/// Per-domain migration registrations.
|
||||
///
|
||||
/// Each persistence type below points at the same `CURRENT_*` constant
|
||||
/// the original module already gates on. The `MIGRATIONS` list is empty
|
||||
/// today because no schema bumps have shipped yet — but the framework is
|
||||
/// in place so the next bump only needs to:
|
||||
///
|
||||
/// 1. Add a `migrate_<domain>_v<N>_to_v<N+1>` function in this module.
|
||||
/// 2. Append it to the matching `MIGRATIONS` list.
|
||||
/// 3. Bump `CURRENT_VERSION` to match.
|
||||
/// 4. Wire `<Domain>Migration::migrate(...)` into the load function in
|
||||
/// the owning module.
|
||||
pub mod registry {
|
||||
use super::{MigrationFn, SchemaMigration};
|
||||
|
||||
/// Sessions: `~/.deepseek/sessions/<id>.json` and the latest
|
||||
/// checkpoint at `~/.deepseek/sessions/checkpoints/latest.json`.
|
||||
pub struct SessionMigration;
|
||||
impl SchemaMigration for SessionMigration {
|
||||
const CURRENT_VERSION: u32 = 1;
|
||||
const DOMAIN: &'static str = "session";
|
||||
const MIGRATIONS: &'static [MigrationFn] = &[];
|
||||
}
|
||||
|
||||
/// Offline queue: `~/.deepseek/sessions/checkpoints/offline_queue.json`.
|
||||
pub struct OfflineQueueMigration;
|
||||
impl SchemaMigration for OfflineQueueMigration {
|
||||
const CURRENT_VERSION: u32 = 1;
|
||||
const DOMAIN: &'static str = "offline_queue";
|
||||
const MIGRATIONS: &'static [MigrationFn] = &[];
|
||||
}
|
||||
|
||||
/// Runtime threads / turns / items / events / store state — all
|
||||
/// share `CURRENT_RUNTIME_SCHEMA_VERSION`.
|
||||
pub struct RuntimeMigration;
|
||||
impl SchemaMigration for RuntimeMigration {
|
||||
const CURRENT_VERSION: u32 = 2;
|
||||
const DOMAIN: &'static str = "runtime";
|
||||
const MIGRATIONS: &'static [MigrationFn] = &[];
|
||||
}
|
||||
|
||||
/// Durable tasks under `~/.deepseek/tasks/`.
|
||||
pub struct TaskMigration;
|
||||
impl SchemaMigration for TaskMigration {
|
||||
const CURRENT_VERSION: u32 = 2;
|
||||
const DOMAIN: &'static str = "task";
|
||||
const MIGRATIONS: &'static [MigrationFn] = &[];
|
||||
}
|
||||
|
||||
/// Automation records and their per-run history.
|
||||
pub struct AutomationMigration;
|
||||
impl SchemaMigration for AutomationMigration {
|
||||
const CURRENT_VERSION: u32 = 1;
|
||||
const DOMAIN: &'static str = "automation";
|
||||
const MIGRATIONS: &'static [MigrationFn] = &[];
|
||||
}
|
||||
|
||||
pub struct AutomationRunMigration;
|
||||
impl SchemaMigration for AutomationRunMigration {
|
||||
const CURRENT_VERSION: u32 = 1;
|
||||
const DOMAIN: &'static str = "automation_run";
|
||||
const MIGRATIONS: &'static [MigrationFn] = &[];
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
/// Test harness: a fake "thread" domain at v3 with two migrations
|
||||
/// (v1 → v2 adds an `archived` field; v2 → v3 adds a `kind` field).
|
||||
struct TestThreadMigration;
|
||||
|
||||
fn add_archived_field(value: &mut serde_json::Value) -> Result<(), MigrationError> {
|
||||
if value.get("archived").is_none() {
|
||||
value["archived"] = serde_json::json!(false);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn add_kind_field(value: &mut serde_json::Value) -> Result<(), MigrationError> {
|
||||
if value.get("kind").is_none() {
|
||||
value["kind"] = serde_json::json!("standard");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl SchemaMigration for TestThreadMigration {
|
||||
const CURRENT_VERSION: u32 = 3;
|
||||
const DOMAIN: &'static str = "test_thread";
|
||||
const MIGRATIONS: &'static [MigrationFn] = &[add_archived_field, add_kind_field];
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn migrate_no_op_when_already_current() {
|
||||
let mut value = serde_json::json!({
|
||||
"schema_version": 3,
|
||||
"id": "abc",
|
||||
"archived": true,
|
||||
"kind": "feature_branch"
|
||||
});
|
||||
let final_version = TestThreadMigration::migrate(&mut value, 3).expect("ok");
|
||||
assert_eq!(final_version, 3);
|
||||
// Existing values must be untouched (we don't reset to defaults).
|
||||
assert_eq!(value["archived"], serde_json::json!(true));
|
||||
assert_eq!(value["kind"], serde_json::json!("feature_branch"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn migrate_runs_all_steps_from_v1() {
|
||||
let mut value = serde_json::json!({
|
||||
"schema_version": 1,
|
||||
"id": "abc"
|
||||
});
|
||||
let final_version = TestThreadMigration::migrate(&mut value, 1).expect("ok");
|
||||
assert_eq!(final_version, 3);
|
||||
assert_eq!(value["schema_version"], serde_json::json!(3));
|
||||
assert_eq!(value["archived"], serde_json::json!(false));
|
||||
assert_eq!(value["kind"], serde_json::json!("standard"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn migrate_runs_only_remaining_steps_from_v2() {
|
||||
let mut value = serde_json::json!({
|
||||
"schema_version": 2,
|
||||
"id": "abc",
|
||||
"archived": true
|
||||
});
|
||||
let final_version = TestThreadMigration::migrate(&mut value, 2).expect("ok");
|
||||
assert_eq!(final_version, 3);
|
||||
// archived was already set; migration must NOT overwrite to default.
|
||||
assert_eq!(value["archived"], serde_json::json!(true));
|
||||
assert_eq!(value["kind"], serde_json::json!("standard"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn migrate_rejects_newer_than_current() {
|
||||
let mut value = serde_json::json!({
|
||||
"schema_version": 99
|
||||
});
|
||||
let err = TestThreadMigration::migrate(&mut value, 99).expect_err("must reject");
|
||||
assert_eq!(err.from_version, 99);
|
||||
assert_eq!(err.to_version, 3);
|
||||
assert!(err.reason.contains("newer than current"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn backup_creates_bak_file_alongside_original() {
|
||||
let tmp = tempfile::tempdir().expect("tempdir");
|
||||
let path = tmp.path().join("session_abc.json");
|
||||
std::fs::write(&path, r#"{"id":"abc"}"#).expect("write");
|
||||
let bak = backup_before_migrate(&path, "test_session");
|
||||
assert!(bak.exists(), "bak file must exist at {}", bak.display());
|
||||
assert_eq!(
|
||||
std::fs::read_to_string(&bak).expect("read bak"),
|
||||
r#"{"id":"abc"}"#
|
||||
);
|
||||
// Bak is path.json.bak (extension append, not replace).
|
||||
assert!(
|
||||
bak.to_string_lossy().ends_with(".json.bak"),
|
||||
"bak suffix must be `.json.bak`; got {}",
|
||||
bak.display()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn backup_failure_does_not_panic_or_propagate() {
|
||||
// Pointing at a non-existent source: copy fails, but the function
|
||||
// returns the bak path it would have used and logs a warning.
|
||||
let tmp = tempfile::tempdir().expect("tempdir");
|
||||
let path = tmp.path().join("does_not_exist.json");
|
||||
let bak = backup_before_migrate(&path, "test_session");
|
||||
// The path is well-formed even though copy failed.
|
||||
assert!(bak.to_string_lossy().ends_with(".json.bak"));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user