From 7bd610fcc0d4d4f138d29c97b909bed22f9fa61a Mon Sep 17 00:00:00 2001 From: Duducoco <1411420236@qq.com> Date: Tue, 12 May 2026 22:56:35 +0800 Subject: [PATCH] feat(engine): chunk parallel-safe tool execution - Split tool plans into parallel chunks separated by serial barriers - Preserve sequential handling for approvals and interactive tools - Add coverage for mixed execution batches --- crates/tui/src/core/engine.rs | 12 +- crates/tui/src/core/engine/dispatch.rs | 42 +- crates/tui/src/core/engine/tests.rs | 71 +- crates/tui/src/core/engine/turn_loop.rs | 838 ++++++++++++------------ 4 files changed, 543 insertions(+), 420 deletions(-) diff --git a/crates/tui/src/core/engine.rs b/crates/tui/src/core/engine.rs index aadef244..e65d0bcb 100644 --- a/crates/tui/src/core/engine.rs +++ b/crates/tui/src/core/engine.rs @@ -2061,12 +2061,14 @@ mod tool_setup; mod turn_loop; use self::approval::{ApprovalDecision, ApprovalResult, UserInputDecision}; +#[cfg(test)] +use self::dispatch::should_parallelize_tool_batch; use self::dispatch::{ - ParallelToolResult, ParallelToolResultEntry, ToolExecGuard, ToolExecOutcome, ToolExecutionPlan, - caller_allowed_for_tool, caller_type_for_tool_use, final_tool_input, format_tool_error, - mcp_tool_approval_description, mcp_tool_is_parallel_safe, mcp_tool_is_read_only, - parse_parallel_tool_calls, parse_tool_input, should_force_update_plan_first, - should_parallelize_tool_batch, should_stop_after_plan_tool, + ParallelToolResult, ParallelToolResultEntry, ToolExecGuard, ToolExecOutcome, + ToolExecutionBatch, ToolExecutionPlan, caller_allowed_for_tool, caller_type_for_tool_use, + final_tool_input, format_tool_error, mcp_tool_approval_description, mcp_tool_is_parallel_safe, + mcp_tool_is_read_only, parse_parallel_tool_calls, parse_tool_input, + plan_tool_execution_batches, should_force_update_plan_first, should_stop_after_plan_tool, }; use self::loop_guard::{AttemptDecision, LoopGuard, OutcomeDecision}; #[cfg(test)] diff --git a/crates/tui/src/core/engine/dispatch.rs b/crates/tui/src/core/engine/dispatch.rs index fb2e7950..335639c4 100644 --- a/crates/tui/src/core/engine/dispatch.rs +++ b/crates/tui/src/core/engine/dispatch.rs @@ -51,6 +51,11 @@ pub(super) struct ToolExecutionPlan { pub(super) guard_result: Option, } +pub(super) enum ToolExecutionBatch { + Parallel(Vec), + Serial(Box), +} + #[derive(Debug, serde::Serialize)] pub(super) struct ParallelToolResultEntry { pub(super) tool_name: String, @@ -265,11 +270,40 @@ pub(super) fn parse_parallel_tool_calls( // === Dispatch policy ================================================== +#[cfg(test)] pub(super) fn should_parallelize_tool_batch(plans: &[ToolExecutionPlan]) -> bool { - !plans.is_empty() - && plans.iter().all(|plan| { - plan.read_only && plan.supports_parallel && !plan.approval_required && !plan.interactive - }) + !plans.is_empty() && plans.iter().all(tool_plan_is_parallel_safe) +} + +pub(super) fn tool_plan_is_parallel_safe(plan: &ToolExecutionPlan) -> bool { + plan.read_only && plan.supports_parallel && !plan.approval_required && !plan.interactive +} + +pub(super) fn plan_tool_execution_batches( + plans: Vec, +) -> Vec { + let mut batches = Vec::new(); + let mut parallel_chunk = Vec::new(); + + for plan in plans { + if tool_plan_is_parallel_safe(&plan) { + parallel_chunk.push(plan); + continue; + } + + if !parallel_chunk.is_empty() { + batches.push(ToolExecutionBatch::Parallel(std::mem::take( + &mut parallel_chunk, + ))); + } + batches.push(ToolExecutionBatch::Serial(Box::new(plan))); + } + + if !parallel_chunk.is_empty() { + batches.push(ToolExecutionBatch::Parallel(parallel_chunk)); + } + + batches } pub(super) fn should_stop_after_plan_tool( diff --git a/crates/tui/src/core/engine/tests.rs b/crates/tui/src/core/engine/tests.rs index d247103f..c898c6ce 100644 --- a/crates/tui/src/core/engine/tests.rs +++ b/crates/tui/src/core/engine/tests.rs @@ -119,10 +119,26 @@ fn make_plan( supports_parallel: bool, approval_required: bool, interactive: bool, +) -> ToolExecutionPlan { + make_plan_at( + 0, + read_only, + supports_parallel, + approval_required, + interactive, + ) +} + +fn make_plan_at( + index: usize, + read_only: bool, + supports_parallel: bool, + approval_required: bool, + interactive: bool, ) -> ToolExecutionPlan { ToolExecutionPlan { - index: 0, - id: "tool-1".to_string(), + index, + id: format!("tool-{index}"), name: "grep_files".to_string(), input: json!({"pattern": "test"}), caller: None, @@ -208,6 +224,57 @@ fn parallel_batch_requires_read_only_parallel_tools() { assert!(!should_parallelize_tool_batch(&plans)); } +#[test] +fn tool_execution_batches_use_serial_barriers() { + let batches = plan_tool_execution_batches(vec![ + make_plan_at(0, true, true, false, false), + make_plan_at(1, true, true, false, false), + make_plan_at(2, false, false, true, false), + make_plan_at(3, true, true, false, false), + make_plan_at(4, true, false, false, false), + make_plan_at(5, true, true, false, false), + make_plan_at(6, true, true, false, false), + ]); + + assert_eq!(batches.len(), 5); + + match &batches[0] { + ToolExecutionBatch::Parallel(plans) => { + assert_eq!( + plans.iter().map(|plan| plan.index).collect::>(), + vec![0, 1] + ); + } + ToolExecutionBatch::Serial(_) => panic!("first batch should be parallel"), + } + match &batches[1] { + ToolExecutionBatch::Serial(plan) => assert_eq!(plan.index, 2), + ToolExecutionBatch::Parallel(_) => panic!("second batch should be serial"), + } + match &batches[2] { + ToolExecutionBatch::Parallel(plans) => { + assert_eq!( + plans.iter().map(|plan| plan.index).collect::>(), + vec![3] + ); + } + ToolExecutionBatch::Serial(_) => panic!("third batch should be parallel"), + } + match &batches[3] { + ToolExecutionBatch::Serial(plan) => assert_eq!(plan.index, 4), + ToolExecutionBatch::Parallel(_) => panic!("fourth batch should be serial"), + } + match &batches[4] { + ToolExecutionBatch::Parallel(plans) => { + assert_eq!( + plans.iter().map(|plan| plan.index).collect::>(), + vec![5, 6] + ); + } + ToolExecutionBatch::Serial(_) => panic!("fifth batch should be parallel"), + } +} + #[test] fn successful_update_plan_ends_plan_mode_turn_immediately() { assert!(should_stop_after_plan_tool( diff --git a/crates/tui/src/core/engine/turn_loop.rs b/crates/tui/src/core/engine/turn_loop.rs index 63e63b45..5cce31c0 100644 --- a/crates/tui/src/core/engine/turn_loop.rs +++ b/crates/tui/src/core/engine/turn_loop.rs @@ -1194,16 +1194,25 @@ impl Engine { } active_tool_names.extend(deferred_tools_hydrated_this_batch); - let parallel_allowed = should_parallelize_tool_batch(&plans); - if parallel_allowed && plans.len() > 1 { + let plan_count = plans.len(); + let batches = plan_tool_execution_batches(plans); + let parallel_chunks = batches + .iter() + .filter_map(|batch| match batch { + ToolExecutionBatch::Parallel(plans) if plans.len() > 1 => Some(plans.len()), + _ => None, + }) + .collect::>(); + if !parallel_chunks.is_empty() { + let parallel_tool_count: usize = parallel_chunks.iter().sum(); let _ = self .tx_event .send(Event::status(format!( - "Executing {} read-only tools in parallel", - plans.len() + "Executing {parallel_tool_count} read-only tools in {} parallel chunk(s)", + parallel_chunks.len() ))) .await; - } else if plans.len() > 1 { + } else if plan_count > 1 { let _ = self .tx_event .send(Event::status( @@ -1212,167 +1221,438 @@ impl Engine { .await; } - let mut outcomes: Vec> = Vec::with_capacity(plans.len()); - outcomes.resize_with(plans.len(), || None); + let mut outcomes: Vec> = Vec::with_capacity(plan_count); + outcomes.resize_with(plan_count, || None); - if parallel_allowed { - let mut tool_tasks = FuturesUnordered::new(); - for plan in plans { - if let Some(result) = plan.guard_result.clone() { - let result = Ok(result); - let _ = self - .tx_event - .send(Event::ToolCallComplete { - id: plan.id.clone(), - name: plan.name.clone(), - result: result.clone(), + for batch in batches { + let (parallel_allowed, plans) = match batch { + ToolExecutionBatch::Parallel(plans) => (true, plans), + ToolExecutionBatch::Serial(plan) => (false, vec![*plan]), + }; + + if parallel_allowed { + let mut tool_tasks = FuturesUnordered::new(); + for plan in plans { + if let Some(result) = plan.guard_result.clone() { + let result = Ok(result); + let _ = self + .tx_event + .send(Event::ToolCallComplete { + id: plan.id.clone(), + name: plan.name.clone(), + result: result.clone(), + }) + .await; + outcomes[plan.index] = Some(ToolExecOutcome { + index: plan.index, + id: plan.id, + name: plan.name, + input: plan.input, + started_at: Instant::now(), + result, + }); + continue; + } + if let Some(err) = plan.blocked_error.clone() { + outcomes[plan.index] = Some(ToolExecOutcome { + index: plan.index, + id: plan.id, + name: plan.name, + input: plan.input, + started_at: Instant::now(), + result: Err(err), + }); + continue; + } + let registry = tool_registry; + let lock = tool_exec_lock.clone(); + let mcp_pool = mcp_pool.clone(); + let tx_event = self.tx_event.clone(); + let session_id = self.session.id.clone(); + let started_at = Instant::now(); + + tool_tasks.push(async move { + let mut result = Engine::execute_tool_with_lock( + lock, + plan.supports_parallel, + plan.interactive, + tx_event.clone(), + plan.name.clone(), + plan.input.clone(), + registry, + mcp_pool, + None, + ) + .await; + + // #500: spill outsized output before fanout (mirror + // of the sequential path below). Emit a + // `tool.spillover` audit event so operators can + // correlate large-output episodes with disk usage. + if let Ok(tool_result) = result.as_mut() + && let Some(path) = + crate::tools::truncate::apply_spillover_with_artifact( + tool_result, + &plan.id, + &plan.name, + &session_id, + ) + { + emit_tool_audit(json!({ + "event": "tool.spillover", + "tool_id": plan.id.clone(), + "tool_name": plan.name.clone(), + "path": path.display().to_string(), + })); + } + + let _ = tx_event + .send(Event::ToolCallComplete { + id: plan.id.clone(), + name: plan.name.clone(), + result: result.clone(), + }) + .await; + + ToolExecOutcome { + index: plan.index, + id: plan.id, + name: plan.name, + input: plan.input, + started_at, + result, + } + }); + } + + while let Some(outcome) = tool_tasks.next().await { + let index = outcome.index; + outcomes[index] = Some(outcome); + } + } else { + for plan in plans { + let tool_id = plan.id.clone(); + let tool_name = plan.name.clone(); + let tool_input = plan.input.clone(); + let tool_caller = plan.caller.clone(); + + if let Some(result) = plan.guard_result.clone() { + let result = Ok(result); + let _ = self + .tx_event + .send(Event::ToolCallComplete { + id: tool_id.clone(), + name: tool_name.clone(), + result: result.clone(), + }) + .await; + outcomes[plan.index] = Some(ToolExecOutcome { + index: plan.index, + id: tool_id, + name: tool_name, + input: tool_input, + started_at: Instant::now(), + result, + }); + continue; + } + + if let Some(err) = plan.blocked_error.clone() { + let result = Err(err); + let _ = self + .tx_event + .send(Event::ToolCallComplete { + id: tool_id.clone(), + name: tool_name.clone(), + result: result.clone(), + }) + .await; + outcomes[plan.index] = Some(ToolExecOutcome { + index: plan.index, + id: tool_id, + name: tool_name, + input: tool_input, + started_at: Instant::now(), + result, + }); + continue; + } + + if tool_name == MULTI_TOOL_PARALLEL_NAME { + let started_at = Instant::now(); + let result = self + .execute_parallel_tool( + tool_input.clone(), + tool_registry, + tool_exec_lock.clone(), + ) + .await; + + let _ = self + .tx_event + .send(Event::ToolCallComplete { + id: tool_id.clone(), + name: tool_name.clone(), + result: result.clone(), + }) + .await; + + outcomes[plan.index] = Some(ToolExecOutcome { + index: plan.index, + id: tool_id, + name: tool_name, + input: tool_input, + started_at, + result, + }); + continue; + } + + if tool_name == CODE_EXECUTION_TOOL_NAME { + let started_at = Instant::now(); + let result = + execute_code_execution_tool(&tool_input, &self.session.workspace) + .await; + + let _ = self + .tx_event + .send(Event::ToolCallComplete { + id: tool_id.clone(), + name: tool_name.clone(), + result: result.clone(), + }) + .await; + + outcomes[plan.index] = Some(ToolExecOutcome { + index: plan.index, + id: tool_id, + name: tool_name, + input: tool_input, + started_at, + result, + }); + continue; + } + + if tool_name == JS_EXECUTION_TOOL_NAME { + let started_at = Instant::now(); + let result = + execute_js_execution_tool(&tool_input, &self.session.workspace) + .await; + + let _ = self + .tx_event + .send(Event::ToolCallComplete { + id: tool_id.clone(), + name: tool_name.clone(), + result: result.clone(), + }) + .await; + + outcomes[plan.index] = Some(ToolExecOutcome { + index: plan.index, + id: tool_id, + name: tool_name, + input: tool_input, + started_at, + result, + }); + continue; + } + + if is_tool_search_tool(&tool_name) { + let started_at = Instant::now(); + let result = execute_tool_search( + &tool_name, + &tool_input, + &tool_catalog, + &mut active_tool_names, + ); + + let _ = self + .tx_event + .send(Event::ToolCallComplete { + id: tool_id.clone(), + name: tool_name.clone(), + result: result.clone(), + }) + .await; + + outcomes[plan.index] = Some(ToolExecOutcome { + index: plan.index, + id: tool_id, + name: tool_name, + input: tool_input, + started_at, + result, + }); + continue; + } + + if tool_name == REQUEST_USER_INPUT_NAME { + let started_at = Instant::now(); + let result = match UserInputRequest::from_value(&tool_input) { + Ok(request) => self + .await_user_input(&tool_id, request) + .await + .and_then(|response| { + ToolResult::json(&response) + .map_err(|e| ToolError::execution_failed(e.to_string())) + }), + Err(err) => Err(err), + }; + + let _ = self + .tx_event + .send(Event::ToolCallComplete { + id: tool_id.clone(), + name: tool_name.clone(), + result: result.clone(), + }) + .await; + + outcomes[plan.index] = Some(ToolExecOutcome { + index: plan.index, + id: tool_id, + name: tool_name, + input: tool_input, + started_at, + result, + }); + continue; + } + + // Handle approval flow: returns (result_override, context_override) + let (result_override, context_override): ( + Option>, + Option, + ) = if plan.approval_required { + emit_tool_audit(json!({ + "event": "tool.approval_required", + "tool_id": tool_id.clone(), + "tool_name": tool_name.clone(), + })); + let approval_key = crate::tools::approval_cache::build_approval_key( + &tool_name, + &tool_input, + ) + .0; + let _ = self + .tx_event + .send(Event::ApprovalRequired { + id: tool_id.clone(), + tool_name: tool_name.clone(), + description: plan.approval_description.clone(), + approval_key, + }) + .await; + + match self.await_tool_approval(&tool_id).await { + Ok(ApprovalResult::Approved) => { + emit_tool_audit(json!({ + "event": "tool.approval_decision", + "tool_id": tool_id.clone(), + "tool_name": tool_name.clone(), + "decision": "approved", + "caller": caller_type_for_tool_use(tool_caller.as_ref()), + })); + (None, None) + } + Ok(ApprovalResult::Denied) => { + emit_tool_audit(json!({ + "event": "tool.approval_decision", + "tool_id": tool_id.clone(), + "tool_name": tool_name.clone(), + "decision": "denied", + "caller": caller_type_for_tool_use(tool_caller.as_ref()), + })); + ( + Some(Err(ToolError::permission_denied(format!( + "Tool '{tool_name}' denied by user" + )))), + None, + ) + } + Ok(ApprovalResult::RetryWithPolicy(policy)) => { + emit_tool_audit(json!({ + "event": "tool.approval_decision", + "tool_id": tool_id.clone(), + "tool_name": tool_name.clone(), + "decision": "retry_with_policy", + "policy": format!("{policy:?}"), + "caller": caller_type_for_tool_use(tool_caller.as_ref()), + })); + let elevated_context = tool_registry.map(|r| { + r.context().clone().with_elevated_sandbox_policy(policy) + }); + (None, elevated_context) + } + Err(err) => (Some(Err(err)), None), + } + } else { + (None, None) + }; + + // Per-tool snapshot for surgical undo (#384): capture workspace + // state before file-modifying tools execute so `/undo` can + // revert the most recent write_file/edit_file/apply_patch. + if result_override.is_none() + && matches!( + tool_name.as_str(), + "write_file" | "edit_file" | "apply_patch" + ) + { + let ws = self.session.workspace.clone(); + let tid = tool_id.clone(); + let cap = self.config.snapshots_max_workspace_bytes; + let _ = tokio::task::spawn_blocking(move || { + crate::core::turn::pre_tool_snapshot(&ws, &tid, cap) }) .await; - outcomes[plan.index] = Some(ToolExecOutcome { - index: plan.index, - id: plan.id, - name: plan.name, - input: plan.input, - started_at: Instant::now(), - result, - }); - continue; - } - if let Some(err) = plan.blocked_error.clone() { - outcomes[plan.index] = Some(ToolExecOutcome { - index: plan.index, - id: plan.id, - name: plan.name, - input: plan.input, - started_at: Instant::now(), - result: Err(err), - }); - continue; - } - let registry = tool_registry; - let lock = tool_exec_lock.clone(); - let mcp_pool = mcp_pool.clone(); - let tx_event = self.tx_event.clone(); - let session_id = self.session.id.clone(); - let started_at = Instant::now(); + } - tool_tasks.push(async move { - let mut result = Engine::execute_tool_with_lock( - lock, - plan.supports_parallel, - plan.interactive, - tx_event.clone(), - plan.name.clone(), - plan.input.clone(), - registry, - mcp_pool, - None, - ) - .await; + let started_at = Instant::now(); + let mut result = if let Some(result_override) = result_override { + result_override + } else { + Self::execute_tool_with_lock( + tool_exec_lock.clone(), + plan.supports_parallel, + plan.interactive, + self.tx_event.clone(), + tool_name.clone(), + tool_input.clone(), + tool_registry, + mcp_pool.clone(), + context_override, + ) + .await + }; - // #500: spill outsized output before fanout (mirror - // of the sequential path below). Emit a - // `tool.spillover` audit event so operators can - // correlate large-output episodes with disk usage. + // #500: spill outsized tool outputs to disk before the + // result fans out to the model context and the UI cell. + // Both consumers see the same artifact reference block + + // metadata pointing at the session-owned full file. + // Emit a discrete `tool.spillover` audit event so + // operators can correlate large-output episodes with + // disk-usage growth in `~/.deepseek/tool_outputs/`. if let Ok(tool_result) = result.as_mut() && let Some(path) = crate::tools::truncate::apply_spillover_with_artifact( tool_result, - &plan.id, - &plan.name, - &session_id, + &tool_id, + &tool_name, + &self.session.id, ) { emit_tool_audit(json!({ "event": "tool.spillover", - "tool_id": plan.id.clone(), - "tool_name": plan.name.clone(), + "tool_id": tool_id.clone(), + "tool_name": tool_name.clone(), "path": path.display().to_string(), })); } - let _ = tx_event - .send(Event::ToolCallComplete { - id: plan.id.clone(), - name: plan.name.clone(), - result: result.clone(), - }) - .await; - - ToolExecOutcome { - index: plan.index, - id: plan.id, - name: plan.name, - input: plan.input, - started_at, - result, - } - }); - } - - while let Some(outcome) = tool_tasks.next().await { - let index = outcome.index; - outcomes[index] = Some(outcome); - } - } else { - for plan in plans { - let tool_id = plan.id.clone(); - let tool_name = plan.name.clone(); - let tool_input = plan.input.clone(); - let tool_caller = plan.caller.clone(); - - if let Some(result) = plan.guard_result.clone() { - let result = Ok(result); - let _ = self - .tx_event - .send(Event::ToolCallComplete { - id: tool_id.clone(), - name: tool_name.clone(), - result: result.clone(), - }) - .await; - outcomes[plan.index] = Some(ToolExecOutcome { - index: plan.index, - id: tool_id, - name: tool_name, - input: tool_input, - started_at: Instant::now(), - result, - }); - continue; - } - - if let Some(err) = plan.blocked_error.clone() { - let result = Err(err); - let _ = self - .tx_event - .send(Event::ToolCallComplete { - id: tool_id.clone(), - name: tool_name.clone(), - result: result.clone(), - }) - .await; - outcomes[plan.index] = Some(ToolExecOutcome { - index: plan.index, - id: tool_id, - name: tool_name, - input: tool_input, - started_at: Instant::now(), - result, - }); - continue; - } - - if tool_name == MULTI_TOOL_PARALLEL_NAME { - let started_at = Instant::now(); - let result = self - .execute_parallel_tool( - tool_input.clone(), - tool_registry, - tool_exec_lock.clone(), - ) - .await; - let _ = self .tx_event .send(Event::ToolCallComplete { @@ -1390,267 +1670,7 @@ impl Engine { started_at, result, }); - continue; } - - if tool_name == CODE_EXECUTION_TOOL_NAME { - let started_at = Instant::now(); - let result = - execute_code_execution_tool(&tool_input, &self.session.workspace).await; - - let _ = self - .tx_event - .send(Event::ToolCallComplete { - id: tool_id.clone(), - name: tool_name.clone(), - result: result.clone(), - }) - .await; - - outcomes[plan.index] = Some(ToolExecOutcome { - index: plan.index, - id: tool_id, - name: tool_name, - input: tool_input, - started_at, - result, - }); - continue; - } - - if tool_name == JS_EXECUTION_TOOL_NAME { - let started_at = Instant::now(); - let result = - execute_js_execution_tool(&tool_input, &self.session.workspace).await; - - let _ = self - .tx_event - .send(Event::ToolCallComplete { - id: tool_id.clone(), - name: tool_name.clone(), - result: result.clone(), - }) - .await; - - outcomes[plan.index] = Some(ToolExecOutcome { - index: plan.index, - id: tool_id, - name: tool_name, - input: tool_input, - started_at, - result, - }); - continue; - } - - if is_tool_search_tool(&tool_name) { - let started_at = Instant::now(); - let result = execute_tool_search( - &tool_name, - &tool_input, - &tool_catalog, - &mut active_tool_names, - ); - - let _ = self - .tx_event - .send(Event::ToolCallComplete { - id: tool_id.clone(), - name: tool_name.clone(), - result: result.clone(), - }) - .await; - - outcomes[plan.index] = Some(ToolExecOutcome { - index: plan.index, - id: tool_id, - name: tool_name, - input: tool_input, - started_at, - result, - }); - continue; - } - - if tool_name == REQUEST_USER_INPUT_NAME { - let started_at = Instant::now(); - let result = match UserInputRequest::from_value(&tool_input) { - Ok(request) => self.await_user_input(&tool_id, request).await.and_then( - |response| { - ToolResult::json(&response) - .map_err(|e| ToolError::execution_failed(e.to_string())) - }, - ), - Err(err) => Err(err), - }; - - let _ = self - .tx_event - .send(Event::ToolCallComplete { - id: tool_id.clone(), - name: tool_name.clone(), - result: result.clone(), - }) - .await; - - outcomes[plan.index] = Some(ToolExecOutcome { - index: plan.index, - id: tool_id, - name: tool_name, - input: tool_input, - started_at, - result, - }); - continue; - } - - // Handle approval flow: returns (result_override, context_override) - let (result_override, context_override): ( - Option>, - Option, - ) = if plan.approval_required { - emit_tool_audit(json!({ - "event": "tool.approval_required", - "tool_id": tool_id.clone(), - "tool_name": tool_name.clone(), - })); - let approval_key = crate::tools::approval_cache::build_approval_key( - &tool_name, - &tool_input, - ) - .0; - let _ = self - .tx_event - .send(Event::ApprovalRequired { - id: tool_id.clone(), - tool_name: tool_name.clone(), - description: plan.approval_description.clone(), - approval_key, - }) - .await; - - match self.await_tool_approval(&tool_id).await { - Ok(ApprovalResult::Approved) => { - emit_tool_audit(json!({ - "event": "tool.approval_decision", - "tool_id": tool_id.clone(), - "tool_name": tool_name.clone(), - "decision": "approved", - "caller": caller_type_for_tool_use(tool_caller.as_ref()), - })); - (None, None) - } - Ok(ApprovalResult::Denied) => { - emit_tool_audit(json!({ - "event": "tool.approval_decision", - "tool_id": tool_id.clone(), - "tool_name": tool_name.clone(), - "decision": "denied", - "caller": caller_type_for_tool_use(tool_caller.as_ref()), - })); - ( - Some(Err(ToolError::permission_denied(format!( - "Tool '{tool_name}' denied by user" - )))), - None, - ) - } - Ok(ApprovalResult::RetryWithPolicy(policy)) => { - emit_tool_audit(json!({ - "event": "tool.approval_decision", - "tool_id": tool_id.clone(), - "tool_name": tool_name.clone(), - "decision": "retry_with_policy", - "policy": format!("{policy:?}"), - "caller": caller_type_for_tool_use(tool_caller.as_ref()), - })); - let elevated_context = tool_registry.map(|r| { - r.context().clone().with_elevated_sandbox_policy(policy) - }); - (None, elevated_context) - } - Err(err) => (Some(Err(err)), None), - } - } else { - (None, None) - }; - - // Per-tool snapshot for surgical undo (#384): capture workspace - // state before file-modifying tools execute so `/undo` can - // revert the most recent write_file/edit_file/apply_patch. - if result_override.is_none() - && matches!( - tool_name.as_str(), - "write_file" | "edit_file" | "apply_patch" - ) - { - let ws = self.session.workspace.clone(); - let tid = tool_id.clone(); - let cap = self.config.snapshots_max_workspace_bytes; - let _ = tokio::task::spawn_blocking(move || { - crate::core::turn::pre_tool_snapshot(&ws, &tid, cap) - }) - .await; - } - - let started_at = Instant::now(); - let mut result = if let Some(result_override) = result_override { - result_override - } else { - Self::execute_tool_with_lock( - tool_exec_lock.clone(), - plan.supports_parallel, - plan.interactive, - self.tx_event.clone(), - tool_name.clone(), - tool_input.clone(), - tool_registry, - mcp_pool.clone(), - context_override, - ) - .await - }; - - // #500: spill outsized tool outputs to disk before the - // result fans out to the model context and the UI cell. - // Both consumers see the same artifact reference block + - // metadata pointing at the session-owned full file. - // Emit a discrete `tool.spillover` audit event so - // operators can correlate large-output episodes with - // disk-usage growth in `~/.deepseek/tool_outputs/`. - if let Ok(tool_result) = result.as_mut() - && let Some(path) = crate::tools::truncate::apply_spillover_with_artifact( - tool_result, - &tool_id, - &tool_name, - &self.session.id, - ) - { - emit_tool_audit(json!({ - "event": "tool.spillover", - "tool_id": tool_id.clone(), - "tool_name": tool_name.clone(), - "path": path.display().to_string(), - })); - } - - let _ = self - .tx_event - .send(Event::ToolCallComplete { - id: tool_id.clone(), - name: tool_name.clone(), - result: result.clone(), - }) - .await; - - outcomes[plan.index] = Some(ToolExecOutcome { - index: plan.index, - id: tool_id, - name: tool_name, - input: tool_input, - started_at, - result, - }); } }