feat(tui): #122 Esc-to-steer + queue visibility

While a turn is running, Esc with non-empty composer input now steers:
the typed text is captured, the in-flight HTTP request is cancelled,
and on TurnComplete::Interrupted every accumulated steer is merged
into a single fresh user message that re-enters the engine. Empty-input
Esc still cancels exactly as before.

State machine. SubmitDisposition::{Immediate, Queue, Steer} replaces
the implicit if/else in submit_or_steer_message; truth table preserves
the offline_mode + busy fallback path. App.pending_steers /
rejected_steers / submit_pending_steers_after_interrupt back the new
flow and the queue-visibility widget.

Partial save. Deliberate divergence from openai/codex which discards
on abort: V4 thinking is expensive, so the streaming Assistant cell is
tagged '[interrupted] …' (or '[interrupted]' when nothing streamed yet)
and its spinner is flipped off. The TurnComplete handler also calls the
helper so Ctrl+C / network failures get the same treatment, idempotent
with the optimistic call in the Esc handler.

Queue visibility. PendingInputPreview already supported all three
buckets; build_pending_input_preview now populates pending_steers and
rejected_steers alongside queued_messages. rejected_steers stays empty
under today's engine paths (no rejection signal yet) but renders if/when
populated.

Recovery. If TurnComplete arrives with Failed instead of Interrupted
while pending_steers is non-empty, the steers are demoted to the
visible queue so they're not silently lost.

Tests. 13 new app-level units cover the disposition truth table,
push/drain semantics, double-Esc idempotency, and the partial-save
helper. 11 new ui-level units cover Esc-action routing, slash-menu
priority, whitespace-only input handling, merge_pending_steers (empty
/ single / multi / skill-instruction), and the three-bucket preview.

