Harvest PR #2808: thread undo/retry and snapshot restore endpoints

The release branch independently evolved the sessions API from #2808
(POST /v1/sessions creates from a thread, /v1/sessions/{id}/resume-thread
resumes), but the turn-rewind and snapshot-restore endpoints never
landed. Port them onto the current thread model:

- POST /v1/threads/{id}/undo — fork at the Nth-from-last user message
  and return the dropped user text for input pre-population.
- POST /v1/threads/{id}/patch-undo — restore the newest differing
  tool:/pre-turn: workspace snapshot (same target selection as the
  TUI's patch_undo), then fork the conversation; reports the file
  rollback result alongside the forked thread.
- POST /v1/threads/{id}/retry — fork and immediately start a turn
  re-using the dropped user text (or an override prompt), adapted to
  the extended StartTurnRequest (dynamic_tools, environment_id).
- POST /v1/snapshots/{id}/restore — restore a workspace snapshot by id.

fork_at_user_message and its tests were already present; this adds the
HTTP surface plus endpoint tests for undo/patch-undo/retry/restore.

Harvested from PR #2808 by @bengao168

Co-authored-by: Ben Gao <bengao168@msn.com>
Co-authored-by: Hunter Bown <hmbown@gmail.com>
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
CodeWhale Agent
2026-06-12 14:43:42 -07:00
parent 9b31621b19
commit 71c45cf92f
+444
View File
@@ -589,6 +589,9 @@ pub fn build_router(state: RuntimeApiState) -> Router {
.route("/v1/threads/{id}", get(get_thread).patch(update_thread))
.route("/v1/threads/{id}/resume", post(resume_thread))
.route("/v1/threads/{id}/fork", post(fork_thread))
.route("/v1/threads/{id}/undo", post(undo_thread_turn))
.route("/v1/threads/{id}/patch-undo", post(patch_undo_thread_turn))
.route("/v1/threads/{id}/retry", post(retry_thread_turn))
.route("/v1/threads/{id}/turns", post(start_thread_turn))
.route(
"/v1/threads/{id}/turns/{turn_id}/steer",
@@ -628,6 +631,7 @@ pub fn build_router(state: RuntimeApiState) -> Router {
.route("/v1/automations/{id}/runs", get(list_automation_runs))
.route("/v1/usage", get(get_usage))
.route("/v1/snapshots", get(list_snapshots))
.route("/v1/snapshots/{id}/restore", post(restore_snapshot))
.route_layer(middleware::from_fn_with_state(
state.clone(),
require_runtime_token,
@@ -1653,6 +1657,238 @@ async fn fork_thread(
Ok((StatusCode::CREATED, Json(thread)))
}
#[derive(Debug, Deserialize)]
struct UndoTurnRequest {
/// How many turns back to undo (default 0 = last turn only).
#[serde(default)]
depth: Option<usize>,
}
#[derive(Debug, Serialize)]
struct UndoTurnResponse {
/// The new forked thread (with the last N turns removed).
thread: ThreadRecord,
/// The original user message text from the first dropped turn,
/// so the GUI can pre-populate the input box.
original_user_text: Option<String>,
}
async fn undo_thread_turn(
State(state): State<RuntimeApiState>,
Path(id): Path<String>,
Json(req): Json<UndoTurnRequest>,
) -> Result<(StatusCode, Json<UndoTurnResponse>), ApiError> {
let depth = req.depth.unwrap_or(0);
let (forked_thread, original_user_text) = state
.runtime_threads
.fork_at_user_message(&id, depth)
.await
.map_err(map_thread_err)?;
Ok((
StatusCode::CREATED,
Json(UndoTurnResponse {
thread: forked_thread,
original_user_text,
}),
))
}
/// Result of the snapshot-based file rollback step of patch-undo, reported
/// alongside the new forked thread.
#[derive(Debug, Serialize)]
struct PatchUndoResult {
/// Whether files were restored from a snapshot.
files_restored: bool,
/// Human-readable summary of what was restored (diff stat).
summary: Option<String>,
/// The label of the restored snapshot (e.g. "tool:apply_patch" or "pre-turn:3").
snapshot_label: Option<String>,
}
#[derive(Debug, Serialize)]
struct PatchUndoResponse {
/// Result of the snapshot-based file rollback step.
patch_result: PatchUndoResult,
/// The new forked thread (with the last turn removed).
thread: ThreadRecord,
/// The original user text from the removed turn (for re-editing).
original_user_text: Option<String>,
}
async fn patch_undo_thread_turn(
State(state): State<RuntimeApiState>,
Path(id): Path<String>,
Json(req): Json<UndoTurnRequest>,
) -> Result<(StatusCode, Json<PatchUndoResponse>), ApiError> {
let depth = req.depth.unwrap_or(0);
// Step 1: Try snapshot-based file rollback (patch_undo).
let thread = state
.runtime_threads
.get_thread(&id)
.await
.map_err(map_thread_err)?;
let patch_result = patch_undo_workspace_files(&thread.workspace);
// Step 2: Remove the last conversation turn (undo_conversation).
let (forked_thread, original_user_text) = state
.runtime_threads
.fork_at_user_message(&id, depth)
.await
.map_err(map_thread_err)?;
Ok((
StatusCode::CREATED,
Json(PatchUndoResponse {
patch_result,
thread: forked_thread,
original_user_text,
}),
))
}
/// Restore the newest `tool:` or `pre-turn:` snapshot that differs from the
/// current workspace — same target selection as the TUI's `patch_undo`.
fn patch_undo_workspace_files(workspace: &FsPath) -> PatchUndoResult {
let repo = match crate::snapshot::SnapshotRepo::open_or_init(workspace) {
Ok(repo) => repo,
Err(e) => {
return PatchUndoResult {
files_restored: false,
summary: Some(format!("Snapshot repo unavailable: {e}")),
snapshot_label: None,
};
}
};
let snapshots = match repo.list(20) {
Ok(snapshots) => snapshots,
Err(e) => {
return PatchUndoResult {
files_restored: false,
summary: Some(format!("Failed to list snapshots: {e}")),
snapshot_label: None,
};
}
};
let target = snapshots
.iter()
.filter(|s| s.label.starts_with("tool:") || s.label.starts_with("pre-turn:"))
.find(|s| matches!(repo.work_tree_matches_snapshot(&s.id), Ok(false) | Err(_)));
let Some(target) = target else {
return PatchUndoResult {
files_restored: false,
summary: Some(
"No older tool or pre-turn snapshots differ from the current workspace."
.to_string(),
),
snapshot_label: None,
};
};
if let Err(e) = repo.restore(&target.id) {
return PatchUndoResult {
files_restored: false,
summary: Some(format!("Restore failed: {e}")),
snapshot_label: None,
};
}
// Compute a diff stat for the summary.
use crate::dependencies::{ExternalTool as _, Git};
let diff_stat = Git::command().and_then(|mut git| {
git.args(["diff", "--stat"])
.current_dir(workspace)
.output()
.ok()
.and_then(|o| {
let s = String::from_utf8_lossy(&o.stdout).trim().to_string();
if s.is_empty() { None } else { Some(s) }
})
});
let short = &target.id.as_str()[..target.id.as_str().len().min(8)];
let summary = match diff_stat {
Some(ref stat) => format!(
"Restored snapshot '{}' ({}). Files affected:\n{stat}",
target.label, short
),
None => format!(
"Restored snapshot '{}' ({}). No diff changes detected.",
target.label, short
),
};
PatchUndoResult {
files_restored: true,
summary: Some(summary),
snapshot_label: Some(target.label.clone()),
}
}
#[derive(Debug, Deserialize)]
struct RetryTurnRequest {
/// How many turns back to retry (default 0 = last turn only).
#[serde(default)]
depth: Option<usize>,
/// Override the user message text. If omitted, the original text
/// from the dropped turn is re-used.
#[serde(default)]
prompt: Option<String>,
}
#[derive(Debug, Serialize)]
struct RetryTurnResponse {
/// The new forked thread (with the last N turns removed).
thread: ThreadRecord,
/// The turn created by the retry.
turn: TurnRecord,
}
async fn retry_thread_turn(
State(state): State<RuntimeApiState>,
Path(id): Path<String>,
Json(req): Json<RetryTurnRequest>,
) -> Result<(StatusCode, Json<RetryTurnResponse>), ApiError> {
let depth = req.depth.unwrap_or(0);
let (forked_thread, original_user_text) = state
.runtime_threads
.fork_at_user_message(&id, depth)
.await
.map_err(map_thread_err)?;
let retry_prompt = req.prompt.or(original_user_text).unwrap_or_default();
if retry_prompt.trim().is_empty() {
return Err(ApiError::bad_request(
"No user message to retry — the dropped turn had no user text",
));
}
let turn = state
.runtime_threads
.start_turn(
&forked_thread.id,
StartTurnRequest {
prompt: retry_prompt,
input_summary: None,
model: None,
mode: None,
allow_shell: None,
trust_mode: None,
auto_approve: None,
dynamic_tools: Vec::new(),
environment_id: None,
},
)
.await
.map_err(map_thread_err)?;
Ok((
StatusCode::CREATED,
Json(RetryTurnResponse {
thread: forked_thread,
turn,
}),
))
}
async fn start_thread_turn(
State(state): State<RuntimeApiState>,
Path(id): Path<String>,
@@ -2306,6 +2542,24 @@ async fn list_snapshots(
)?))
}
async fn restore_snapshot(
State(state): State<RuntimeApiState>,
Path(id): Path<String>,
) -> Result<Json<Value>, ApiError> {
restore_snapshot_for_workspace(&state.workspace, &id)?;
Ok(Json(json!({
"restored": id,
})))
}
fn restore_snapshot_for_workspace(workspace: &FsPath, id: &str) -> Result<(), ApiError> {
let repo = crate::snapshot::SnapshotRepo::open_or_init(workspace)
.map_err(|e| ApiError::internal(format!("Snapshot repo init failed: {e}")))?;
let snapshot_id = crate::snapshot::SnapshotId(id.to_string());
repo.restore(&snapshot_id)
.map_err(|e| ApiError::internal(format!("Snapshot restore failed: {e}")))
}
fn snapshot_entries_for_workspace(
workspace: &FsPath,
query: SnapshotsQuery,
@@ -4296,6 +4550,196 @@ mod tests {
Ok(())
}
/// Create a thread over HTTP and seed it with one user/assistant turn.
/// Shared setup for the undo/patch-undo/retry endpoint tests.
async fn create_seeded_thread(
addr: &SocketAddr,
runtime_threads: &SharedRuntimeThreadManager,
root: &FsPath,
user_text: &str,
) -> Result<String> {
let client = crate::tls::reqwest_client();
let created: serde_json::Value = client
.post(format!("http://{addr}/v1/threads"))
.json(&json!({
"model": "deepseek-v4-pro",
"mode": "agent",
"workspace": root.join("workspace")
}))
.send()
.await?
.error_for_status()?
.json()
.await?;
let thread_id = created["id"]
.as_str()
.context("missing thread id")?
.to_string();
runtime_threads
.seed_thread_from_messages(
&thread_id,
&[
Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text: user_text.to_string(),
cache_control: None,
}],
},
Message {
role: "assistant".to_string(),
content: vec![ContentBlock::Text {
text: "Done — anything else?".to_string(),
cache_control: None,
}],
},
],
)
.await?;
Ok(thread_id)
}
#[tokio::test]
async fn undo_endpoint_forks_thread_and_returns_original_user_text() -> Result<()> {
let root = std::env::temp_dir().join(format!("deepseek-undo-endpoint-{}", Uuid::new_v4()));
let sessions_dir = root.join("sessions");
let Some((addr, runtime_threads, handle)) =
spawn_test_server_with_root(root.clone(), sessions_dir).await?
else {
return Ok(());
};
let thread_id =
create_seeded_thread(&addr, &runtime_threads, &root, "Please undo this turn").await?;
let client = crate::tls::reqwest_client();
let resp = client
.post(format!("http://{addr}/v1/threads/{thread_id}/undo"))
.json(&json!({}))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::CREATED);
let undone: serde_json::Value = resp.json().await?;
assert_eq!(undone["original_user_text"], "Please undo this turn");
let forked_id = undone["thread"]["id"]
.as_str()
.context("missing forked thread id")?;
assert_ne!(forked_id, thread_id, "undo must fork, not mutate in place");
// The forked thread has the undone turn removed.
let detail: serde_json::Value = client
.get(format!("http://{addr}/v1/threads/{forked_id}"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(detail["turns"].as_array().map_or(usize::MAX, Vec::len), 0);
handle.abort();
Ok(())
}
#[tokio::test]
async fn undo_endpoint_404s_for_missing_thread() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let resp = client
.post(format!("http://{addr}/v1/threads/thr_missing/undo"))
.json(&json!({}))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
handle.abort();
Ok(())
}
#[tokio::test]
async fn patch_undo_endpoint_forks_and_reports_file_rollback_state() -> Result<()> {
let root =
std::env::temp_dir().join(format!("deepseek-patch-undo-endpoint-{}", Uuid::new_v4()));
let sessions_dir = root.join("sessions");
let Some((addr, runtime_threads, handle)) =
spawn_test_server_with_root(root.clone(), sessions_dir).await?
else {
return Ok(());
};
let thread_id =
create_seeded_thread(&addr, &runtime_threads, &root, "Roll back the patch").await?;
let client = crate::tls::reqwest_client();
let resp = client
.post(format!("http://{addr}/v1/threads/{thread_id}/patch-undo"))
.json(&json!({}))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::CREATED);
let undone: serde_json::Value = resp.json().await?;
// The fresh workspace has no tool/pre-turn snapshots to roll back to,
// so the file-restore step reports failure while the conversation
// undo still forks the thread.
assert_eq!(undone["patch_result"]["files_restored"], false);
assert!(undone["patch_result"]["summary"].is_string());
assert_eq!(undone["original_user_text"], "Roll back the patch");
assert_ne!(undone["thread"]["id"].as_str(), Some(thread_id.as_str()));
handle.abort();
Ok(())
}
#[tokio::test]
async fn retry_endpoint_reuses_dropped_user_text_to_start_a_turn() -> Result<()> {
let root = std::env::temp_dir().join(format!("deepseek-retry-endpoint-{}", Uuid::new_v4()));
let sessions_dir = root.join("sessions");
let Some((addr, runtime_threads, handle)) =
spawn_test_server_with_root(root.clone(), sessions_dir).await?
else {
return Ok(());
};
let thread_id =
create_seeded_thread(&addr, &runtime_threads, &root, "Retry this request").await?;
let client = crate::tls::reqwest_client();
let resp = client
.post(format!("http://{addr}/v1/threads/{thread_id}/retry"))
.json(&json!({}))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::CREATED);
let retried: serde_json::Value = resp.json().await?;
let forked_id = retried["thread"]["id"]
.as_str()
.context("missing forked thread id")?;
assert_ne!(forked_id, thread_id);
assert_eq!(retried["turn"]["thread_id"], forked_id);
handle.abort();
Ok(())
}
#[test]
fn restore_snapshot_endpoint_helper_restores_workspace_files() -> Result<()> {
let _lock = lock_test_env();
let root = tempfile::tempdir()?;
let home = root.path().join("home");
fs::create_dir_all(&home)?;
let _home = EnvVarGuard::set("HOME", &home);
let workspace = root.path().join("workspace");
fs::create_dir_all(&workspace)?;
let repo = crate::snapshot::SnapshotRepo::open_or_init(&workspace)?;
fs::write(workspace.join("a.txt"), "v1")?;
let snapshot_id = repo.snapshot("pre-turn:1")?;
fs::write(workspace.join("a.txt"), "v2")?;
restore_snapshot_for_workspace(&workspace, snapshot_id.as_str())
.expect("snapshot restore should succeed");
assert_eq!(fs::read_to_string(workspace.join("a.txt"))?, "v1");
Ok(())
}
#[tokio::test]
async fn session_create_from_thread_rejects_active_turn() -> Result<()> {
let Some((addr, runtime_threads, handle)) = spawn_test_server().await? else {