From 34331454874ff8770ec6f21c67a9abd52b4f11a9 Mon Sep 17 00:00:00 2001 From: Reid <61492567+reidliu41@users.noreply.github.com> Date: Fri, 8 May 2026 01:44:39 +0800 Subject: [PATCH] fix(thinking): finalize streaming thinking block on stream errors and restarts (#1078) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refactors the thinking-block lifecycle in `run_event_loop` into three named helpers — `start_streaming_thinking_block`, `finalize_current_streaming_thinking`, `stash_reasoning_buffer_into_last_reasoning` — and calls `finalize_current_streaming_thinking` from the engine-error handler so a thinking block that's still active when the stream errors gets drained into the transcript instead of being discarded. This addresses one of the failure modes in #861 ("thinking collapse — thinking blocks freeze, truncate silently, or drop reasoning_content"): the case where a transient stream error mid-thinking left the partial reasoning orphaned in `StreamingState`. Thanks to @reidliu41 — extracting the named helpers is the kind of refactor that pays off the next time we have to touch this lifecycle. --- crates/tui/src/tui/ui.rs | 71 ++++++++++++++++++++++++++-------- crates/tui/src/tui/ui/tests.rs | 71 ++++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+), 17 deletions(-) diff --git a/crates/tui/src/tui/ui.rs b/crates/tui/src/tui/ui.rs index 43d5abbb..16146c5b 100644 --- a/crates/tui/src/tui/ui.rs +++ b/crates/tui/src/tui/ui.rs @@ -753,12 +753,9 @@ async fn run_event_loop( // P2.3: thinking lives in the active cell so it groups // visually with the tool calls that follow until the // next assistant prose chunk flushes the group. - app.reasoning_buffer.clear(); - app.reasoning_header = None; - app.thinking_started_at = Some(Instant::now()); - app.streaming_state.reset(); - app.streaming_state.start_thinking(0, None); - let _ = ensure_streaming_thinking_active_entry(app); + if start_streaming_thinking_block(app) { + transcript_batch_updated = true; + } } EngineEvent::ThinkingDelta { content, .. } => { let sanitized = sanitize_stream_chunk(&content); @@ -779,19 +776,10 @@ async fn run_event_loop( } } EngineEvent::ThinkingComplete { .. } => { - let duration = app - .thinking_started_at - .take() - .map(|t| t.elapsed().as_secs_f32()); - let remaining = app.streaming_state.finalize_block_text(0); - if finalize_streaming_thinking_active_entry(app, duration, &remaining) { + if finalize_current_streaming_thinking(app) { transcript_batch_updated = true; } - - if !app.reasoning_buffer.is_empty() { - app.last_reasoning = Some(app.reasoning_buffer.clone()); - } - app.reasoning_buffer.clear(); + stash_reasoning_buffer_into_last_reasoning(app); } EngineEvent::ToolCallStarted { id, name, input } => { app.pending_tool_uses @@ -3040,6 +3028,7 @@ pub(crate) fn apply_engine_error_to_app( let recoverable = envelope.recoverable; let message = envelope.message.clone(); let severity = envelope.severity; + finalize_current_streaming_thinking(app); app.streaming_state.reset(); app.streaming_message_index = None; app.streaming_thinking_active_entry = None; @@ -3332,6 +3321,54 @@ fn append_streaming_thinking(app: &mut App, entry_idx: usize, text: &str) { } } +/// Start a new streaming thinking block. If another thinking block is still +/// active, first drain its pending UI tail so a late block boundary cannot +/// discard content buffered inside `StreamingState`. +fn start_streaming_thinking_block(app: &mut App) -> bool { + let finalized_previous = if app.streaming_thinking_active_entry.is_some() { + let finalized = finalize_current_streaming_thinking(app); + stash_reasoning_buffer_into_last_reasoning(app); + finalized + } else { + false + }; + + app.reasoning_buffer.clear(); + app.reasoning_header = None; + app.thinking_started_at = Some(Instant::now()); + app.streaming_state.reset(); + app.streaming_state.start_thinking(0, None); + let _ = ensure_streaming_thinking_active_entry(app); + finalized_previous +} + +fn finalize_current_streaming_thinking(app: &mut App) -> bool { + let duration = app + .thinking_started_at + .take() + .map(|t| t.elapsed().as_secs_f32()); + let remaining = app.streaming_state.finalize_block_text(0); + finalize_streaming_thinking_active_entry(app, duration, &remaining) +} + +fn stash_reasoning_buffer_into_last_reasoning(app: &mut App) { + if app.reasoning_buffer.is_empty() { + return; + } + + if let Some(existing) = app.last_reasoning.as_mut() + && !existing.is_empty() + { + if !existing.ends_with('\n') { + existing.push('\n'); + } + existing.push_str(&app.reasoning_buffer); + } else { + app.last_reasoning = Some(app.reasoning_buffer.clone()); + } + app.reasoning_buffer.clear(); +} + /// Finalize the in-flight thinking entry in `active_cell`: append the /// collector's remaining buffered text, stop the spinner, and stamp the /// duration. Returns `true` when a thinking entry was finalized (so the diff --git a/crates/tui/src/tui/ui/tests.rs b/crates/tui/src/tui/ui/tests.rs index 608f8878..3ec2059b 100644 --- a/crates/tui/src/tui/ui/tests.rs +++ b/crates/tui/src/tui/ui/tests.rs @@ -3078,6 +3078,36 @@ fn flush_active_cell_finalizes_unclosed_thinking_block() { ); } +#[test] +fn engine_error_finalizes_active_thinking_block() { + use crate::error_taxonomy::StreamError; + + let mut app = create_test_app(); + let entry_idx = ensure_streaming_thinking_active_entry(&mut app); + app.thinking_started_at = Some(Instant::now()); + app.streaming_state.start_thinking(0, None); + app.streaming_state.push_content(0, "partial reasoning"); + + apply_engine_error_to_app( + &mut app, + StreamError::Stall { timeout_secs: 60 }.into_envelope(), + ); + + let active = app.active_cell.as_ref().expect("active thinking remains"); + let HistoryCell::Thinking { + content, streaming, .. + } = &active.entries()[entry_idx] + else { + panic!("expected active thinking cell"); + }; + assert!(!*streaming, "error path must stop the thinking spinner"); + assert!( + content.contains("partial reasoning"), + "error path must drain pending thinking tail" + ); + assert!(app.streaming_thinking_active_entry.is_none()); +} + #[test] fn second_thinking_block_appends_new_entry_in_same_active_cell() { // Real V4 turns can emit Thinking → Tool → Thinking → Tool before any @@ -3118,6 +3148,47 @@ fn second_thinking_block_appends_new_entry_in_same_active_cell() { ); } +#[test] +fn new_thinking_block_drains_pending_tail_from_previous_block() { + let mut app = create_test_app(); + + assert!(!start_streaming_thinking_block(&mut app)); + let first_idx = app + .streaming_thinking_active_entry + .expect("first thinking entry active"); + app.reasoning_buffer.push_str("first tail"); + app.streaming_state.push_content(0, "first tail"); + + assert!(start_streaming_thinking_block(&mut app)); + let second_idx = app + .streaming_thinking_active_entry + .expect("second thinking entry active"); + + let active = app.active_cell.as_ref().expect("active cell exists"); + assert_ne!(first_idx, second_idx); + + let HistoryCell::Thinking { + content, streaming, .. + } = &active.entries()[first_idx] + else { + panic!("expected first thinking cell"); + }; + assert!(!*streaming, "previous thinking block should be finalized"); + assert!( + content.contains("first tail"), + "pending text must survive a new ThinkingStarted event" + ); + + assert!(matches!( + active.entries()[second_idx], + HistoryCell::Thinking { + streaming: true, + .. + } + )); + assert_eq!(app.last_reasoning.as_deref(), Some("first tail")); +} + // ---- per-child prompt wiring ---- // // Generic tool cells default to `prompts: None`. Reserved for any future