fix(tui): stop rendering after cancel restore
This commit is contained in:
@@ -309,7 +309,14 @@ impl Engine {
|
||||
// first call) so we can resend it on a transparent retry below
|
||||
// when the wire dies before any content was streamed (#103).
|
||||
let stream_request = request;
|
||||
let stream_result = client.create_message_stream(stream_request.clone()).await;
|
||||
let stream_result = tokio::select! {
|
||||
biased;
|
||||
() = self.cancel_token.cancelled() => {
|
||||
let _ = self.tx_event.send(Event::status("Request cancelled")).await;
|
||||
return (TurnOutcomeStatus::Interrupted, None);
|
||||
}
|
||||
result = client.create_message_stream(stream_request.clone()) => result,
|
||||
};
|
||||
let stream = match stream_result {
|
||||
Ok(s) => {
|
||||
context_recovery_attempts = 0;
|
||||
@@ -391,6 +398,7 @@ impl Engine {
|
||||
// Process stream events
|
||||
loop {
|
||||
let poll_outcome = tokio::select! {
|
||||
biased;
|
||||
_ = self.cancel_token.cancelled() => None,
|
||||
result = tokio::time::timeout(chunk_timeout, stream.next()) => {
|
||||
match result {
|
||||
@@ -487,7 +495,12 @@ impl Engine {
|
||||
// Drop the failed stream before issuing the new
|
||||
// request to release the underlying connection.
|
||||
drop(stream);
|
||||
match client.create_message_stream(stream_request.clone()).await {
|
||||
let retry_stream_result = tokio::select! {
|
||||
biased;
|
||||
() = self.cancel_token.cancelled() => break,
|
||||
result = client.create_message_stream(stream_request.clone()) => result,
|
||||
};
|
||||
match retry_stream_result {
|
||||
Ok(fresh) => {
|
||||
stream = fresh;
|
||||
stream_start = Instant::now();
|
||||
@@ -746,6 +759,11 @@ impl Engine {
|
||||
}
|
||||
}
|
||||
|
||||
if self.cancel_token.is_cancelled() {
|
||||
let _ = self.tx_event.send(Event::status("Request cancelled")).await;
|
||||
return (TurnOutcomeStatus::Interrupted, None);
|
||||
}
|
||||
|
||||
// #103 Phase 3 — transparent retry. The inner loop above bails
|
||||
// when reqwest yields chunk decode errors three times in a row;
|
||||
// most of the time those are recoverable proxy / HTTP/2 issues
|
||||
|
||||
@@ -1018,6 +1018,11 @@ pub struct App {
|
||||
pub last_exec_wait_command: Option<String>,
|
||||
/// Current streaming assistant cell
|
||||
pub streaming_message_index: Option<usize>,
|
||||
/// True after a local cancel key has been handled and before the engine's
|
||||
/// authoritative TurnComplete arrives. Stream events already queued for
|
||||
/// the cancelled turn are ignored so text does not keep appearing after
|
||||
/// Ctrl+C/Esc returns focus to the composer.
|
||||
pub suppress_stream_events_until_turn_complete: bool,
|
||||
/// Index into `active_cell.entries` of the thinking entry currently being
|
||||
/// streamed. `None` when no thinking block is in flight. P2.3 routes
|
||||
/// thinking into the active cell so it groups visually with tool calls
|
||||
@@ -1598,6 +1603,7 @@ impl App {
|
||||
ignored_tool_calls: HashSet::new(),
|
||||
last_exec_wait_command: None,
|
||||
streaming_message_index: None,
|
||||
suppress_stream_events_until_turn_complete: false,
|
||||
streaming_thinking_active_entry: None,
|
||||
streaming_state: StreamingState::new(),
|
||||
reasoning_buffer: String::new(),
|
||||
|
||||
+74
-34
@@ -936,6 +936,22 @@ async fn run_event_loop(
|
||||
let mut rx = engine_handle.rx_event.write().await;
|
||||
while let Ok(event) = rx.try_recv() {
|
||||
received_engine_event = true;
|
||||
if app.suppress_stream_events_until_turn_complete {
|
||||
if matches!(event, EngineEvent::TurnStarted { .. }) {
|
||||
// Ctrl+C can race with the engine's per-turn token
|
||||
// reset: the first cancel may hit the previous token
|
||||
// if SendMessage is queued but TurnStarted has not
|
||||
// arrived yet. Reassert cancellation once the real
|
||||
// turn starts, then keep hiding its queued deltas.
|
||||
engine_handle.cancel();
|
||||
continue;
|
||||
}
|
||||
if suppress_engine_event_after_local_cancel(&event) {
|
||||
continue;
|
||||
}
|
||||
} else if !app.is_loading && ignore_stale_stream_event_while_idle(&event) {
|
||||
continue;
|
||||
}
|
||||
match event {
|
||||
EngineEvent::MessageStarted { .. } => {
|
||||
// Assistant text starting after parallel tool work
|
||||
@@ -1242,6 +1258,7 @@ async fn run_event_loop(
|
||||
}
|
||||
}
|
||||
EngineEvent::TurnStarted { turn_id } => {
|
||||
app.suppress_stream_events_until_turn_complete = false;
|
||||
app.is_loading = true;
|
||||
app.offline_mode = false;
|
||||
app.turn_error_posted = false;
|
||||
@@ -1275,6 +1292,8 @@ async fn run_event_loop(
|
||||
status,
|
||||
error,
|
||||
} => {
|
||||
let was_locally_cancelled = app.suppress_stream_events_until_turn_complete;
|
||||
app.suppress_stream_events_until_turn_complete = false;
|
||||
if !matches!(status, crate::core::events::TurnOutcomeStatus::Completed)
|
||||
|| draws_since_last_full_repaint >= PERIODIC_FULL_REPAINT_EVERY_N
|
||||
{
|
||||
@@ -1302,6 +1321,9 @@ async fn run_event_loop(
|
||||
app.dispatch_started_at = None;
|
||||
app.offline_mode = false;
|
||||
app.streaming_state.reset();
|
||||
if was_locally_cancelled {
|
||||
current_streaming_text.clear();
|
||||
}
|
||||
// Capture elapsed before clearing turn_started_at so
|
||||
// notifications can use the real wall-clock duration.
|
||||
let turn_elapsed =
|
||||
@@ -2728,17 +2750,9 @@ async fn run_event_loop(
|
||||
}
|
||||
CtrlCDisposition::CancelTurn => {
|
||||
engine_handle.cancel();
|
||||
app.is_loading = false;
|
||||
app.dispatch_started_at = None;
|
||||
app.streaming_state.reset();
|
||||
mark_active_turn_cancelled_locally(app);
|
||||
current_streaming_text.clear();
|
||||
let prompt_restored = app.restore_last_submitted_prompt_if_empty();
|
||||
// Optimistically clear the turn-in-progress flag
|
||||
// so the footer wave animation halts immediately —
|
||||
// without this, the strip keeps animating until
|
||||
// the engine eventually emits TurnComplete (#5a).
|
||||
// The engine's eventual TurnComplete event will
|
||||
// overwrite with the real outcome ("interrupted").
|
||||
app.runtime_turn_status = None;
|
||||
app.status_message = Some(
|
||||
if prompt_restored {
|
||||
"Request cancelled; prompt restored to composer"
|
||||
@@ -2793,24 +2807,8 @@ async fn run_event_loop(
|
||||
EscapeAction::CancelRequest => {
|
||||
app.backtrack.reset();
|
||||
engine_handle.cancel();
|
||||
app.is_loading = false;
|
||||
app.dispatch_started_at = None;
|
||||
app.streaming_state.reset();
|
||||
// Optimistically halt the wave + working label —
|
||||
// engine's TurnComplete will resync with the real
|
||||
// outcome. Fixes #5a (wave kept animating after Esc).
|
||||
app.runtime_turn_status = None;
|
||||
// Finalize any in-flight tool entries optimistically so
|
||||
// the composer regains focus and the footer's "tool ...
|
||||
// · X active" chip clears immediately rather than
|
||||
// waiting for the engine's TurnComplete echo to drain.
|
||||
// Idempotent with the TurnComplete handler that runs
|
||||
// when the engine actually echoes the cancel (#243).
|
||||
// Background sub-agents continue running — they are
|
||||
// tracked via `subagent_cache` independently of the
|
||||
// foreground turn.
|
||||
app.finalize_active_cell_as_interrupted();
|
||||
app.finalize_streaming_assistant_as_interrupted();
|
||||
mark_active_turn_cancelled_locally(app);
|
||||
current_streaming_text.clear();
|
||||
app.status_message = Some("Request cancelled".to_string());
|
||||
}
|
||||
EscapeAction::DiscardQueuedDraft => {
|
||||
@@ -5990,12 +5988,7 @@ async fn handle_view_events(
|
||||
ViewEvent::ShellControlCancel => {
|
||||
app.backtrack.reset();
|
||||
engine_handle.cancel();
|
||||
app.is_loading = false;
|
||||
app.dispatch_started_at = None;
|
||||
app.streaming_state.reset();
|
||||
app.runtime_turn_status = None;
|
||||
app.finalize_active_cell_as_interrupted();
|
||||
app.finalize_streaming_assistant_as_interrupted();
|
||||
mark_active_turn_cancelled_locally(app);
|
||||
app.status_message = Some("Request cancelled".to_string());
|
||||
}
|
||||
}
|
||||
@@ -6004,6 +5997,53 @@ async fn handle_view_events(
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
fn mark_active_turn_cancelled_locally(app: &mut App) {
|
||||
app.is_loading = false;
|
||||
app.dispatch_started_at = None;
|
||||
app.streaming_state.reset();
|
||||
app.runtime_turn_status = None;
|
||||
app.suppress_stream_events_until_turn_complete = true;
|
||||
app.finalize_active_cell_as_interrupted();
|
||||
app.finalize_streaming_assistant_as_interrupted();
|
||||
}
|
||||
|
||||
fn suppress_engine_event_after_local_cancel(event: &EngineEvent) -> bool {
|
||||
matches!(
|
||||
event,
|
||||
EngineEvent::MessageStarted { .. }
|
||||
| EngineEvent::MessageDelta { .. }
|
||||
| EngineEvent::MessageComplete { .. }
|
||||
| EngineEvent::ThinkingStarted { .. }
|
||||
| EngineEvent::ThinkingDelta { .. }
|
||||
| EngineEvent::ThinkingComplete { .. }
|
||||
| EngineEvent::ToolCallStarted { .. }
|
||||
| EngineEvent::ToolCallProgress { .. }
|
||||
| EngineEvent::ToolCallComplete { .. }
|
||||
| EngineEvent::ApprovalRequired { .. }
|
||||
| EngineEvent::UserInputRequired { .. }
|
||||
| EngineEvent::ElevationRequired { .. }
|
||||
| EngineEvent::SessionUpdated { .. }
|
||||
)
|
||||
}
|
||||
|
||||
fn ignore_stale_stream_event_while_idle(event: &EngineEvent) -> bool {
|
||||
matches!(
|
||||
event,
|
||||
EngineEvent::MessageStarted { .. }
|
||||
| EngineEvent::MessageDelta { .. }
|
||||
| EngineEvent::MessageComplete { .. }
|
||||
| EngineEvent::ThinkingStarted { .. }
|
||||
| EngineEvent::ThinkingDelta { .. }
|
||||
| EngineEvent::ThinkingComplete { .. }
|
||||
| EngineEvent::ToolCallStarted { .. }
|
||||
| EngineEvent::ToolCallProgress { .. }
|
||||
| EngineEvent::ToolCallComplete { .. }
|
||||
| EngineEvent::ApprovalRequired { .. }
|
||||
| EngineEvent::UserInputRequired { .. }
|
||||
| EngineEvent::ElevationRequired { .. }
|
||||
)
|
||||
}
|
||||
|
||||
/// Push the new `selected_idx` into the live transcript overlay so the
|
||||
/// highlight follows the user's Left/Right input. No-op if the overlay is
|
||||
/// no longer on top (e.g. it was closed underneath us).
|
||||
|
||||
@@ -2659,6 +2659,57 @@ fn test_ctrl_c_cancels_streaming_sets_status() {
|
||||
assert_eq!(app.status_message, Some("Request cancelled".to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn local_cancel_marks_late_stream_events_for_suppression() {
|
||||
let mut app = create_test_app();
|
||||
app.is_loading = true;
|
||||
app.streaming_state.start_text(0, None);
|
||||
|
||||
mark_active_turn_cancelled_locally(&mut app);
|
||||
|
||||
assert!(!app.is_loading);
|
||||
assert!(app.suppress_stream_events_until_turn_complete);
|
||||
assert!(suppress_engine_event_after_local_cancel(
|
||||
&EngineEvent::MessageDelta {
|
||||
index: 0,
|
||||
content: "late text".to_string(),
|
||||
}
|
||||
));
|
||||
assert!(suppress_engine_event_after_local_cancel(
|
||||
&EngineEvent::ThinkingDelta {
|
||||
index: 0,
|
||||
content: "late thinking".to_string(),
|
||||
}
|
||||
));
|
||||
assert!(suppress_engine_event_after_local_cancel(
|
||||
&EngineEvent::SessionUpdated {
|
||||
session_id: "session".to_string(),
|
||||
messages: Vec::new(),
|
||||
system_prompt: None,
|
||||
model: "deepseek-v4-flash".to_string(),
|
||||
workspace: PathBuf::from("."),
|
||||
}
|
||||
));
|
||||
assert!(ignore_stale_stream_event_while_idle(
|
||||
&EngineEvent::MessageDelta {
|
||||
index: 0,
|
||||
content: "late text".to_string(),
|
||||
}
|
||||
));
|
||||
assert!(!suppress_engine_event_after_local_cancel(
|
||||
&EngineEvent::TurnComplete {
|
||||
usage: Usage::default(),
|
||||
status: crate::core::events::TurnOutcomeStatus::Interrupted,
|
||||
error: None,
|
||||
}
|
||||
));
|
||||
assert!(!suppress_engine_event_after_local_cancel(
|
||||
&EngineEvent::Status {
|
||||
message: "Request cancelled".to_string(),
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ctrl_c_exits_when_not_loading() {
|
||||
let mut app = create_test_app();
|
||||
|
||||
Reference in New Issue
Block a user