fix(thinking): finalize streaming thinking block on stream errors and restarts (#1078)
Refactors the thinking-block lifecycle in `run_event_loop` into three named helpers — `start_streaming_thinking_block`, `finalize_current_streaming_thinking`, `stash_reasoning_buffer_into_last_reasoning` — and calls `finalize_current_streaming_thinking` from the engine-error handler so a thinking block that's still active when the stream errors gets drained into the transcript instead of being discarded. This addresses one of the failure modes in #861 ("thinking collapse — thinking blocks freeze, truncate silently, or drop reasoning_content"): the case where a transient stream error mid-thinking left the partial reasoning orphaned in `StreamingState`. Thanks to @reidliu41 — extracting the named helpers is the kind of refactor that pays off the next time we have to touch this lifecycle.
This commit is contained in:
+54
-17
@@ -753,12 +753,9 @@ async fn run_event_loop(
|
||||
// P2.3: thinking lives in the active cell so it groups
|
||||
// visually with the tool calls that follow until the
|
||||
// next assistant prose chunk flushes the group.
|
||||
app.reasoning_buffer.clear();
|
||||
app.reasoning_header = None;
|
||||
app.thinking_started_at = Some(Instant::now());
|
||||
app.streaming_state.reset();
|
||||
app.streaming_state.start_thinking(0, None);
|
||||
let _ = ensure_streaming_thinking_active_entry(app);
|
||||
if start_streaming_thinking_block(app) {
|
||||
transcript_batch_updated = true;
|
||||
}
|
||||
}
|
||||
EngineEvent::ThinkingDelta { content, .. } => {
|
||||
let sanitized = sanitize_stream_chunk(&content);
|
||||
@@ -779,19 +776,10 @@ async fn run_event_loop(
|
||||
}
|
||||
}
|
||||
EngineEvent::ThinkingComplete { .. } => {
|
||||
let duration = app
|
||||
.thinking_started_at
|
||||
.take()
|
||||
.map(|t| t.elapsed().as_secs_f32());
|
||||
let remaining = app.streaming_state.finalize_block_text(0);
|
||||
if finalize_streaming_thinking_active_entry(app, duration, &remaining) {
|
||||
if finalize_current_streaming_thinking(app) {
|
||||
transcript_batch_updated = true;
|
||||
}
|
||||
|
||||
if !app.reasoning_buffer.is_empty() {
|
||||
app.last_reasoning = Some(app.reasoning_buffer.clone());
|
||||
}
|
||||
app.reasoning_buffer.clear();
|
||||
stash_reasoning_buffer_into_last_reasoning(app);
|
||||
}
|
||||
EngineEvent::ToolCallStarted { id, name, input } => {
|
||||
app.pending_tool_uses
|
||||
@@ -3040,6 +3028,7 @@ pub(crate) fn apply_engine_error_to_app(
|
||||
let recoverable = envelope.recoverable;
|
||||
let message = envelope.message.clone();
|
||||
let severity = envelope.severity;
|
||||
finalize_current_streaming_thinking(app);
|
||||
app.streaming_state.reset();
|
||||
app.streaming_message_index = None;
|
||||
app.streaming_thinking_active_entry = None;
|
||||
@@ -3332,6 +3321,54 @@ fn append_streaming_thinking(app: &mut App, entry_idx: usize, text: &str) {
|
||||
}
|
||||
}
|
||||
|
||||
/// Start a new streaming thinking block. If another thinking block is still
|
||||
/// active, first drain its pending UI tail so a late block boundary cannot
|
||||
/// discard content buffered inside `StreamingState`.
|
||||
fn start_streaming_thinking_block(app: &mut App) -> bool {
|
||||
let finalized_previous = if app.streaming_thinking_active_entry.is_some() {
|
||||
let finalized = finalize_current_streaming_thinking(app);
|
||||
stash_reasoning_buffer_into_last_reasoning(app);
|
||||
finalized
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
app.reasoning_buffer.clear();
|
||||
app.reasoning_header = None;
|
||||
app.thinking_started_at = Some(Instant::now());
|
||||
app.streaming_state.reset();
|
||||
app.streaming_state.start_thinking(0, None);
|
||||
let _ = ensure_streaming_thinking_active_entry(app);
|
||||
finalized_previous
|
||||
}
|
||||
|
||||
fn finalize_current_streaming_thinking(app: &mut App) -> bool {
|
||||
let duration = app
|
||||
.thinking_started_at
|
||||
.take()
|
||||
.map(|t| t.elapsed().as_secs_f32());
|
||||
let remaining = app.streaming_state.finalize_block_text(0);
|
||||
finalize_streaming_thinking_active_entry(app, duration, &remaining)
|
||||
}
|
||||
|
||||
fn stash_reasoning_buffer_into_last_reasoning(app: &mut App) {
|
||||
if app.reasoning_buffer.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(existing) = app.last_reasoning.as_mut()
|
||||
&& !existing.is_empty()
|
||||
{
|
||||
if !existing.ends_with('\n') {
|
||||
existing.push('\n');
|
||||
}
|
||||
existing.push_str(&app.reasoning_buffer);
|
||||
} else {
|
||||
app.last_reasoning = Some(app.reasoning_buffer.clone());
|
||||
}
|
||||
app.reasoning_buffer.clear();
|
||||
}
|
||||
|
||||
/// Finalize the in-flight thinking entry in `active_cell`: append the
|
||||
/// collector's remaining buffered text, stop the spinner, and stamp the
|
||||
/// duration. Returns `true` when a thinking entry was finalized (so the
|
||||
|
||||
@@ -3078,6 +3078,36 @@ fn flush_active_cell_finalizes_unclosed_thinking_block() {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn engine_error_finalizes_active_thinking_block() {
|
||||
use crate::error_taxonomy::StreamError;
|
||||
|
||||
let mut app = create_test_app();
|
||||
let entry_idx = ensure_streaming_thinking_active_entry(&mut app);
|
||||
app.thinking_started_at = Some(Instant::now());
|
||||
app.streaming_state.start_thinking(0, None);
|
||||
app.streaming_state.push_content(0, "partial reasoning");
|
||||
|
||||
apply_engine_error_to_app(
|
||||
&mut app,
|
||||
StreamError::Stall { timeout_secs: 60 }.into_envelope(),
|
||||
);
|
||||
|
||||
let active = app.active_cell.as_ref().expect("active thinking remains");
|
||||
let HistoryCell::Thinking {
|
||||
content, streaming, ..
|
||||
} = &active.entries()[entry_idx]
|
||||
else {
|
||||
panic!("expected active thinking cell");
|
||||
};
|
||||
assert!(!*streaming, "error path must stop the thinking spinner");
|
||||
assert!(
|
||||
content.contains("partial reasoning"),
|
||||
"error path must drain pending thinking tail"
|
||||
);
|
||||
assert!(app.streaming_thinking_active_entry.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn second_thinking_block_appends_new_entry_in_same_active_cell() {
|
||||
// Real V4 turns can emit Thinking → Tool → Thinking → Tool before any
|
||||
@@ -3118,6 +3148,47 @@ fn second_thinking_block_appends_new_entry_in_same_active_cell() {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn new_thinking_block_drains_pending_tail_from_previous_block() {
|
||||
let mut app = create_test_app();
|
||||
|
||||
assert!(!start_streaming_thinking_block(&mut app));
|
||||
let first_idx = app
|
||||
.streaming_thinking_active_entry
|
||||
.expect("first thinking entry active");
|
||||
app.reasoning_buffer.push_str("first tail");
|
||||
app.streaming_state.push_content(0, "first tail");
|
||||
|
||||
assert!(start_streaming_thinking_block(&mut app));
|
||||
let second_idx = app
|
||||
.streaming_thinking_active_entry
|
||||
.expect("second thinking entry active");
|
||||
|
||||
let active = app.active_cell.as_ref().expect("active cell exists");
|
||||
assert_ne!(first_idx, second_idx);
|
||||
|
||||
let HistoryCell::Thinking {
|
||||
content, streaming, ..
|
||||
} = &active.entries()[first_idx]
|
||||
else {
|
||||
panic!("expected first thinking cell");
|
||||
};
|
||||
assert!(!*streaming, "previous thinking block should be finalized");
|
||||
assert!(
|
||||
content.contains("first tail"),
|
||||
"pending text must survive a new ThinkingStarted event"
|
||||
);
|
||||
|
||||
assert!(matches!(
|
||||
active.entries()[second_idx],
|
||||
HistoryCell::Thinking {
|
||||
streaming: true,
|
||||
..
|
||||
}
|
||||
));
|
||||
assert_eq!(app.last_reasoning.as_deref(), Some("first tail"));
|
||||
}
|
||||
|
||||
// ---- per-child prompt wiring ----
|
||||
//
|
||||
// Generic tool cells default to `prompts: None`. Reserved for any future
|
||||
|
||||
Reference in New Issue
Block a user