From 71c45cf92f9e1991938916b0a5d781457771e016 Mon Sep 17 00:00:00 2001 From: CodeWhale Agent Date: Fri, 12 Jun 2026 14:43:42 -0700 Subject: [PATCH] Harvest PR #2808: thread undo/retry and snapshot restore endpoints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The release branch independently evolved the sessions API from #2808 (POST /v1/sessions creates from a thread, /v1/sessions/{id}/resume-thread resumes), but the turn-rewind and snapshot-restore endpoints never landed. Port them onto the current thread model: - POST /v1/threads/{id}/undo — fork at the Nth-from-last user message and return the dropped user text for input pre-population. - POST /v1/threads/{id}/patch-undo — restore the newest differing tool:/pre-turn: workspace snapshot (same target selection as the TUI's patch_undo), then fork the conversation; reports the file rollback result alongside the forked thread. - POST /v1/threads/{id}/retry — fork and immediately start a turn re-using the dropped user text (or an override prompt), adapted to the extended StartTurnRequest (dynamic_tools, environment_id). - POST /v1/snapshots/{id}/restore — restore a workspace snapshot by id. fork_at_user_message and its tests were already present; this adds the HTTP surface plus endpoint tests for undo/patch-undo/retry/restore. Harvested from PR #2808 by @bengao168 Co-authored-by: Ben Gao Co-authored-by: Hunter Bown Co-Authored-By: Claude Fable 5 --- crates/tui/src/runtime_api.rs | 444 ++++++++++++++++++++++++++++++++++ 1 file changed, 444 insertions(+) diff --git a/crates/tui/src/runtime_api.rs b/crates/tui/src/runtime_api.rs index 54ba1040..c1f4f0d0 100644 --- a/crates/tui/src/runtime_api.rs +++ b/crates/tui/src/runtime_api.rs @@ -589,6 +589,9 @@ pub fn build_router(state: RuntimeApiState) -> Router { .route("/v1/threads/{id}", get(get_thread).patch(update_thread)) .route("/v1/threads/{id}/resume", post(resume_thread)) .route("/v1/threads/{id}/fork", post(fork_thread)) + .route("/v1/threads/{id}/undo", post(undo_thread_turn)) + .route("/v1/threads/{id}/patch-undo", post(patch_undo_thread_turn)) + .route("/v1/threads/{id}/retry", post(retry_thread_turn)) .route("/v1/threads/{id}/turns", post(start_thread_turn)) .route( "/v1/threads/{id}/turns/{turn_id}/steer", @@ -628,6 +631,7 @@ pub fn build_router(state: RuntimeApiState) -> Router { .route("/v1/automations/{id}/runs", get(list_automation_runs)) .route("/v1/usage", get(get_usage)) .route("/v1/snapshots", get(list_snapshots)) + .route("/v1/snapshots/{id}/restore", post(restore_snapshot)) .route_layer(middleware::from_fn_with_state( state.clone(), require_runtime_token, @@ -1653,6 +1657,238 @@ async fn fork_thread( Ok((StatusCode::CREATED, Json(thread))) } +#[derive(Debug, Deserialize)] +struct UndoTurnRequest { + /// How many turns back to undo (default 0 = last turn only). + #[serde(default)] + depth: Option, +} + +#[derive(Debug, Serialize)] +struct UndoTurnResponse { + /// The new forked thread (with the last N turns removed). + thread: ThreadRecord, + /// The original user message text from the first dropped turn, + /// so the GUI can pre-populate the input box. + original_user_text: Option, +} + +async fn undo_thread_turn( + State(state): State, + Path(id): Path, + Json(req): Json, +) -> Result<(StatusCode, Json), ApiError> { + let depth = req.depth.unwrap_or(0); + let (forked_thread, original_user_text) = state + .runtime_threads + .fork_at_user_message(&id, depth) + .await + .map_err(map_thread_err)?; + Ok(( + StatusCode::CREATED, + Json(UndoTurnResponse { + thread: forked_thread, + original_user_text, + }), + )) +} + +/// Result of the snapshot-based file rollback step of patch-undo, reported +/// alongside the new forked thread. +#[derive(Debug, Serialize)] +struct PatchUndoResult { + /// Whether files were restored from a snapshot. + files_restored: bool, + /// Human-readable summary of what was restored (diff stat). + summary: Option, + /// The label of the restored snapshot (e.g. "tool:apply_patch" or "pre-turn:3"). + snapshot_label: Option, +} + +#[derive(Debug, Serialize)] +struct PatchUndoResponse { + /// Result of the snapshot-based file rollback step. + patch_result: PatchUndoResult, + /// The new forked thread (with the last turn removed). + thread: ThreadRecord, + /// The original user text from the removed turn (for re-editing). + original_user_text: Option, +} + +async fn patch_undo_thread_turn( + State(state): State, + Path(id): Path, + Json(req): Json, +) -> Result<(StatusCode, Json), ApiError> { + let depth = req.depth.unwrap_or(0); + + // Step 1: Try snapshot-based file rollback (patch_undo). + let thread = state + .runtime_threads + .get_thread(&id) + .await + .map_err(map_thread_err)?; + let patch_result = patch_undo_workspace_files(&thread.workspace); + + // Step 2: Remove the last conversation turn (undo_conversation). + let (forked_thread, original_user_text) = state + .runtime_threads + .fork_at_user_message(&id, depth) + .await + .map_err(map_thread_err)?; + + Ok(( + StatusCode::CREATED, + Json(PatchUndoResponse { + patch_result, + thread: forked_thread, + original_user_text, + }), + )) +} + +/// Restore the newest `tool:` or `pre-turn:` snapshot that differs from the +/// current workspace — same target selection as the TUI's `patch_undo`. +fn patch_undo_workspace_files(workspace: &FsPath) -> PatchUndoResult { + let repo = match crate::snapshot::SnapshotRepo::open_or_init(workspace) { + Ok(repo) => repo, + Err(e) => { + return PatchUndoResult { + files_restored: false, + summary: Some(format!("Snapshot repo unavailable: {e}")), + snapshot_label: None, + }; + } + }; + let snapshots = match repo.list(20) { + Ok(snapshots) => snapshots, + Err(e) => { + return PatchUndoResult { + files_restored: false, + summary: Some(format!("Failed to list snapshots: {e}")), + snapshot_label: None, + }; + } + }; + let target = snapshots + .iter() + .filter(|s| s.label.starts_with("tool:") || s.label.starts_with("pre-turn:")) + .find(|s| matches!(repo.work_tree_matches_snapshot(&s.id), Ok(false) | Err(_))); + let Some(target) = target else { + return PatchUndoResult { + files_restored: false, + summary: Some( + "No older tool or pre-turn snapshots differ from the current workspace." + .to_string(), + ), + snapshot_label: None, + }; + }; + if let Err(e) = repo.restore(&target.id) { + return PatchUndoResult { + files_restored: false, + summary: Some(format!("Restore failed: {e}")), + snapshot_label: None, + }; + } + + // Compute a diff stat for the summary. + use crate::dependencies::{ExternalTool as _, Git}; + let diff_stat = Git::command().and_then(|mut git| { + git.args(["diff", "--stat"]) + .current_dir(workspace) + .output() + .ok() + .and_then(|o| { + let s = String::from_utf8_lossy(&o.stdout).trim().to_string(); + if s.is_empty() { None } else { Some(s) } + }) + }); + + let short = &target.id.as_str()[..target.id.as_str().len().min(8)]; + let summary = match diff_stat { + Some(ref stat) => format!( + "Restored snapshot '{}' ({}). Files affected:\n{stat}", + target.label, short + ), + None => format!( + "Restored snapshot '{}' ({}). No diff changes detected.", + target.label, short + ), + }; + PatchUndoResult { + files_restored: true, + summary: Some(summary), + snapshot_label: Some(target.label.clone()), + } +} + +#[derive(Debug, Deserialize)] +struct RetryTurnRequest { + /// How many turns back to retry (default 0 = last turn only). + #[serde(default)] + depth: Option, + /// Override the user message text. If omitted, the original text + /// from the dropped turn is re-used. + #[serde(default)] + prompt: Option, +} + +#[derive(Debug, Serialize)] +struct RetryTurnResponse { + /// The new forked thread (with the last N turns removed). + thread: ThreadRecord, + /// The turn created by the retry. + turn: TurnRecord, +} + +async fn retry_thread_turn( + State(state): State, + Path(id): Path, + Json(req): Json, +) -> Result<(StatusCode, Json), ApiError> { + let depth = req.depth.unwrap_or(0); + let (forked_thread, original_user_text) = state + .runtime_threads + .fork_at_user_message(&id, depth) + .await + .map_err(map_thread_err)?; + + let retry_prompt = req.prompt.or(original_user_text).unwrap_or_default(); + if retry_prompt.trim().is_empty() { + return Err(ApiError::bad_request( + "No user message to retry — the dropped turn had no user text", + )); + } + + let turn = state + .runtime_threads + .start_turn( + &forked_thread.id, + StartTurnRequest { + prompt: retry_prompt, + input_summary: None, + model: None, + mode: None, + allow_shell: None, + trust_mode: None, + auto_approve: None, + dynamic_tools: Vec::new(), + environment_id: None, + }, + ) + .await + .map_err(map_thread_err)?; + + Ok(( + StatusCode::CREATED, + Json(RetryTurnResponse { + thread: forked_thread, + turn, + }), + )) +} + async fn start_thread_turn( State(state): State, Path(id): Path, @@ -2306,6 +2542,24 @@ async fn list_snapshots( )?)) } +async fn restore_snapshot( + State(state): State, + Path(id): Path, +) -> Result, ApiError> { + restore_snapshot_for_workspace(&state.workspace, &id)?; + Ok(Json(json!({ + "restored": id, + }))) +} + +fn restore_snapshot_for_workspace(workspace: &FsPath, id: &str) -> Result<(), ApiError> { + let repo = crate::snapshot::SnapshotRepo::open_or_init(workspace) + .map_err(|e| ApiError::internal(format!("Snapshot repo init failed: {e}")))?; + let snapshot_id = crate::snapshot::SnapshotId(id.to_string()); + repo.restore(&snapshot_id) + .map_err(|e| ApiError::internal(format!("Snapshot restore failed: {e}"))) +} + fn snapshot_entries_for_workspace( workspace: &FsPath, query: SnapshotsQuery, @@ -4296,6 +4550,196 @@ mod tests { Ok(()) } + /// Create a thread over HTTP and seed it with one user/assistant turn. + /// Shared setup for the undo/patch-undo/retry endpoint tests. + async fn create_seeded_thread( + addr: &SocketAddr, + runtime_threads: &SharedRuntimeThreadManager, + root: &FsPath, + user_text: &str, + ) -> Result { + let client = crate::tls::reqwest_client(); + let created: serde_json::Value = client + .post(format!("http://{addr}/v1/threads")) + .json(&json!({ + "model": "deepseek-v4-pro", + "mode": "agent", + "workspace": root.join("workspace") + })) + .send() + .await? + .error_for_status()? + .json() + .await?; + let thread_id = created["id"] + .as_str() + .context("missing thread id")? + .to_string(); + + runtime_threads + .seed_thread_from_messages( + &thread_id, + &[ + Message { + role: "user".to_string(), + content: vec![ContentBlock::Text { + text: user_text.to_string(), + cache_control: None, + }], + }, + Message { + role: "assistant".to_string(), + content: vec![ContentBlock::Text { + text: "Done — anything else?".to_string(), + cache_control: None, + }], + }, + ], + ) + .await?; + Ok(thread_id) + } + + #[tokio::test] + async fn undo_endpoint_forks_thread_and_returns_original_user_text() -> Result<()> { + let root = std::env::temp_dir().join(format!("deepseek-undo-endpoint-{}", Uuid::new_v4())); + let sessions_dir = root.join("sessions"); + let Some((addr, runtime_threads, handle)) = + spawn_test_server_with_root(root.clone(), sessions_dir).await? + else { + return Ok(()); + }; + let thread_id = + create_seeded_thread(&addr, &runtime_threads, &root, "Please undo this turn").await?; + let client = crate::tls::reqwest_client(); + + let resp = client + .post(format!("http://{addr}/v1/threads/{thread_id}/undo")) + .json(&json!({})) + .send() + .await?; + assert_eq!(resp.status(), StatusCode::CREATED); + let undone: serde_json::Value = resp.json().await?; + assert_eq!(undone["original_user_text"], "Please undo this turn"); + let forked_id = undone["thread"]["id"] + .as_str() + .context("missing forked thread id")?; + assert_ne!(forked_id, thread_id, "undo must fork, not mutate in place"); + + // The forked thread has the undone turn removed. + let detail: serde_json::Value = client + .get(format!("http://{addr}/v1/threads/{forked_id}")) + .send() + .await? + .error_for_status()? + .json() + .await?; + assert_eq!(detail["turns"].as_array().map_or(usize::MAX, Vec::len), 0); + + handle.abort(); + Ok(()) + } + + #[tokio::test] + async fn undo_endpoint_404s_for_missing_thread() -> Result<()> { + let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else { + return Ok(()); + }; + let client = crate::tls::reqwest_client(); + let resp = client + .post(format!("http://{addr}/v1/threads/thr_missing/undo")) + .json(&json!({})) + .send() + .await?; + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + handle.abort(); + Ok(()) + } + + #[tokio::test] + async fn patch_undo_endpoint_forks_and_reports_file_rollback_state() -> Result<()> { + let root = + std::env::temp_dir().join(format!("deepseek-patch-undo-endpoint-{}", Uuid::new_v4())); + let sessions_dir = root.join("sessions"); + let Some((addr, runtime_threads, handle)) = + spawn_test_server_with_root(root.clone(), sessions_dir).await? + else { + return Ok(()); + }; + let thread_id = + create_seeded_thread(&addr, &runtime_threads, &root, "Roll back the patch").await?; + let client = crate::tls::reqwest_client(); + + let resp = client + .post(format!("http://{addr}/v1/threads/{thread_id}/patch-undo")) + .json(&json!({})) + .send() + .await?; + assert_eq!(resp.status(), StatusCode::CREATED); + let undone: serde_json::Value = resp.json().await?; + // The fresh workspace has no tool/pre-turn snapshots to roll back to, + // so the file-restore step reports failure while the conversation + // undo still forks the thread. + assert_eq!(undone["patch_result"]["files_restored"], false); + assert!(undone["patch_result"]["summary"].is_string()); + assert_eq!(undone["original_user_text"], "Roll back the patch"); + assert_ne!(undone["thread"]["id"].as_str(), Some(thread_id.as_str())); + + handle.abort(); + Ok(()) + } + + #[tokio::test] + async fn retry_endpoint_reuses_dropped_user_text_to_start_a_turn() -> Result<()> { + let root = std::env::temp_dir().join(format!("deepseek-retry-endpoint-{}", Uuid::new_v4())); + let sessions_dir = root.join("sessions"); + let Some((addr, runtime_threads, handle)) = + spawn_test_server_with_root(root.clone(), sessions_dir).await? + else { + return Ok(()); + }; + let thread_id = + create_seeded_thread(&addr, &runtime_threads, &root, "Retry this request").await?; + let client = crate::tls::reqwest_client(); + + let resp = client + .post(format!("http://{addr}/v1/threads/{thread_id}/retry")) + .json(&json!({})) + .send() + .await?; + assert_eq!(resp.status(), StatusCode::CREATED); + let retried: serde_json::Value = resp.json().await?; + let forked_id = retried["thread"]["id"] + .as_str() + .context("missing forked thread id")?; + assert_ne!(forked_id, thread_id); + assert_eq!(retried["turn"]["thread_id"], forked_id); + + handle.abort(); + Ok(()) + } + + #[test] + fn restore_snapshot_endpoint_helper_restores_workspace_files() -> Result<()> { + let _lock = lock_test_env(); + let root = tempfile::tempdir()?; + let home = root.path().join("home"); + fs::create_dir_all(&home)?; + let _home = EnvVarGuard::set("HOME", &home); + + let workspace = root.path().join("workspace"); + fs::create_dir_all(&workspace)?; + let repo = crate::snapshot::SnapshotRepo::open_or_init(&workspace)?; + fs::write(workspace.join("a.txt"), "v1")?; + let snapshot_id = repo.snapshot("pre-turn:1")?; + fs::write(workspace.join("a.txt"), "v2")?; + + restore_snapshot_for_workspace(&workspace, snapshot_id.as_str()) + .expect("snapshot restore should succeed"); + assert_eq!(fs::read_to_string(workspace.join("a.txt"))?, "v1"); + Ok(()) + } + #[tokio::test] async fn session_create_from_thread_rejects_active_turn() -> Result<()> { let Some((addr, runtime_threads, handle)) = spawn_test_server().await? else {