diff --git a/crates/tui/src/core/engine.rs b/crates/tui/src/core/engine.rs index 6e2e5911..d69a2b2a 100644 --- a/crates/tui/src/core/engine.rs +++ b/crates/tui/src/core/engine.rs @@ -32,9 +32,11 @@ use crate::error_taxonomy::{ErrorCategory, ErrorEnvelope, StreamError}; use crate::features::{Feature, Features}; use crate::llm_client::LlmClient; use crate::mcp::McpPool; +#[cfg(test)] +use crate::models::ToolCaller; use crate::models::{ ContentBlock, ContentBlockStart, DEFAULT_CONTEXT_WINDOW_TOKENS, Delta, Message, MessageRequest, - StreamEvent, SystemPrompt, Tool, ToolCaller, Usage, + StreamEvent, SystemPrompt, Tool, Usage, }; use crate::prompts; use crate::seam_manager::{SeamConfig, SeamManager}; @@ -291,185 +293,7 @@ pub struct Engine { pending_lsp_blocks: Vec, } -// === Internal stream helpers === - -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -enum ContentBlockKind { - Text, - Thinking, - ToolUse, -} - -#[derive(Debug, Clone)] -struct ToolUseState { - id: String, - name: String, - input: serde_json::Value, - caller: Option, - input_buffer: String, -} - -/// Maximum time to wait for a single stream chunk before assuming a stall. -/// **This is the idle timeout** — it resets on every SSE chunk, so long -/// thinking turns that ARE producing reasoning_content stay alive. Only a -/// genuine `chunk_timeout` window of silence kills the stream. -const STREAM_CHUNK_TIMEOUT_SECS: u64 = 90; -/// Maximum total bytes of text/thinking content before aborting the stream. -const STREAM_MAX_CONTENT_BYTES: usize = 10 * 1024 * 1024; // 10 MB -/// Sanity backstop for total stream wall-clock duration. **Not** a routine -/// kill switch — `STREAM_CHUNK_TIMEOUT_SECS` (idle) is the primary stall -/// detector. The wall-clock cap is here only to bound pathological cases -/// (e.g. a server that keeps sending heartbeats forever without progress). -/// -/// History: this used to be 300s (5 min) which was too aggressive — V4 -/// thinking turns on hard prompts legitimately exceed 5 minutes wall-clock -/// while still emitting reasoning_content chunks the whole way. Bumped to -/// 30 min in v0.6.6 to address `TODO_FIXES.md` #1. Codex defaults to a -/// per-chunk idle of 300s with no wall-clock cap; we keep both layers but -/// give the wall-clock a generous window so it never fires in practice. -const STREAM_MAX_DURATION_SECS: u64 = 1800; // 30 minutes (was 300s; #103/#1) -/// Hard cap on consecutive recoverable stream errors before we surface a turn -/// failure. Bumped 3 → 5 in v0.6.7 along with the HTTP/2 keepalive defaults -/// (#103) — keepalive should make spurious decode errors rarer, so we can -/// tolerate a longer streak before giving up on the turn. -const MAX_STREAM_ERRORS_BEFORE_FAIL: u32 = 5; -/// Cap on transparent stream-level retries — these only happen when the wire -/// dies before any content was streamed, so DeepSeek hasn't billed us and -/// the user hasn't seen anything. Two attempts is enough to ride out a -/// flaky edge node without amplifying real outages (#103). -const MAX_TRANSPARENT_STREAM_RETRIES: u32 = 2; - -/// Decide whether a stream error is eligible for a transparent retry. -/// -/// True only when ALL three conditions hold: -/// 1. No content has been received on the current attempt — otherwise DeepSeek -/// has already billed us for output tokens and the user has seen partial -/// deltas; resending would double-bill and desync the UI. -/// 2. We still have transparent-retry budget remaining. -/// 3. The turn has not been cancelled. -/// -/// Extracted as a pure function so the four #103 retry cases can be exercised -/// in unit tests without booting the full engine state machine. -fn should_transparently_retry_stream( - any_content_received: bool, - transparent_attempts: u32, - cancelled: bool, -) -> bool { - !any_content_received && transparent_attempts < MAX_TRANSPARENT_STREAM_RETRIES && !cancelled -} -pub(crate) const TOOL_CALL_START_MARKERS: [&str; 5] = [ - "[TOOL_CALL]", - "", -]; - -pub(crate) const TOOL_CALL_END_MARKERS: [&str; 5] = [ - "[/TOOL_CALL]", - "", - "", - "", - "", -]; - -/// Compact one-shot notice emitted when a model attempts to forge a tool-call -/// wrapper in plain text instead of using the API tool channel. The visible -/// content is still scrubbed; this exists so the user can see why their text -/// shrank. -pub(crate) const FAKE_WRAPPER_NOTICE: &str = - "Stripped non-API tool-call wrapper from model output (use the API tool channel)"; - -/// True if `text` contains any of the known fake-wrapper start markers. Used by -/// the streaming loop to decide whether to emit `FAKE_WRAPPER_NOTICE`. -pub(crate) fn contains_fake_tool_wrapper(text: &str) -> bool { - TOOL_CALL_START_MARKERS.iter().any(|m| text.contains(m)) -} - -fn find_first_marker(text: &str, markers: &[&str]) -> Option<(usize, usize)> { - markers - .iter() - .filter_map(|marker| text.find(marker).map(|idx| (idx, marker.len()))) - .min_by_key(|(idx, _)| *idx) -} - -pub(crate) fn filter_tool_call_delta(delta: &str, in_tool_call: &mut bool) -> String { - if delta.is_empty() { - return String::new(); - } - - let mut output = String::new(); - let mut rest = delta; - - loop { - if *in_tool_call { - let Some((idx, len)) = find_first_marker(rest, &TOOL_CALL_END_MARKERS) else { - break; - }; - rest = &rest[idx + len..]; - *in_tool_call = false; - } else { - let Some((idx, len)) = find_first_marker(rest, &TOOL_CALL_START_MARKERS) else { - output.push_str(rest); - break; - }; - output.push_str(&rest[..idx]); - rest = &rest[idx + len..]; - *in_tool_call = true; - } - } - - output -} - -fn caller_type_for_tool_use(caller: Option<&ToolCaller>) -> &str { - caller.map_or("direct", |c| c.caller_type.as_str()) -} - -fn caller_allowed_for_tool(caller: Option<&ToolCaller>, tool_def: Option<&Tool>) -> bool { - let requested = caller_type_for_tool_use(caller); - if let Some(def) = tool_def - && let Some(allowed) = &def.allowed_callers - { - if allowed.is_empty() { - return requested == "direct"; - } - return allowed.iter().any(|item| item == requested); - } - requested == "direct" -} - -fn format_tool_error(err: &ToolError, tool_name: &str) -> String { - match err { - ToolError::InvalidInput { message } => { - format!("Invalid input for tool '{tool_name}': {message}") - } - ToolError::MissingField { field } => { - format!("Tool '{tool_name}' is missing required field '{field}'") - } - ToolError::PathEscape { path } => format!( - "Path escapes workspace: {}. Use a workspace-relative path or enable trust mode.", - path.display() - ), - ToolError::ExecutionFailed { message } => message.clone(), - ToolError::Timeout { seconds } => format!( - "Tool '{tool_name}' timed out after {seconds}s. Try a narrower scope or a longer timeout." - ), - ToolError::NotAvailable { message } => { - let lower = message.to_ascii_lowercase(); - if lower.contains("current tool catalog") || lower.contains("did you mean:") { - message.clone() - } else { - format!( - "Tool '{tool_name}' is not available: {message}. Check mode, feature flags, or tool name." - ) - } - } - ToolError::PermissionDenied { message } => format!( - "Tool '{tool_name}' was denied: {message}. Adjust approval mode or request permission." - ), - } -} +// === Internal tool helpers === impl Engine { fn reset_cancel_token(&mut self) { @@ -1850,6 +1674,7 @@ use context::{ }; mod dispatch; mod lsp_hooks; +mod streaming; mod tool_catalog; mod tool_execution; mod tool_setup; @@ -1858,12 +1683,21 @@ mod turn_loop; use self::approval::{ApprovalDecision, ApprovalResult, UserInputDecision}; use self::dispatch::{ ParallelToolResult, ParallelToolResultEntry, ToolExecGuard, ToolExecOutcome, ToolExecutionPlan, - final_tool_input, mcp_tool_approval_description, mcp_tool_is_parallel_safe, - mcp_tool_is_read_only, parse_parallel_tool_calls, parse_tool_input, - should_force_update_plan_first, should_parallelize_tool_batch, should_stop_after_plan_tool, + caller_allowed_for_tool, caller_type_for_tool_use, final_tool_input, format_tool_error, + mcp_tool_approval_description, mcp_tool_is_parallel_safe, mcp_tool_is_read_only, + parse_parallel_tool_calls, parse_tool_input, should_force_update_plan_first, + should_parallelize_tool_batch, should_stop_after_plan_tool, }; #[cfg(test)] use self::lsp_hooks::{edited_paths_for_tool, parse_patch_paths}; +#[cfg(test)] +use self::streaming::TOOL_CALL_START_MARKERS; +use self::streaming::{ + ContentBlockKind, FAKE_WRAPPER_NOTICE, MAX_STREAM_ERRORS_BEFORE_FAIL, + MAX_TRANSPARENT_STREAM_RETRIES, STREAM_CHUNK_TIMEOUT_SECS, STREAM_MAX_CONTENT_BYTES, + STREAM_MAX_DURATION_SECS, ToolUseState, contains_fake_tool_wrapper, filter_tool_call_delta, + should_transparently_retry_stream, +}; use self::tool_catalog::{ CODE_EXECUTION_TOOL_NAME, MULTI_TOOL_PARALLEL_NAME, REQUEST_USER_INPUT_NAME, active_tools_for_step, build_model_tool_catalog, ensure_advanced_tooling, diff --git a/crates/tui/src/core/engine/dispatch.rs b/crates/tui/src/core/engine/dispatch.rs index 94b8c864..eb39f41f 100644 --- a/crates/tui/src/core/engine/dispatch.rs +++ b/crates/tui/src/core/engine/dispatch.rs @@ -17,7 +17,7 @@ use serde_json::json; -use crate::models::ToolCaller; +use crate::models::{Tool, ToolCaller}; use crate::tools::spec::{ToolError, ToolResult}; use crate::tui::app::AppMode; @@ -71,6 +71,60 @@ pub(super) enum ToolExecGuard<'a> { Write(#[allow(dead_code)] tokio::sync::RwLockWriteGuard<'a, ()>), } +// === Caller policy and errors ======================================== + +pub(super) fn caller_type_for_tool_use(caller: Option<&ToolCaller>) -> &str { + caller.map_or("direct", |c| c.caller_type.as_str()) +} + +pub(super) fn caller_allowed_for_tool( + caller: Option<&ToolCaller>, + tool_def: Option<&Tool>, +) -> bool { + let requested = caller_type_for_tool_use(caller); + if let Some(def) = tool_def + && let Some(allowed) = &def.allowed_callers + { + if allowed.is_empty() { + return requested == "direct"; + } + return allowed.iter().any(|item| item == requested); + } + requested == "direct" +} + +pub(super) fn format_tool_error(err: &ToolError, tool_name: &str) -> String { + match err { + ToolError::InvalidInput { message } => { + format!("Invalid input for tool '{tool_name}': {message}") + } + ToolError::MissingField { field } => { + format!("Tool '{tool_name}' is missing required field '{field}'") + } + ToolError::PathEscape { path } => format!( + "Path escapes workspace: {}. Use a workspace-relative path or enable trust mode.", + path.display() + ), + ToolError::ExecutionFailed { message } => message.clone(), + ToolError::Timeout { seconds } => format!( + "Tool '{tool_name}' timed out after {seconds}s. Try a narrower scope or a longer timeout." + ), + ToolError::NotAvailable { message } => { + let lower = message.to_ascii_lowercase(); + if lower.contains("current tool catalog") || lower.contains("did you mean:") { + message.clone() + } else { + format!( + "Tool '{tool_name}' is not available: {message}. Check mode, feature flags, or tool name." + ) + } + } + ToolError::PermissionDenied { message } => format!( + "Tool '{tool_name}' was denied: {message}. Adjust approval mode or request permission." + ), + } +} + // === Streaming-buffer parsing ========================================= /// Promote a streaming `ToolUseState` to a finalized JSON input. diff --git a/crates/tui/src/core/engine/streaming.rs b/crates/tui/src/core/engine/streaming.rs new file mode 100644 index 00000000..1855dc18 --- /dev/null +++ b/crates/tui/src/core/engine/streaming.rs @@ -0,0 +1,137 @@ +//! Streaming response state and guardrails. +//! +//! This module owns the local state used while decoding one model stream: +//! content block kind tracking, streamed tool-use buffers, transparent retry +//! policy, and scrubbers for text that looks like a forged tool-call wrapper. + +use crate::models::ToolCaller; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(super) enum ContentBlockKind { + Text, + Thinking, + ToolUse, +} + +#[derive(Debug, Clone)] +pub(super) struct ToolUseState { + pub(super) id: String, + pub(super) name: String, + pub(super) input: serde_json::Value, + pub(super) caller: Option, + pub(super) input_buffer: String, +} + +/// Maximum time to wait for a single stream chunk before assuming a stall. +/// **This is the idle timeout** — it resets on every SSE chunk, so long +/// thinking turns that ARE producing reasoning_content stay alive. Only a +/// genuine `chunk_timeout` window of silence kills the stream. +pub(super) const STREAM_CHUNK_TIMEOUT_SECS: u64 = 90; +/// Maximum total bytes of text/thinking content before aborting the stream. +pub(super) const STREAM_MAX_CONTENT_BYTES: usize = 10 * 1024 * 1024; // 10 MB +/// Sanity backstop for total stream wall-clock duration. **Not** a routine +/// kill switch — `STREAM_CHUNK_TIMEOUT_SECS` (idle) is the primary stall +/// detector. The wall-clock cap is here only to bound pathological cases +/// (e.g. a server that keeps sending heartbeats forever without progress). +/// +/// History: this used to be 300s (5 min) which was too aggressive — V4 +/// thinking turns on hard prompts legitimately exceed 5 minutes wall-clock +/// while still emitting reasoning_content chunks the whole way. Bumped to +/// 30 min in v0.6.6 to address `TODO_FIXES.md` #1. Codex defaults to a +/// per-chunk idle of 300s with no wall-clock cap; we keep both layers but +/// give the wall-clock a generous window so it never fires in practice. +pub(super) const STREAM_MAX_DURATION_SECS: u64 = 1800; // 30 minutes (was 300s; #103/#1) +/// Hard cap on consecutive recoverable stream errors before we surface a turn +/// failure. Bumped 3 → 5 in v0.6.7 along with the HTTP/2 keepalive defaults +/// (#103) — keepalive should make spurious decode errors rarer, so we can +/// tolerate a longer streak before giving up on the turn. +pub(super) const MAX_STREAM_ERRORS_BEFORE_FAIL: u32 = 5; +/// Cap on transparent stream-level retries — these only happen when the wire +/// dies before any content was streamed, so DeepSeek hasn't billed us and +/// the user hasn't seen anything. Two attempts is enough to ride out a +/// flaky edge node without amplifying real outages (#103). +pub(super) const MAX_TRANSPARENT_STREAM_RETRIES: u32 = 2; + +/// Decide whether a stream error is eligible for a transparent retry. +/// +/// True only when ALL three conditions hold: +/// 1. No content has been received on the current attempt — otherwise DeepSeek +/// has already billed us for output tokens and the user has seen partial +/// deltas; resending would double-bill and desync the UI. +/// 2. We still have transparent-retry budget remaining. +/// 3. The turn has not been cancelled. +/// +/// Extracted as a pure function so the four #103 retry cases can be exercised +/// in unit tests without booting the full engine state machine. +pub(super) fn should_transparently_retry_stream( + any_content_received: bool, + transparent_attempts: u32, + cancelled: bool, +) -> bool { + !any_content_received && transparent_attempts < MAX_TRANSPARENT_STREAM_RETRIES && !cancelled +} + +pub(crate) const TOOL_CALL_START_MARKERS: [&str; 5] = [ + "[TOOL_CALL]", + "", +]; + +pub(crate) const TOOL_CALL_END_MARKERS: [&str; 5] = [ + "[/TOOL_CALL]", + "", + "", + "", + "", +]; + +/// Compact one-shot notice emitted when a model attempts to forge a tool-call +/// wrapper in plain text instead of using the API tool channel. The visible +/// content is still scrubbed; this exists so the user can see why their text +/// shrank. +pub(crate) const FAKE_WRAPPER_NOTICE: &str = + "Stripped non-API tool-call wrapper from model output (use the API tool channel)"; + +/// True if `text` contains any of the known fake-wrapper start markers. Used by +/// the streaming loop to decide whether to emit `FAKE_WRAPPER_NOTICE`. +pub(crate) fn contains_fake_tool_wrapper(text: &str) -> bool { + TOOL_CALL_START_MARKERS.iter().any(|m| text.contains(m)) +} + +fn find_first_marker(text: &str, markers: &[&str]) -> Option<(usize, usize)> { + markers + .iter() + .filter_map(|marker| text.find(marker).map(|idx| (idx, marker.len()))) + .min_by_key(|(idx, _)| *idx) +} + +pub(crate) fn filter_tool_call_delta(delta: &str, in_tool_call: &mut bool) -> String { + if delta.is_empty() { + return String::new(); + } + + let mut output = String::new(); + let mut rest = delta; + + loop { + if *in_tool_call { + let Some((idx, len)) = find_first_marker(rest, &TOOL_CALL_END_MARKERS) else { + break; + }; + rest = &rest[idx + len..]; + *in_tool_call = false; + } else { + let Some((idx, len)) = find_first_marker(rest, &TOOL_CALL_START_MARKERS) else { + output.push_str(rest); + break; + }; + output.push_str(&rest[..idx]); + rest = &rest[idx + len..]; + *in_tool_call = true; + } + } + + output +}