diff --git a/crates/tui/src/core/engine.rs b/crates/tui/src/core/engine.rs index 0beee047..fe499f11 100644 --- a/crates/tui/src/core/engine.rs +++ b/crates/tui/src/core/engine.rs @@ -2378,2482 +2378,6 @@ impl Engine { /// Handle a turn using the DeepSeek API. #[allow(clippy::too_many_lines)] - async fn handle_deepseek_turn( - &mut self, - turn: &mut TurnContext, - tool_registry: Option<&crate::tools::ToolRegistry>, - tools: Option>, - mode: AppMode, - force_update_plan_first: bool, - ) -> (TurnOutcomeStatus, Option) { - let client = self - .deepseek_client - .clone() - .expect("DeepSeek client should be configured"); - - let mut consecutive_tool_error_steps = 0u32; - let mut turn_error: Option = None; - let mut context_recovery_attempts = 0u8; - let mut tool_catalog = tools.unwrap_or_default(); - if !tool_catalog.is_empty() { - ensure_advanced_tooling(&mut tool_catalog); - } - let mut active_tool_names = initial_active_tools(&tool_catalog); - - // Transparent stream-retry counter: when the chunked-transfer - // connection dies mid-stream and we got nothing useful out of it - // (no tool calls, no completed text), we silently re-issue the - // SAME request up to MAX_STREAM_RETRIES times before surfacing - // the failure to the user. This is the #103 Phase 3 retry that - // keeps long V4 thinking turns from being killed by transient - // proxy disconnects. - const MAX_STREAM_RETRIES: u32 = 3; - let mut stream_retry_attempts: u32 = 0; - - loop { - if self.cancel_token.is_cancelled() { - let _ = self.tx_event.send(Event::status("Request cancelled")).await; - return (TurnOutcomeStatus::Interrupted, None); - } - - while let Ok(steer) = self.rx_steer.try_recv() { - let steer = steer.trim().to_string(); - if steer.is_empty() { - continue; - } - self.session - .working_set - .observe_user_message(&steer, &self.session.workspace); - self.add_session_message(Message { - role: "user".to_string(), - content: vec![ContentBlock::Text { - text: steer.clone(), - cache_control: None, - }], - }) - .await; - let _ = self - .tx_event - .send(Event::status(format!( - "Steer input accepted: {}", - summarize_text(&steer, 120) - ))) - .await; - } - - // Ensure system prompt is up to date with latest session states - self.refresh_system_prompt(mode); - - if turn.at_max_steps() { - let _ = self - .tx_event - .send(Event::status("Reached maximum steps")) - .await; - break; - } - - let compaction_pins = self - .session - .working_set - .pinned_message_indices(&self.session.messages, &self.session.workspace); - let compaction_paths = self.session.working_set.top_paths(24); - - if self.config.compaction.enabled - && should_compact( - &self.session.messages, - &self.config.compaction, - Some(&self.session.workspace), - Some(&compaction_pins), - Some(&compaction_paths), - ) - { - let compaction_id = format!("compact_{}", &uuid::Uuid::new_v4().to_string()[..8]); - self.emit_compaction_started( - compaction_id.clone(), - true, - "Auto context compaction started".to_string(), - ) - .await; - let _ = self - .tx_event - .send(Event::status("Auto-compacting context...".to_string())) - .await; - let auto_messages_before = self.session.messages.len(); - match compact_messages_safe( - &client, - &self.session.messages, - &self.config.compaction, - Some(&self.session.workspace), - Some(&compaction_pins), - Some(&compaction_paths), - ) - .await - { - Ok(result) => { - // Only update if we got valid messages (never corrupt state) - if !result.messages.is_empty() || self.session.messages.is_empty() { - let auto_messages_after = result.messages.len(); - self.session.messages = result.messages; - self.merge_compaction_summary(result.summary_prompt); - self.emit_session_updated().await; - let removed = auto_messages_before.saturating_sub(auto_messages_after); - let status = if result.retries_used > 0 { - format!( - "Auto-compaction complete: {auto_messages_before} → {auto_messages_after} messages ({removed} removed, {} retries)", - result.retries_used - ) - } else { - format!( - "Auto-compaction complete: {auto_messages_before} → {auto_messages_after} messages ({removed} removed)" - ) - }; - self.emit_compaction_completed( - compaction_id.clone(), - true, - status.clone(), - Some(auto_messages_before), - Some(auto_messages_after), - ) - .await; - let _ = self.tx_event.send(Event::status(status)).await; - } else { - let message = "Auto-compaction skipped: empty result".to_string(); - self.emit_compaction_failed( - compaction_id.clone(), - true, - message.clone(), - ) - .await; - let _ = self.tx_event.send(Event::status(message)).await; - } - } - Err(err) => { - // Log error but continue with original messages (never corrupt) - let message = format!("Auto-compaction failed: {err}"); - self.emit_compaction_failed(compaction_id, true, message.clone()) - .await; - let _ = self.tx_event.send(Event::status(message)).await; - } - } - } - - if self - .run_capacity_pre_request_checkpoint(turn, Some(&client), mode) - .await - { - continue; - } - - if let Some(input_budget) = - context_input_budget(&self.session.model, TURN_MAX_OUTPUT_TOKENS) - { - let estimated_input = self.estimated_input_tokens(); - if estimated_input > input_budget { - if context_recovery_attempts >= MAX_CONTEXT_RECOVERY_ATTEMPTS { - let message = format!( - "Context remains above model limit after {} recovery attempts \ - (~{} token estimate, ~{} budget). Please run /compact or /clear.", - MAX_CONTEXT_RECOVERY_ATTEMPTS, estimated_input, input_budget - ); - turn_error = Some(message.clone()); - let _ = self - .tx_event - .send(Event::error(ErrorEnvelope::context_overflow(message))) - .await; - return (TurnOutcomeStatus::Failed, turn_error); - } - - if self - .recover_context_overflow( - &client, - "preflight token budget", - TURN_MAX_OUTPUT_TOKENS, - ) - .await - { - context_recovery_attempts = context_recovery_attempts.saturating_add(1); - continue; - } - } - } - - // #136: drain any LSP diagnostics collected since the last - // request and inject them as a synthetic user message so the - // model sees compile errors before its next reasoning step. - self.flush_pending_lsp_diagnostics().await; - - // Build the request - let force_update_plan_this_step = force_update_plan_first && turn.tool_calls.is_empty(); - let active_tools = if tool_catalog.is_empty() { - None - } else { - Some(active_tools_for_step( - &tool_catalog, - &active_tool_names, - force_update_plan_this_step, - )) - }; - let request = MessageRequest { - model: self.session.model.clone(), - messages: self.session.messages.clone(), - max_tokens: TURN_MAX_OUTPUT_TOKENS, - system: self.session.system_prompt.clone(), - tools: active_tools.clone(), - tool_choice: if active_tools.is_some() { - Some(json!({ "type": "auto" })) - } else { - None - }, - metadata: None, - thinking: None, - reasoning_effort: self.session.reasoning_effort.clone(), - stream: Some(true), - temperature: None, - top_p: None, - }; - - // Stream the response. Keep the request around (cloned into the - // first call) so we can resend it on a transparent retry below - // when the wire dies before any content was streamed (#103). - let stream_request = request; - let stream_result = client.create_message_stream(stream_request.clone()).await; - let stream = match stream_result { - Ok(s) => { - context_recovery_attempts = 0; - s - } - Err(e) => { - let message = e.to_string(); - if is_context_length_error_message(&message) - && context_recovery_attempts < MAX_CONTEXT_RECOVERY_ATTEMPTS - && self - .recover_context_overflow( - &client, - "provider context-length rejection", - TURN_MAX_OUTPUT_TOKENS, - ) - .await - { - context_recovery_attempts = context_recovery_attempts.saturating_add(1); - continue; - } - turn_error = Some(message.clone()); - let _ = self - .tx_event - .send(Event::error(ErrorEnvelope::classify(message, true))) - .await; - return (TurnOutcomeStatus::Failed, turn_error); - } - }; - // The stream value is itself `Pin>`, which - // is `Unpin`, so we can rebind it on a transparent retry without - // breaking the existing pin invariants. - let mut stream = stream; - - // Track content blocks - let mut content_blocks: Vec = Vec::new(); - let mut current_text_raw = String::new(); - let mut current_text_visible = String::new(); - let mut current_thinking = String::new(); - let mut tool_uses: Vec = Vec::new(); - let mut usage = Usage { - input_tokens: 0, - output_tokens: 0, - ..Usage::default() - }; - let mut current_block_kind: Option = None; - let mut current_tool_index: Option = None; - let mut in_tool_call_block = false; - let mut fake_wrapper_notice_emitted = false; - let mut pending_message_complete = false; - let mut last_text_index: Option = None; - let mut stream_errors = 0u32; - // #103 transparent retry bookkeeping. `any_content_received` flips - // on the first non-MessageStart event so we know whether DeepSeek - // billed us / the user has seen any output for this turn yet. - // This is distinct from the outer `stream_retry_attempts` (which - // restarts the whole turn-step when a stream died with no - // content-block delta delivered to the consumer). - let mut any_content_received = false; - let mut transparent_stream_retries = 0u32; - let mut pending_steers: Vec = Vec::new(); - // `stream_start` is reset on a transparent retry so the wall-clock - // budget restarts with the fresh stream. - let mut stream_start = Instant::now(); - let mut stream_content_bytes: usize = 0; - let chunk_timeout = Duration::from_secs(STREAM_CHUNK_TIMEOUT_SECS); - let max_duration = Duration::from_secs(STREAM_MAX_DURATION_SECS); - - // Process stream events - loop { - let poll_outcome = tokio::select! { - _ = self.cancel_token.cancelled() => None, - result = tokio::time::timeout(chunk_timeout, stream.next()) => { - match result { - Ok(Some(event_result)) => Some(event_result), - Ok(None) => None, // stream ended normally - Err(_) => { - let envelope = StreamError::Stall { - timeout_secs: STREAM_CHUNK_TIMEOUT_SECS, - } - .into_envelope(); - crate::logging::warn(&envelope.message); - let _ = self.tx_event.send(Event::error(envelope)).await; - None - } - } - } - }; - let Some(event_result) = poll_outcome else { - break; - }; - while let Ok(steer) = self.rx_steer.try_recv() { - let steer = steer.trim().to_string(); - if steer.is_empty() { - continue; - } - pending_steers.push(steer.clone()); - let _ = self - .tx_event - .send(Event::status(format!( - "Steer input queued: {}", - summarize_text(&steer, 120) - ))) - .await; - } - - if self.cancel_token.is_cancelled() { - break; - } - - // Guard: max wall-clock duration - if stream_start.elapsed() > max_duration { - let envelope = StreamError::DurationLimit { - limit_secs: STREAM_MAX_DURATION_SECS, - } - .into_envelope(); - crate::logging::warn(&envelope.message); - turn_error.get_or_insert(envelope.message.clone()); - let _ = self.tx_event.send(Event::error(envelope)).await; - break; - } - - // Guard: max accumulated content bytes - if stream_content_bytes > STREAM_MAX_CONTENT_BYTES { - let envelope = StreamError::Overflow { - limit_bytes: STREAM_MAX_CONTENT_BYTES, - } - .into_envelope(); - crate::logging::warn(&envelope.message); - turn_error.get_or_insert(envelope.message.clone()); - let _ = self.tx_event.send(Event::error(envelope)).await; - break; - } - - let event = match event_result { - Ok(e) => { - // Flip on the first non-MessageStart event — that's - // the moment we cross from "stream not yet productive" - // (eligible for transparent retry) into "DeepSeek has - // billed us / user has seen output" (must surface). - if !any_content_received && !matches!(e, StreamEvent::MessageStart { .. }) { - any_content_received = true; - } - e - } - Err(e) => { - stream_errors = stream_errors.saturating_add(1); - let message = e.to_string(); - // #103: when the stream errors before any content was - // streamed AND we still have retry budget, transparently - // resend the request. DeepSeek has not billed for any - // output and the user has seen nothing — re-trying is - // the right user-visible behavior. - if should_transparently_retry_stream( - any_content_received, - transparent_stream_retries, - self.cancel_token.is_cancelled(), - ) { - transparent_stream_retries = - transparent_stream_retries.saturating_add(1); - crate::logging::info(format!( - "Transparent stream retry {}/{} (no content received yet): {}", - transparent_stream_retries, MAX_TRANSPARENT_STREAM_RETRIES, message, - )); - // Drop the failed stream before issuing the new - // request to release the underlying connection. - drop(stream); - match client.create_message_stream(stream_request.clone()).await { - Ok(fresh) => { - stream = fresh; - stream_start = Instant::now(); - // Roll back the error counter — this one - // didn't surface to the user. - stream_errors = stream_errors.saturating_sub(1); - continue; - } - Err(retry_err) => { - let retry_msg = format!("Stream retry failed: {retry_err}"); - turn_error.get_or_insert(retry_msg.clone()); - let _ = self - .tx_event - .send(Event::error(ErrorEnvelope::classify( - retry_msg, true, - ))) - .await; - break; - } - } - } - turn_error.get_or_insert(message.clone()); - let _ = self - .tx_event - .send(Event::error(ErrorEnvelope::classify(message, true))) - .await; - if stream_errors >= MAX_STREAM_ERRORS_BEFORE_FAIL { - break; - } - continue; - } - }; - - match event { - StreamEvent::MessageStart { message } => { - usage = message.usage; - } - StreamEvent::ContentBlockStart { - index, - content_block, - } => match content_block { - ContentBlockStart::Text { text } => { - current_text_raw = text; - current_text_visible.clear(); - in_tool_call_block = false; - let filtered = - filter_tool_call_delta(¤t_text_raw, &mut in_tool_call_block); - if !fake_wrapper_notice_emitted - && filtered.len() < current_text_raw.len() - && contains_fake_tool_wrapper(¤t_text_raw) - { - let _ = - self.tx_event.send(Event::status(FAKE_WRAPPER_NOTICE)).await; - fake_wrapper_notice_emitted = true; - } - current_text_visible.push_str(&filtered); - current_block_kind = Some(ContentBlockKind::Text); - last_text_index = Some(index as usize); - let _ = self - .tx_event - .send(Event::MessageStarted { - index: index as usize, - }) - .await; - } - ContentBlockStart::Thinking { thinking } => { - current_thinking = thinking; - current_block_kind = Some(ContentBlockKind::Thinking); - let _ = self - .tx_event - .send(Event::ThinkingStarted { - index: index as usize, - }) - .await; - } - ContentBlockStart::ToolUse { - id, - name, - input, - caller, - } => { - crate::logging::info(format!( - "Tool '{}' block start. Initial input: {:?}", - name, input - )); - current_block_kind = Some(ContentBlockKind::ToolUse); - current_tool_index = Some(tool_uses.len()); - // ToolCallStarted is deferred to ContentBlockStop — - // see `final_tool_input`. Emitting here would ship - // the placeholder `{}` and the cell would render - // `` / `` literals to the user. - tool_uses.push(ToolUseState { - id, - name, - input, - caller, - input_buffer: String::new(), - }); - } - ContentBlockStart::ServerToolUse { id, name, input } => { - crate::logging::info(format!( - "Server tool '{}' block start. Initial input: {:?}", - name, input - )); - current_block_kind = Some(ContentBlockKind::ToolUse); - current_tool_index = Some(tool_uses.len()); - tool_uses.push(ToolUseState { - id, - name, - input, - caller: None, - input_buffer: String::new(), - }); - } - }, - StreamEvent::ContentBlockDelta { index, delta } => match delta { - Delta::TextDelta { text } => { - stream_content_bytes = stream_content_bytes.saturating_add(text.len()); - current_text_raw.push_str(&text); - let filtered = filter_tool_call_delta(&text, &mut in_tool_call_block); - if !fake_wrapper_notice_emitted - && filtered.len() < text.len() - && contains_fake_tool_wrapper(&text) - { - let _ = - self.tx_event.send(Event::status(FAKE_WRAPPER_NOTICE)).await; - fake_wrapper_notice_emitted = true; - } - if !filtered.is_empty() { - current_text_visible.push_str(&filtered); - let _ = self - .tx_event - .send(Event::MessageDelta { - index: index as usize, - content: filtered, - }) - .await; - } - } - Delta::ThinkingDelta { thinking } => { - stream_content_bytes = - stream_content_bytes.saturating_add(thinking.len()); - current_thinking.push_str(&thinking); - if !thinking.is_empty() { - let _ = self - .tx_event - .send(Event::ThinkingDelta { - index: index as usize, - content: thinking, - }) - .await; - } - } - Delta::InputJsonDelta { partial_json } => { - if let Some(index) = current_tool_index - && let Some(tool_state) = tool_uses.get_mut(index) - { - tool_state.input_buffer.push_str(&partial_json); - crate::logging::info(format!( - "Tool '{}' input delta: {} (buffer now: {})", - tool_state.name, partial_json, tool_state.input_buffer - )); - if let Some(value) = parse_tool_input(&tool_state.input_buffer) { - tool_state.input = value.clone(); - crate::logging::info(format!( - "Tool '{}' input parsed: {:?}", - tool_state.name, value - )); - } - } - } - }, - StreamEvent::ContentBlockStop { index } => { - let stopped_kind = current_block_kind.take(); - match stopped_kind { - Some(ContentBlockKind::Text) => { - pending_message_complete = true; - last_text_index = Some(index as usize); - } - Some(ContentBlockKind::Thinking) => { - let _ = self - .tx_event - .send(Event::ThinkingComplete { - index: index as usize, - }) - .await; - } - Some(ContentBlockKind::ToolUse) | None => {} - } - if matches!(stopped_kind, Some(ContentBlockKind::ToolUse)) - && let Some(index) = current_tool_index.take() - && let Some(tool_state) = tool_uses.get_mut(index) - { - crate::logging::info(format!( - "Tool '{}' block stop. Buffer: '{}', Current input: {:?}", - tool_state.name, tool_state.input_buffer, tool_state.input - )); - if !tool_state.input_buffer.trim().is_empty() { - if let Some(value) = parse_tool_input(&tool_state.input_buffer) { - tool_state.input = value; - crate::logging::info(format!( - "Tool '{}' final input: {:?}", - tool_state.name, tool_state.input - )); - } else { - crate::logging::warn(format!( - "Tool '{}' failed to parse final input buffer: '{}'", - tool_state.name, tool_state.input_buffer - )); - let _ = self - .tx_event - .send(Event::status(format!( - "⚠ Tool '{}' received malformed arguments from model", - tool_state.name - ))) - .await; - } - } else { - crate::logging::warn(format!( - "Tool '{}' input buffer is empty, using initial input: {:?}", - tool_state.name, tool_state.input - )); - } - - // Now that the input is finalized, announce the - // tool call to the UI. Deferring to here is what - // keeps the cell from rendering `` / - // `` placeholders during the brief window - // between block start and the last InputJsonDelta. - let _ = self - .tx_event - .send(Event::ToolCallStarted { - id: tool_state.id.clone(), - name: tool_state.name.clone(), - input: final_tool_input(tool_state), - }) - .await; - } - } - StreamEvent::MessageDelta { - usage: delta_usage, .. - } => { - if let Some(u) = delta_usage { - usage = u; - } - } - StreamEvent::MessageStop | StreamEvent::Ping => {} - } - } - - // #103 Phase 3 — transparent retry. The inner loop above bails - // when reqwest yields chunk decode errors three times in a row; - // most of the time those are recoverable proxy / HTTP/2 issues - // and the request can simply be re-issued. Re-issue silently up - // to MAX_STREAM_RETRIES, but only when the stream produced - // nothing actionable — if any tool call landed or text was - // streamed, ship the partial state to the rest of the turn - // pipeline so we don't double-bill the user by re-running it. - let stream_died_with_nothing = stream_errors > 0 - && tool_uses.is_empty() - && current_text_visible.trim().is_empty() - && current_thinking.trim().is_empty() - && !pending_message_complete; - if stream_died_with_nothing { - if stream_retry_attempts < MAX_STREAM_RETRIES { - stream_retry_attempts = stream_retry_attempts.saturating_add(1); - crate::logging::warn(format!( - "Stream died with no content (attempt {}/{}); retrying request", - stream_retry_attempts, MAX_STREAM_RETRIES - )); - let _ = self - .tx_event - .send(Event::status(format!( - "Connection interrupted; retrying ({}/{})", - stream_retry_attempts, MAX_STREAM_RETRIES - ))) - .await; - // Don't preserve the per-stream `turn_error` — we're - // about to retry, and a successful retry should not - // surface the transient error as the turn outcome. - turn_error = None; - continue; - } - crate::logging::warn(format!( - "Stream retry budget exhausted ({} attempts); failing turn", - stream_retry_attempts - )); - } else if stream_errors == 0 { - // Healthy round → reset retry budget so we don't carry over - // state from a previous bad round. - stream_retry_attempts = 0; - } - - // Update turn usage - turn.add_usage(&usage); - - // Build content blocks. If this assistant turn produced tool - // calls, ensure a Thinking block is present even when the model - // didn't stream any reasoning text — DeepSeek's thinking-mode - // API requires `reasoning_content` to accompany every tool-call - // assistant message in the conversation history. Saving a - // placeholder here keeps the on-disk session structurally - // correct so subsequent requests won't 400. - let needs_thinking_block = - !tool_uses.is_empty() || tool_parser::has_tool_call_markers(¤t_text_raw); - let thinking_to_persist = if !current_thinking.is_empty() { - Some(current_thinking.clone()) - } else if needs_thinking_block { - Some(String::from("(reasoning omitted)")) - } else { - None - }; - if let Some(thinking) = thinking_to_persist { - content_blocks.push(ContentBlock::Thinking { thinking }); - } - let mut final_text = current_text_visible.clone(); - if tool_uses.is_empty() && tool_parser::has_tool_call_markers(¤t_text_raw) { - let parsed = tool_parser::parse_tool_calls(¤t_text_raw); - final_text = parsed.clean_text; - for call in parsed.tool_calls { - let _ = self - .tx_event - .send(Event::ToolCallStarted { - id: call.id.clone(), - name: call.name.clone(), - input: call.args.clone(), - }) - .await; - tool_uses.push(ToolUseState { - id: call.id, - name: call.name, - input: call.args, - caller: None, - input_buffer: String::new(), - }); - } - } - - if !final_text.is_empty() { - content_blocks.push(ContentBlock::Text { - text: final_text, - cache_control: None, - }); - } - for tool in &tool_uses { - content_blocks.push(ContentBlock::ToolUse { - id: tool.id.clone(), - name: tool.name.clone(), - input: tool.input.clone(), - caller: tool.caller.clone(), - }); - } - - if pending_message_complete { - let index = last_text_index.unwrap_or(0); - let _ = self.tx_event.send(Event::MessageComplete { index }).await; - } - - // RLM is a structured tool call (`rlm_query`) handled by the - // normal tool dispatch path; inline ```repl blocks (paper §2) - // are executed below when tool_uses is empty. - // DeepSeek chat API rejects assistant messages that contain only - // Keep thinking for UI stream events, but persist only sendable - // assistant turns in the conversation state. - let has_sendable_assistant_content = content_blocks.iter().any(|block| { - matches!( - block, - ContentBlock::Text { .. } | ContentBlock::ToolUse { .. } - ) - }); - - // Add assistant message to session - if has_sendable_assistant_content { - self.add_session_message(Message { - role: "assistant".to_string(), - content: content_blocks, - }) - .await; - } - - // If no tool uses, check for inline REPL blocks (paper §2) or - // finish the turn. - if tool_uses.is_empty() { - if !pending_steers.is_empty() { - for steer in pending_steers.drain(..) { - self.session - .working_set - .observe_user_message(&steer, &self.session.workspace); - self.add_session_message(Message { - role: "user".to_string(), - content: vec![ContentBlock::Text { - text: steer, - cache_control: None, - }], - }) - .await; - } - turn.next_step(); - continue; - } - - // Inline ```repl execution — paper-spec RLM integration. - if has_sendable_assistant_content - && crate::repl::sandbox::has_repl_block(¤t_text_visible) - { - let repl_blocks = - crate::repl::sandbox::extract_repl_blocks(¤t_text_visible); - let mut runtime = match crate::repl::runtime::PythonRuntime::new().await { - Ok(rt) => rt, - Err(e) => { - let _ = self - .tx_event - .send(Event::status(format!("REPL init failed: {e}"))) - .await; - break; - } - }; - - let mut final_result: Option = None; - for (i, block) in repl_blocks.iter().enumerate() { - let round_num = i + 1; - let _ = self - .tx_event - .send(Event::status(format!( - "REPL round {round_num}: executing..." - ))) - .await; - - match runtime.execute(&block.code).await { - Ok(round) => { - if let Some(val) = &round.final_value { - let _ = self - .tx_event - .send(Event::status(format!( - "REPL round {round_num}: FINAL result obtained" - ))) - .await; - final_result = Some(val.clone()); - break; - } - - // No FINAL — feed truncated stdout back as user metadata. - let feedback = if round.has_error { - format!( - "[REPL round {round_num} error]\nstdout:\n{}\nstderr:\n{}", - round.stdout, round.stderr - ) - } else { - format!("[REPL round {round_num} output]\n{}", round.stdout) - }; - self.add_session_message(Message { - role: "user".to_string(), - content: vec![ContentBlock::Text { - text: feedback, - cache_control: None, - }], - }) - .await; - } - Err(e) => { - let _ = self - .tx_event - .send(Event::status(format!( - "REPL round {round_num} failed: {e}" - ))) - .await; - self.add_session_message(Message { - role: "user".to_string(), - content: vec![ContentBlock::Text { - text: format!( - "[REPL round {round_num} execution failed]\n{e}" - ), - cache_control: None, - }], - }) - .await; - } - } - } - - if let Some(final_val) = final_result { - // Replace the assistant's text with the FINAL answer. - if let Some(last_msg) = self.session.messages.last_mut() - && last_msg.role == "assistant" - { - for block in &mut last_msg.content { - if let ContentBlock::Text { text, .. } = block { - *text = final_val; - break; - } - } - } - self.emit_session_updated().await; - break; - } - - // No FINAL — let the model iterate with the feedback. - turn.next_step(); - continue; - } - - break; - } - - // Execute tools - let tool_exec_lock = self.tool_exec_lock.clone(); - let mcp_pool = if tool_uses - .iter() - .any(|tool| McpPool::is_mcp_tool(&tool.name)) - { - match self.ensure_mcp_pool().await { - Ok(pool) => Some(pool), - Err(err) => { - let _ = self.tx_event.send(Event::status(err.to_string())).await; - None - } - } - } else { - None - }; - - let mut plans: Vec = Vec::with_capacity(tool_uses.len()); - for (index, tool) in tool_uses.iter().enumerate() { - let tool_id = tool.id.clone(); - let tool_name = tool.name.clone(); - let tool_input = tool.input.clone(); - let tool_caller = tool.caller.clone(); - crate::logging::info(format!( - "Planning tool '{}' with input: {:?}", - tool_name, tool_input - )); - - let interactive = (tool_name == "exec_shell" - && tool_input - .get("interactive") - .and_then(serde_json::Value::as_bool) - == Some(true)) - || tool_name == REQUEST_USER_INPUT_NAME; - - let mut approval_required = false; - let mut approval_description = "Tool execution requires approval".to_string(); - let mut supports_parallel = false; - let mut read_only = false; - let mut blocked_error: Option = None; - if maybe_activate_requested_deferred_tool( - &tool_name, - &tool_catalog, - &mut active_tool_names, - ) { - let _ = self - .tx_event - .send(Event::status(format!( - "Auto-loaded deferred tool '{tool_name}' after model request." - ))) - .await; - } - let tool_def = tool_catalog.iter().find(|def| def.name == tool_name); - - if !caller_allowed_for_tool(tool_caller.as_ref(), tool_def) { - blocked_error = Some(ToolError::permission_denied(format!( - "Tool '{tool_name}' does not allow caller '{}'", - caller_type_for_tool_use(tool_caller.as_ref()) - ))); - } - - if blocked_error.is_none() - && tool_def.is_none() - && !McpPool::is_mcp_tool(&tool_name) - && tool_name != CODE_EXECUTION_TOOL_NAME - && !is_tool_search_tool(&tool_name) - { - blocked_error = Some(ToolError::not_available(missing_tool_error_message( - &tool_name, - &tool_catalog, - ))); - } - - if McpPool::is_mcp_tool(&tool_name) { - read_only = mcp_tool_is_read_only(&tool_name); - supports_parallel = mcp_tool_is_parallel_safe(&tool_name); - approval_required = !read_only; - approval_description = mcp_tool_approval_description(&tool_name); - } else if let Some(registry) = tool_registry - && let Some(spec) = registry.get(&tool_name) - { - approval_required = spec.approval_requirement() != ApprovalRequirement::Auto; - approval_description = spec.description().to_string(); - supports_parallel = spec.supports_parallel(); - read_only = spec.is_read_only(); - } else if tool_name == CODE_EXECUTION_TOOL_NAME { - approval_required = true; - approval_description = - "Run model-provided Python code in local execution sandbox".to_string(); - supports_parallel = false; - read_only = false; - } else if is_tool_search_tool(&tool_name) { - approval_required = false; - approval_description = "Search tool catalog".to_string(); - supports_parallel = false; - read_only = true; - } - - plans.push(ToolExecutionPlan { - index, - id: tool_id, - name: tool_name, - input: tool_input, - caller: tool_caller, - interactive, - approval_required, - approval_description, - supports_parallel, - read_only, - blocked_error, - }); - } - - let parallel_allowed = should_parallelize_tool_batch(&plans); - if parallel_allowed && plans.len() > 1 { - let _ = self - .tx_event - .send(Event::status(format!( - "Executing {} read-only tools in parallel", - plans.len() - ))) - .await; - } else if plans.len() > 1 { - let _ = self - .tx_event - .send(Event::status( - "Executing tools sequentially (writes, approvals, or non-parallel tools detected)", - )) - .await; - } - - let mut outcomes: Vec> = Vec::with_capacity(plans.len()); - outcomes.resize_with(plans.len(), || None); - - if parallel_allowed { - let mut tool_tasks = FuturesUnordered::new(); - for plan in plans { - if let Some(err) = plan.blocked_error.clone() { - outcomes[plan.index] = Some(ToolExecOutcome { - index: plan.index, - id: plan.id, - name: plan.name, - input: plan.input, - started_at: Instant::now(), - result: Err(err), - }); - continue; - } - let registry = tool_registry; - let lock = tool_exec_lock.clone(); - let mcp_pool = mcp_pool.clone(); - let tx_event = self.tx_event.clone(); - let started_at = Instant::now(); - - tool_tasks.push(async move { - let result = Engine::execute_tool_with_lock( - lock, - plan.supports_parallel, - plan.interactive, - tx_event.clone(), - plan.name.clone(), - plan.input.clone(), - registry, - mcp_pool, - None, - ) - .await; - - let _ = tx_event - .send(Event::ToolCallComplete { - id: plan.id.clone(), - name: plan.name.clone(), - result: result.clone(), - }) - .await; - - ToolExecOutcome { - index: plan.index, - id: plan.id, - name: plan.name, - input: plan.input, - started_at, - result, - } - }); - } - - while let Some(outcome) = tool_tasks.next().await { - let index = outcome.index; - outcomes[index] = Some(outcome); - } - } else { - for plan in plans { - let tool_id = plan.id.clone(); - let tool_name = plan.name.clone(); - let tool_input = plan.input.clone(); - let tool_caller = plan.caller.clone(); - - if let Some(err) = plan.blocked_error.clone() { - let result = Err(err); - let _ = self - .tx_event - .send(Event::ToolCallComplete { - id: tool_id.clone(), - name: tool_name.clone(), - result: result.clone(), - }) - .await; - outcomes[plan.index] = Some(ToolExecOutcome { - index: plan.index, - id: tool_id, - name: tool_name, - input: tool_input, - started_at: Instant::now(), - result, - }); - continue; - } - - if tool_name == MULTI_TOOL_PARALLEL_NAME { - let started_at = Instant::now(); - let result = self - .execute_parallel_tool( - tool_input.clone(), - tool_registry, - tool_exec_lock.clone(), - ) - .await; - - let _ = self - .tx_event - .send(Event::ToolCallComplete { - id: tool_id.clone(), - name: tool_name.clone(), - result: result.clone(), - }) - .await; - - outcomes[plan.index] = Some(ToolExecOutcome { - index: plan.index, - id: tool_id, - name: tool_name, - input: tool_input, - started_at, - result, - }); - continue; - } - - if tool_name == CODE_EXECUTION_TOOL_NAME { - let started_at = Instant::now(); - let result = - execute_code_execution_tool(&tool_input, &self.session.workspace).await; - - let _ = self - .tx_event - .send(Event::ToolCallComplete { - id: tool_id.clone(), - name: tool_name.clone(), - result: result.clone(), - }) - .await; - - outcomes[plan.index] = Some(ToolExecOutcome { - index: plan.index, - id: tool_id, - name: tool_name, - input: tool_input, - started_at, - result, - }); - continue; - } - - if is_tool_search_tool(&tool_name) { - let started_at = Instant::now(); - let result = execute_tool_search( - &tool_name, - &tool_input, - &tool_catalog, - &mut active_tool_names, - ); - - let _ = self - .tx_event - .send(Event::ToolCallComplete { - id: tool_id.clone(), - name: tool_name.clone(), - result: result.clone(), - }) - .await; - - outcomes[plan.index] = Some(ToolExecOutcome { - index: plan.index, - id: tool_id, - name: tool_name, - input: tool_input, - started_at, - result, - }); - continue; - } - - if tool_name == REQUEST_USER_INPUT_NAME { - let started_at = Instant::now(); - let result = match UserInputRequest::from_value(&tool_input) { - Ok(request) => self.await_user_input(&tool_id, request).await.and_then( - |response| { - ToolResult::json(&response) - .map_err(|e| ToolError::execution_failed(e.to_string())) - }, - ), - Err(err) => Err(err), - }; - - let _ = self - .tx_event - .send(Event::ToolCallComplete { - id: tool_id.clone(), - name: tool_name.clone(), - result: result.clone(), - }) - .await; - - outcomes[plan.index] = Some(ToolExecOutcome { - index: plan.index, - id: tool_id, - name: tool_name, - input: tool_input, - started_at, - result, - }); - continue; - } - - // Handle approval flow: returns (result_override, context_override) - let (result_override, context_override): ( - Option>, - Option, - ) = if plan.approval_required { - emit_tool_audit(json!({ - "event": "tool.approval_required", - "tool_id": tool_id.clone(), - "tool_name": tool_name.clone(), - })); - let approval_key = crate::tools::approval_cache::build_approval_key( - &tool_name, - &tool_input, - ) - .0; - let _ = self - .tx_event - .send(Event::ApprovalRequired { - id: tool_id.clone(), - tool_name: tool_name.clone(), - description: plan.approval_description.clone(), - approval_key, - }) - .await; - - match self.await_tool_approval(&tool_id).await { - Ok(ApprovalResult::Approved) => { - emit_tool_audit(json!({ - "event": "tool.approval_decision", - "tool_id": tool_id.clone(), - "tool_name": tool_name.clone(), - "decision": "approved", - "caller": caller_type_for_tool_use(tool_caller.as_ref()), - })); - (None, None) - } - Ok(ApprovalResult::Denied) => { - emit_tool_audit(json!({ - "event": "tool.approval_decision", - "tool_id": tool_id.clone(), - "tool_name": tool_name.clone(), - "decision": "denied", - "caller": caller_type_for_tool_use(tool_caller.as_ref()), - })); - ( - Some(Err(ToolError::permission_denied(format!( - "Tool '{tool_name}' denied by user" - )))), - None, - ) - } - Ok(ApprovalResult::RetryWithPolicy(policy)) => { - emit_tool_audit(json!({ - "event": "tool.approval_decision", - "tool_id": tool_id.clone(), - "tool_name": tool_name.clone(), - "decision": "retry_with_policy", - "policy": format!("{policy:?}"), - "caller": caller_type_for_tool_use(tool_caller.as_ref()), - })); - let elevated_context = tool_registry.map(|r| { - r.context().clone().with_elevated_sandbox_policy(policy) - }); - (None, elevated_context) - } - Err(err) => (Some(Err(err)), None), - } - } else { - (None, None) - }; - - let started_at = Instant::now(); - let result = if let Some(result_override) = result_override { - result_override - } else { - Self::execute_tool_with_lock( - tool_exec_lock.clone(), - plan.supports_parallel, - plan.interactive, - self.tx_event.clone(), - tool_name.clone(), - tool_input.clone(), - tool_registry, - mcp_pool.clone(), - context_override, - ) - .await - }; - - let _ = self - .tx_event - .send(Event::ToolCallComplete { - id: tool_id.clone(), - name: tool_name.clone(), - result: result.clone(), - }) - .await; - - outcomes[plan.index] = Some(ToolExecOutcome { - index: plan.index, - id: tool_id, - name: tool_name, - input: tool_input, - started_at, - result, - }); - } - } - - let mut step_error_count = 0usize; - // Categorized tool errors collected this step. Feeds the capacity - // controller's error-escalation checkpoint so it can distinguish - // (e.g.) a Tool failure that should escalate from a permission - // denial that should not. - let mut step_error_categories: Vec = Vec::new(); - let mut stop_after_plan_tool = false; - - for outcome in outcomes.into_iter().flatten() { - let duration = outcome.started_at.elapsed(); - let tool_input = outcome.input.clone(); - let tool_name_for_ws = outcome.name.clone(); - let mut tool_call = - TurnToolCall::new(outcome.id.clone(), outcome.name.clone(), outcome.input); - let should_stop_this_turn = - should_stop_after_plan_tool(mode, &outcome.name, &outcome.result); - - match outcome.result { - Ok(output) => { - emit_tool_audit(json!({ - "event": "tool.result", - "tool_id": outcome.id.clone(), - "tool_name": outcome.name.clone(), - "success": output.success, - })); - let output_for_context = compact_tool_result_for_context( - &self.session.model, - &outcome.name, - &output, - ); - let output_content = output.content; - - tool_call.set_result(output_content.clone(), duration); - self.session.working_set.observe_tool_call( - &tool_name_for_ws, - &tool_input, - Some(&output_for_context), - &self.session.workspace, - ); - - // #136: post-edit LSP diagnostics hook. We only run - // this on success — failed edits leave the file - // untouched, so polling for diagnostics would just - // surface stale state. - if output.success { - self.run_post_edit_lsp_hook(&outcome.name, &tool_input) - .await; - } - - self.add_session_message(Message { - role: "user".to_string(), - content: vec![ContentBlock::ToolResult { - tool_use_id: outcome.id, - content: output_for_context, - is_error: None, - content_blocks: None, - }], - }) - .await; - } - Err(e) => { - let envelope: ErrorEnvelope = e.clone().into(); - emit_tool_audit(json!({ - "event": "tool.result", - "tool_id": outcome.id.clone(), - "tool_name": outcome.name.clone(), - "success": false, - "error": e.to_string(), - "category": envelope.category.to_string(), - "severity": envelope.severity.to_string(), - })); - step_error_count += 1; - step_error_categories.push(envelope.category); - let error = format_tool_error(&e, &outcome.name); - tool_call.set_error(error.clone(), duration); - self.session.working_set.observe_tool_call( - &tool_name_for_ws, - &tool_input, - Some(&error), - &self.session.workspace, - ); - self.add_session_message(Message { - role: "user".to_string(), - content: vec![ContentBlock::ToolResult { - tool_use_id: outcome.id, - content: format!("Error: {error}"), - is_error: Some(true), - content_blocks: None, - }], - }) - .await; - } - } - - turn.record_tool_call(tool_call); - stop_after_plan_tool |= should_stop_this_turn; - } - - if stop_after_plan_tool { - break; - } - - if self - .run_capacity_post_tool_checkpoint( - turn, - mode, - tool_registry, - tool_exec_lock.clone(), - mcp_pool.clone(), - step_error_count, - consecutive_tool_error_steps, - ) - .await - { - turn.next_step(); - continue; - } - - if !pending_steers.is_empty() { - for steer in pending_steers.drain(..) { - self.session - .working_set - .observe_user_message(&steer, &self.session.workspace); - self.add_session_message(Message { - role: "user".to_string(), - content: vec![ContentBlock::Text { - text: steer, - cache_control: None, - }], - }) - .await; - } - } - - if step_error_count > 0 { - consecutive_tool_error_steps = consecutive_tool_error_steps.saturating_add(1); - } else { - consecutive_tool_error_steps = 0; - } - - if self - .run_capacity_error_escalation_checkpoint( - turn, - mode, - step_error_count, - consecutive_tool_error_steps, - &step_error_categories, - ) - .await - { - turn.next_step(); - continue; - } - - if consecutive_tool_error_steps >= 3 { - let _ = self - .tx_event - .send(Event::status( - "Stopping after repeated tool failures. Try a narrower scope or adjust approvals.", - )) - .await; - break; - } - - turn.next_step(); - } - - if self.cancel_token.is_cancelled() { - return (TurnOutcomeStatus::Interrupted, None); - } - if let Some(err) = turn_error { - return (TurnOutcomeStatus::Failed, Some(err)); - } - (TurnOutcomeStatus::Completed, None) - } - - async fn run_capacity_pre_request_checkpoint( - &mut self, - turn: &TurnContext, - client: Option<&DeepSeekClient>, - mode: AppMode, - ) -> bool { - let snapshot = self - .capacity_controller - .observe_pre_turn(self.capacity_observation(turn)); - let decision = self - .capacity_controller - .decide(self.turn_counter, snapshot.as_ref()); - self.emit_capacity_decision(turn, snapshot.as_ref(), &decision) - .await; - - if decision.action != GuardrailAction::TargetedContextRefresh { - return false; - } - - self.apply_targeted_context_refresh(turn, client, mode, snapshot.as_ref()) - .await - } - - #[allow(clippy::too_many_arguments)] - async fn run_capacity_post_tool_checkpoint( - &mut self, - turn: &TurnContext, - mode: AppMode, - tool_registry: Option<&crate::tools::ToolRegistry>, - tool_exec_lock: Arc>, - mcp_pool: Option>>, - _step_error_count: usize, - _consecutive_tool_error_steps: u32, - ) -> bool { - let snapshot = self - .capacity_controller - .observe_post_tool(self.capacity_observation(turn)); - let decision = self - .capacity_controller - .decide(self.turn_counter, snapshot.as_ref()); - self.emit_capacity_decision(turn, snapshot.as_ref(), &decision) - .await; - - match decision.action { - GuardrailAction::VerifyWithToolReplay => { - let _ = self - .apply_verify_with_tool_replay( - turn, - mode, - snapshot.as_ref(), - tool_registry, - tool_exec_lock, - mcp_pool, - ) - .await; - false - } - GuardrailAction::VerifyAndReplan => { - self.apply_verify_and_replan(turn, mode, snapshot.as_ref(), "high_risk_post_tool") - .await - } - GuardrailAction::NoIntervention | GuardrailAction::TargetedContextRefresh => false, - } - } - - async fn run_capacity_error_escalation_checkpoint( - &mut self, - turn: &TurnContext, - mode: AppMode, - step_error_count: usize, - consecutive_tool_error_steps: u32, - error_categories: &[ErrorCategory], - ) -> bool { - if step_error_count == 0 && consecutive_tool_error_steps < 2 { - return false; - } - - // Categorize this step's failures by typed `ErrorCategory` rather than - // substring-matching error strings. Context overflow always escalates; - // network / rate-limit / timeout are transient and skip escalation; - // anything else only escalates with consecutive consecutive failures. - let has_context_overflow = error_categories.contains(&ErrorCategory::InvalidInput); - let only_transient = !error_categories.is_empty() - && error_categories.iter().all(|c| { - matches!( - c, - ErrorCategory::Network | ErrorCategory::RateLimit | ErrorCategory::Timeout - ) - }); - if only_transient && !has_context_overflow { - return false; - } - if !has_context_overflow && consecutive_tool_error_steps < 2 { - return false; - } - - let snapshot = self - .capacity_controller - .last_snapshot() - .cloned() - .or_else(|| { - self.capacity_controller - .observe_pre_turn(self.capacity_observation(turn)) - }); - let Some(snapshot) = snapshot else { - return false; - }; - - let repeated_failures = step_error_count >= 2 || consecutive_tool_error_steps >= 2; - let mut forced = snapshot.clone(); - if repeated_failures && !(snapshot.risk_band == RiskBand::High && snapshot.severe) { - forced.risk_band = RiskBand::High; - forced.severe = true; - } - - let decision = self - .capacity_controller - .decide(self.turn_counter, Some(&forced)); - self.emit_capacity_decision(turn, Some(&forced), &decision) - .await; - - if decision.action != GuardrailAction::VerifyAndReplan { - return false; - } - - let category_labels: Vec = error_categories.iter().map(|c| c.to_string()).collect(); - self.apply_verify_and_replan( - turn, - mode, - Some(&forced), - &format!( - "error_escalation: step_errors={}, consecutive_steps={}, categories={}", - step_error_count, - consecutive_tool_error_steps, - category_labels.join(",") - ), - ) - .await - } - - fn capacity_observation(&self, turn: &TurnContext) -> CapacityObservationInput { - let message_window = self.config.capacity.profile_window.max(8) * 3; - let action_count_this_turn = usize::try_from(turn.step) - .unwrap_or(usize::MAX) - .saturating_add(turn.tool_calls.len()) - .saturating_add(1); - let tool_calls_recent_window = self.recent_tool_call_count(message_window); - let unique_reference_ids_recent_window = - self.recent_unique_reference_count(message_window, turn); - let context_window = usize::try_from( - context_window_for_model(&self.session.model).unwrap_or(DEFAULT_CONTEXT_WINDOW_TOKENS), - ) - .unwrap_or(usize::try_from(DEFAULT_CONTEXT_WINDOW_TOKENS).unwrap_or(128_000)) - .max(1); - let context_used_ratio = (self.estimated_input_tokens() as f64) / (context_window as f64); - - CapacityObservationInput { - turn_index: self.turn_counter, - model: self.session.model.clone(), - action_count_this_turn, - tool_calls_recent_window, - unique_reference_ids_recent_window, - context_used_ratio, - } - } - - fn recent_tool_call_count(&self, message_window: usize) -> usize { - self.session - .messages - .iter() - .rev() - .take(message_window) - .map(|msg| { - msg.content - .iter() - .filter(|block| { - matches!( - block, - ContentBlock::ToolUse { .. } | ContentBlock::ToolResult { .. } - ) - }) - .count() - }) - .sum() - } - - fn recent_unique_reference_count(&self, message_window: usize, turn: &TurnContext) -> usize { - let mut refs = std::collections::HashSet::new(); - for msg in self.session.messages.iter().rev().take(message_window) { - for block in &msg.content { - match block { - ContentBlock::ToolUse { id, .. } => { - refs.insert(id.clone()); - } - ContentBlock::ToolResult { tool_use_id, .. } => { - refs.insert(tool_use_id.clone()); - } - ContentBlock::Text { text, .. } => { - for token in text.split_whitespace() { - if token.contains('/') || token.contains('.') { - refs.insert( - token - .trim_matches(|c: char| ",.;:()[]{}".contains(c)) - .to_string(), - ); - } - } - } - ContentBlock::Thinking { .. } - | ContentBlock::ServerToolUse { .. } - | ContentBlock::ToolSearchToolResult { .. } - | ContentBlock::CodeExecutionToolResult { .. } => {} - } - } - } - for tool_call in turn.tool_calls.iter().rev().take(8) { - refs.insert(tool_call.id.clone()); - } - for path in self.session.working_set.top_paths(8) { - refs.insert(path); - } - refs.retain(|item| !item.is_empty()); - refs.len() - } - - async fn emit_coherence_signal(&mut self, signal: CoherenceSignal, reason: impl Into) { - let next = next_coherence_state(self.coherence_state, signal); - self.coherence_state = next; - let _ = self - .tx_event - .send(Event::CoherenceState { - state: next, - label: next.label().to_string(), - description: next.description().to_string(), - reason: reason.into(), - }) - .await; - } - - async fn emit_compaction_started(&mut self, id: String, auto: bool, message: String) { - let _ = self - .tx_event - .send(Event::CompactionStarted { - id, - auto, - message: message.clone(), - }) - .await; - self.emit_coherence_signal(CoherenceSignal::CompactionStarted, message) - .await; - } - - async fn emit_compaction_completed( - &mut self, - id: String, - auto: bool, - message: String, - messages_before: Option, - messages_after: Option, - ) { - let _ = self - .tx_event - .send(Event::CompactionCompleted { - id, - auto, - message: message.clone(), - messages_before, - messages_after, - }) - .await; - self.emit_coherence_signal(CoherenceSignal::CompactionCompleted, message) - .await; - } - - async fn emit_compaction_failed(&mut self, id: String, auto: bool, message: String) { - let _ = self - .tx_event - .send(Event::CompactionFailed { - id, - auto, - message: message.clone(), - }) - .await; - self.emit_coherence_signal(CoherenceSignal::CompactionFailed, message) - .await; - } - - async fn emit_capacity_decision( - &mut self, - turn: &TurnContext, - snapshot: Option<&CapacitySnapshot>, - decision: &CapacityDecision, - ) { - let Some(snapshot) = snapshot else { - return; - }; - let _ = self - .tx_event - .send(Event::CapacityDecision { - session_id: self.session.id.clone(), - turn_id: turn.id.clone(), - h_hat: snapshot.h_hat, - c_hat: snapshot.c_hat, - slack: snapshot.slack, - min_slack: snapshot.profile.min_slack, - violation_ratio: snapshot.profile.violation_ratio, - p_fail: snapshot.p_fail, - risk_band: snapshot.risk_band.as_str().to_string(), - action: decision.action.as_str().to_string(), - cooldown_blocked: decision.cooldown_blocked, - reason: decision.reason.clone(), - }) - .await; - self.emit_coherence_signal( - CoherenceSignal::CapacityDecision { - risk_band: snapshot.risk_band, - action: decision.action, - cooldown_blocked: decision.cooldown_blocked, - }, - format!( - "capacity_decision: risk={} action={} reason={}", - snapshot.risk_band.as_str(), - decision.action.as_str(), - decision.reason - ), - ) - .await; - } - - async fn emit_capacity_intervention( - &mut self, - turn: &TurnContext, - action: GuardrailAction, - before_prompt_tokens: usize, - after_prompt_tokens: usize, - replay_outcome: Option, - replan_performed: bool, - ) { - let _ = self - .tx_event - .send(Event::CapacityIntervention { - session_id: self.session.id.clone(), - turn_id: turn.id.clone(), - action: action.as_str().to_string(), - before_prompt_tokens, - after_prompt_tokens, - compaction_size_reduction: before_prompt_tokens.saturating_sub(after_prompt_tokens), - replay_outcome, - replan_performed, - }) - .await; - self.emit_coherence_signal( - CoherenceSignal::CapacityIntervention { action }, - format!("capacity_intervention: action={}", action.as_str()), - ) - .await; - } - - async fn apply_targeted_context_refresh( - &mut self, - turn: &TurnContext, - client: Option<&DeepSeekClient>, - mode: AppMode, - snapshot: Option<&CapacitySnapshot>, - ) -> bool { - let before_tokens = self.estimated_input_tokens(); - let compaction_pins = self - .session - .working_set - .pinned_message_indices(&self.session.messages, &self.session.workspace); - let compaction_paths = self.session.working_set.top_paths(24); - - let mut refreshed = false; - let should_run_summary_compaction = self.config.compaction.enabled - && should_compact( - &self.session.messages, - &self.config.compaction, - Some(&self.session.workspace), - Some(&compaction_pins), - Some(&compaction_paths), - ); - if should_run_summary_compaction && let Some(client) = client { - match compact_messages_safe( - client, - &self.session.messages, - &self.config.compaction, - Some(&self.session.workspace), - Some(&compaction_pins), - Some(&compaction_paths), - ) - .await - { - Ok(result) => { - if !result.messages.is_empty() || self.session.messages.is_empty() { - self.session.messages = result.messages; - self.merge_compaction_summary(result.summary_prompt); - refreshed = true; - } - } - Err(err) => { - let _ = self - .tx_event - .send(Event::status(format!( - "Capacity refresh compaction failed: {err}. Falling back to local trim." - ))) - .await; - } - } - } - - if !refreshed { - let target_budget = context_input_budget(&self.session.model, TURN_MAX_OUTPUT_TOKENS) - .unwrap_or(self.config.compaction.token_threshold.max(1)); - if self.estimated_input_tokens() > target_budget { - let trimmed = self.trim_oldest_messages_to_budget(target_budget); - refreshed = trimmed > 0; - } - } - - if !refreshed { - return false; - } - - let canonical = self.build_canonical_state(turn, None); - let source_message_ids = self.capacity_source_message_ids(turn); - let record = self.build_capacity_record( - turn, - GuardrailAction::TargetedContextRefresh, - snapshot, - canonical.clone(), - source_message_ids, - None, - ); - let pointer = self - .persist_capacity_record(turn, GuardrailAction::TargetedContextRefresh, &record) - .await; - self.merge_compaction_summary(Some(self.canonical_prompt( - &canonical, - &pointer, - GuardrailAction::TargetedContextRefresh, - None, - ))); - self.refresh_system_prompt(mode); - self.emit_session_updated().await; - - let after_tokens = self.estimated_input_tokens(); - self.emit_capacity_intervention( - turn, - GuardrailAction::TargetedContextRefresh, - before_tokens, - after_tokens, - None, - false, - ) - .await; - self.capacity_controller - .mark_intervention_applied(self.turn_counter, GuardrailAction::TargetedContextRefresh); - true - } - - #[allow(clippy::too_many_arguments)] - async fn apply_verify_with_tool_replay( - &mut self, - turn: &TurnContext, - mode: AppMode, - snapshot: Option<&CapacitySnapshot>, - tool_registry: Option<&crate::tools::ToolRegistry>, - tool_exec_lock: Arc>, - mut mcp_pool: Option>>, - ) -> bool { - let before_tokens = self.estimated_input_tokens(); - let Some(candidate) = self.select_replay_candidate(turn, tool_registry) else { - return false; - }; - - if McpPool::is_mcp_tool(&candidate.name) && mcp_pool.is_none() { - mcp_pool = self.ensure_mcp_pool().await.ok(); - } - - let supports_parallel = if McpPool::is_mcp_tool(&candidate.name) { - mcp_tool_is_parallel_safe(&candidate.name) - } else { - tool_registry - .and_then(|registry| registry.get(&candidate.name)) - .is_some_and(|spec| spec.supports_parallel()) - }; - let interactive = (candidate.name == "exec_shell" - && candidate - .input - .get("interactive") - .and_then(serde_json::Value::as_bool) - == Some(true)) - || candidate.name == REQUEST_USER_INPUT_NAME; - - let replay_result = Self::execute_tool_with_lock( - tool_exec_lock, - supports_parallel, - interactive, - self.tx_event.clone(), - candidate.name.clone(), - candidate.input.clone(), - tool_registry, - mcp_pool.clone(), - None, - ) - .await; - - let (pass, replay_outcome, diff_summary) = match replay_result { - Ok(output) => { - let original = candidate.result.as_deref().unwrap_or_default(); - let replay = output.content.as_str(); - let equal = original.trim() == replay.trim(); - let diff = if equal { - "output_match".to_string() - } else { - format!( - "output_mismatch: original='{}' replay='{}'", - summarize_text(original, 140), - summarize_text(replay, 140) - ) - }; - ( - equal, - if equal { - "pass".to_string() - } else { - "conflict".to_string() - }, - diff, - ) - } - Err(err) => { - self.capacity_controller - .mark_replay_failed(self.turn_counter); - ( - false, - "error".to_string(), - format!("replay_error: {}", summarize_text(&err.to_string(), 180)), - ) - } - }; - - let verification_note = format!( - "[verification replay] tool={} pass={} details={}", - candidate.name, pass, diff_summary - ); - self.add_session_message(Message { - role: "user".to_string(), - content: vec![ContentBlock::ToolResult { - tool_use_id: candidate.id.clone(), - content: verification_note.clone(), - is_error: None, - content_blocks: None, - }], - }) - .await; - - if !pass { - self.capacity_controller - .mark_replay_failed(self.turn_counter); - } - - let canonical = self.build_canonical_state( - turn, - Some(if pass { - "replay verification passed" - } else { - "replay verification failed or conflicted" - }), - ); - let replay_info = Some(ReplayInfo { - tool_id: candidate.id.clone(), - tool_name: candidate.name.clone(), - pass, - diff_summary: diff_summary.clone(), - }); - let source_message_ids = self.capacity_source_message_ids(turn); - let record = self.build_capacity_record( - turn, - GuardrailAction::VerifyWithToolReplay, - snapshot, - canonical.clone(), - source_message_ids, - replay_info, - ); - let pointer = self - .persist_capacity_record(turn, GuardrailAction::VerifyWithToolReplay, &record) - .await; - self.merge_compaction_summary(Some(self.canonical_prompt( - &canonical, - &pointer, - GuardrailAction::VerifyWithToolReplay, - Some(&verification_note), - ))); - self.refresh_system_prompt(mode); - self.emit_session_updated().await; - - let after_tokens = self.estimated_input_tokens(); - self.emit_capacity_intervention( - turn, - GuardrailAction::VerifyWithToolReplay, - before_tokens, - after_tokens, - Some(replay_outcome), - false, - ) - .await; - self.capacity_controller - .mark_intervention_applied(self.turn_counter, GuardrailAction::VerifyWithToolReplay); - true - } - - async fn apply_verify_and_replan( - &mut self, - turn: &TurnContext, - mode: AppMode, - snapshot: Option<&CapacitySnapshot>, - reason: &str, - ) -> bool { - let before_tokens = self.estimated_input_tokens(); - let canonical = self.build_canonical_state(turn, Some(reason)); - let source_message_ids = self.capacity_source_message_ids(turn); - let record = self.build_capacity_record( - turn, - GuardrailAction::VerifyAndReplan, - snapshot, - canonical.clone(), - source_message_ids, - None, - ); - let pointer = self - .persist_capacity_record(turn, GuardrailAction::VerifyAndReplan, &record) - .await; - - let latest_user = self - .session - .messages - .iter() - .rev() - .find(|msg| { - msg.role == "user" - && msg - .content - .iter() - .any(|block| matches!(block, ContentBlock::Text { .. })) - }) - .cloned(); - let latest_verified = self - .session - .messages - .iter() - .rev() - .find(|msg| { - msg.role == "user" - && msg.content.iter().any(|block| match block { - ContentBlock::ToolResult { content, .. } => { - content.contains("[verification replay]") - } - _ => false, - }) - }) - .cloned(); - - self.session.messages.clear(); - if let Some(msg) = latest_user { - self.session.messages.push(msg); - } - if let Some(msg) = latest_verified { - self.session.messages.push(msg); - } - - self.merge_compaction_summary(Some(self.canonical_prompt( - &canonical, - &pointer, - GuardrailAction::VerifyAndReplan, - Some("Replan now from canonical state. Keep steps minimal and verifiable."), - ))); - self.refresh_system_prompt(mode); - self.emit_session_updated().await; - - let _ = self - .tx_event - .send(Event::status( - "Capacity guardrail: context reset to canonical state; replanning step." - .to_string(), - )) - .await; - - let after_tokens = self.estimated_input_tokens(); - self.emit_capacity_intervention( - turn, - GuardrailAction::VerifyAndReplan, - before_tokens, - after_tokens, - None, - true, - ) - .await; - self.capacity_controller - .mark_intervention_applied(self.turn_counter, GuardrailAction::VerifyAndReplan); - true - } - - fn select_replay_candidate( - &self, - turn: &TurnContext, - tool_registry: Option<&crate::tools::ToolRegistry>, - ) -> Option { - turn.tool_calls - .iter() - .rev() - .find(|call| { - call.error.is_none() - && call.result.is_some() - && self.tool_is_replayable_read_only(&call.name, tool_registry) - }) - .cloned() - } - - fn tool_is_replayable_read_only( - &self, - tool_name: &str, - tool_registry: Option<&crate::tools::ToolRegistry>, - ) -> bool { - if tool_name == MULTI_TOOL_PARALLEL_NAME || tool_name == REQUEST_USER_INPUT_NAME { - return false; - } - if McpPool::is_mcp_tool(tool_name) { - return mcp_tool_is_read_only(tool_name); - } - tool_registry - .and_then(|registry| registry.get(tool_name)) - .is_some_and(|spec| spec.is_read_only()) - } - - fn build_canonical_state(&self, turn: &TurnContext, note: Option<&str>) -> CanonicalState { - let goal = self - .session - .messages - .iter() - .rev() - .find_map(|msg| { - if msg.role != "user" { - return None; - } - msg.content.iter().find_map(|block| match block { - ContentBlock::Text { text, .. } => Some(summarize_text(text, 220)), - _ => None, - }) - }) - .unwrap_or_else(|| "Continue current task from compact state".to_string()); - - let mut constraints = vec![ - format!("model={}", self.session.model), - format!("workspace={}", self.session.workspace.display()), - ]; - if let Some(note) = note { - constraints.push(summarize_text(note, 180)); - } - - let mut confirmed_facts = Vec::new(); - for msg in self.session.messages.iter().rev() { - for block in &msg.content { - if let ContentBlock::ToolResult { content, .. } = block { - if content.starts_with("Error:") { - continue; - } - confirmed_facts.push(summarize_text(content, 180)); - if confirmed_facts.len() >= 4 { - break; - } - } - } - if confirmed_facts.len() >= 4 { - break; - } - } - - let open_loops: Vec = turn - .tool_calls - .iter() - .rev() - .filter_map(|call| { - call.error - .as_ref() - .map(|error| format!("{}: {}", call.name, summarize_text(error, 180))) - }) - .take(4) - .collect(); - - let pending_actions: Vec = if open_loops.is_empty() { - vec!["Continue with next smallest verifiable step".to_string()] - } else { - vec![ - "Re-evaluate failed tool steps with narrower scope".to_string(), - "Re-derive plan from canonical facts before further edits".to_string(), - ] - }; - - let mut critical_refs = self.session.working_set.top_paths(8); - for tool_call in turn.tool_calls.iter().rev().take(4) { - critical_refs.push(format!("tool:{}", tool_call.id)); - } - critical_refs.dedup(); - - CanonicalState { - goal, - constraints, - confirmed_facts, - open_loops, - pending_actions, - critical_refs, - } - } - - fn canonical_prompt( - &self, - canonical: &CanonicalState, - pointer: &str, - action: GuardrailAction, - extra: Option<&str>, - ) -> SystemPrompt { - let mut lines = vec![ - COMPACTION_SUMMARY_MARKER.to_string(), - format!("Capacity Canonical State [{}]", action.as_str()), - format!("Goal: {}", canonical.goal), - "Constraints:".to_string(), - ]; - for item in &canonical.constraints { - lines.push(format!("- {}", summarize_text(item, 200))); - } - lines.push("Confirmed Facts:".to_string()); - for item in &canonical.confirmed_facts { - lines.push(format!("- {}", summarize_text(item, 200))); - } - lines.push("Open Loops:".to_string()); - if canonical.open_loops.is_empty() { - lines.push("- none".to_string()); - } else { - for item in &canonical.open_loops { - lines.push(format!("- {}", summarize_text(item, 200))); - } - } - lines.push("Pending Actions:".to_string()); - for item in &canonical.pending_actions { - lines.push(format!("- {}", summarize_text(item, 200))); - } - lines.push("Critical Refs:".to_string()); - for item in &canonical.critical_refs { - lines.push(format!("- {}", summarize_text(item, 200))); - } - if let Some(extra) = extra { - lines.push(format!("Instruction: {}", summarize_text(extra, 240))); - } - lines.push(format!("Memory Pointer: {pointer}")); - - SystemPrompt::Blocks(vec![crate::models::SystemBlock { - block_type: "text".to_string(), - text: lines.join("\n"), - cache_control: None, - }]) - } - - fn capacity_source_message_ids(&self, turn: &TurnContext) -> Vec { - let mut ids: Vec = turn - .tool_calls - .iter() - .rev() - .take(8) - .map(|call| call.id.clone()) - .collect(); - ids.reverse(); - ids - } - - fn build_capacity_record( - &self, - turn: &TurnContext, - action: GuardrailAction, - snapshot: Option<&CapacitySnapshot>, - canonical: CanonicalState, - source_message_ids: Vec, - replay_info: Option, - ) -> CapacityMemoryRecord { - let (h_hat, c_hat, slack, risk_band) = snapshot - .map(|s| (s.h_hat, s.c_hat, s.slack, s.risk_band.as_str().to_string())) - .unwrap_or_else(|| (0.0, 0.0, 0.0, "unknown".to_string())); - - CapacityMemoryRecord { - id: new_record_id(), - ts: now_rfc3339(), - turn_index: self.turn_counter, - action_trigger: action.as_str().to_string(), - h_hat, - c_hat, - slack, - risk_band, - canonical_state: canonical, - source_message_ids: if source_message_ids.is_empty() { - vec![turn.id.clone()] - } else { - source_message_ids - }, - replay_info, - } - } - - async fn persist_capacity_record( - &mut self, - turn: &TurnContext, - action: GuardrailAction, - record: &CapacityMemoryRecord, - ) -> String { - let pointer = format!("memory://{}/{}", self.session.id, record.id); - if let Err(err) = append_capacity_record(&self.session.id, record) { - let _ = self - .tx_event - .send(Event::CapacityMemoryPersistFailed { - session_id: self.session.id.clone(), - turn_id: turn.id.clone(), - action: action.as_str().to_string(), - error: summarize_text(&err.to_string(), 280), - }) - .await; - return format!("{pointer}?persist=failed"); - } - pointer - } - - fn rehydrate_latest_canonical_state(&mut self) { - let Ok(records) = load_last_k_capacity_records(&self.session.id, 1) else { - return; - }; - let Some(last) = records.last() else { - return; - }; - let pointer = format!("memory://{}/{}", self.session.id, last.id); - let prompt = self.canonical_prompt( - &last.canonical_state, - &pointer, - GuardrailAction::NoIntervention, - Some("Rehydrated canonical state from memory."), - ); - self.merge_compaction_summary(Some(prompt)); - } - /// Run the checkpoint-restart cycle boundary if the session has crossed /// its token threshold (issue #124). No-op in the common case. /// @@ -5113,7 +2637,9 @@ pub(crate) fn mock_engine_handle() -> MockEngineHandle { } mod approval; +mod capacity_flow; mod dispatch; +mod turn_loop; use self::approval::{ApprovalDecision, ApprovalResult, UserInputDecision}; use self::dispatch::{ diff --git a/crates/tui/src/core/engine/capacity_flow.rs b/crates/tui/src/core/engine/capacity_flow.rs new file mode 100644 index 00000000..f5fbfe4a --- /dev/null +++ b/crates/tui/src/core/engine/capacity_flow.rs @@ -0,0 +1,973 @@ +//! Capacity-controller checkpoints and interventions for the engine loop. +//! +//! Extracted from `core/engine.rs` for issue #74. The main turn loop still +//! decides when checkpoints run; this module owns the guardrail policy side +//! effects, replay verification, canonical-state persistence, and event +//! emission helpers. + +use super::*; + +impl Engine { + pub(super) async fn run_capacity_pre_request_checkpoint( + &mut self, + turn: &TurnContext, + client: Option<&DeepSeekClient>, + mode: AppMode, + ) -> bool { + let snapshot = self + .capacity_controller + .observe_pre_turn(self.capacity_observation(turn)); + let decision = self + .capacity_controller + .decide(self.turn_counter, snapshot.as_ref()); + self.emit_capacity_decision(turn, snapshot.as_ref(), &decision) + .await; + + if decision.action != GuardrailAction::TargetedContextRefresh { + return false; + } + + self.apply_targeted_context_refresh(turn, client, mode, snapshot.as_ref()) + .await + } + + #[allow(clippy::too_many_arguments)] + pub(super) async fn run_capacity_post_tool_checkpoint( + &mut self, + turn: &TurnContext, + mode: AppMode, + tool_registry: Option<&crate::tools::ToolRegistry>, + tool_exec_lock: Arc>, + mcp_pool: Option>>, + _step_error_count: usize, + _consecutive_tool_error_steps: u32, + ) -> bool { + let snapshot = self + .capacity_controller + .observe_post_tool(self.capacity_observation(turn)); + let decision = self + .capacity_controller + .decide(self.turn_counter, snapshot.as_ref()); + self.emit_capacity_decision(turn, snapshot.as_ref(), &decision) + .await; + + match decision.action { + GuardrailAction::VerifyWithToolReplay => { + let _ = self + .apply_verify_with_tool_replay( + turn, + mode, + snapshot.as_ref(), + tool_registry, + tool_exec_lock, + mcp_pool, + ) + .await; + false + } + GuardrailAction::VerifyAndReplan => { + self.apply_verify_and_replan(turn, mode, snapshot.as_ref(), "high_risk_post_tool") + .await + } + GuardrailAction::NoIntervention | GuardrailAction::TargetedContextRefresh => false, + } + } + + pub(super) async fn run_capacity_error_escalation_checkpoint( + &mut self, + turn: &TurnContext, + mode: AppMode, + step_error_count: usize, + consecutive_tool_error_steps: u32, + error_categories: &[ErrorCategory], + ) -> bool { + if step_error_count == 0 && consecutive_tool_error_steps < 2 { + return false; + } + + // Categorize this step's failures by typed `ErrorCategory` rather than + // substring-matching error strings. Context overflow always escalates; + // network / rate-limit / timeout are transient and skip escalation; + // anything else only escalates with consecutive consecutive failures. + let has_context_overflow = error_categories.contains(&ErrorCategory::InvalidInput); + let only_transient = !error_categories.is_empty() + && error_categories.iter().all(|c| { + matches!( + c, + ErrorCategory::Network | ErrorCategory::RateLimit | ErrorCategory::Timeout + ) + }); + if only_transient && !has_context_overflow { + return false; + } + if !has_context_overflow && consecutive_tool_error_steps < 2 { + return false; + } + + let snapshot = self + .capacity_controller + .last_snapshot() + .cloned() + .or_else(|| { + self.capacity_controller + .observe_pre_turn(self.capacity_observation(turn)) + }); + let Some(snapshot) = snapshot else { + return false; + }; + + let repeated_failures = step_error_count >= 2 || consecutive_tool_error_steps >= 2; + let mut forced = snapshot.clone(); + if repeated_failures && !(snapshot.risk_band == RiskBand::High && snapshot.severe) { + forced.risk_band = RiskBand::High; + forced.severe = true; + } + + let decision = self + .capacity_controller + .decide(self.turn_counter, Some(&forced)); + self.emit_capacity_decision(turn, Some(&forced), &decision) + .await; + + if decision.action != GuardrailAction::VerifyAndReplan { + return false; + } + + let category_labels: Vec = error_categories.iter().map(|c| c.to_string()).collect(); + self.apply_verify_and_replan( + turn, + mode, + Some(&forced), + &format!( + "error_escalation: step_errors={}, consecutive_steps={}, categories={}", + step_error_count, + consecutive_tool_error_steps, + category_labels.join(",") + ), + ) + .await + } + + pub(super) fn capacity_observation(&self, turn: &TurnContext) -> CapacityObservationInput { + let message_window = self.config.capacity.profile_window.max(8) * 3; + let action_count_this_turn = usize::try_from(turn.step) + .unwrap_or(usize::MAX) + .saturating_add(turn.tool_calls.len()) + .saturating_add(1); + let tool_calls_recent_window = self.recent_tool_call_count(message_window); + let unique_reference_ids_recent_window = + self.recent_unique_reference_count(message_window, turn); + let context_window = usize::try_from( + context_window_for_model(&self.session.model).unwrap_or(DEFAULT_CONTEXT_WINDOW_TOKENS), + ) + .unwrap_or(usize::try_from(DEFAULT_CONTEXT_WINDOW_TOKENS).unwrap_or(128_000)) + .max(1); + let context_used_ratio = (self.estimated_input_tokens() as f64) / (context_window as f64); + + CapacityObservationInput { + turn_index: self.turn_counter, + model: self.session.model.clone(), + action_count_this_turn, + tool_calls_recent_window, + unique_reference_ids_recent_window, + context_used_ratio, + } + } + + pub(super) fn recent_tool_call_count(&self, message_window: usize) -> usize { + self.session + .messages + .iter() + .rev() + .take(message_window) + .map(|msg| { + msg.content + .iter() + .filter(|block| { + matches!( + block, + ContentBlock::ToolUse { .. } | ContentBlock::ToolResult { .. } + ) + }) + .count() + }) + .sum() + } + + pub(super) fn recent_unique_reference_count( + &self, + message_window: usize, + turn: &TurnContext, + ) -> usize { + let mut refs = std::collections::HashSet::new(); + for msg in self.session.messages.iter().rev().take(message_window) { + for block in &msg.content { + match block { + ContentBlock::ToolUse { id, .. } => { + refs.insert(id.clone()); + } + ContentBlock::ToolResult { tool_use_id, .. } => { + refs.insert(tool_use_id.clone()); + } + ContentBlock::Text { text, .. } => { + for token in text.split_whitespace() { + if token.contains('/') || token.contains('.') { + refs.insert( + token + .trim_matches(|c: char| ",.;:()[]{}".contains(c)) + .to_string(), + ); + } + } + } + ContentBlock::Thinking { .. } + | ContentBlock::ServerToolUse { .. } + | ContentBlock::ToolSearchToolResult { .. } + | ContentBlock::CodeExecutionToolResult { .. } => {} + } + } + } + for tool_call in turn.tool_calls.iter().rev().take(8) { + refs.insert(tool_call.id.clone()); + } + for path in self.session.working_set.top_paths(8) { + refs.insert(path); + } + refs.retain(|item| !item.is_empty()); + refs.len() + } + + pub(super) async fn emit_coherence_signal( + &mut self, + signal: CoherenceSignal, + reason: impl Into, + ) { + let next = next_coherence_state(self.coherence_state, signal); + self.coherence_state = next; + let _ = self + .tx_event + .send(Event::CoherenceState { + state: next, + label: next.label().to_string(), + description: next.description().to_string(), + reason: reason.into(), + }) + .await; + } + + pub(super) async fn emit_compaction_started( + &mut self, + id: String, + auto: bool, + message: String, + ) { + let _ = self + .tx_event + .send(Event::CompactionStarted { + id, + auto, + message: message.clone(), + }) + .await; + self.emit_coherence_signal(CoherenceSignal::CompactionStarted, message) + .await; + } + + pub(super) async fn emit_compaction_completed( + &mut self, + id: String, + auto: bool, + message: String, + messages_before: Option, + messages_after: Option, + ) { + let _ = self + .tx_event + .send(Event::CompactionCompleted { + id, + auto, + message: message.clone(), + messages_before, + messages_after, + }) + .await; + self.emit_coherence_signal(CoherenceSignal::CompactionCompleted, message) + .await; + } + + pub(super) async fn emit_compaction_failed(&mut self, id: String, auto: bool, message: String) { + let _ = self + .tx_event + .send(Event::CompactionFailed { + id, + auto, + message: message.clone(), + }) + .await; + self.emit_coherence_signal(CoherenceSignal::CompactionFailed, message) + .await; + } + + pub(super) async fn emit_capacity_decision( + &mut self, + turn: &TurnContext, + snapshot: Option<&CapacitySnapshot>, + decision: &CapacityDecision, + ) { + let Some(snapshot) = snapshot else { + return; + }; + let _ = self + .tx_event + .send(Event::CapacityDecision { + session_id: self.session.id.clone(), + turn_id: turn.id.clone(), + h_hat: snapshot.h_hat, + c_hat: snapshot.c_hat, + slack: snapshot.slack, + min_slack: snapshot.profile.min_slack, + violation_ratio: snapshot.profile.violation_ratio, + p_fail: snapshot.p_fail, + risk_band: snapshot.risk_band.as_str().to_string(), + action: decision.action.as_str().to_string(), + cooldown_blocked: decision.cooldown_blocked, + reason: decision.reason.clone(), + }) + .await; + self.emit_coherence_signal( + CoherenceSignal::CapacityDecision { + risk_band: snapshot.risk_band, + action: decision.action, + cooldown_blocked: decision.cooldown_blocked, + }, + format!( + "capacity_decision: risk={} action={} reason={}", + snapshot.risk_band.as_str(), + decision.action.as_str(), + decision.reason + ), + ) + .await; + } + + pub(super) async fn emit_capacity_intervention( + &mut self, + turn: &TurnContext, + action: GuardrailAction, + before_prompt_tokens: usize, + after_prompt_tokens: usize, + replay_outcome: Option, + replan_performed: bool, + ) { + let _ = self + .tx_event + .send(Event::CapacityIntervention { + session_id: self.session.id.clone(), + turn_id: turn.id.clone(), + action: action.as_str().to_string(), + before_prompt_tokens, + after_prompt_tokens, + compaction_size_reduction: before_prompt_tokens.saturating_sub(after_prompt_tokens), + replay_outcome, + replan_performed, + }) + .await; + self.emit_coherence_signal( + CoherenceSignal::CapacityIntervention { action }, + format!("capacity_intervention: action={}", action.as_str()), + ) + .await; + } + + pub(super) async fn apply_targeted_context_refresh( + &mut self, + turn: &TurnContext, + client: Option<&DeepSeekClient>, + mode: AppMode, + snapshot: Option<&CapacitySnapshot>, + ) -> bool { + let before_tokens = self.estimated_input_tokens(); + let compaction_pins = self + .session + .working_set + .pinned_message_indices(&self.session.messages, &self.session.workspace); + let compaction_paths = self.session.working_set.top_paths(24); + + let mut refreshed = false; + let should_run_summary_compaction = self.config.compaction.enabled + && should_compact( + &self.session.messages, + &self.config.compaction, + Some(&self.session.workspace), + Some(&compaction_pins), + Some(&compaction_paths), + ); + if should_run_summary_compaction && let Some(client) = client { + match compact_messages_safe( + client, + &self.session.messages, + &self.config.compaction, + Some(&self.session.workspace), + Some(&compaction_pins), + Some(&compaction_paths), + ) + .await + { + Ok(result) => { + if !result.messages.is_empty() || self.session.messages.is_empty() { + self.session.messages = result.messages; + self.merge_compaction_summary(result.summary_prompt); + refreshed = true; + } + } + Err(err) => { + let _ = self + .tx_event + .send(Event::status(format!( + "Capacity refresh compaction failed: {err}. Falling back to local trim." + ))) + .await; + } + } + } + + if !refreshed { + let target_budget = context_input_budget(&self.session.model, TURN_MAX_OUTPUT_TOKENS) + .unwrap_or(self.config.compaction.token_threshold.max(1)); + if self.estimated_input_tokens() > target_budget { + let trimmed = self.trim_oldest_messages_to_budget(target_budget); + refreshed = trimmed > 0; + } + } + + if !refreshed { + return false; + } + + let canonical = self.build_canonical_state(turn, None); + let source_message_ids = self.capacity_source_message_ids(turn); + let record = self.build_capacity_record( + turn, + GuardrailAction::TargetedContextRefresh, + snapshot, + canonical.clone(), + source_message_ids, + None, + ); + let pointer = self + .persist_capacity_record(turn, GuardrailAction::TargetedContextRefresh, &record) + .await; + self.merge_compaction_summary(Some(self.canonical_prompt( + &canonical, + &pointer, + GuardrailAction::TargetedContextRefresh, + None, + ))); + self.refresh_system_prompt(mode); + self.emit_session_updated().await; + + let after_tokens = self.estimated_input_tokens(); + self.emit_capacity_intervention( + turn, + GuardrailAction::TargetedContextRefresh, + before_tokens, + after_tokens, + None, + false, + ) + .await; + self.capacity_controller + .mark_intervention_applied(self.turn_counter, GuardrailAction::TargetedContextRefresh); + true + } + + #[allow(clippy::too_many_arguments)] + pub(super) async fn apply_verify_with_tool_replay( + &mut self, + turn: &TurnContext, + mode: AppMode, + snapshot: Option<&CapacitySnapshot>, + tool_registry: Option<&crate::tools::ToolRegistry>, + tool_exec_lock: Arc>, + mut mcp_pool: Option>>, + ) -> bool { + let before_tokens = self.estimated_input_tokens(); + let Some(candidate) = self.select_replay_candidate(turn, tool_registry) else { + return false; + }; + + if McpPool::is_mcp_tool(&candidate.name) && mcp_pool.is_none() { + mcp_pool = self.ensure_mcp_pool().await.ok(); + } + + let supports_parallel = if McpPool::is_mcp_tool(&candidate.name) { + mcp_tool_is_parallel_safe(&candidate.name) + } else { + tool_registry + .and_then(|registry| registry.get(&candidate.name)) + .is_some_and(|spec| spec.supports_parallel()) + }; + let interactive = (candidate.name == "exec_shell" + && candidate + .input + .get("interactive") + .and_then(serde_json::Value::as_bool) + == Some(true)) + || candidate.name == REQUEST_USER_INPUT_NAME; + + let replay_result = Self::execute_tool_with_lock( + tool_exec_lock, + supports_parallel, + interactive, + self.tx_event.clone(), + candidate.name.clone(), + candidate.input.clone(), + tool_registry, + mcp_pool.clone(), + None, + ) + .await; + + let (pass, replay_outcome, diff_summary) = match replay_result { + Ok(output) => { + let original = candidate.result.as_deref().unwrap_or_default(); + let replay = output.content.as_str(); + let equal = original.trim() == replay.trim(); + let diff = if equal { + "output_match".to_string() + } else { + format!( + "output_mismatch: original='{}' replay='{}'", + summarize_text(original, 140), + summarize_text(replay, 140) + ) + }; + ( + equal, + if equal { + "pass".to_string() + } else { + "conflict".to_string() + }, + diff, + ) + } + Err(err) => { + self.capacity_controller + .mark_replay_failed(self.turn_counter); + ( + false, + "error".to_string(), + format!("replay_error: {}", summarize_text(&err.to_string(), 180)), + ) + } + }; + + let verification_note = format!( + "[verification replay] tool={} pass={} details={}", + candidate.name, pass, diff_summary + ); + self.add_session_message(Message { + role: "user".to_string(), + content: vec![ContentBlock::ToolResult { + tool_use_id: candidate.id.clone(), + content: verification_note.clone(), + is_error: None, + content_blocks: None, + }], + }) + .await; + + if !pass { + self.capacity_controller + .mark_replay_failed(self.turn_counter); + } + + let canonical = self.build_canonical_state( + turn, + Some(if pass { + "replay verification passed" + } else { + "replay verification failed or conflicted" + }), + ); + let replay_info = Some(ReplayInfo { + tool_id: candidate.id.clone(), + tool_name: candidate.name.clone(), + pass, + diff_summary: diff_summary.clone(), + }); + let source_message_ids = self.capacity_source_message_ids(turn); + let record = self.build_capacity_record( + turn, + GuardrailAction::VerifyWithToolReplay, + snapshot, + canonical.clone(), + source_message_ids, + replay_info, + ); + let pointer = self + .persist_capacity_record(turn, GuardrailAction::VerifyWithToolReplay, &record) + .await; + self.merge_compaction_summary(Some(self.canonical_prompt( + &canonical, + &pointer, + GuardrailAction::VerifyWithToolReplay, + Some(&verification_note), + ))); + self.refresh_system_prompt(mode); + self.emit_session_updated().await; + + let after_tokens = self.estimated_input_tokens(); + self.emit_capacity_intervention( + turn, + GuardrailAction::VerifyWithToolReplay, + before_tokens, + after_tokens, + Some(replay_outcome), + false, + ) + .await; + self.capacity_controller + .mark_intervention_applied(self.turn_counter, GuardrailAction::VerifyWithToolReplay); + true + } + + pub(super) async fn apply_verify_and_replan( + &mut self, + turn: &TurnContext, + mode: AppMode, + snapshot: Option<&CapacitySnapshot>, + reason: &str, + ) -> bool { + let before_tokens = self.estimated_input_tokens(); + let canonical = self.build_canonical_state(turn, Some(reason)); + let source_message_ids = self.capacity_source_message_ids(turn); + let record = self.build_capacity_record( + turn, + GuardrailAction::VerifyAndReplan, + snapshot, + canonical.clone(), + source_message_ids, + None, + ); + let pointer = self + .persist_capacity_record(turn, GuardrailAction::VerifyAndReplan, &record) + .await; + + let latest_user = self + .session + .messages + .iter() + .rev() + .find(|msg| { + msg.role == "user" + && msg + .content + .iter() + .any(|block| matches!(block, ContentBlock::Text { .. })) + }) + .cloned(); + let latest_verified = self + .session + .messages + .iter() + .rev() + .find(|msg| { + msg.role == "user" + && msg.content.iter().any(|block| match block { + ContentBlock::ToolResult { content, .. } => { + content.contains("[verification replay]") + } + _ => false, + }) + }) + .cloned(); + + self.session.messages.clear(); + if let Some(msg) = latest_user { + self.session.messages.push(msg); + } + if let Some(msg) = latest_verified { + self.session.messages.push(msg); + } + + self.merge_compaction_summary(Some(self.canonical_prompt( + &canonical, + &pointer, + GuardrailAction::VerifyAndReplan, + Some("Replan now from canonical state. Keep steps minimal and verifiable."), + ))); + self.refresh_system_prompt(mode); + self.emit_session_updated().await; + + let _ = self + .tx_event + .send(Event::status( + "Capacity guardrail: context reset to canonical state; replanning step." + .to_string(), + )) + .await; + + let after_tokens = self.estimated_input_tokens(); + self.emit_capacity_intervention( + turn, + GuardrailAction::VerifyAndReplan, + before_tokens, + after_tokens, + None, + true, + ) + .await; + self.capacity_controller + .mark_intervention_applied(self.turn_counter, GuardrailAction::VerifyAndReplan); + true + } + + pub(super) fn select_replay_candidate( + &self, + turn: &TurnContext, + tool_registry: Option<&crate::tools::ToolRegistry>, + ) -> Option { + turn.tool_calls + .iter() + .rev() + .find(|call| { + call.error.is_none() + && call.result.is_some() + && self.tool_is_replayable_read_only(&call.name, tool_registry) + }) + .cloned() + } + + pub(super) fn tool_is_replayable_read_only( + &self, + tool_name: &str, + tool_registry: Option<&crate::tools::ToolRegistry>, + ) -> bool { + if tool_name == MULTI_TOOL_PARALLEL_NAME || tool_name == REQUEST_USER_INPUT_NAME { + return false; + } + if McpPool::is_mcp_tool(tool_name) { + return mcp_tool_is_read_only(tool_name); + } + tool_registry + .and_then(|registry| registry.get(tool_name)) + .is_some_and(|spec| spec.is_read_only()) + } + + pub(super) fn build_canonical_state( + &self, + turn: &TurnContext, + note: Option<&str>, + ) -> CanonicalState { + let goal = self + .session + .messages + .iter() + .rev() + .find_map(|msg| { + if msg.role != "user" { + return None; + } + msg.content.iter().find_map(|block| match block { + ContentBlock::Text { text, .. } => Some(summarize_text(text, 220)), + _ => None, + }) + }) + .unwrap_or_else(|| "Continue current task from compact state".to_string()); + + let mut constraints = vec![ + format!("model={}", self.session.model), + format!("workspace={}", self.session.workspace.display()), + ]; + if let Some(note) = note { + constraints.push(summarize_text(note, 180)); + } + + let mut confirmed_facts = Vec::new(); + for msg in self.session.messages.iter().rev() { + for block in &msg.content { + if let ContentBlock::ToolResult { content, .. } = block { + if content.starts_with("Error:") { + continue; + } + confirmed_facts.push(summarize_text(content, 180)); + if confirmed_facts.len() >= 4 { + break; + } + } + } + if confirmed_facts.len() >= 4 { + break; + } + } + + let open_loops: Vec = turn + .tool_calls + .iter() + .rev() + .filter_map(|call| { + call.error + .as_ref() + .map(|error| format!("{}: {}", call.name, summarize_text(error, 180))) + }) + .take(4) + .collect(); + + let pending_actions: Vec = if open_loops.is_empty() { + vec!["Continue with next smallest verifiable step".to_string()] + } else { + vec![ + "Re-evaluate failed tool steps with narrower scope".to_string(), + "Re-derive plan from canonical facts before further edits".to_string(), + ] + }; + + let mut critical_refs = self.session.working_set.top_paths(8); + for tool_call in turn.tool_calls.iter().rev().take(4) { + critical_refs.push(format!("tool:{}", tool_call.id)); + } + critical_refs.dedup(); + + CanonicalState { + goal, + constraints, + confirmed_facts, + open_loops, + pending_actions, + critical_refs, + } + } + + pub(super) fn canonical_prompt( + &self, + canonical: &CanonicalState, + pointer: &str, + action: GuardrailAction, + extra: Option<&str>, + ) -> SystemPrompt { + let mut lines = vec![ + COMPACTION_SUMMARY_MARKER.to_string(), + format!("Capacity Canonical State [{}]", action.as_str()), + format!("Goal: {}", canonical.goal), + "Constraints:".to_string(), + ]; + for item in &canonical.constraints { + lines.push(format!("- {}", summarize_text(item, 200))); + } + lines.push("Confirmed Facts:".to_string()); + for item in &canonical.confirmed_facts { + lines.push(format!("- {}", summarize_text(item, 200))); + } + lines.push("Open Loops:".to_string()); + if canonical.open_loops.is_empty() { + lines.push("- none".to_string()); + } else { + for item in &canonical.open_loops { + lines.push(format!("- {}", summarize_text(item, 200))); + } + } + lines.push("Pending Actions:".to_string()); + for item in &canonical.pending_actions { + lines.push(format!("- {}", summarize_text(item, 200))); + } + lines.push("Critical Refs:".to_string()); + for item in &canonical.critical_refs { + lines.push(format!("- {}", summarize_text(item, 200))); + } + if let Some(extra) = extra { + lines.push(format!("Instruction: {}", summarize_text(extra, 240))); + } + lines.push(format!("Memory Pointer: {pointer}")); + + SystemPrompt::Blocks(vec![crate::models::SystemBlock { + block_type: "text".to_string(), + text: lines.join("\n"), + cache_control: None, + }]) + } + + pub(super) fn capacity_source_message_ids(&self, turn: &TurnContext) -> Vec { + let mut ids: Vec = turn + .tool_calls + .iter() + .rev() + .take(8) + .map(|call| call.id.clone()) + .collect(); + ids.reverse(); + ids + } + + pub(super) fn build_capacity_record( + &self, + turn: &TurnContext, + action: GuardrailAction, + snapshot: Option<&CapacitySnapshot>, + canonical: CanonicalState, + source_message_ids: Vec, + replay_info: Option, + ) -> CapacityMemoryRecord { + let (h_hat, c_hat, slack, risk_band) = snapshot + .map(|s| (s.h_hat, s.c_hat, s.slack, s.risk_band.as_str().to_string())) + .unwrap_or_else(|| (0.0, 0.0, 0.0, "unknown".to_string())); + + CapacityMemoryRecord { + id: new_record_id(), + ts: now_rfc3339(), + turn_index: self.turn_counter, + action_trigger: action.as_str().to_string(), + h_hat, + c_hat, + slack, + risk_band, + canonical_state: canonical, + source_message_ids: if source_message_ids.is_empty() { + vec![turn.id.clone()] + } else { + source_message_ids + }, + replay_info, + } + } + + pub(super) async fn persist_capacity_record( + &mut self, + turn: &TurnContext, + action: GuardrailAction, + record: &CapacityMemoryRecord, + ) -> String { + let pointer = format!("memory://{}/{}", self.session.id, record.id); + if let Err(err) = append_capacity_record(&self.session.id, record) { + let _ = self + .tx_event + .send(Event::CapacityMemoryPersistFailed { + session_id: self.session.id.clone(), + turn_id: turn.id.clone(), + action: action.as_str().to_string(), + error: summarize_text(&err.to_string(), 280), + }) + .await; + return format!("{pointer}?persist=failed"); + } + pointer + } + + pub(super) fn rehydrate_latest_canonical_state(&mut self) { + let Ok(records) = load_last_k_capacity_records(&self.session.id, 1) else { + return; + }; + let Some(last) = records.last() else { + return; + }; + let pointer = format!("memory://{}/{}", self.session.id, last.id); + let prompt = self.canonical_prompt( + &last.canonical_state, + &pointer, + GuardrailAction::NoIntervention, + Some("Rehydrated canonical state from memory."), + ); + self.merge_compaction_summary(Some(prompt)); + } +} diff --git a/crates/tui/src/core/engine/dispatch.rs b/crates/tui/src/core/engine/dispatch.rs index bdc14702..94b8c864 100644 --- a/crates/tui/src/core/engine/dispatch.rs +++ b/crates/tui/src/core/engine/dispatch.rs @@ -13,7 +13,7 @@ //! * The tool execution plan/outcome types the batch driver passes around. //! //! All items are `pub(super)`-only: the public engine surface (Op/Event, -//! `EngineHandle`, `spawn_engine`) stays in `engine/mod.rs`. +//! `EngineHandle`, `spawn_engine`) stays in `core/engine.rs`. use serde_json::json; diff --git a/crates/tui/src/core/engine/turn_loop.rs b/crates/tui/src/core/engine/turn_loop.rs new file mode 100644 index 00000000..48dbe380 --- /dev/null +++ b/crates/tui/src/core/engine/turn_loop.rs @@ -0,0 +1,1540 @@ +//! Main streaming turn loop for the engine. +//! +//! Extracted from `core/engine.rs` for issue #74. This module keeps the +//! existing per-turn orchestration intact: request construction, streaming +//! event handling, tool planning/execution, LSP post-edit hooks, capacity +//! checkpoints, and loop termination. + +use super::*; + +impl Engine { + pub(super) async fn handle_deepseek_turn( + &mut self, + turn: &mut TurnContext, + tool_registry: Option<&crate::tools::ToolRegistry>, + tools: Option>, + mode: AppMode, + force_update_plan_first: bool, + ) -> (TurnOutcomeStatus, Option) { + let client = self + .deepseek_client + .clone() + .expect("DeepSeek client should be configured"); + + let mut consecutive_tool_error_steps = 0u32; + let mut turn_error: Option = None; + let mut context_recovery_attempts = 0u8; + let mut tool_catalog = tools.unwrap_or_default(); + if !tool_catalog.is_empty() { + ensure_advanced_tooling(&mut tool_catalog); + } + let mut active_tool_names = initial_active_tools(&tool_catalog); + + // Transparent stream-retry counter: when the chunked-transfer + // connection dies mid-stream and we got nothing useful out of it + // (no tool calls, no completed text), we silently re-issue the + // SAME request up to MAX_STREAM_RETRIES times before surfacing + // the failure to the user. This is the #103 Phase 3 retry that + // keeps long V4 thinking turns from being killed by transient + // proxy disconnects. + const MAX_STREAM_RETRIES: u32 = 3; + let mut stream_retry_attempts: u32 = 0; + + loop { + if self.cancel_token.is_cancelled() { + let _ = self.tx_event.send(Event::status("Request cancelled")).await; + return (TurnOutcomeStatus::Interrupted, None); + } + + while let Ok(steer) = self.rx_steer.try_recv() { + let steer = steer.trim().to_string(); + if steer.is_empty() { + continue; + } + self.session + .working_set + .observe_user_message(&steer, &self.session.workspace); + self.add_session_message(Message { + role: "user".to_string(), + content: vec![ContentBlock::Text { + text: steer.clone(), + cache_control: None, + }], + }) + .await; + let _ = self + .tx_event + .send(Event::status(format!( + "Steer input accepted: {}", + summarize_text(&steer, 120) + ))) + .await; + } + + // Ensure system prompt is up to date with latest session states + self.refresh_system_prompt(mode); + + if turn.at_max_steps() { + let _ = self + .tx_event + .send(Event::status("Reached maximum steps")) + .await; + break; + } + + let compaction_pins = self + .session + .working_set + .pinned_message_indices(&self.session.messages, &self.session.workspace); + let compaction_paths = self.session.working_set.top_paths(24); + + if self.config.compaction.enabled + && should_compact( + &self.session.messages, + &self.config.compaction, + Some(&self.session.workspace), + Some(&compaction_pins), + Some(&compaction_paths), + ) + { + let compaction_id = format!("compact_{}", &uuid::Uuid::new_v4().to_string()[..8]); + self.emit_compaction_started( + compaction_id.clone(), + true, + "Auto context compaction started".to_string(), + ) + .await; + let _ = self + .tx_event + .send(Event::status("Auto-compacting context...".to_string())) + .await; + let auto_messages_before = self.session.messages.len(); + match compact_messages_safe( + &client, + &self.session.messages, + &self.config.compaction, + Some(&self.session.workspace), + Some(&compaction_pins), + Some(&compaction_paths), + ) + .await + { + Ok(result) => { + // Only update if we got valid messages (never corrupt state) + if !result.messages.is_empty() || self.session.messages.is_empty() { + let auto_messages_after = result.messages.len(); + self.session.messages = result.messages; + self.merge_compaction_summary(result.summary_prompt); + self.emit_session_updated().await; + let removed = auto_messages_before.saturating_sub(auto_messages_after); + let status = if result.retries_used > 0 { + format!( + "Auto-compaction complete: {auto_messages_before} → {auto_messages_after} messages ({removed} removed, {} retries)", + result.retries_used + ) + } else { + format!( + "Auto-compaction complete: {auto_messages_before} → {auto_messages_after} messages ({removed} removed)" + ) + }; + self.emit_compaction_completed( + compaction_id.clone(), + true, + status.clone(), + Some(auto_messages_before), + Some(auto_messages_after), + ) + .await; + let _ = self.tx_event.send(Event::status(status)).await; + } else { + let message = "Auto-compaction skipped: empty result".to_string(); + self.emit_compaction_failed( + compaction_id.clone(), + true, + message.clone(), + ) + .await; + let _ = self.tx_event.send(Event::status(message)).await; + } + } + Err(err) => { + // Log error but continue with original messages (never corrupt) + let message = format!("Auto-compaction failed: {err}"); + self.emit_compaction_failed(compaction_id, true, message.clone()) + .await; + let _ = self.tx_event.send(Event::status(message)).await; + } + } + } + + if self + .run_capacity_pre_request_checkpoint(turn, Some(&client), mode) + .await + { + continue; + } + + if let Some(input_budget) = + context_input_budget(&self.session.model, TURN_MAX_OUTPUT_TOKENS) + { + let estimated_input = self.estimated_input_tokens(); + if estimated_input > input_budget { + if context_recovery_attempts >= MAX_CONTEXT_RECOVERY_ATTEMPTS { + let message = format!( + "Context remains above model limit after {} recovery attempts \ + (~{} token estimate, ~{} budget). Please run /compact or /clear.", + MAX_CONTEXT_RECOVERY_ATTEMPTS, estimated_input, input_budget + ); + turn_error = Some(message.clone()); + let _ = self + .tx_event + .send(Event::error(ErrorEnvelope::context_overflow(message))) + .await; + return (TurnOutcomeStatus::Failed, turn_error); + } + + if self + .recover_context_overflow( + &client, + "preflight token budget", + TURN_MAX_OUTPUT_TOKENS, + ) + .await + { + context_recovery_attempts = context_recovery_attempts.saturating_add(1); + continue; + } + } + } + + // #136: drain any LSP diagnostics collected since the last + // request and inject them as a synthetic user message so the + // model sees compile errors before its next reasoning step. + self.flush_pending_lsp_diagnostics().await; + + // Build the request + let force_update_plan_this_step = force_update_plan_first && turn.tool_calls.is_empty(); + let active_tools = if tool_catalog.is_empty() { + None + } else { + Some(active_tools_for_step( + &tool_catalog, + &active_tool_names, + force_update_plan_this_step, + )) + }; + let request = MessageRequest { + model: self.session.model.clone(), + messages: self.session.messages.clone(), + max_tokens: TURN_MAX_OUTPUT_TOKENS, + system: self.session.system_prompt.clone(), + tools: active_tools.clone(), + tool_choice: if active_tools.is_some() { + Some(json!({ "type": "auto" })) + } else { + None + }, + metadata: None, + thinking: None, + reasoning_effort: self.session.reasoning_effort.clone(), + stream: Some(true), + temperature: None, + top_p: None, + }; + + // Stream the response. Keep the request around (cloned into the + // first call) so we can resend it on a transparent retry below + // when the wire dies before any content was streamed (#103). + let stream_request = request; + let stream_result = client.create_message_stream(stream_request.clone()).await; + let stream = match stream_result { + Ok(s) => { + context_recovery_attempts = 0; + s + } + Err(e) => { + let message = e.to_string(); + if is_context_length_error_message(&message) + && context_recovery_attempts < MAX_CONTEXT_RECOVERY_ATTEMPTS + && self + .recover_context_overflow( + &client, + "provider context-length rejection", + TURN_MAX_OUTPUT_TOKENS, + ) + .await + { + context_recovery_attempts = context_recovery_attempts.saturating_add(1); + continue; + } + turn_error = Some(message.clone()); + let _ = self + .tx_event + .send(Event::error(ErrorEnvelope::classify(message, true))) + .await; + return (TurnOutcomeStatus::Failed, turn_error); + } + }; + // The stream value is itself `Pin>`, which + // is `Unpin`, so we can rebind it on a transparent retry without + // breaking the existing pin invariants. + let mut stream = stream; + + // Track content blocks + let mut content_blocks: Vec = Vec::new(); + let mut current_text_raw = String::new(); + let mut current_text_visible = String::new(); + let mut current_thinking = String::new(); + let mut tool_uses: Vec = Vec::new(); + let mut usage = Usage { + input_tokens: 0, + output_tokens: 0, + ..Usage::default() + }; + let mut current_block_kind: Option = None; + let mut current_tool_index: Option = None; + let mut in_tool_call_block = false; + let mut fake_wrapper_notice_emitted = false; + let mut pending_message_complete = false; + let mut last_text_index: Option = None; + let mut stream_errors = 0u32; + // #103 transparent retry bookkeeping. `any_content_received` flips + // on the first non-MessageStart event so we know whether DeepSeek + // billed us / the user has seen any output for this turn yet. + // This is distinct from the outer `stream_retry_attempts` (which + // restarts the whole turn-step when a stream died with no + // content-block delta delivered to the consumer). + let mut any_content_received = false; + let mut transparent_stream_retries = 0u32; + let mut pending_steers: Vec = Vec::new(); + // `stream_start` is reset on a transparent retry so the wall-clock + // budget restarts with the fresh stream. + let mut stream_start = Instant::now(); + let mut stream_content_bytes: usize = 0; + let chunk_timeout = Duration::from_secs(STREAM_CHUNK_TIMEOUT_SECS); + let max_duration = Duration::from_secs(STREAM_MAX_DURATION_SECS); + + // Process stream events + loop { + let poll_outcome = tokio::select! { + _ = self.cancel_token.cancelled() => None, + result = tokio::time::timeout(chunk_timeout, stream.next()) => { + match result { + Ok(Some(event_result)) => Some(event_result), + Ok(None) => None, // stream ended normally + Err(_) => { + let envelope = StreamError::Stall { + timeout_secs: STREAM_CHUNK_TIMEOUT_SECS, + } + .into_envelope(); + crate::logging::warn(&envelope.message); + let _ = self.tx_event.send(Event::error(envelope)).await; + None + } + } + } + }; + let Some(event_result) = poll_outcome else { + break; + }; + while let Ok(steer) = self.rx_steer.try_recv() { + let steer = steer.trim().to_string(); + if steer.is_empty() { + continue; + } + pending_steers.push(steer.clone()); + let _ = self + .tx_event + .send(Event::status(format!( + "Steer input queued: {}", + summarize_text(&steer, 120) + ))) + .await; + } + + if self.cancel_token.is_cancelled() { + break; + } + + // Guard: max wall-clock duration + if stream_start.elapsed() > max_duration { + let envelope = StreamError::DurationLimit { + limit_secs: STREAM_MAX_DURATION_SECS, + } + .into_envelope(); + crate::logging::warn(&envelope.message); + turn_error.get_or_insert(envelope.message.clone()); + let _ = self.tx_event.send(Event::error(envelope)).await; + break; + } + + // Guard: max accumulated content bytes + if stream_content_bytes > STREAM_MAX_CONTENT_BYTES { + let envelope = StreamError::Overflow { + limit_bytes: STREAM_MAX_CONTENT_BYTES, + } + .into_envelope(); + crate::logging::warn(&envelope.message); + turn_error.get_or_insert(envelope.message.clone()); + let _ = self.tx_event.send(Event::error(envelope)).await; + break; + } + + let event = match event_result { + Ok(e) => { + // Flip on the first non-MessageStart event — that's + // the moment we cross from "stream not yet productive" + // (eligible for transparent retry) into "DeepSeek has + // billed us / user has seen output" (must surface). + if !any_content_received && !matches!(e, StreamEvent::MessageStart { .. }) { + any_content_received = true; + } + e + } + Err(e) => { + stream_errors = stream_errors.saturating_add(1); + let message = e.to_string(); + // #103: when the stream errors before any content was + // streamed AND we still have retry budget, transparently + // resend the request. DeepSeek has not billed for any + // output and the user has seen nothing — re-trying is + // the right user-visible behavior. + if should_transparently_retry_stream( + any_content_received, + transparent_stream_retries, + self.cancel_token.is_cancelled(), + ) { + transparent_stream_retries = + transparent_stream_retries.saturating_add(1); + crate::logging::info(format!( + "Transparent stream retry {}/{} (no content received yet): {}", + transparent_stream_retries, MAX_TRANSPARENT_STREAM_RETRIES, message, + )); + // Drop the failed stream before issuing the new + // request to release the underlying connection. + drop(stream); + match client.create_message_stream(stream_request.clone()).await { + Ok(fresh) => { + stream = fresh; + stream_start = Instant::now(); + // Roll back the error counter — this one + // didn't surface to the user. + stream_errors = stream_errors.saturating_sub(1); + continue; + } + Err(retry_err) => { + let retry_msg = format!("Stream retry failed: {retry_err}"); + turn_error.get_or_insert(retry_msg.clone()); + let _ = self + .tx_event + .send(Event::error(ErrorEnvelope::classify( + retry_msg, true, + ))) + .await; + break; + } + } + } + turn_error.get_or_insert(message.clone()); + let _ = self + .tx_event + .send(Event::error(ErrorEnvelope::classify(message, true))) + .await; + if stream_errors >= MAX_STREAM_ERRORS_BEFORE_FAIL { + break; + } + continue; + } + }; + + match event { + StreamEvent::MessageStart { message } => { + usage = message.usage; + } + StreamEvent::ContentBlockStart { + index, + content_block, + } => match content_block { + ContentBlockStart::Text { text } => { + current_text_raw = text; + current_text_visible.clear(); + in_tool_call_block = false; + let filtered = + filter_tool_call_delta(¤t_text_raw, &mut in_tool_call_block); + if !fake_wrapper_notice_emitted + && filtered.len() < current_text_raw.len() + && contains_fake_tool_wrapper(¤t_text_raw) + { + let _ = + self.tx_event.send(Event::status(FAKE_WRAPPER_NOTICE)).await; + fake_wrapper_notice_emitted = true; + } + current_text_visible.push_str(&filtered); + current_block_kind = Some(ContentBlockKind::Text); + last_text_index = Some(index as usize); + let _ = self + .tx_event + .send(Event::MessageStarted { + index: index as usize, + }) + .await; + } + ContentBlockStart::Thinking { thinking } => { + current_thinking = thinking; + current_block_kind = Some(ContentBlockKind::Thinking); + let _ = self + .tx_event + .send(Event::ThinkingStarted { + index: index as usize, + }) + .await; + } + ContentBlockStart::ToolUse { + id, + name, + input, + caller, + } => { + crate::logging::info(format!( + "Tool '{}' block start. Initial input: {:?}", + name, input + )); + current_block_kind = Some(ContentBlockKind::ToolUse); + current_tool_index = Some(tool_uses.len()); + // ToolCallStarted is deferred to ContentBlockStop — + // see `final_tool_input`. Emitting here would ship + // the placeholder `{}` and the cell would render + // `` / `` literals to the user. + tool_uses.push(ToolUseState { + id, + name, + input, + caller, + input_buffer: String::new(), + }); + } + ContentBlockStart::ServerToolUse { id, name, input } => { + crate::logging::info(format!( + "Server tool '{}' block start. Initial input: {:?}", + name, input + )); + current_block_kind = Some(ContentBlockKind::ToolUse); + current_tool_index = Some(tool_uses.len()); + tool_uses.push(ToolUseState { + id, + name, + input, + caller: None, + input_buffer: String::new(), + }); + } + }, + StreamEvent::ContentBlockDelta { index, delta } => match delta { + Delta::TextDelta { text } => { + stream_content_bytes = stream_content_bytes.saturating_add(text.len()); + current_text_raw.push_str(&text); + let filtered = filter_tool_call_delta(&text, &mut in_tool_call_block); + if !fake_wrapper_notice_emitted + && filtered.len() < text.len() + && contains_fake_tool_wrapper(&text) + { + let _ = + self.tx_event.send(Event::status(FAKE_WRAPPER_NOTICE)).await; + fake_wrapper_notice_emitted = true; + } + if !filtered.is_empty() { + current_text_visible.push_str(&filtered); + let _ = self + .tx_event + .send(Event::MessageDelta { + index: index as usize, + content: filtered, + }) + .await; + } + } + Delta::ThinkingDelta { thinking } => { + stream_content_bytes = + stream_content_bytes.saturating_add(thinking.len()); + current_thinking.push_str(&thinking); + if !thinking.is_empty() { + let _ = self + .tx_event + .send(Event::ThinkingDelta { + index: index as usize, + content: thinking, + }) + .await; + } + } + Delta::InputJsonDelta { partial_json } => { + if let Some(index) = current_tool_index + && let Some(tool_state) = tool_uses.get_mut(index) + { + tool_state.input_buffer.push_str(&partial_json); + crate::logging::info(format!( + "Tool '{}' input delta: {} (buffer now: {})", + tool_state.name, partial_json, tool_state.input_buffer + )); + if let Some(value) = parse_tool_input(&tool_state.input_buffer) { + tool_state.input = value.clone(); + crate::logging::info(format!( + "Tool '{}' input parsed: {:?}", + tool_state.name, value + )); + } + } + } + }, + StreamEvent::ContentBlockStop { index } => { + let stopped_kind = current_block_kind.take(); + match stopped_kind { + Some(ContentBlockKind::Text) => { + pending_message_complete = true; + last_text_index = Some(index as usize); + } + Some(ContentBlockKind::Thinking) => { + let _ = self + .tx_event + .send(Event::ThinkingComplete { + index: index as usize, + }) + .await; + } + Some(ContentBlockKind::ToolUse) | None => {} + } + if matches!(stopped_kind, Some(ContentBlockKind::ToolUse)) + && let Some(index) = current_tool_index.take() + && let Some(tool_state) = tool_uses.get_mut(index) + { + crate::logging::info(format!( + "Tool '{}' block stop. Buffer: '{}', Current input: {:?}", + tool_state.name, tool_state.input_buffer, tool_state.input + )); + if !tool_state.input_buffer.trim().is_empty() { + if let Some(value) = parse_tool_input(&tool_state.input_buffer) { + tool_state.input = value; + crate::logging::info(format!( + "Tool '{}' final input: {:?}", + tool_state.name, tool_state.input + )); + } else { + crate::logging::warn(format!( + "Tool '{}' failed to parse final input buffer: '{}'", + tool_state.name, tool_state.input_buffer + )); + let _ = self + .tx_event + .send(Event::status(format!( + "⚠ Tool '{}' received malformed arguments from model", + tool_state.name + ))) + .await; + } + } else { + crate::logging::warn(format!( + "Tool '{}' input buffer is empty, using initial input: {:?}", + tool_state.name, tool_state.input + )); + } + + // Now that the input is finalized, announce the + // tool call to the UI. Deferring to here is what + // keeps the cell from rendering `` / + // `` placeholders during the brief window + // between block start and the last InputJsonDelta. + let _ = self + .tx_event + .send(Event::ToolCallStarted { + id: tool_state.id.clone(), + name: tool_state.name.clone(), + input: final_tool_input(tool_state), + }) + .await; + } + } + StreamEvent::MessageDelta { + usage: delta_usage, .. + } => { + if let Some(u) = delta_usage { + usage = u; + } + } + StreamEvent::MessageStop | StreamEvent::Ping => {} + } + } + + // #103 Phase 3 — transparent retry. The inner loop above bails + // when reqwest yields chunk decode errors three times in a row; + // most of the time those are recoverable proxy / HTTP/2 issues + // and the request can simply be re-issued. Re-issue silently up + // to MAX_STREAM_RETRIES, but only when the stream produced + // nothing actionable — if any tool call landed or text was + // streamed, ship the partial state to the rest of the turn + // pipeline so we don't double-bill the user by re-running it. + let stream_died_with_nothing = stream_errors > 0 + && tool_uses.is_empty() + && current_text_visible.trim().is_empty() + && current_thinking.trim().is_empty() + && !pending_message_complete; + if stream_died_with_nothing { + if stream_retry_attempts < MAX_STREAM_RETRIES { + stream_retry_attempts = stream_retry_attempts.saturating_add(1); + crate::logging::warn(format!( + "Stream died with no content (attempt {}/{}); retrying request", + stream_retry_attempts, MAX_STREAM_RETRIES + )); + let _ = self + .tx_event + .send(Event::status(format!( + "Connection interrupted; retrying ({}/{})", + stream_retry_attempts, MAX_STREAM_RETRIES + ))) + .await; + // Don't preserve the per-stream `turn_error` — we're + // about to retry, and a successful retry should not + // surface the transient error as the turn outcome. + turn_error = None; + continue; + } + crate::logging::warn(format!( + "Stream retry budget exhausted ({} attempts); failing turn", + stream_retry_attempts + )); + } else if stream_errors == 0 { + // Healthy round → reset retry budget so we don't carry over + // state from a previous bad round. + stream_retry_attempts = 0; + } + + // Update turn usage + turn.add_usage(&usage); + + // Build content blocks. If this assistant turn produced tool + // calls, ensure a Thinking block is present even when the model + // didn't stream any reasoning text — DeepSeek's thinking-mode + // API requires `reasoning_content` to accompany every tool-call + // assistant message in the conversation history. Saving a + // placeholder here keeps the on-disk session structurally + // correct so subsequent requests won't 400. + let needs_thinking_block = + !tool_uses.is_empty() || tool_parser::has_tool_call_markers(¤t_text_raw); + let thinking_to_persist = if !current_thinking.is_empty() { + Some(current_thinking.clone()) + } else if needs_thinking_block { + Some(String::from("(reasoning omitted)")) + } else { + None + }; + if let Some(thinking) = thinking_to_persist { + content_blocks.push(ContentBlock::Thinking { thinking }); + } + let mut final_text = current_text_visible.clone(); + if tool_uses.is_empty() && tool_parser::has_tool_call_markers(¤t_text_raw) { + let parsed = tool_parser::parse_tool_calls(¤t_text_raw); + final_text = parsed.clean_text; + for call in parsed.tool_calls { + let _ = self + .tx_event + .send(Event::ToolCallStarted { + id: call.id.clone(), + name: call.name.clone(), + input: call.args.clone(), + }) + .await; + tool_uses.push(ToolUseState { + id: call.id, + name: call.name, + input: call.args, + caller: None, + input_buffer: String::new(), + }); + } + } + + if !final_text.is_empty() { + content_blocks.push(ContentBlock::Text { + text: final_text, + cache_control: None, + }); + } + for tool in &tool_uses { + content_blocks.push(ContentBlock::ToolUse { + id: tool.id.clone(), + name: tool.name.clone(), + input: tool.input.clone(), + caller: tool.caller.clone(), + }); + } + + if pending_message_complete { + let index = last_text_index.unwrap_or(0); + let _ = self.tx_event.send(Event::MessageComplete { index }).await; + } + + // RLM is a structured tool call (`rlm_query`) handled by the + // normal tool dispatch path; inline ```repl blocks (paper §2) + // are executed below when tool_uses is empty. + // DeepSeek chat API rejects assistant messages that contain only + // Keep thinking for UI stream events, but persist only sendable + // assistant turns in the conversation state. + let has_sendable_assistant_content = content_blocks.iter().any(|block| { + matches!( + block, + ContentBlock::Text { .. } | ContentBlock::ToolUse { .. } + ) + }); + + // Add assistant message to session + if has_sendable_assistant_content { + self.add_session_message(Message { + role: "assistant".to_string(), + content: content_blocks, + }) + .await; + } + + // If no tool uses, check for inline REPL blocks (paper §2) or + // finish the turn. + if tool_uses.is_empty() { + if !pending_steers.is_empty() { + for steer in pending_steers.drain(..) { + self.session + .working_set + .observe_user_message(&steer, &self.session.workspace); + self.add_session_message(Message { + role: "user".to_string(), + content: vec![ContentBlock::Text { + text: steer, + cache_control: None, + }], + }) + .await; + } + turn.next_step(); + continue; + } + + // Inline ```repl execution — paper-spec RLM integration. + if has_sendable_assistant_content + && crate::repl::sandbox::has_repl_block(¤t_text_visible) + { + let repl_blocks = + crate::repl::sandbox::extract_repl_blocks(¤t_text_visible); + let mut runtime = match crate::repl::runtime::PythonRuntime::new().await { + Ok(rt) => rt, + Err(e) => { + let _ = self + .tx_event + .send(Event::status(format!("REPL init failed: {e}"))) + .await; + break; + } + }; + + let mut final_result: Option = None; + for (i, block) in repl_blocks.iter().enumerate() { + let round_num = i + 1; + let _ = self + .tx_event + .send(Event::status(format!( + "REPL round {round_num}: executing..." + ))) + .await; + + match runtime.execute(&block.code).await { + Ok(round) => { + if let Some(val) = &round.final_value { + let _ = self + .tx_event + .send(Event::status(format!( + "REPL round {round_num}: FINAL result obtained" + ))) + .await; + final_result = Some(val.clone()); + break; + } + + // No FINAL — feed truncated stdout back as user metadata. + let feedback = if round.has_error { + format!( + "[REPL round {round_num} error]\nstdout:\n{}\nstderr:\n{}", + round.stdout, round.stderr + ) + } else { + format!("[REPL round {round_num} output]\n{}", round.stdout) + }; + self.add_session_message(Message { + role: "user".to_string(), + content: vec![ContentBlock::Text { + text: feedback, + cache_control: None, + }], + }) + .await; + } + Err(e) => { + let _ = self + .tx_event + .send(Event::status(format!( + "REPL round {round_num} failed: {e}" + ))) + .await; + self.add_session_message(Message { + role: "user".to_string(), + content: vec![ContentBlock::Text { + text: format!( + "[REPL round {round_num} execution failed]\n{e}" + ), + cache_control: None, + }], + }) + .await; + } + } + } + + if let Some(final_val) = final_result { + // Replace the assistant's text with the FINAL answer. + if let Some(last_msg) = self.session.messages.last_mut() + && last_msg.role == "assistant" + { + for block in &mut last_msg.content { + if let ContentBlock::Text { text, .. } = block { + *text = final_val; + break; + } + } + } + self.emit_session_updated().await; + break; + } + + // No FINAL — let the model iterate with the feedback. + turn.next_step(); + continue; + } + + break; + } + + // Execute tools + let tool_exec_lock = self.tool_exec_lock.clone(); + let mcp_pool = if tool_uses + .iter() + .any(|tool| McpPool::is_mcp_tool(&tool.name)) + { + match self.ensure_mcp_pool().await { + Ok(pool) => Some(pool), + Err(err) => { + let _ = self.tx_event.send(Event::status(err.to_string())).await; + None + } + } + } else { + None + }; + + let mut plans: Vec = Vec::with_capacity(tool_uses.len()); + for (index, tool) in tool_uses.iter().enumerate() { + let tool_id = tool.id.clone(); + let tool_name = tool.name.clone(); + let tool_input = tool.input.clone(); + let tool_caller = tool.caller.clone(); + crate::logging::info(format!( + "Planning tool '{}' with input: {:?}", + tool_name, tool_input + )); + + let interactive = (tool_name == "exec_shell" + && tool_input + .get("interactive") + .and_then(serde_json::Value::as_bool) + == Some(true)) + || tool_name == REQUEST_USER_INPUT_NAME; + + let mut approval_required = false; + let mut approval_description = "Tool execution requires approval".to_string(); + let mut supports_parallel = false; + let mut read_only = false; + let mut blocked_error: Option = None; + if maybe_activate_requested_deferred_tool( + &tool_name, + &tool_catalog, + &mut active_tool_names, + ) { + let _ = self + .tx_event + .send(Event::status(format!( + "Auto-loaded deferred tool '{tool_name}' after model request." + ))) + .await; + } + let tool_def = tool_catalog.iter().find(|def| def.name == tool_name); + + if !caller_allowed_for_tool(tool_caller.as_ref(), tool_def) { + blocked_error = Some(ToolError::permission_denied(format!( + "Tool '{tool_name}' does not allow caller '{}'", + caller_type_for_tool_use(tool_caller.as_ref()) + ))); + } + + if blocked_error.is_none() + && tool_def.is_none() + && !McpPool::is_mcp_tool(&tool_name) + && tool_name != CODE_EXECUTION_TOOL_NAME + && !is_tool_search_tool(&tool_name) + { + blocked_error = Some(ToolError::not_available(missing_tool_error_message( + &tool_name, + &tool_catalog, + ))); + } + + if McpPool::is_mcp_tool(&tool_name) { + read_only = mcp_tool_is_read_only(&tool_name); + supports_parallel = mcp_tool_is_parallel_safe(&tool_name); + approval_required = !read_only; + approval_description = mcp_tool_approval_description(&tool_name); + } else if let Some(registry) = tool_registry + && let Some(spec) = registry.get(&tool_name) + { + approval_required = spec.approval_requirement() != ApprovalRequirement::Auto; + approval_description = spec.description().to_string(); + supports_parallel = spec.supports_parallel(); + read_only = spec.is_read_only(); + } else if tool_name == CODE_EXECUTION_TOOL_NAME { + approval_required = true; + approval_description = + "Run model-provided Python code in local execution sandbox".to_string(); + supports_parallel = false; + read_only = false; + } else if is_tool_search_tool(&tool_name) { + approval_required = false; + approval_description = "Search tool catalog".to_string(); + supports_parallel = false; + read_only = true; + } + + plans.push(ToolExecutionPlan { + index, + id: tool_id, + name: tool_name, + input: tool_input, + caller: tool_caller, + interactive, + approval_required, + approval_description, + supports_parallel, + read_only, + blocked_error, + }); + } + + let parallel_allowed = should_parallelize_tool_batch(&plans); + if parallel_allowed && plans.len() > 1 { + let _ = self + .tx_event + .send(Event::status(format!( + "Executing {} read-only tools in parallel", + plans.len() + ))) + .await; + } else if plans.len() > 1 { + let _ = self + .tx_event + .send(Event::status( + "Executing tools sequentially (writes, approvals, or non-parallel tools detected)", + )) + .await; + } + + let mut outcomes: Vec> = Vec::with_capacity(plans.len()); + outcomes.resize_with(plans.len(), || None); + + if parallel_allowed { + let mut tool_tasks = FuturesUnordered::new(); + for plan in plans { + if let Some(err) = plan.blocked_error.clone() { + outcomes[plan.index] = Some(ToolExecOutcome { + index: plan.index, + id: plan.id, + name: plan.name, + input: plan.input, + started_at: Instant::now(), + result: Err(err), + }); + continue; + } + let registry = tool_registry; + let lock = tool_exec_lock.clone(); + let mcp_pool = mcp_pool.clone(); + let tx_event = self.tx_event.clone(); + let started_at = Instant::now(); + + tool_tasks.push(async move { + let result = Engine::execute_tool_with_lock( + lock, + plan.supports_parallel, + plan.interactive, + tx_event.clone(), + plan.name.clone(), + plan.input.clone(), + registry, + mcp_pool, + None, + ) + .await; + + let _ = tx_event + .send(Event::ToolCallComplete { + id: plan.id.clone(), + name: plan.name.clone(), + result: result.clone(), + }) + .await; + + ToolExecOutcome { + index: plan.index, + id: plan.id, + name: plan.name, + input: plan.input, + started_at, + result, + } + }); + } + + while let Some(outcome) = tool_tasks.next().await { + let index = outcome.index; + outcomes[index] = Some(outcome); + } + } else { + for plan in plans { + let tool_id = plan.id.clone(); + let tool_name = plan.name.clone(); + let tool_input = plan.input.clone(); + let tool_caller = plan.caller.clone(); + + if let Some(err) = plan.blocked_error.clone() { + let result = Err(err); + let _ = self + .tx_event + .send(Event::ToolCallComplete { + id: tool_id.clone(), + name: tool_name.clone(), + result: result.clone(), + }) + .await; + outcomes[plan.index] = Some(ToolExecOutcome { + index: plan.index, + id: tool_id, + name: tool_name, + input: tool_input, + started_at: Instant::now(), + result, + }); + continue; + } + + if tool_name == MULTI_TOOL_PARALLEL_NAME { + let started_at = Instant::now(); + let result = self + .execute_parallel_tool( + tool_input.clone(), + tool_registry, + tool_exec_lock.clone(), + ) + .await; + + let _ = self + .tx_event + .send(Event::ToolCallComplete { + id: tool_id.clone(), + name: tool_name.clone(), + result: result.clone(), + }) + .await; + + outcomes[plan.index] = Some(ToolExecOutcome { + index: plan.index, + id: tool_id, + name: tool_name, + input: tool_input, + started_at, + result, + }); + continue; + } + + if tool_name == CODE_EXECUTION_TOOL_NAME { + let started_at = Instant::now(); + let result = + execute_code_execution_tool(&tool_input, &self.session.workspace).await; + + let _ = self + .tx_event + .send(Event::ToolCallComplete { + id: tool_id.clone(), + name: tool_name.clone(), + result: result.clone(), + }) + .await; + + outcomes[plan.index] = Some(ToolExecOutcome { + index: plan.index, + id: tool_id, + name: tool_name, + input: tool_input, + started_at, + result, + }); + continue; + } + + if is_tool_search_tool(&tool_name) { + let started_at = Instant::now(); + let result = execute_tool_search( + &tool_name, + &tool_input, + &tool_catalog, + &mut active_tool_names, + ); + + let _ = self + .tx_event + .send(Event::ToolCallComplete { + id: tool_id.clone(), + name: tool_name.clone(), + result: result.clone(), + }) + .await; + + outcomes[plan.index] = Some(ToolExecOutcome { + index: plan.index, + id: tool_id, + name: tool_name, + input: tool_input, + started_at, + result, + }); + continue; + } + + if tool_name == REQUEST_USER_INPUT_NAME { + let started_at = Instant::now(); + let result = match UserInputRequest::from_value(&tool_input) { + Ok(request) => self.await_user_input(&tool_id, request).await.and_then( + |response| { + ToolResult::json(&response) + .map_err(|e| ToolError::execution_failed(e.to_string())) + }, + ), + Err(err) => Err(err), + }; + + let _ = self + .tx_event + .send(Event::ToolCallComplete { + id: tool_id.clone(), + name: tool_name.clone(), + result: result.clone(), + }) + .await; + + outcomes[plan.index] = Some(ToolExecOutcome { + index: plan.index, + id: tool_id, + name: tool_name, + input: tool_input, + started_at, + result, + }); + continue; + } + + // Handle approval flow: returns (result_override, context_override) + let (result_override, context_override): ( + Option>, + Option, + ) = if plan.approval_required { + emit_tool_audit(json!({ + "event": "tool.approval_required", + "tool_id": tool_id.clone(), + "tool_name": tool_name.clone(), + })); + let approval_key = crate::tools::approval_cache::build_approval_key( + &tool_name, + &tool_input, + ) + .0; + let _ = self + .tx_event + .send(Event::ApprovalRequired { + id: tool_id.clone(), + tool_name: tool_name.clone(), + description: plan.approval_description.clone(), + approval_key, + }) + .await; + + match self.await_tool_approval(&tool_id).await { + Ok(ApprovalResult::Approved) => { + emit_tool_audit(json!({ + "event": "tool.approval_decision", + "tool_id": tool_id.clone(), + "tool_name": tool_name.clone(), + "decision": "approved", + "caller": caller_type_for_tool_use(tool_caller.as_ref()), + })); + (None, None) + } + Ok(ApprovalResult::Denied) => { + emit_tool_audit(json!({ + "event": "tool.approval_decision", + "tool_id": tool_id.clone(), + "tool_name": tool_name.clone(), + "decision": "denied", + "caller": caller_type_for_tool_use(tool_caller.as_ref()), + })); + ( + Some(Err(ToolError::permission_denied(format!( + "Tool '{tool_name}' denied by user" + )))), + None, + ) + } + Ok(ApprovalResult::RetryWithPolicy(policy)) => { + emit_tool_audit(json!({ + "event": "tool.approval_decision", + "tool_id": tool_id.clone(), + "tool_name": tool_name.clone(), + "decision": "retry_with_policy", + "policy": format!("{policy:?}"), + "caller": caller_type_for_tool_use(tool_caller.as_ref()), + })); + let elevated_context = tool_registry.map(|r| { + r.context().clone().with_elevated_sandbox_policy(policy) + }); + (None, elevated_context) + } + Err(err) => (Some(Err(err)), None), + } + } else { + (None, None) + }; + + let started_at = Instant::now(); + let result = if let Some(result_override) = result_override { + result_override + } else { + Self::execute_tool_with_lock( + tool_exec_lock.clone(), + plan.supports_parallel, + plan.interactive, + self.tx_event.clone(), + tool_name.clone(), + tool_input.clone(), + tool_registry, + mcp_pool.clone(), + context_override, + ) + .await + }; + + let _ = self + .tx_event + .send(Event::ToolCallComplete { + id: tool_id.clone(), + name: tool_name.clone(), + result: result.clone(), + }) + .await; + + outcomes[plan.index] = Some(ToolExecOutcome { + index: plan.index, + id: tool_id, + name: tool_name, + input: tool_input, + started_at, + result, + }); + } + } + + let mut step_error_count = 0usize; + // Categorized tool errors collected this step. Feeds the capacity + // controller's error-escalation checkpoint so it can distinguish + // (e.g.) a Tool failure that should escalate from a permission + // denial that should not. + let mut step_error_categories: Vec = Vec::new(); + let mut stop_after_plan_tool = false; + + for outcome in outcomes.into_iter().flatten() { + let duration = outcome.started_at.elapsed(); + let tool_input = outcome.input.clone(); + let tool_name_for_ws = outcome.name.clone(); + let mut tool_call = + TurnToolCall::new(outcome.id.clone(), outcome.name.clone(), outcome.input); + let should_stop_this_turn = + should_stop_after_plan_tool(mode, &outcome.name, &outcome.result); + + match outcome.result { + Ok(output) => { + emit_tool_audit(json!({ + "event": "tool.result", + "tool_id": outcome.id.clone(), + "tool_name": outcome.name.clone(), + "success": output.success, + })); + let output_for_context = compact_tool_result_for_context( + &self.session.model, + &outcome.name, + &output, + ); + let output_content = output.content; + + tool_call.set_result(output_content.clone(), duration); + self.session.working_set.observe_tool_call( + &tool_name_for_ws, + &tool_input, + Some(&output_for_context), + &self.session.workspace, + ); + + // #136: post-edit LSP diagnostics hook. We only run + // this on success — failed edits leave the file + // untouched, so polling for diagnostics would just + // surface stale state. + if output.success { + self.run_post_edit_lsp_hook(&outcome.name, &tool_input) + .await; + } + + self.add_session_message(Message { + role: "user".to_string(), + content: vec![ContentBlock::ToolResult { + tool_use_id: outcome.id, + content: output_for_context, + is_error: None, + content_blocks: None, + }], + }) + .await; + } + Err(e) => { + let envelope: ErrorEnvelope = e.clone().into(); + emit_tool_audit(json!({ + "event": "tool.result", + "tool_id": outcome.id.clone(), + "tool_name": outcome.name.clone(), + "success": false, + "error": e.to_string(), + "category": envelope.category.to_string(), + "severity": envelope.severity.to_string(), + })); + step_error_count += 1; + step_error_categories.push(envelope.category); + let error = format_tool_error(&e, &outcome.name); + tool_call.set_error(error.clone(), duration); + self.session.working_set.observe_tool_call( + &tool_name_for_ws, + &tool_input, + Some(&error), + &self.session.workspace, + ); + self.add_session_message(Message { + role: "user".to_string(), + content: vec![ContentBlock::ToolResult { + tool_use_id: outcome.id, + content: format!("Error: {error}"), + is_error: Some(true), + content_blocks: None, + }], + }) + .await; + } + } + + turn.record_tool_call(tool_call); + stop_after_plan_tool |= should_stop_this_turn; + } + + if stop_after_plan_tool { + break; + } + + if self + .run_capacity_post_tool_checkpoint( + turn, + mode, + tool_registry, + tool_exec_lock.clone(), + mcp_pool.clone(), + step_error_count, + consecutive_tool_error_steps, + ) + .await + { + turn.next_step(); + continue; + } + + if !pending_steers.is_empty() { + for steer in pending_steers.drain(..) { + self.session + .working_set + .observe_user_message(&steer, &self.session.workspace); + self.add_session_message(Message { + role: "user".to_string(), + content: vec![ContentBlock::Text { + text: steer, + cache_control: None, + }], + }) + .await; + } + } + + if step_error_count > 0 { + consecutive_tool_error_steps = consecutive_tool_error_steps.saturating_add(1); + } else { + consecutive_tool_error_steps = 0; + } + + if self + .run_capacity_error_escalation_checkpoint( + turn, + mode, + step_error_count, + consecutive_tool_error_steps, + &step_error_categories, + ) + .await + { + turn.next_step(); + continue; + } + + if consecutive_tool_error_steps >= 3 { + let _ = self + .tx_event + .send(Event::status( + "Stopping after repeated tool failures. Try a narrower scope or adjust approvals.", + )) + .await; + break; + } + + turn.next_step(); + } + + if self.cancel_token.is_cancelled() { + return (TurnOutcomeStatus::Interrupted, None); + } + if let Some(err) = turn_error { + return (TurnOutcomeStatus::Failed, Some(err)); + } + (TurnOutcomeStatus::Completed, None) + } +} diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index cde02e9a..79fbd6f2 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -68,7 +68,9 @@ Current boundary note: ### Core Components - **`core/`** - Main engine components - - `engine.rs` - Agent loop, message processing, tool execution orchestration + - `engine.rs` - Engine state, operation handling, message processing + - `engine/turn_loop.rs` - Streaming turn loop and tool execution orchestration + - `engine/capacity_flow.rs` - Capacity guardrail checkpoints and interventions - `session.rs` - Session state management - `turn.rs` - Turn-based conversation handling - `events.rs` - Event system for UI updates