refactor(engine): isolate streaming state helpers

This commit is contained in:
Hunter Bown
2026-05-01 09:09:09 -05:00
parent 8379230ef1
commit a64bc9bbe5
3 changed files with 209 additions and 184 deletions
+17 -183
View File
@@ -32,9 +32,11 @@ use crate::error_taxonomy::{ErrorCategory, ErrorEnvelope, StreamError};
use crate::features::{Feature, Features};
use crate::llm_client::LlmClient;
use crate::mcp::McpPool;
#[cfg(test)]
use crate::models::ToolCaller;
use crate::models::{
ContentBlock, ContentBlockStart, DEFAULT_CONTEXT_WINDOW_TOKENS, Delta, Message, MessageRequest,
StreamEvent, SystemPrompt, Tool, ToolCaller, Usage,
StreamEvent, SystemPrompt, Tool, Usage,
};
use crate::prompts;
use crate::seam_manager::{SeamConfig, SeamManager};
@@ -291,185 +293,7 @@ pub struct Engine {
pending_lsp_blocks: Vec<crate::lsp::DiagnosticBlock>,
}
// === Internal stream helpers ===
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum ContentBlockKind {
Text,
Thinking,
ToolUse,
}
#[derive(Debug, Clone)]
struct ToolUseState {
id: String,
name: String,
input: serde_json::Value,
caller: Option<ToolCaller>,
input_buffer: String,
}
/// Maximum time to wait for a single stream chunk before assuming a stall.
/// **This is the idle timeout** — it resets on every SSE chunk, so long
/// thinking turns that ARE producing reasoning_content stay alive. Only a
/// genuine `chunk_timeout` window of silence kills the stream.
const STREAM_CHUNK_TIMEOUT_SECS: u64 = 90;
/// Maximum total bytes of text/thinking content before aborting the stream.
const STREAM_MAX_CONTENT_BYTES: usize = 10 * 1024 * 1024; // 10 MB
/// Sanity backstop for total stream wall-clock duration. **Not** a routine
/// kill switch — `STREAM_CHUNK_TIMEOUT_SECS` (idle) is the primary stall
/// detector. The wall-clock cap is here only to bound pathological cases
/// (e.g. a server that keeps sending heartbeats forever without progress).
///
/// History: this used to be 300s (5 min) which was too aggressive — V4
/// thinking turns on hard prompts legitimately exceed 5 minutes wall-clock
/// while still emitting reasoning_content chunks the whole way. Bumped to
/// 30 min in v0.6.6 to address `TODO_FIXES.md` #1. Codex defaults to a
/// per-chunk idle of 300s with no wall-clock cap; we keep both layers but
/// give the wall-clock a generous window so it never fires in practice.
const STREAM_MAX_DURATION_SECS: u64 = 1800; // 30 minutes (was 300s; #103/#1)
/// Hard cap on consecutive recoverable stream errors before we surface a turn
/// failure. Bumped 3 → 5 in v0.6.7 along with the HTTP/2 keepalive defaults
/// (#103) — keepalive should make spurious decode errors rarer, so we can
/// tolerate a longer streak before giving up on the turn.
const MAX_STREAM_ERRORS_BEFORE_FAIL: u32 = 5;
/// Cap on transparent stream-level retries — these only happen when the wire
/// dies before any content was streamed, so DeepSeek hasn't billed us and
/// the user hasn't seen anything. Two attempts is enough to ride out a
/// flaky edge node without amplifying real outages (#103).
const MAX_TRANSPARENT_STREAM_RETRIES: u32 = 2;
/// Decide whether a stream error is eligible for a transparent retry.
///
/// True only when ALL three conditions hold:
/// 1. No content has been received on the current attempt — otherwise DeepSeek
/// has already billed us for output tokens and the user has seen partial
/// deltas; resending would double-bill and desync the UI.
/// 2. We still have transparent-retry budget remaining.
/// 3. The turn has not been cancelled.
///
/// Extracted as a pure function so the four #103 retry cases can be exercised
/// in unit tests without booting the full engine state machine.
fn should_transparently_retry_stream(
any_content_received: bool,
transparent_attempts: u32,
cancelled: bool,
) -> bool {
!any_content_received && transparent_attempts < MAX_TRANSPARENT_STREAM_RETRIES && !cancelled
}
pub(crate) const TOOL_CALL_START_MARKERS: [&str; 5] = [
"[TOOL_CALL]",
"<deepseek:tool_call",
"<tool_call",
"<invoke ",
"<function_calls>",
];
pub(crate) const TOOL_CALL_END_MARKERS: [&str; 5] = [
"[/TOOL_CALL]",
"</deepseek:tool_call>",
"</tool_call>",
"</invoke>",
"</function_calls>",
];
/// Compact one-shot notice emitted when a model attempts to forge a tool-call
/// wrapper in plain text instead of using the API tool channel. The visible
/// content is still scrubbed; this exists so the user can see why their text
/// shrank.
pub(crate) const FAKE_WRAPPER_NOTICE: &str =
"Stripped non-API tool-call wrapper from model output (use the API tool channel)";
/// True if `text` contains any of the known fake-wrapper start markers. Used by
/// the streaming loop to decide whether to emit `FAKE_WRAPPER_NOTICE`.
pub(crate) fn contains_fake_tool_wrapper(text: &str) -> bool {
TOOL_CALL_START_MARKERS.iter().any(|m| text.contains(m))
}
fn find_first_marker(text: &str, markers: &[&str]) -> Option<(usize, usize)> {
markers
.iter()
.filter_map(|marker| text.find(marker).map(|idx| (idx, marker.len())))
.min_by_key(|(idx, _)| *idx)
}
pub(crate) fn filter_tool_call_delta(delta: &str, in_tool_call: &mut bool) -> String {
if delta.is_empty() {
return String::new();
}
let mut output = String::new();
let mut rest = delta;
loop {
if *in_tool_call {
let Some((idx, len)) = find_first_marker(rest, &TOOL_CALL_END_MARKERS) else {
break;
};
rest = &rest[idx + len..];
*in_tool_call = false;
} else {
let Some((idx, len)) = find_first_marker(rest, &TOOL_CALL_START_MARKERS) else {
output.push_str(rest);
break;
};
output.push_str(&rest[..idx]);
rest = &rest[idx + len..];
*in_tool_call = true;
}
}
output
}
fn caller_type_for_tool_use(caller: Option<&ToolCaller>) -> &str {
caller.map_or("direct", |c| c.caller_type.as_str())
}
fn caller_allowed_for_tool(caller: Option<&ToolCaller>, tool_def: Option<&Tool>) -> bool {
let requested = caller_type_for_tool_use(caller);
if let Some(def) = tool_def
&& let Some(allowed) = &def.allowed_callers
{
if allowed.is_empty() {
return requested == "direct";
}
return allowed.iter().any(|item| item == requested);
}
requested == "direct"
}
fn format_tool_error(err: &ToolError, tool_name: &str) -> String {
match err {
ToolError::InvalidInput { message } => {
format!("Invalid input for tool '{tool_name}': {message}")
}
ToolError::MissingField { field } => {
format!("Tool '{tool_name}' is missing required field '{field}'")
}
ToolError::PathEscape { path } => format!(
"Path escapes workspace: {}. Use a workspace-relative path or enable trust mode.",
path.display()
),
ToolError::ExecutionFailed { message } => message.clone(),
ToolError::Timeout { seconds } => format!(
"Tool '{tool_name}' timed out after {seconds}s. Try a narrower scope or a longer timeout."
),
ToolError::NotAvailable { message } => {
let lower = message.to_ascii_lowercase();
if lower.contains("current tool catalog") || lower.contains("did you mean:") {
message.clone()
} else {
format!(
"Tool '{tool_name}' is not available: {message}. Check mode, feature flags, or tool name."
)
}
}
ToolError::PermissionDenied { message } => format!(
"Tool '{tool_name}' was denied: {message}. Adjust approval mode or request permission."
),
}
}
// === Internal tool helpers ===
impl Engine {
fn reset_cancel_token(&mut self) {
@@ -1850,6 +1674,7 @@ use context::{
};
mod dispatch;
mod lsp_hooks;
mod streaming;
mod tool_catalog;
mod tool_execution;
mod tool_setup;
@@ -1858,12 +1683,21 @@ mod turn_loop;
use self::approval::{ApprovalDecision, ApprovalResult, UserInputDecision};
use self::dispatch::{
ParallelToolResult, ParallelToolResultEntry, ToolExecGuard, ToolExecOutcome, ToolExecutionPlan,
final_tool_input, mcp_tool_approval_description, mcp_tool_is_parallel_safe,
mcp_tool_is_read_only, parse_parallel_tool_calls, parse_tool_input,
should_force_update_plan_first, should_parallelize_tool_batch, should_stop_after_plan_tool,
caller_allowed_for_tool, caller_type_for_tool_use, final_tool_input, format_tool_error,
mcp_tool_approval_description, mcp_tool_is_parallel_safe, mcp_tool_is_read_only,
parse_parallel_tool_calls, parse_tool_input, should_force_update_plan_first,
should_parallelize_tool_batch, should_stop_after_plan_tool,
};
#[cfg(test)]
use self::lsp_hooks::{edited_paths_for_tool, parse_patch_paths};
#[cfg(test)]
use self::streaming::TOOL_CALL_START_MARKERS;
use self::streaming::{
ContentBlockKind, FAKE_WRAPPER_NOTICE, MAX_STREAM_ERRORS_BEFORE_FAIL,
MAX_TRANSPARENT_STREAM_RETRIES, STREAM_CHUNK_TIMEOUT_SECS, STREAM_MAX_CONTENT_BYTES,
STREAM_MAX_DURATION_SECS, ToolUseState, contains_fake_tool_wrapper, filter_tool_call_delta,
should_transparently_retry_stream,
};
use self::tool_catalog::{
CODE_EXECUTION_TOOL_NAME, MULTI_TOOL_PARALLEL_NAME, REQUEST_USER_INPUT_NAME,
active_tools_for_step, build_model_tool_catalog, ensure_advanced_tooling,
+55 -1
View File
@@ -17,7 +17,7 @@
use serde_json::json;
use crate::models::ToolCaller;
use crate::models::{Tool, ToolCaller};
use crate::tools::spec::{ToolError, ToolResult};
use crate::tui::app::AppMode;
@@ -71,6 +71,60 @@ pub(super) enum ToolExecGuard<'a> {
Write(#[allow(dead_code)] tokio::sync::RwLockWriteGuard<'a, ()>),
}
// === Caller policy and errors ========================================
pub(super) fn caller_type_for_tool_use(caller: Option<&ToolCaller>) -> &str {
caller.map_or("direct", |c| c.caller_type.as_str())
}
pub(super) fn caller_allowed_for_tool(
caller: Option<&ToolCaller>,
tool_def: Option<&Tool>,
) -> bool {
let requested = caller_type_for_tool_use(caller);
if let Some(def) = tool_def
&& let Some(allowed) = &def.allowed_callers
{
if allowed.is_empty() {
return requested == "direct";
}
return allowed.iter().any(|item| item == requested);
}
requested == "direct"
}
pub(super) fn format_tool_error(err: &ToolError, tool_name: &str) -> String {
match err {
ToolError::InvalidInput { message } => {
format!("Invalid input for tool '{tool_name}': {message}")
}
ToolError::MissingField { field } => {
format!("Tool '{tool_name}' is missing required field '{field}'")
}
ToolError::PathEscape { path } => format!(
"Path escapes workspace: {}. Use a workspace-relative path or enable trust mode.",
path.display()
),
ToolError::ExecutionFailed { message } => message.clone(),
ToolError::Timeout { seconds } => format!(
"Tool '{tool_name}' timed out after {seconds}s. Try a narrower scope or a longer timeout."
),
ToolError::NotAvailable { message } => {
let lower = message.to_ascii_lowercase();
if lower.contains("current tool catalog") || lower.contains("did you mean:") {
message.clone()
} else {
format!(
"Tool '{tool_name}' is not available: {message}. Check mode, feature flags, or tool name."
)
}
}
ToolError::PermissionDenied { message } => format!(
"Tool '{tool_name}' was denied: {message}. Adjust approval mode or request permission."
),
}
}
// === Streaming-buffer parsing =========================================
/// Promote a streaming `ToolUseState` to a finalized JSON input.
+137
View File
@@ -0,0 +1,137 @@
//! Streaming response state and guardrails.
//!
//! This module owns the local state used while decoding one model stream:
//! content block kind tracking, streamed tool-use buffers, transparent retry
//! policy, and scrubbers for text that looks like a forged tool-call wrapper.
use crate::models::ToolCaller;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(super) enum ContentBlockKind {
Text,
Thinking,
ToolUse,
}
#[derive(Debug, Clone)]
pub(super) struct ToolUseState {
pub(super) id: String,
pub(super) name: String,
pub(super) input: serde_json::Value,
pub(super) caller: Option<ToolCaller>,
pub(super) input_buffer: String,
}
/// Maximum time to wait for a single stream chunk before assuming a stall.
/// **This is the idle timeout** — it resets on every SSE chunk, so long
/// thinking turns that ARE producing reasoning_content stay alive. Only a
/// genuine `chunk_timeout` window of silence kills the stream.
pub(super) const STREAM_CHUNK_TIMEOUT_SECS: u64 = 90;
/// Maximum total bytes of text/thinking content before aborting the stream.
pub(super) const STREAM_MAX_CONTENT_BYTES: usize = 10 * 1024 * 1024; // 10 MB
/// Sanity backstop for total stream wall-clock duration. **Not** a routine
/// kill switch — `STREAM_CHUNK_TIMEOUT_SECS` (idle) is the primary stall
/// detector. The wall-clock cap is here only to bound pathological cases
/// (e.g. a server that keeps sending heartbeats forever without progress).
///
/// History: this used to be 300s (5 min) which was too aggressive — V4
/// thinking turns on hard prompts legitimately exceed 5 minutes wall-clock
/// while still emitting reasoning_content chunks the whole way. Bumped to
/// 30 min in v0.6.6 to address `TODO_FIXES.md` #1. Codex defaults to a
/// per-chunk idle of 300s with no wall-clock cap; we keep both layers but
/// give the wall-clock a generous window so it never fires in practice.
pub(super) const STREAM_MAX_DURATION_SECS: u64 = 1800; // 30 minutes (was 300s; #103/#1)
/// Hard cap on consecutive recoverable stream errors before we surface a turn
/// failure. Bumped 3 → 5 in v0.6.7 along with the HTTP/2 keepalive defaults
/// (#103) — keepalive should make spurious decode errors rarer, so we can
/// tolerate a longer streak before giving up on the turn.
pub(super) const MAX_STREAM_ERRORS_BEFORE_FAIL: u32 = 5;
/// Cap on transparent stream-level retries — these only happen when the wire
/// dies before any content was streamed, so DeepSeek hasn't billed us and
/// the user hasn't seen anything. Two attempts is enough to ride out a
/// flaky edge node without amplifying real outages (#103).
pub(super) const MAX_TRANSPARENT_STREAM_RETRIES: u32 = 2;
/// Decide whether a stream error is eligible for a transparent retry.
///
/// True only when ALL three conditions hold:
/// 1. No content has been received on the current attempt — otherwise DeepSeek
/// has already billed us for output tokens and the user has seen partial
/// deltas; resending would double-bill and desync the UI.
/// 2. We still have transparent-retry budget remaining.
/// 3. The turn has not been cancelled.
///
/// Extracted as a pure function so the four #103 retry cases can be exercised
/// in unit tests without booting the full engine state machine.
pub(super) fn should_transparently_retry_stream(
any_content_received: bool,
transparent_attempts: u32,
cancelled: bool,
) -> bool {
!any_content_received && transparent_attempts < MAX_TRANSPARENT_STREAM_RETRIES && !cancelled
}
pub(crate) const TOOL_CALL_START_MARKERS: [&str; 5] = [
"[TOOL_CALL]",
"<deepseek:tool_call",
"<tool_call",
"<invoke ",
"<function_calls>",
];
pub(crate) const TOOL_CALL_END_MARKERS: [&str; 5] = [
"[/TOOL_CALL]",
"</deepseek:tool_call>",
"</tool_call>",
"</invoke>",
"</function_calls>",
];
/// Compact one-shot notice emitted when a model attempts to forge a tool-call
/// wrapper in plain text instead of using the API tool channel. The visible
/// content is still scrubbed; this exists so the user can see why their text
/// shrank.
pub(crate) const FAKE_WRAPPER_NOTICE: &str =
"Stripped non-API tool-call wrapper from model output (use the API tool channel)";
/// True if `text` contains any of the known fake-wrapper start markers. Used by
/// the streaming loop to decide whether to emit `FAKE_WRAPPER_NOTICE`.
pub(crate) fn contains_fake_tool_wrapper(text: &str) -> bool {
TOOL_CALL_START_MARKERS.iter().any(|m| text.contains(m))
}
fn find_first_marker(text: &str, markers: &[&str]) -> Option<(usize, usize)> {
markers
.iter()
.filter_map(|marker| text.find(marker).map(|idx| (idx, marker.len())))
.min_by_key(|(idx, _)| *idx)
}
pub(crate) fn filter_tool_call_delta(delta: &str, in_tool_call: &mut bool) -> String {
if delta.is_empty() {
return String::new();
}
let mut output = String::new();
let mut rest = delta;
loop {
if *in_tool_call {
let Some((idx, len)) = find_first_marker(rest, &TOOL_CALL_END_MARKERS) else {
break;
};
rest = &rest[idx + len..];
*in_tool_call = false;
} else {
let Some((idx, len)) = find_first_marker(rest, &TOOL_CALL_START_MARKERS) else {
output.push_str(rest);
break;
};
output.push_str(&rest[..idx]);
rest = &rest[idx + len..];
*in_tool_call = true;
}
}
output
}