Merge remote-tracking branch 'origin/pr/1535' into work/v0.8.34
This commit is contained in:
@@ -2025,12 +2025,14 @@ mod tool_setup;
|
||||
mod turn_loop;
|
||||
|
||||
use self::approval::{ApprovalDecision, ApprovalResult, UserInputDecision};
|
||||
#[cfg(test)]
|
||||
use self::dispatch::should_parallelize_tool_batch;
|
||||
use self::dispatch::{
|
||||
ParallelToolResult, ParallelToolResultEntry, ToolExecGuard, ToolExecOutcome, ToolExecutionPlan,
|
||||
caller_allowed_for_tool, caller_type_for_tool_use, final_tool_input, format_tool_error,
|
||||
mcp_tool_approval_description, mcp_tool_is_parallel_safe, mcp_tool_is_read_only,
|
||||
parse_parallel_tool_calls, parse_tool_input, should_force_update_plan_first,
|
||||
should_parallelize_tool_batch, should_stop_after_plan_tool,
|
||||
ParallelToolResult, ParallelToolResultEntry, ToolExecGuard, ToolExecOutcome,
|
||||
ToolExecutionBatch, ToolExecutionPlan, caller_allowed_for_tool, caller_type_for_tool_use,
|
||||
final_tool_input, format_tool_error, mcp_tool_approval_description, mcp_tool_is_parallel_safe,
|
||||
mcp_tool_is_read_only, parse_parallel_tool_calls, parse_tool_input,
|
||||
plan_tool_execution_batches, should_force_update_plan_first, should_stop_after_plan_tool,
|
||||
};
|
||||
use self::loop_guard::{AttemptDecision, LoopGuard, OutcomeDecision};
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -51,6 +51,11 @@ pub(super) struct ToolExecutionPlan {
|
||||
pub(super) guard_result: Option<ToolResult>,
|
||||
}
|
||||
|
||||
pub(super) enum ToolExecutionBatch {
|
||||
Parallel(Vec<ToolExecutionPlan>),
|
||||
Serial(Box<ToolExecutionPlan>),
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Serialize)]
|
||||
pub(super) struct ParallelToolResultEntry {
|
||||
pub(super) tool_name: String,
|
||||
@@ -265,11 +270,40 @@ pub(super) fn parse_parallel_tool_calls(
|
||||
|
||||
// === Dispatch policy ==================================================
|
||||
|
||||
#[cfg(test)]
|
||||
pub(super) fn should_parallelize_tool_batch(plans: &[ToolExecutionPlan]) -> bool {
|
||||
!plans.is_empty()
|
||||
&& plans.iter().all(|plan| {
|
||||
plan.read_only && plan.supports_parallel && !plan.approval_required && !plan.interactive
|
||||
})
|
||||
!plans.is_empty() && plans.iter().all(tool_plan_is_parallel_safe)
|
||||
}
|
||||
|
||||
pub(super) fn tool_plan_is_parallel_safe(plan: &ToolExecutionPlan) -> bool {
|
||||
plan.read_only && plan.supports_parallel && !plan.approval_required && !plan.interactive
|
||||
}
|
||||
|
||||
pub(super) fn plan_tool_execution_batches(
|
||||
plans: Vec<ToolExecutionPlan>,
|
||||
) -> Vec<ToolExecutionBatch> {
|
||||
let mut batches = Vec::new();
|
||||
let mut parallel_chunk = Vec::new();
|
||||
|
||||
for plan in plans {
|
||||
if tool_plan_is_parallel_safe(&plan) {
|
||||
parallel_chunk.push(plan);
|
||||
continue;
|
||||
}
|
||||
|
||||
if !parallel_chunk.is_empty() {
|
||||
batches.push(ToolExecutionBatch::Parallel(std::mem::take(
|
||||
&mut parallel_chunk,
|
||||
)));
|
||||
}
|
||||
batches.push(ToolExecutionBatch::Serial(Box::new(plan)));
|
||||
}
|
||||
|
||||
if !parallel_chunk.is_empty() {
|
||||
batches.push(ToolExecutionBatch::Parallel(parallel_chunk));
|
||||
}
|
||||
|
||||
batches
|
||||
}
|
||||
|
||||
pub(super) fn should_stop_after_plan_tool(
|
||||
|
||||
@@ -119,10 +119,26 @@ fn make_plan(
|
||||
supports_parallel: bool,
|
||||
approval_required: bool,
|
||||
interactive: bool,
|
||||
) -> ToolExecutionPlan {
|
||||
make_plan_at(
|
||||
0,
|
||||
read_only,
|
||||
supports_parallel,
|
||||
approval_required,
|
||||
interactive,
|
||||
)
|
||||
}
|
||||
|
||||
fn make_plan_at(
|
||||
index: usize,
|
||||
read_only: bool,
|
||||
supports_parallel: bool,
|
||||
approval_required: bool,
|
||||
interactive: bool,
|
||||
) -> ToolExecutionPlan {
|
||||
ToolExecutionPlan {
|
||||
index: 0,
|
||||
id: "tool-1".to_string(),
|
||||
index,
|
||||
id: format!("tool-{index}"),
|
||||
name: "grep_files".to_string(),
|
||||
input: json!({"pattern": "test"}),
|
||||
caller: None,
|
||||
@@ -208,6 +224,57 @@ fn parallel_batch_requires_read_only_parallel_tools() {
|
||||
assert!(!should_parallelize_tool_batch(&plans));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tool_execution_batches_use_serial_barriers() {
|
||||
let batches = plan_tool_execution_batches(vec![
|
||||
make_plan_at(0, true, true, false, false),
|
||||
make_plan_at(1, true, true, false, false),
|
||||
make_plan_at(2, false, false, true, false),
|
||||
make_plan_at(3, true, true, false, false),
|
||||
make_plan_at(4, true, false, false, false),
|
||||
make_plan_at(5, true, true, false, false),
|
||||
make_plan_at(6, true, true, false, false),
|
||||
]);
|
||||
|
||||
assert_eq!(batches.len(), 5);
|
||||
|
||||
match &batches[0] {
|
||||
ToolExecutionBatch::Parallel(plans) => {
|
||||
assert_eq!(
|
||||
plans.iter().map(|plan| plan.index).collect::<Vec<_>>(),
|
||||
vec![0, 1]
|
||||
);
|
||||
}
|
||||
ToolExecutionBatch::Serial(_) => panic!("first batch should be parallel"),
|
||||
}
|
||||
match &batches[1] {
|
||||
ToolExecutionBatch::Serial(plan) => assert_eq!(plan.index, 2),
|
||||
ToolExecutionBatch::Parallel(_) => panic!("second batch should be serial"),
|
||||
}
|
||||
match &batches[2] {
|
||||
ToolExecutionBatch::Parallel(plans) => {
|
||||
assert_eq!(
|
||||
plans.iter().map(|plan| plan.index).collect::<Vec<_>>(),
|
||||
vec![3]
|
||||
);
|
||||
}
|
||||
ToolExecutionBatch::Serial(_) => panic!("third batch should be parallel"),
|
||||
}
|
||||
match &batches[3] {
|
||||
ToolExecutionBatch::Serial(plan) => assert_eq!(plan.index, 4),
|
||||
ToolExecutionBatch::Parallel(_) => panic!("fourth batch should be serial"),
|
||||
}
|
||||
match &batches[4] {
|
||||
ToolExecutionBatch::Parallel(plans) => {
|
||||
assert_eq!(
|
||||
plans.iter().map(|plan| plan.index).collect::<Vec<_>>(),
|
||||
vec![5, 6]
|
||||
);
|
||||
}
|
||||
ToolExecutionBatch::Serial(_) => panic!("fifth batch should be parallel"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn successful_update_plan_ends_plan_mode_turn_immediately() {
|
||||
assert!(should_stop_after_plan_tool(
|
||||
|
||||
@@ -1194,16 +1194,25 @@ impl Engine {
|
||||
}
|
||||
active_tool_names.extend(deferred_tools_hydrated_this_batch);
|
||||
|
||||
let parallel_allowed = should_parallelize_tool_batch(&plans);
|
||||
if parallel_allowed && plans.len() > 1 {
|
||||
let plan_count = plans.len();
|
||||
let batches = plan_tool_execution_batches(plans);
|
||||
let parallel_chunks = batches
|
||||
.iter()
|
||||
.filter_map(|batch| match batch {
|
||||
ToolExecutionBatch::Parallel(plans) if plans.len() > 1 => Some(plans.len()),
|
||||
_ => None,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
if !parallel_chunks.is_empty() {
|
||||
let parallel_tool_count: usize = parallel_chunks.iter().sum();
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::status(format!(
|
||||
"Executing {} read-only tools in parallel",
|
||||
plans.len()
|
||||
"Executing {parallel_tool_count} read-only tools in {} parallel chunk(s)",
|
||||
parallel_chunks.len()
|
||||
)))
|
||||
.await;
|
||||
} else if plans.len() > 1 {
|
||||
} else if plan_count > 1 {
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::status(
|
||||
@@ -1212,167 +1221,438 @@ impl Engine {
|
||||
.await;
|
||||
}
|
||||
|
||||
let mut outcomes: Vec<Option<ToolExecOutcome>> = Vec::with_capacity(plans.len());
|
||||
outcomes.resize_with(plans.len(), || None);
|
||||
let mut outcomes: Vec<Option<ToolExecOutcome>> = Vec::with_capacity(plan_count);
|
||||
outcomes.resize_with(plan_count, || None);
|
||||
|
||||
if parallel_allowed {
|
||||
let mut tool_tasks = FuturesUnordered::new();
|
||||
for plan in plans {
|
||||
if let Some(result) = plan.guard_result.clone() {
|
||||
let result = Ok(result);
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::ToolCallComplete {
|
||||
id: plan.id.clone(),
|
||||
name: plan.name.clone(),
|
||||
result: result.clone(),
|
||||
for batch in batches {
|
||||
let (parallel_allowed, plans) = match batch {
|
||||
ToolExecutionBatch::Parallel(plans) => (true, plans),
|
||||
ToolExecutionBatch::Serial(plan) => (false, vec![*plan]),
|
||||
};
|
||||
|
||||
if parallel_allowed {
|
||||
let mut tool_tasks = FuturesUnordered::new();
|
||||
for plan in plans {
|
||||
if let Some(result) = plan.guard_result.clone() {
|
||||
let result = Ok(result);
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::ToolCallComplete {
|
||||
id: plan.id.clone(),
|
||||
name: plan.name.clone(),
|
||||
result: result.clone(),
|
||||
})
|
||||
.await;
|
||||
outcomes[plan.index] = Some(ToolExecOutcome {
|
||||
index: plan.index,
|
||||
id: plan.id,
|
||||
name: plan.name,
|
||||
input: plan.input,
|
||||
started_at: Instant::now(),
|
||||
result,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
if let Some(err) = plan.blocked_error.clone() {
|
||||
outcomes[plan.index] = Some(ToolExecOutcome {
|
||||
index: plan.index,
|
||||
id: plan.id,
|
||||
name: plan.name,
|
||||
input: plan.input,
|
||||
started_at: Instant::now(),
|
||||
result: Err(err),
|
||||
});
|
||||
continue;
|
||||
}
|
||||
let registry = tool_registry;
|
||||
let lock = tool_exec_lock.clone();
|
||||
let mcp_pool = mcp_pool.clone();
|
||||
let tx_event = self.tx_event.clone();
|
||||
let session_id = self.session.id.clone();
|
||||
let started_at = Instant::now();
|
||||
|
||||
tool_tasks.push(async move {
|
||||
let mut result = Engine::execute_tool_with_lock(
|
||||
lock,
|
||||
plan.supports_parallel,
|
||||
plan.interactive,
|
||||
tx_event.clone(),
|
||||
plan.name.clone(),
|
||||
plan.input.clone(),
|
||||
registry,
|
||||
mcp_pool,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
// #500: spill outsized output before fanout (mirror
|
||||
// of the sequential path below). Emit a
|
||||
// `tool.spillover` audit event so operators can
|
||||
// correlate large-output episodes with disk usage.
|
||||
if let Ok(tool_result) = result.as_mut()
|
||||
&& let Some(path) =
|
||||
crate::tools::truncate::apply_spillover_with_artifact(
|
||||
tool_result,
|
||||
&plan.id,
|
||||
&plan.name,
|
||||
&session_id,
|
||||
)
|
||||
{
|
||||
emit_tool_audit(json!({
|
||||
"event": "tool.spillover",
|
||||
"tool_id": plan.id.clone(),
|
||||
"tool_name": plan.name.clone(),
|
||||
"path": path.display().to_string(),
|
||||
}));
|
||||
}
|
||||
|
||||
let _ = tx_event
|
||||
.send(Event::ToolCallComplete {
|
||||
id: plan.id.clone(),
|
||||
name: plan.name.clone(),
|
||||
result: result.clone(),
|
||||
})
|
||||
.await;
|
||||
|
||||
ToolExecOutcome {
|
||||
index: plan.index,
|
||||
id: plan.id,
|
||||
name: plan.name,
|
||||
input: plan.input,
|
||||
started_at,
|
||||
result,
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
while let Some(outcome) = tool_tasks.next().await {
|
||||
let index = outcome.index;
|
||||
outcomes[index] = Some(outcome);
|
||||
}
|
||||
} else {
|
||||
for plan in plans {
|
||||
let tool_id = plan.id.clone();
|
||||
let tool_name = plan.name.clone();
|
||||
let tool_input = plan.input.clone();
|
||||
let tool_caller = plan.caller.clone();
|
||||
|
||||
if let Some(result) = plan.guard_result.clone() {
|
||||
let result = Ok(result);
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::ToolCallComplete {
|
||||
id: tool_id.clone(),
|
||||
name: tool_name.clone(),
|
||||
result: result.clone(),
|
||||
})
|
||||
.await;
|
||||
outcomes[plan.index] = Some(ToolExecOutcome {
|
||||
index: plan.index,
|
||||
id: tool_id,
|
||||
name: tool_name,
|
||||
input: tool_input,
|
||||
started_at: Instant::now(),
|
||||
result,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(err) = plan.blocked_error.clone() {
|
||||
let result = Err(err);
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::ToolCallComplete {
|
||||
id: tool_id.clone(),
|
||||
name: tool_name.clone(),
|
||||
result: result.clone(),
|
||||
})
|
||||
.await;
|
||||
outcomes[plan.index] = Some(ToolExecOutcome {
|
||||
index: plan.index,
|
||||
id: tool_id,
|
||||
name: tool_name,
|
||||
input: tool_input,
|
||||
started_at: Instant::now(),
|
||||
result,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
if tool_name == MULTI_TOOL_PARALLEL_NAME {
|
||||
let started_at = Instant::now();
|
||||
let result = self
|
||||
.execute_parallel_tool(
|
||||
tool_input.clone(),
|
||||
tool_registry,
|
||||
tool_exec_lock.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::ToolCallComplete {
|
||||
id: tool_id.clone(),
|
||||
name: tool_name.clone(),
|
||||
result: result.clone(),
|
||||
})
|
||||
.await;
|
||||
|
||||
outcomes[plan.index] = Some(ToolExecOutcome {
|
||||
index: plan.index,
|
||||
id: tool_id,
|
||||
name: tool_name,
|
||||
input: tool_input,
|
||||
started_at,
|
||||
result,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
if tool_name == CODE_EXECUTION_TOOL_NAME {
|
||||
let started_at = Instant::now();
|
||||
let result =
|
||||
execute_code_execution_tool(&tool_input, &self.session.workspace)
|
||||
.await;
|
||||
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::ToolCallComplete {
|
||||
id: tool_id.clone(),
|
||||
name: tool_name.clone(),
|
||||
result: result.clone(),
|
||||
})
|
||||
.await;
|
||||
|
||||
outcomes[plan.index] = Some(ToolExecOutcome {
|
||||
index: plan.index,
|
||||
id: tool_id,
|
||||
name: tool_name,
|
||||
input: tool_input,
|
||||
started_at,
|
||||
result,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
if tool_name == JS_EXECUTION_TOOL_NAME {
|
||||
let started_at = Instant::now();
|
||||
let result =
|
||||
execute_js_execution_tool(&tool_input, &self.session.workspace)
|
||||
.await;
|
||||
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::ToolCallComplete {
|
||||
id: tool_id.clone(),
|
||||
name: tool_name.clone(),
|
||||
result: result.clone(),
|
||||
})
|
||||
.await;
|
||||
|
||||
outcomes[plan.index] = Some(ToolExecOutcome {
|
||||
index: plan.index,
|
||||
id: tool_id,
|
||||
name: tool_name,
|
||||
input: tool_input,
|
||||
started_at,
|
||||
result,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
if is_tool_search_tool(&tool_name) {
|
||||
let started_at = Instant::now();
|
||||
let result = execute_tool_search(
|
||||
&tool_name,
|
||||
&tool_input,
|
||||
&tool_catalog,
|
||||
&mut active_tool_names,
|
||||
);
|
||||
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::ToolCallComplete {
|
||||
id: tool_id.clone(),
|
||||
name: tool_name.clone(),
|
||||
result: result.clone(),
|
||||
})
|
||||
.await;
|
||||
|
||||
outcomes[plan.index] = Some(ToolExecOutcome {
|
||||
index: plan.index,
|
||||
id: tool_id,
|
||||
name: tool_name,
|
||||
input: tool_input,
|
||||
started_at,
|
||||
result,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
if tool_name == REQUEST_USER_INPUT_NAME {
|
||||
let started_at = Instant::now();
|
||||
let result = match UserInputRequest::from_value(&tool_input) {
|
||||
Ok(request) => self
|
||||
.await_user_input(&tool_id, request)
|
||||
.await
|
||||
.and_then(|response| {
|
||||
ToolResult::json(&response)
|
||||
.map_err(|e| ToolError::execution_failed(e.to_string()))
|
||||
}),
|
||||
Err(err) => Err(err),
|
||||
};
|
||||
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::ToolCallComplete {
|
||||
id: tool_id.clone(),
|
||||
name: tool_name.clone(),
|
||||
result: result.clone(),
|
||||
})
|
||||
.await;
|
||||
|
||||
outcomes[plan.index] = Some(ToolExecOutcome {
|
||||
index: plan.index,
|
||||
id: tool_id,
|
||||
name: tool_name,
|
||||
input: tool_input,
|
||||
started_at,
|
||||
result,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
// Handle approval flow: returns (result_override, context_override)
|
||||
let (result_override, context_override): (
|
||||
Option<Result<ToolResult, ToolError>>,
|
||||
Option<crate::tools::ToolContext>,
|
||||
) = if plan.approval_required {
|
||||
emit_tool_audit(json!({
|
||||
"event": "tool.approval_required",
|
||||
"tool_id": tool_id.clone(),
|
||||
"tool_name": tool_name.clone(),
|
||||
}));
|
||||
let approval_key = crate::tools::approval_cache::build_approval_key(
|
||||
&tool_name,
|
||||
&tool_input,
|
||||
)
|
||||
.0;
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::ApprovalRequired {
|
||||
id: tool_id.clone(),
|
||||
tool_name: tool_name.clone(),
|
||||
description: plan.approval_description.clone(),
|
||||
approval_key,
|
||||
})
|
||||
.await;
|
||||
|
||||
match self.await_tool_approval(&tool_id).await {
|
||||
Ok(ApprovalResult::Approved) => {
|
||||
emit_tool_audit(json!({
|
||||
"event": "tool.approval_decision",
|
||||
"tool_id": tool_id.clone(),
|
||||
"tool_name": tool_name.clone(),
|
||||
"decision": "approved",
|
||||
"caller": caller_type_for_tool_use(tool_caller.as_ref()),
|
||||
}));
|
||||
(None, None)
|
||||
}
|
||||
Ok(ApprovalResult::Denied) => {
|
||||
emit_tool_audit(json!({
|
||||
"event": "tool.approval_decision",
|
||||
"tool_id": tool_id.clone(),
|
||||
"tool_name": tool_name.clone(),
|
||||
"decision": "denied",
|
||||
"caller": caller_type_for_tool_use(tool_caller.as_ref()),
|
||||
}));
|
||||
(
|
||||
Some(Err(ToolError::permission_denied(format!(
|
||||
"Tool '{tool_name}' denied by user"
|
||||
)))),
|
||||
None,
|
||||
)
|
||||
}
|
||||
Ok(ApprovalResult::RetryWithPolicy(policy)) => {
|
||||
emit_tool_audit(json!({
|
||||
"event": "tool.approval_decision",
|
||||
"tool_id": tool_id.clone(),
|
||||
"tool_name": tool_name.clone(),
|
||||
"decision": "retry_with_policy",
|
||||
"policy": format!("{policy:?}"),
|
||||
"caller": caller_type_for_tool_use(tool_caller.as_ref()),
|
||||
}));
|
||||
let elevated_context = tool_registry.map(|r| {
|
||||
r.context().clone().with_elevated_sandbox_policy(policy)
|
||||
});
|
||||
(None, elevated_context)
|
||||
}
|
||||
Err(err) => (Some(Err(err)), None),
|
||||
}
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
|
||||
// Per-tool snapshot for surgical undo (#384): capture workspace
|
||||
// state before file-modifying tools execute so `/undo` can
|
||||
// revert the most recent write_file/edit_file/apply_patch.
|
||||
if result_override.is_none()
|
||||
&& matches!(
|
||||
tool_name.as_str(),
|
||||
"write_file" | "edit_file" | "apply_patch"
|
||||
)
|
||||
{
|
||||
let ws = self.session.workspace.clone();
|
||||
let tid = tool_id.clone();
|
||||
let cap = self.config.snapshots_max_workspace_bytes;
|
||||
let _ = tokio::task::spawn_blocking(move || {
|
||||
crate::core::turn::pre_tool_snapshot(&ws, &tid, cap)
|
||||
})
|
||||
.await;
|
||||
outcomes[plan.index] = Some(ToolExecOutcome {
|
||||
index: plan.index,
|
||||
id: plan.id,
|
||||
name: plan.name,
|
||||
input: plan.input,
|
||||
started_at: Instant::now(),
|
||||
result,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
if let Some(err) = plan.blocked_error.clone() {
|
||||
outcomes[plan.index] = Some(ToolExecOutcome {
|
||||
index: plan.index,
|
||||
id: plan.id,
|
||||
name: plan.name,
|
||||
input: plan.input,
|
||||
started_at: Instant::now(),
|
||||
result: Err(err),
|
||||
});
|
||||
continue;
|
||||
}
|
||||
let registry = tool_registry;
|
||||
let lock = tool_exec_lock.clone();
|
||||
let mcp_pool = mcp_pool.clone();
|
||||
let tx_event = self.tx_event.clone();
|
||||
let session_id = self.session.id.clone();
|
||||
let started_at = Instant::now();
|
||||
}
|
||||
|
||||
tool_tasks.push(async move {
|
||||
let mut result = Engine::execute_tool_with_lock(
|
||||
lock,
|
||||
plan.supports_parallel,
|
||||
plan.interactive,
|
||||
tx_event.clone(),
|
||||
plan.name.clone(),
|
||||
plan.input.clone(),
|
||||
registry,
|
||||
mcp_pool,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
let started_at = Instant::now();
|
||||
let mut result = if let Some(result_override) = result_override {
|
||||
result_override
|
||||
} else {
|
||||
Self::execute_tool_with_lock(
|
||||
tool_exec_lock.clone(),
|
||||
plan.supports_parallel,
|
||||
plan.interactive,
|
||||
self.tx_event.clone(),
|
||||
tool_name.clone(),
|
||||
tool_input.clone(),
|
||||
tool_registry,
|
||||
mcp_pool.clone(),
|
||||
context_override,
|
||||
)
|
||||
.await
|
||||
};
|
||||
|
||||
// #500: spill outsized output before fanout (mirror
|
||||
// of the sequential path below). Emit a
|
||||
// `tool.spillover` audit event so operators can
|
||||
// correlate large-output episodes with disk usage.
|
||||
// #500: spill outsized tool outputs to disk before the
|
||||
// result fans out to the model context and the UI cell.
|
||||
// Both consumers see the same artifact reference block +
|
||||
// metadata pointing at the session-owned full file.
|
||||
// Emit a discrete `tool.spillover` audit event so
|
||||
// operators can correlate large-output episodes with
|
||||
// disk-usage growth in `~/.deepseek/tool_outputs/`.
|
||||
if let Ok(tool_result) = result.as_mut()
|
||||
&& let Some(path) =
|
||||
crate::tools::truncate::apply_spillover_with_artifact(
|
||||
tool_result,
|
||||
&plan.id,
|
||||
&plan.name,
|
||||
&session_id,
|
||||
&tool_id,
|
||||
&tool_name,
|
||||
&self.session.id,
|
||||
)
|
||||
{
|
||||
emit_tool_audit(json!({
|
||||
"event": "tool.spillover",
|
||||
"tool_id": plan.id.clone(),
|
||||
"tool_name": plan.name.clone(),
|
||||
"tool_id": tool_id.clone(),
|
||||
"tool_name": tool_name.clone(),
|
||||
"path": path.display().to_string(),
|
||||
}));
|
||||
}
|
||||
|
||||
let _ = tx_event
|
||||
.send(Event::ToolCallComplete {
|
||||
id: plan.id.clone(),
|
||||
name: plan.name.clone(),
|
||||
result: result.clone(),
|
||||
})
|
||||
.await;
|
||||
|
||||
ToolExecOutcome {
|
||||
index: plan.index,
|
||||
id: plan.id,
|
||||
name: plan.name,
|
||||
input: plan.input,
|
||||
started_at,
|
||||
result,
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
while let Some(outcome) = tool_tasks.next().await {
|
||||
let index = outcome.index;
|
||||
outcomes[index] = Some(outcome);
|
||||
}
|
||||
} else {
|
||||
for plan in plans {
|
||||
let tool_id = plan.id.clone();
|
||||
let tool_name = plan.name.clone();
|
||||
let tool_input = plan.input.clone();
|
||||
let tool_caller = plan.caller.clone();
|
||||
|
||||
if let Some(result) = plan.guard_result.clone() {
|
||||
let result = Ok(result);
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::ToolCallComplete {
|
||||
id: tool_id.clone(),
|
||||
name: tool_name.clone(),
|
||||
result: result.clone(),
|
||||
})
|
||||
.await;
|
||||
outcomes[plan.index] = Some(ToolExecOutcome {
|
||||
index: plan.index,
|
||||
id: tool_id,
|
||||
name: tool_name,
|
||||
input: tool_input,
|
||||
started_at: Instant::now(),
|
||||
result,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(err) = plan.blocked_error.clone() {
|
||||
let result = Err(err);
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::ToolCallComplete {
|
||||
id: tool_id.clone(),
|
||||
name: tool_name.clone(),
|
||||
result: result.clone(),
|
||||
})
|
||||
.await;
|
||||
outcomes[plan.index] = Some(ToolExecOutcome {
|
||||
index: plan.index,
|
||||
id: tool_id,
|
||||
name: tool_name,
|
||||
input: tool_input,
|
||||
started_at: Instant::now(),
|
||||
result,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
if tool_name == MULTI_TOOL_PARALLEL_NAME {
|
||||
let started_at = Instant::now();
|
||||
let result = self
|
||||
.execute_parallel_tool(
|
||||
tool_input.clone(),
|
||||
tool_registry,
|
||||
tool_exec_lock.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::ToolCallComplete {
|
||||
@@ -1390,267 +1670,7 @@ impl Engine {
|
||||
started_at,
|
||||
result,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
if tool_name == CODE_EXECUTION_TOOL_NAME {
|
||||
let started_at = Instant::now();
|
||||
let result =
|
||||
execute_code_execution_tool(&tool_input, &self.session.workspace).await;
|
||||
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::ToolCallComplete {
|
||||
id: tool_id.clone(),
|
||||
name: tool_name.clone(),
|
||||
result: result.clone(),
|
||||
})
|
||||
.await;
|
||||
|
||||
outcomes[plan.index] = Some(ToolExecOutcome {
|
||||
index: plan.index,
|
||||
id: tool_id,
|
||||
name: tool_name,
|
||||
input: tool_input,
|
||||
started_at,
|
||||
result,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
if tool_name == JS_EXECUTION_TOOL_NAME {
|
||||
let started_at = Instant::now();
|
||||
let result =
|
||||
execute_js_execution_tool(&tool_input, &self.session.workspace).await;
|
||||
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::ToolCallComplete {
|
||||
id: tool_id.clone(),
|
||||
name: tool_name.clone(),
|
||||
result: result.clone(),
|
||||
})
|
||||
.await;
|
||||
|
||||
outcomes[plan.index] = Some(ToolExecOutcome {
|
||||
index: plan.index,
|
||||
id: tool_id,
|
||||
name: tool_name,
|
||||
input: tool_input,
|
||||
started_at,
|
||||
result,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
if is_tool_search_tool(&tool_name) {
|
||||
let started_at = Instant::now();
|
||||
let result = execute_tool_search(
|
||||
&tool_name,
|
||||
&tool_input,
|
||||
&tool_catalog,
|
||||
&mut active_tool_names,
|
||||
);
|
||||
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::ToolCallComplete {
|
||||
id: tool_id.clone(),
|
||||
name: tool_name.clone(),
|
||||
result: result.clone(),
|
||||
})
|
||||
.await;
|
||||
|
||||
outcomes[plan.index] = Some(ToolExecOutcome {
|
||||
index: plan.index,
|
||||
id: tool_id,
|
||||
name: tool_name,
|
||||
input: tool_input,
|
||||
started_at,
|
||||
result,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
if tool_name == REQUEST_USER_INPUT_NAME {
|
||||
let started_at = Instant::now();
|
||||
let result = match UserInputRequest::from_value(&tool_input) {
|
||||
Ok(request) => self.await_user_input(&tool_id, request).await.and_then(
|
||||
|response| {
|
||||
ToolResult::json(&response)
|
||||
.map_err(|e| ToolError::execution_failed(e.to_string()))
|
||||
},
|
||||
),
|
||||
Err(err) => Err(err),
|
||||
};
|
||||
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::ToolCallComplete {
|
||||
id: tool_id.clone(),
|
||||
name: tool_name.clone(),
|
||||
result: result.clone(),
|
||||
})
|
||||
.await;
|
||||
|
||||
outcomes[plan.index] = Some(ToolExecOutcome {
|
||||
index: plan.index,
|
||||
id: tool_id,
|
||||
name: tool_name,
|
||||
input: tool_input,
|
||||
started_at,
|
||||
result,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
// Handle approval flow: returns (result_override, context_override)
|
||||
let (result_override, context_override): (
|
||||
Option<Result<ToolResult, ToolError>>,
|
||||
Option<crate::tools::ToolContext>,
|
||||
) = if plan.approval_required {
|
||||
emit_tool_audit(json!({
|
||||
"event": "tool.approval_required",
|
||||
"tool_id": tool_id.clone(),
|
||||
"tool_name": tool_name.clone(),
|
||||
}));
|
||||
let approval_key = crate::tools::approval_cache::build_approval_key(
|
||||
&tool_name,
|
||||
&tool_input,
|
||||
)
|
||||
.0;
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::ApprovalRequired {
|
||||
id: tool_id.clone(),
|
||||
tool_name: tool_name.clone(),
|
||||
description: plan.approval_description.clone(),
|
||||
approval_key,
|
||||
})
|
||||
.await;
|
||||
|
||||
match self.await_tool_approval(&tool_id).await {
|
||||
Ok(ApprovalResult::Approved) => {
|
||||
emit_tool_audit(json!({
|
||||
"event": "tool.approval_decision",
|
||||
"tool_id": tool_id.clone(),
|
||||
"tool_name": tool_name.clone(),
|
||||
"decision": "approved",
|
||||
"caller": caller_type_for_tool_use(tool_caller.as_ref()),
|
||||
}));
|
||||
(None, None)
|
||||
}
|
||||
Ok(ApprovalResult::Denied) => {
|
||||
emit_tool_audit(json!({
|
||||
"event": "tool.approval_decision",
|
||||
"tool_id": tool_id.clone(),
|
||||
"tool_name": tool_name.clone(),
|
||||
"decision": "denied",
|
||||
"caller": caller_type_for_tool_use(tool_caller.as_ref()),
|
||||
}));
|
||||
(
|
||||
Some(Err(ToolError::permission_denied(format!(
|
||||
"Tool '{tool_name}' denied by user"
|
||||
)))),
|
||||
None,
|
||||
)
|
||||
}
|
||||
Ok(ApprovalResult::RetryWithPolicy(policy)) => {
|
||||
emit_tool_audit(json!({
|
||||
"event": "tool.approval_decision",
|
||||
"tool_id": tool_id.clone(),
|
||||
"tool_name": tool_name.clone(),
|
||||
"decision": "retry_with_policy",
|
||||
"policy": format!("{policy:?}"),
|
||||
"caller": caller_type_for_tool_use(tool_caller.as_ref()),
|
||||
}));
|
||||
let elevated_context = tool_registry.map(|r| {
|
||||
r.context().clone().with_elevated_sandbox_policy(policy)
|
||||
});
|
||||
(None, elevated_context)
|
||||
}
|
||||
Err(err) => (Some(Err(err)), None),
|
||||
}
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
|
||||
// Per-tool snapshot for surgical undo (#384): capture workspace
|
||||
// state before file-modifying tools execute so `/undo` can
|
||||
// revert the most recent write_file/edit_file/apply_patch.
|
||||
if result_override.is_none()
|
||||
&& matches!(
|
||||
tool_name.as_str(),
|
||||
"write_file" | "edit_file" | "apply_patch"
|
||||
)
|
||||
{
|
||||
let ws = self.session.workspace.clone();
|
||||
let tid = tool_id.clone();
|
||||
let cap = self.config.snapshots_max_workspace_bytes;
|
||||
let _ = tokio::task::spawn_blocking(move || {
|
||||
crate::core::turn::pre_tool_snapshot(&ws, &tid, cap)
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
let started_at = Instant::now();
|
||||
let mut result = if let Some(result_override) = result_override {
|
||||
result_override
|
||||
} else {
|
||||
Self::execute_tool_with_lock(
|
||||
tool_exec_lock.clone(),
|
||||
plan.supports_parallel,
|
||||
plan.interactive,
|
||||
self.tx_event.clone(),
|
||||
tool_name.clone(),
|
||||
tool_input.clone(),
|
||||
tool_registry,
|
||||
mcp_pool.clone(),
|
||||
context_override,
|
||||
)
|
||||
.await
|
||||
};
|
||||
|
||||
// #500: spill outsized tool outputs to disk before the
|
||||
// result fans out to the model context and the UI cell.
|
||||
// Both consumers see the same artifact reference block +
|
||||
// metadata pointing at the session-owned full file.
|
||||
// Emit a discrete `tool.spillover` audit event so
|
||||
// operators can correlate large-output episodes with
|
||||
// disk-usage growth in `~/.deepseek/tool_outputs/`.
|
||||
if let Ok(tool_result) = result.as_mut()
|
||||
&& let Some(path) = crate::tools::truncate::apply_spillover_with_artifact(
|
||||
tool_result,
|
||||
&tool_id,
|
||||
&tool_name,
|
||||
&self.session.id,
|
||||
)
|
||||
{
|
||||
emit_tool_audit(json!({
|
||||
"event": "tool.spillover",
|
||||
"tool_id": tool_id.clone(),
|
||||
"tool_name": tool_name.clone(),
|
||||
"path": path.display().to_string(),
|
||||
}));
|
||||
}
|
||||
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::ToolCallComplete {
|
||||
id: tool_id.clone(),
|
||||
name: tool_name.clone(),
|
||||
result: result.clone(),
|
||||
})
|
||||
.await;
|
||||
|
||||
outcomes[plan.index] = Some(ToolExecOutcome {
|
||||
index: plan.index,
|
||||
id: tool_id,
|
||||
name: tool_name,
|
||||
input: tool_input,
|
||||
started_at,
|
||||
result,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user