Closes #122.
This commit is contained in:
Hunter Bown
2026-04-27 20:59:00 -05:00
parent e075ecd0fe
commit 764aed65ed
3 changed files with 479 additions and 20 deletions
+257
View File
@@ -548,6 +548,23 @@ pub struct App {
pub queued_messages: VecDeque<QueuedMessage>,
/// Draft queued message being edited
pub queued_draft: Option<QueuedMessage>,
/// Composer inputs the user steered with Esc during a running turn. Held
/// here until the in-flight turn aborts; then merged into a single fresh
/// turn (#122). Not the same channel as the engine's mid-turn steer
/// (`EngineHandle::steer`) — those flow through `queued_messages`/`Steer`
/// disposition and never abort the current turn.
pub pending_steers: VecDeque<QueuedMessage>,
/// Engine-rejected steers (e.g. a tool was already running and couldn't be
/// cancelled cleanly). Surfaced in the pending-input preview so the user
/// knows the steer was deferred to end-of-turn. Today no engine path
/// produces these; the field is scaffolding for a future signalling
/// channel and the bucket renders identically when populated.
pub rejected_steers: VecDeque<String>,
/// Set when the user pressed Esc with non-empty input. The next
/// `TurnComplete::Interrupted` event drains `pending_steers`, merges them
/// into one user message, and dispatches a fresh turn. Cleared on drain
/// (or whenever the queue empties out).
pub submit_pending_steers_after_interrupt: bool,
/// Start time for current turn
pub turn_started_at: Option<Instant>,
/// Current runtime turn id (if known).
@@ -602,6 +619,21 @@ pub struct QueuedMessage {
pub skill_instruction: Option<String>,
}
/// How a freshly-typed user input should be sent.
///
/// Picked by [`App::decide_submit_disposition`] when the user hits Enter on a
/// non-empty composer. The Esc-to-steer path (typed input + Esc during a
/// running turn) is separate — see [`App::push_pending_steer`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SubmitDisposition {
/// Engine idle (or offline mode without a busy turn): send immediately.
Immediate,
/// Engine busy and offline: park on `queued_messages` for end-of-turn drain.
Queue,
/// Engine busy and online: forward as a mid-turn steer.
Steer,
}
/// Detailed tool payload attached to a history cell.
#[derive(Debug, Clone)]
pub struct ToolDetailRecord {
@@ -855,6 +887,9 @@ impl App {
pending_tool_uses: Vec::new(),
queued_messages: VecDeque::new(),
queued_draft: None,
pending_steers: VecDeque::new(),
rejected_steers: VecDeque::new(),
submit_pending_steers_after_interrupt: false,
turn_started_at: None,
runtime_turn_id: None,
runtime_turn_status: None,
@@ -1828,6 +1863,68 @@ impl App {
self.queued_messages.len()
}
/// Park a composer input the user steered with Esc. Re-armed each call so
/// rapid Esc taps accumulate rather than overwriting each other.
pub fn push_pending_steer(&mut self, message: QueuedMessage) {
self.pending_steers.push_back(message);
self.submit_pending_steers_after_interrupt = true;
self.needs_redraw = true;
}
/// Drain the pending-steer queue and clear the resend flag. Returns the
/// messages in submit order (oldest first).
pub fn drain_pending_steers(&mut self) -> Vec<QueuedMessage> {
self.submit_pending_steers_after_interrupt = false;
if self.pending_steers.is_empty() {
return Vec::new();
}
self.needs_redraw = true;
self.pending_steers.drain(..).collect()
}
/// Decide how to route a fresh composer submit. Esc-to-steer goes through
/// [`Self::push_pending_steer`] instead; this is the Enter path.
///
/// Truth table (preserves the pre-refactor behaviour):
/// offline=F, busy=F → Immediate
/// offline=F, busy=T → Steer
/// offline=T, busy=F → Queue
/// offline=T, busy=T → Steer (in-flight turn still owns the wire; the
/// steer attempt falls back to queueing on send failure)
#[must_use]
pub fn decide_submit_disposition(&self) -> SubmitDisposition {
if self.is_loading {
SubmitDisposition::Steer
} else if self.offline_mode {
SubmitDisposition::Queue
} else {
SubmitDisposition::Immediate
}
}
/// Mark the in-flight streaming Assistant cell as interrupted: prepend
/// `[interrupted]` to whatever streamed so far (so the user can see what
/// was salvaged) and flip `streaming` off so the spinner halts. No-op if
/// no Assistant cell is currently streaming.
///
/// Deliberate divergence from openai/codex which discards partial output
/// on abort — V4 thinking is expensive and the user usually wants to see
/// what the model produced before steering.
pub fn finalize_streaming_assistant_as_interrupted(&mut self) {
let Some(index) = self.streaming_message_index.take() else {
return;
};
if let Some(HistoryCell::Assistant { content, streaming }) = self.history.get_mut(index) {
*streaming = false;
if content.is_empty() {
*content = "[interrupted]".to_string();
} else if !content.starts_with("[interrupted]") {
content.insert_str(0, "[interrupted] ");
}
}
self.bump_history_cell(index);
}
pub fn history_up(&mut self) {
if self.input_history.is_empty() {
return;
@@ -2371,6 +2468,166 @@ mod tests {
assert!(deadline > Instant::now(), "fresh deadline in the future");
}
// ---- Issue #122: Esc-to-steer + queue visibility ----
#[test]
fn submit_disposition_immediate_when_idle_and_online() {
let app = App::new(test_options(false), &Config::default());
assert!(!app.is_loading);
assert!(!app.offline_mode);
assert_eq!(
app.decide_submit_disposition(),
SubmitDisposition::Immediate
);
}
#[test]
fn submit_disposition_steer_when_busy_and_online() {
let mut app = App::new(test_options(false), &Config::default());
app.is_loading = true;
app.offline_mode = false;
assert_eq!(app.decide_submit_disposition(), SubmitDisposition::Steer);
}
#[test]
fn submit_disposition_queue_when_offline_and_idle() {
let mut app = App::new(test_options(false), &Config::default());
app.is_loading = false;
app.offline_mode = true;
assert_eq!(app.decide_submit_disposition(), SubmitDisposition::Queue);
}
#[test]
fn submit_disposition_offline_busy_still_steers() {
// In-flight turn owns the wire even in offline mode; steer attempt
// catches the send error and falls back to the queue.
let mut app = App::new(test_options(false), &Config::default());
app.is_loading = true;
app.offline_mode = true;
assert_eq!(app.decide_submit_disposition(), SubmitDisposition::Steer);
}
#[test]
fn push_pending_steer_arms_resend_flag() {
let mut app = App::new(test_options(false), &Config::default());
assert!(!app.submit_pending_steers_after_interrupt);
app.push_pending_steer(QueuedMessage::new("steer me".to_string(), None));
assert_eq!(app.pending_steers.len(), 1);
assert!(app.submit_pending_steers_after_interrupt);
}
#[test]
fn drain_pending_steers_clears_flag_and_returns_in_order() {
let mut app = App::new(test_options(false), &Config::default());
app.push_pending_steer(QueuedMessage::new("first".to_string(), None));
app.push_pending_steer(QueuedMessage::new("second".to_string(), None));
app.push_pending_steer(QueuedMessage::new("third".to_string(), None));
let drained = app.drain_pending_steers();
assert_eq!(drained.len(), 3);
assert_eq!(drained[0].display, "first");
assert_eq!(drained[2].display, "third");
assert!(app.pending_steers.is_empty());
assert!(!app.submit_pending_steers_after_interrupt);
}
#[test]
fn drain_pending_steers_when_empty_is_safe() {
let mut app = App::new(test_options(false), &Config::default());
// Flag-only set (someone armed it manually): drain still clears it.
app.submit_pending_steers_after_interrupt = true;
let drained = app.drain_pending_steers();
assert!(drained.is_empty());
assert!(!app.submit_pending_steers_after_interrupt);
}
#[test]
fn double_push_pending_steer_is_idempotent_on_flag() {
let mut app = App::new(test_options(false), &Config::default());
app.push_pending_steer(QueuedMessage::new("a".to_string(), None));
app.push_pending_steer(QueuedMessage::new("b".to_string(), None));
assert!(app.submit_pending_steers_after_interrupt);
assert_eq!(app.pending_steers.len(), 2);
}
#[test]
fn finalize_streaming_assistant_marks_existing_cell_interrupted() {
let mut app = App::new(test_options(false), &Config::default());
app.add_message(HistoryCell::Assistant {
content: "partial reply so far".to_string(),
streaming: true,
});
let idx = app.history.len() - 1;
app.streaming_message_index = Some(idx);
app.finalize_streaming_assistant_as_interrupted();
assert!(app.streaming_message_index.is_none());
match &app.history[idx] {
HistoryCell::Assistant { content, streaming } => {
assert!(content.starts_with("[interrupted]"), "got: {content}");
assert!(content.contains("partial reply so far"));
assert!(!*streaming);
}
other => panic!("expected Assistant cell, got {other:?}"),
}
}
#[test]
fn finalize_streaming_assistant_handles_empty_content() {
let mut app = App::new(test_options(false), &Config::default());
app.add_message(HistoryCell::Assistant {
content: String::new(),
streaming: true,
});
let idx = app.history.len() - 1;
app.streaming_message_index = Some(idx);
app.finalize_streaming_assistant_as_interrupted();
match &app.history[idx] {
HistoryCell::Assistant { content, streaming } => {
assert_eq!(content, "[interrupted]");
assert!(!*streaming);
}
other => panic!("expected Assistant cell, got {other:?}"),
}
}
#[test]
fn finalize_streaming_assistant_no_op_without_index() {
let mut app = App::new(test_options(false), &Config::default());
// No streaming index set; should not panic and should leave history unchanged.
let prev_len = app.history.len();
app.finalize_streaming_assistant_as_interrupted();
assert_eq!(app.history.len(), prev_len);
assert!(app.streaming_message_index.is_none());
}
#[test]
fn finalize_streaming_assistant_is_idempotent_on_double_call() {
let mut app = App::new(test_options(false), &Config::default());
app.add_message(HistoryCell::Assistant {
content: "something".to_string(),
streaming: true,
});
let idx = app.history.len() - 1;
app.streaming_message_index = Some(idx);
app.finalize_streaming_assistant_as_interrupted();
// Second call without resetting state must be safe.
app.finalize_streaming_assistant_as_interrupted();
match &app.history[idx] {
HistoryCell::Assistant { content, .. } => {
// Second call still finds index None — content unchanged from first.
assert!(content.starts_with("[interrupted] "));
assert_eq!(content.matches("[interrupted]").count(), 1);
}
other => panic!("expected Assistant cell, got {other:?}"),
}
}
#[test]
fn kill_and_yank_handle_multibyte_utf8() {
let mut app = App::new(test_options(false), &Config::default());
+105 -20
View File
@@ -66,7 +66,7 @@ use crate::tui::user_input::UserInputView;
use super::active_cell::ActiveCell;
use super::app::{
App, AppAction, AppMode, OnboardingState, QueuedMessage, SidebarFocus, StatusToastLevel,
TaskPanelEntry, ToolDetailRecord, TuiOptions,
SubmitDisposition, TaskPanelEntry, ToolDetailRecord, TuiOptions,
};
use super::approval::{
ApprovalMode, ApprovalRequest, ApprovalView, ElevationRequest, ElevationView, ReviewDecision,
@@ -582,6 +582,11 @@ async fn run_event_loop(
| crate::core::events::TurnOutcomeStatus::Failed
) {
app.finalize_active_cell_as_interrupted();
// Also mark the streaming Assistant cell (if any)
// so partial reasoning/text isn't left with a
// permanent spinner. Idempotent with the
// optimistic call in the Esc handler.
app.finalize_streaming_assistant_as_interrupted();
} else {
app.flush_active_cell();
}
@@ -642,6 +647,27 @@ async fn run_event_loop(
}
app.plan_tool_used_in_turn = false;
// Esc-to-steer (#122): the user interrupted with input
// pending. Merge every steered message into one fresh
// turn so the model sees a single coherent prompt.
if status == crate::core::events::TurnOutcomeStatus::Interrupted
&& app.submit_pending_steers_after_interrupt
{
if let Some(merged) = merge_pending_steers(&mut *app) {
queued_to_send = Some(merged);
}
} else if status == crate::core::events::TurnOutcomeStatus::Failed
&& !app.pending_steers.is_empty()
{
// Hard-fail recovery: if the engine failed before
// a clean Interrupted landed, demote pending
// steers to the visible queue so they're not
// silently lost. User can /queue to inspect.
for msg in app.drain_pending_steers() {
app.queue_message(msg);
}
}
if queued_to_send.is_none() {
queued_to_send = app.pop_queued_message();
}
@@ -1437,8 +1463,26 @@ async fn run_event_loop(
// engine's TurnComplete will resync with the real
// outcome. Fixes #5a (wave kept animating after Esc).
app.runtime_turn_status = None;
app.finalize_streaming_assistant_as_interrupted();
app.status_message = Some("Request cancelled".to_string());
}
EscapeAction::SteerAndAbort => {
if let Some(input) = app.submit_input() {
let queued = build_queued_message(app, input);
app.push_pending_steer(queued);
engine_handle.cancel();
app.is_loading = false;
app.streaming_state.reset();
app.runtime_turn_status = None;
app.finalize_streaming_assistant_as_interrupted();
let count = app.pending_steers.len();
app.status_message = Some(if count == 1 {
"Steering: aborting turn and resending input".to_string()
} else {
format!("Steering: aborting turn and resending {count} input(s)")
});
}
}
EscapeAction::DiscardQueuedDraft => {
app.queued_draft = None;
app.status_message = Some("Stopped editing queued message".to_string());
@@ -1990,6 +2034,9 @@ fn finalize_streaming_thinking_active_entry(
enum EscapeAction {
CloseSlashMenu,
CancelRequest,
/// Composer non-empty during a running turn — capture the input as a
/// pending steer, abort the turn, and re-submit on TurnComplete (#122).
SteerAndAbort,
DiscardQueuedDraft,
ClearInput,
Noop,
@@ -1999,7 +2046,11 @@ fn next_escape_action(app: &App, slash_menu_open: bool) -> EscapeAction {
if slash_menu_open {
EscapeAction::CloseSlashMenu
} else if app.is_loading {
EscapeAction::CancelRequest
if app.input.trim().is_empty() {
EscapeAction::CancelRequest
} else {
EscapeAction::SteerAndAbort
}
} else if app.queued_draft.is_some() && app.input.is_empty() {
EscapeAction::DiscardQueuedDraft
} else if !app.input.is_empty() {
@@ -2559,28 +2610,53 @@ async fn submit_or_steer_message(
engine_handle: &EngineHandle,
message: QueuedMessage,
) -> Result<()> {
if app.offline_mode && !app.is_loading {
app.queue_message(message);
app.status_message = Some(format!(
"Offline mode: queued {} message(s) - /queue to review",
app.queued_message_count()
));
return Ok(());
}
if app.is_loading {
if let Err(err) = steer_user_message(app, engine_handle, message.clone()).await {
match app.decide_submit_disposition() {
SubmitDisposition::Immediate => dispatch_user_message(app, engine_handle, message).await,
SubmitDisposition::Queue => {
app.queue_message(message);
app.status_message = Some(format!(
"Steer failed ({err}); queued {} message(s) - /queue to view/edit",
"Offline mode: queued {} message(s) - /queue to review",
app.queued_message_count()
));
Ok(())
}
SubmitDisposition::Steer => {
if let Err(err) = steer_user_message(app, engine_handle, message.clone()).await {
app.queue_message(message);
app.status_message = Some(format!(
"Steer failed ({err}); queued {} message(s) - /queue to view/edit",
app.queued_message_count()
));
}
Ok(())
}
Ok(())
} else {
dispatch_user_message(app, engine_handle, message).await
}
}
/// Drain `app.pending_steers` into a single `QueuedMessage` ready for
/// `dispatch_user_message`. Returns `None` if the queue was empty (caller
/// then falls back to `app.queued_messages`). Skill instruction is taken
/// from the first message that supplies one — multiple steers shouldn't
/// double-up the system framing.
fn merge_pending_steers(app: &mut App) -> Option<QueuedMessage> {
let drained = app.drain_pending_steers();
if drained.is_empty() {
return None;
}
if drained.len() == 1 {
return drained.into_iter().next();
}
let mut skill_instruction: Option<String> = None;
let mut bodies: Vec<String> = Vec::with_capacity(drained.len());
for msg in drained {
if skill_instruction.is_none() {
skill_instruction = msg.skill_instruction;
}
bodies.push(msg.display);
}
Some(QueuedMessage::new(bodies.join("\n\n"), skill_instruction))
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PlanChoice {
AcceptAgent,
@@ -2740,12 +2816,21 @@ fn reconcile_subagent_activity_state(app: &mut App) {
/// Build the pending-input preview widget from current `App` state.
///
/// v0.6.6 ships the queued-messages half of #85: when the user types during
/// a running turn the message goes onto `app.queued_messages` and shows
/// here above the composer. The full steer/rejected-steer wiring lands in
/// a follow-up (TODO_BACKEND.md §4).
/// v0.6.6 (#122) wires all three buckets:
/// - `pending_steers` — typed during a running turn + Esc; held until the
/// abort lands and gets resubmitted as a fresh merged turn.
/// - `rejected_steers` — engine declined a mid-turn steer (scaffolding;
/// no engine path produces these yet but the bucket renders identically).
/// - `queued_messages` — Enter while busy (offline-mode FIFO); drained at
/// end-of-turn.
fn build_pending_input_preview(app: &App) -> PendingInputPreview {
let mut preview = PendingInputPreview::new();
preview.pending_steers = app
.pending_steers
.iter()
.map(|m| m.display.clone())
.collect();
preview.rejected_steers = app.rejected_steers.iter().cloned().collect();
preview.queued_messages = app
.queued_messages
.iter()
+117
View File
@@ -765,9 +765,14 @@ fn test_esc_priority_order_matches_cancel_stack() {
app.is_loading = true;
app.input = "draft".to_string();
app.mode = AppMode::Yolo;
// #122: typing during a running turn now steers instead of cancelling.
assert_eq!(next_escape_action(&app, false), EscapeAction::SteerAndAbort);
app.input.clear();
assert_eq!(next_escape_action(&app, false), EscapeAction::CancelRequest);
app.is_loading = false;
app.input = "draft".to_string();
assert_eq!(next_escape_action(&app, false), EscapeAction::ClearInput);
app.input.clear();
@@ -1964,3 +1969,115 @@ fn non_recoverable_engine_error_enters_offline_mode() {
"expected engine-error status, got {status:?}"
);
}
// ---- Issue #122: Esc-to-steer routing + steer merge ----
#[test]
fn next_escape_action_cancels_when_loading_with_empty_input() {
let mut app = create_test_app();
app.is_loading = true;
app.input.clear();
assert_eq!(next_escape_action(&app, false), EscapeAction::CancelRequest);
}
#[test]
fn next_escape_action_steers_when_loading_with_input() {
let mut app = create_test_app();
app.is_loading = true;
app.input = "hold on, look at this instead".to_string();
assert_eq!(next_escape_action(&app, false), EscapeAction::SteerAndAbort);
}
#[test]
fn next_escape_action_treats_whitespace_only_as_empty() {
let mut app = create_test_app();
app.is_loading = true;
app.input = " \n\t".to_string();
assert_eq!(next_escape_action(&app, false), EscapeAction::CancelRequest);
}
#[test]
fn next_escape_action_idle_with_input_clears() {
let mut app = create_test_app();
app.is_loading = false;
app.input = "draft".to_string();
assert_eq!(next_escape_action(&app, false), EscapeAction::ClearInput);
}
#[test]
fn next_escape_action_idle_empty_is_noop() {
let mut app = create_test_app();
app.is_loading = false;
app.input.clear();
assert_eq!(next_escape_action(&app, false), EscapeAction::Noop);
}
#[test]
fn next_escape_action_slash_menu_takes_priority() {
let mut app = create_test_app();
app.is_loading = true;
app.input = "anything".to_string();
assert_eq!(next_escape_action(&app, true), EscapeAction::CloseSlashMenu);
}
#[test]
fn merge_pending_steers_returns_none_when_empty() {
let mut app = create_test_app();
assert!(merge_pending_steers(&mut app).is_none());
assert!(!app.submit_pending_steers_after_interrupt);
}
#[test]
fn merge_pending_steers_passes_through_single_message() {
let mut app = create_test_app();
app.push_pending_steer(QueuedMessage::new(
"lone steer".to_string(),
Some("skill body".to_string()),
));
let merged = merge_pending_steers(&mut app).expect("merge yields a message");
assert_eq!(merged.display, "lone steer");
assert_eq!(merged.skill_instruction.as_deref(), Some("skill body"));
assert!(app.pending_steers.is_empty());
assert!(!app.submit_pending_steers_after_interrupt);
}
#[test]
fn merge_pending_steers_concatenates_multiple_with_blank_line() {
let mut app = create_test_app();
app.push_pending_steer(QueuedMessage::new("first".to_string(), None));
app.push_pending_steer(QueuedMessage::new("second".to_string(), None));
app.push_pending_steer(QueuedMessage::new("third".to_string(), None));
let merged = merge_pending_steers(&mut app).expect("merge yields a message");
assert_eq!(merged.display, "first\n\nsecond\n\nthird");
assert!(app.pending_steers.is_empty());
}
#[test]
fn merge_pending_steers_keeps_first_skill_instruction_only() {
let mut app = create_test_app();
app.push_pending_steer(QueuedMessage::new(
"a".to_string(),
Some("first skill".to_string()),
));
app.push_pending_steer(QueuedMessage::new(
"b".to_string(),
Some("second skill".to_string()),
));
let merged = merge_pending_steers(&mut app).expect("merge yields a message");
assert_eq!(merged.skill_instruction.as_deref(), Some("first skill"));
assert_eq!(merged.display, "a\n\nb");
}
#[test]
fn build_pending_input_preview_populates_all_three_buckets() {
let mut app = create_test_app();
app.push_pending_steer(QueuedMessage::new("steer-msg".to_string(), None));
app.rejected_steers.push_back("rejected-msg".to_string());
app.queue_message(QueuedMessage::new("queued-msg".to_string(), None));
let preview = build_pending_input_preview(&app);
assert_eq!(preview.pending_steers, vec!["steer-msg".to_string()]);
assert_eq!(preview.rejected_steers, vec!["rejected-msg".to_string()]);
assert_eq!(preview.queued_messages, vec!["queued-msg".to_string()]);
}