From b4867b835d84c8991572ace8d47703b268fbe4ed Mon Sep 17 00:00:00 2001 From: Hunter Bown Date: Wed, 6 May 2026 17:23:02 -0500 Subject: [PATCH] fix: smooth streaming and release UX slices * fix(streaming): drip text grapheme by grapheme * fix(snapshot): exclude generated artifacts by default * fix(tui): fill panel and footer backgrounds --- crates/tui/src/deepseek_theme.rs | 4 +- crates/tui/src/palette.rs | 2 + crates/tui/src/snapshot/repo.rs | 54 ++++++++ crates/tui/src/tui/streaming/chunking.rs | 131 ++++++++++++------ crates/tui/src/tui/streaming/commit_tick.rs | 144 +++++++++++--------- crates/tui/src/tui/streaming/mod.rs | 81 +++++++---- crates/tui/src/tui/ui.rs | 13 ++ crates/tui/src/tui/widgets/footer.rs | 24 +++- 8 files changed, 312 insertions(+), 141 deletions(-) diff --git a/crates/tui/src/deepseek_theme.rs b/crates/tui/src/deepseek_theme.rs index 9c4a96a9..46f4069f 100644 --- a/crates/tui/src/deepseek_theme.rs +++ b/crates/tui/src/deepseek_theme.rs @@ -62,7 +62,7 @@ impl Theme { section_borders: Borders::ALL, section_border_type: BorderType::Plain, section_border_color: palette::BORDER_COLOR, - section_bg: Color::Reset, + section_bg: palette::DEEPSEEK_INK, section_title_color: palette::DEEPSEEK_BLUE, // Horizontal padding only. `Padding::uniform(1)` ate two rows of // each sidebar panel — for compact terminals where Plan/Todos/Tasks @@ -147,7 +147,7 @@ mod tests { let theme = Theme::dark(); assert_eq!(theme.variant, Variant::Dark); assert_eq!(theme.section_border_color, palette::BORDER_COLOR); - assert_eq!(theme.section_bg, ratatui::style::Color::Reset); + assert_eq!(theme.section_bg, palette::DEEPSEEK_INK); assert_eq!(theme.section_title_color, palette::DEEPSEEK_BLUE); assert_eq!(theme.tool_title_color, palette::TEXT_SOFT); assert_eq!(theme.tool_value_color, palette::TEXT_MUTED); diff --git a/crates/tui/src/palette.rs b/crates/tui/src/palette.rs index cdb19fb1..6fbd13e0 100644 --- a/crates/tui/src/palette.rs +++ b/crates/tui/src/palette.rs @@ -109,6 +109,7 @@ pub struct UiTheme { pub composer_bg: Color, pub selection_bg: Color, pub header_bg: Color, + pub footer_bg: Color, /// Statusline mode colors (agent/yolo/plan) pub mode_agent: Color, pub mode_yolo: Color, @@ -128,6 +129,7 @@ pub const UI_THEME: UiTheme = UiTheme { composer_bg: DEEPSEEK_SLATE, selection_bg: SELECTION_BG, header_bg: DEEPSEEK_INK, + footer_bg: DEEPSEEK_INK, mode_agent: MODE_AGENT, mode_yolo: MODE_YOLO, mode_plan: MODE_PLAN, diff --git a/crates/tui/src/snapshot/repo.rs b/crates/tui/src/snapshot/repo.rs index be8d1eee..e7a56c65 100644 --- a/crates/tui/src/snapshot/repo.rs +++ b/crates/tui/src/snapshot/repo.rs @@ -54,6 +54,13 @@ node_modules/ target/ dist/ build/ +.build/ +.next/ +.nuxt/ +.svelte-kit/ +.turbo/ +.parcel-cache/ +vendor/ .cargo/ .rustup/ .npm/ @@ -63,6 +70,7 @@ build/ .cache/ .venv/ venv/ +.tox/ __pycache__/ *.pyc .mypy_cache/ @@ -72,6 +80,39 @@ __pycache__/ .m2/ .local/ .DS_Store + +# Binary and generated artifacts. Snapshots are source rollback checkpoints, +# not a full binary backup; keeping these out avoids side-repo bloat. +*.exe +*.dll +*.so +*.dylib +*.wasm +*.o +*.obj +*.class +*.pdb +*.dSYM +*.zip +*.tar +*.tar.gz +*.tgz +*.tar.bz2 +*.tar.xz +*.7z +*.rar +*.iso +*.dmg +*.bin +*.mp4 +*.mov +*.mkv +*.avi +*.webm +*.mp3 +*.wav +*.flac +*.aac "; impl SnapshotRepo { @@ -788,16 +829,21 @@ mod tests { let tmp = tempdir().unwrap(); let (repo, _home) = make_repo(tmp.path()); std::fs::create_dir_all(repo.work_tree().join("node_modules/pkg")).unwrap(); + std::fs::create_dir_all(repo.work_tree().join(".next/cache")).unwrap(); std::fs::create_dir_all(repo.work_tree().join("src")).unwrap(); std::fs::write( repo.work_tree().join("node_modules/pkg/index.js"), b"generated", ) .unwrap(); + std::fs::write(repo.work_tree().join(".next/cache/chunk.bin"), b"generated").unwrap(); + std::fs::write(repo.work_tree().join("debug.wasm"), b"binary").unwrap(); std::fs::write(repo.work_tree().join("src/main.rs"), b"fn main() {}").unwrap(); let excludes = std::fs::read_to_string(repo.git_dir().join("info/exclude")).unwrap(); assert!(excludes.contains("node_modules/")); + assert!(excludes.contains(".next/")); + assert!(excludes.contains("*.wasm")); let id = repo.snapshot("pre-turn:1").expect("snapshot"); let ls = run_git( @@ -815,6 +861,14 @@ mod tests { !names.contains("node_modules"), "node_modules should not be in snapshot: {names}", ); + assert!( + !names.contains(".next"), + ".next should not be in snapshot: {names}", + ); + assert!( + !names.contains("debug.wasm"), + "binary artifacts should not be in snapshot: {names}", + ); } #[test] diff --git a/crates/tui/src/tui/streaming/chunking.rs b/crates/tui/src/tui/streaming/chunking.rs index 6a196277..831608fc 100644 --- a/crates/tui/src/tui/streaming/chunking.rs +++ b/crates/tui/src/tui/streaming/chunking.rs @@ -7,13 +7,13 @@ //! # Mental model //! //! Two gears: -//! - [`ChunkingMode::Smooth`]: drain one line per commit tick (steady pacing). -//! - [`ChunkingMode::CatchUp`]: drain the entire queued backlog while pressure exists. +//! - [`ChunkingMode::Smooth`]: drain one display chunk per commit tick (steady pacing). +//! - [`ChunkingMode::CatchUp`]: drain a bounded burst while pressure exists. //! //! # Hysteresis //! //! - Enter `CatchUp` when `queued_lines >= ENTER_QUEUE_DEPTH_LINES` OR -//! the oldest queued line is at least [`ENTER_OLDEST_AGE`]. +//! the oldest queued chunk is at least [`ENTER_OLDEST_AGE`]. //! - Exit `CatchUp` only after pressure stays below [`EXIT_QUEUE_DEPTH_LINES`] //! AND [`EXIT_OLDEST_AGE`] for at least [`EXIT_HOLD`]. //! - After exit, suppress immediate re-entry for [`REENTER_CATCH_UP_HOLD`] @@ -24,16 +24,16 @@ use std::time::Duration; use std::time::Instant; /// Queue-depth threshold that allows entering catch-up mode. -pub(crate) const ENTER_QUEUE_DEPTH_LINES: usize = 8; +pub(crate) const ENTER_QUEUE_DEPTH_LINES: usize = 160; -/// Oldest-line age threshold that allows entering catch-up mode. -pub(crate) const ENTER_OLDEST_AGE: Duration = Duration::from_millis(120); +/// Oldest-chunk age threshold that allows entering catch-up mode. +pub(crate) const ENTER_OLDEST_AGE: Duration = Duration::from_millis(1_200); /// Queue-depth threshold used when evaluating catch-up exit hysteresis. -pub(crate) const EXIT_QUEUE_DEPTH_LINES: usize = 2; +pub(crate) const EXIT_QUEUE_DEPTH_LINES: usize = 32; -/// Oldest-line age threshold used when evaluating catch-up exit hysteresis. -pub(crate) const EXIT_OLDEST_AGE: Duration = Duration::from_millis(40); +/// Oldest-chunk age threshold used when evaluating catch-up exit hysteresis. +pub(crate) const EXIT_OLDEST_AGE: Duration = Duration::from_millis(300); /// Minimum duration queue pressure must stay below exit thresholds to leave catch-up mode. pub(crate) const EXIT_HOLD: Duration = Duration::from_millis(250); @@ -42,14 +42,14 @@ pub(crate) const EXIT_HOLD: Duration = Duration::from_millis(250); pub(crate) const REENTER_CATCH_UP_HOLD: Duration = Duration::from_millis(250); /// Queue-depth cutoff that marks backlog as severe (bypasses re-entry hold). -pub(crate) const SEVERE_QUEUE_DEPTH_LINES: usize = 64; +pub(crate) const SEVERE_QUEUE_DEPTH_LINES: usize = 640; /// Oldest-line age cutoff that marks backlog as severe. -pub(crate) const SEVERE_OLDEST_AGE: Duration = Duration::from_millis(300); +pub(crate) const SEVERE_OLDEST_AGE: Duration = Duration::from_millis(4_000); #[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] pub enum ChunkingMode { - /// Drain one line per baseline commit tick. + /// Drain one display chunk per baseline commit tick. #[default] Smooth, /// Drain the queued backlog according to queue pressure. @@ -59,9 +59,9 @@ pub enum ChunkingMode { /// Captures queue pressure inputs used by adaptive chunking decisions. #[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] pub struct QueueSnapshot { - /// Number of queued stream lines waiting to be displayed. + /// Number of queued stream chunks waiting to be displayed. pub queued_lines: usize, - /// Age of the oldest queued line at decision time. + /// Age of the oldest queued chunk at decision time. pub oldest_age: Option, } @@ -272,33 +272,39 @@ mod tests { } #[test] - fn eight_line_burst_flips_to_catch_up_and_drains_full_backlog() { - // A burst of 8 queued lines crosses ENTER_QUEUE_DEPTH_LINES exactly. + fn deep_burst_flips_to_catch_up_and_drains_backlog() { + // A burst crossing ENTER_QUEUE_DEPTH_LINES enters CatchUp. With + // single-grapheme chunks, the threshold stays high enough that + // ordinary prose still drips in visibly before catch-up engages. // The policy should enter `CatchUp` and request a Batch drain matching // the queue depth. let mut policy = AdaptiveChunkingPolicy::new(); let now = Instant::now(); - let decision = policy.decide(snap(8, 10), now); + let decision = policy.decide(snap(ENTER_QUEUE_DEPTH_LINES, 10), now); assert_eq!(decision.mode, ChunkingMode::CatchUp); assert!(decision.entered_catch_up); - assert_eq!(decision.drain_plan, DrainPlan::Batch(8)); + assert_eq!( + decision.drain_plan, + DrainPlan::Batch(ENTER_QUEUE_DEPTH_LINES) + ); // Larger backlog requested next tick: still CatchUp, batch grows to match. - let decision = policy.decide(snap(20, 30), now + Duration::from_millis(10)); + let larger_backlog = ENTER_QUEUE_DEPTH_LINES + 80; + let decision = policy.decide(snap(larger_backlog, 30), now + Duration::from_millis(10)); assert_eq!(decision.mode, ChunkingMode::CatchUp); assert!(!decision.entered_catch_up, "no second transition signal"); - assert_eq!(decision.drain_plan, DrainPlan::Batch(20)); + assert_eq!(decision.drain_plan, DrainPlan::Batch(larger_backlog)); } #[test] fn age_threshold_alone_triggers_catch_up() { - // Queue depth is small, but the oldest line has been waiting >=120 ms. + // Queue depth is small, but the oldest chunk has crossed the age threshold. // Either condition is sufficient to enter catch-up. let mut policy = AdaptiveChunkingPolicy::new(); let now = Instant::now(); - let decision = policy.decide(snap(2, 120), now); + let decision = policy.decide(snap(2, ENTER_OLDEST_AGE.as_millis() as u64), now); assert_eq!(decision.mode, ChunkingMode::CatchUp); assert!(decision.entered_catch_up); assert_eq!(decision.drain_plan, DrainPlan::Batch(2)); @@ -311,20 +317,29 @@ mod tests { let mut policy = AdaptiveChunkingPolicy::new(); let t0 = Instant::now(); - let _ = policy.decide(snap(10, 20), t0); + let _ = policy.decide(snap(ENTER_QUEUE_DEPTH_LINES, 20), t0); assert_eq!(policy.mode(), ChunkingMode::CatchUp); - // Pressure drops to (2 lines, 40 ms) — at the exit thresholds. + // Pressure drops to the exit thresholds. // Hold begins; not yet 250ms. - let pre_hold = policy.decide(snap(2, 40), t0 + Duration::from_millis(50)); + let pre_hold = policy.decide( + snap(EXIT_QUEUE_DEPTH_LINES, EXIT_OLDEST_AGE.as_millis() as u64), + t0 + Duration::from_millis(50), + ); assert_eq!(pre_hold.mode, ChunkingMode::CatchUp); // Still under hold. - let mid_hold = policy.decide(snap(2, 40), t0 + Duration::from_millis(200)); + let mid_hold = policy.decide( + snap(EXIT_QUEUE_DEPTH_LINES, EXIT_OLDEST_AGE.as_millis() as u64), + t0 + Duration::from_millis(200), + ); assert_eq!(mid_hold.mode, ChunkingMode::CatchUp); // Past EXIT_HOLD (250 ms) → return to Smooth. - let post_hold = policy.decide(snap(2, 40), t0 + Duration::from_millis(320)); + let post_hold = policy.decide( + snap(EXIT_QUEUE_DEPTH_LINES, EXIT_OLDEST_AGE.as_millis() as u64), + t0 + Duration::from_millis(320), + ); assert_eq!(post_hold.mode, ChunkingMode::Smooth); assert_eq!(post_hold.drain_plan, DrainPlan::Single); } @@ -335,7 +350,7 @@ mod tests { let mut policy = AdaptiveChunkingPolicy::new(); let now = Instant::now(); - let _ = policy.decide(snap(10, 20), now); + let _ = policy.decide(snap(ENTER_QUEUE_DEPTH_LINES, 20), now); assert_eq!(policy.mode(), ChunkingMode::CatchUp); let decision = policy.decide(empty_snap(), now + Duration::from_millis(10)); @@ -345,38 +360,53 @@ mod tests { #[test] fn reentry_hold_blocks_immediate_flip_back() { - // After exiting CatchUp via idle, an 8-line burst that arrives within + // After exiting CatchUp via idle, a threshold-sized burst that arrives within // the re-entry hold window should not immediately re-enter CatchUp. let mut policy = AdaptiveChunkingPolicy::new(); let t0 = Instant::now(); - let _ = policy.decide(snap(10, 20), t0); + let _ = policy.decide(snap(ENTER_QUEUE_DEPTH_LINES, 20), t0); let _ = policy.decide(empty_snap(), t0 + Duration::from_millis(10)); // Within REENTER_CATCH_UP_HOLD (250 ms): hold blocks re-entry. - let held = policy.decide(snap(8, 20), t0 + Duration::from_millis(100)); + let held = policy.decide( + snap(ENTER_QUEUE_DEPTH_LINES, 20), + t0 + Duration::from_millis(100), + ); assert_eq!(held.mode, ChunkingMode::Smooth); assert_eq!(held.drain_plan, DrainPlan::Single); // Past the hold: re-entry permitted. - let reentered = policy.decide(snap(8, 20), t0 + Duration::from_millis(400)); + let reentered = policy.decide( + snap(ENTER_QUEUE_DEPTH_LINES, 20), + t0 + Duration::from_millis(400), + ); assert_eq!(reentered.mode, ChunkingMode::CatchUp); - assert_eq!(reentered.drain_plan, DrainPlan::Batch(8)); + assert_eq!( + reentered.drain_plan, + DrainPlan::Batch(ENTER_QUEUE_DEPTH_LINES) + ); } #[test] fn severe_backlog_bypasses_reentry_hold() { - // Even within the hold window, a "severe" backlog (>=64 lines) bypasses + // Even within the hold window, a "severe" backlog bypasses // the gate so display lag doesn't unbounded-grow. let mut policy = AdaptiveChunkingPolicy::new(); let t0 = Instant::now(); - let _ = policy.decide(snap(10, 20), t0); + let _ = policy.decide(snap(ENTER_QUEUE_DEPTH_LINES, 20), t0); let _ = policy.decide(empty_snap(), t0 + Duration::from_millis(10)); - let severe = policy.decide(snap(64, 20), t0 + Duration::from_millis(100)); + let severe = policy.decide( + snap(SEVERE_QUEUE_DEPTH_LINES, 20), + t0 + Duration::from_millis(100), + ); assert_eq!(severe.mode, ChunkingMode::CatchUp); - assert_eq!(severe.drain_plan, DrainPlan::Batch(64)); + assert_eq!( + severe.drain_plan, + DrainPlan::Batch(SEVERE_QUEUE_DEPTH_LINES) + ); } #[test] @@ -386,19 +416,28 @@ mod tests { let t0 = Instant::now(); // Queue depth far above ENTER threshold. - let d1 = policy.decide(snap(20, 10), t0); + let d1 = policy.decide(snap(ENTER_QUEUE_DEPTH_LINES + 80, 10), t0); assert_eq!(d1.mode, ChunkingMode::Smooth); assert!(!d1.entered_catch_up); assert_eq!(d1.drain_plan, DrainPlan::Single); // Oldest age far above ENTER threshold. - let d2 = policy.decide(snap(5, 500), t0 + Duration::from_millis(100)); + let d2 = policy.decide( + snap(5, ENTER_OLDEST_AGE.as_millis() as u64), + t0 + Duration::from_millis(100), + ); assert_eq!(d2.mode, ChunkingMode::Smooth); assert!(!d2.entered_catch_up); assert_eq!(d2.drain_plan, DrainPlan::Single); // Severe backlog — still Smooth. - let d3 = policy.decide(snap(80, 500), t0 + Duration::from_millis(200)); + let d3 = policy.decide( + snap( + SEVERE_QUEUE_DEPTH_LINES + 80, + SEVERE_OLDEST_AGE.as_millis() as u64, + ), + t0 + Duration::from_millis(200), + ); assert_eq!(d3.mode, ChunkingMode::Smooth); assert_eq!(d3.drain_plan, DrainPlan::Single); } @@ -410,14 +449,20 @@ mod tests { let t0 = Instant::now(); // Low motion blocks catch-up. - let d1 = policy.decide(snap(20, 10), t0); + let d1 = policy.decide(snap(ENTER_QUEUE_DEPTH_LINES + 80, 10), t0); assert_eq!(d1.mode, ChunkingMode::Smooth); // Turn off low motion — next burst should enter CatchUp. policy.set_low_motion(false); - let d2 = policy.decide(snap(20, 10), t0 + Duration::from_millis(10)); + let d2 = policy.decide( + snap(ENTER_QUEUE_DEPTH_LINES + 80, 10), + t0 + Duration::from_millis(10), + ); assert_eq!(d2.mode, ChunkingMode::CatchUp); assert!(d2.entered_catch_up); - assert_eq!(d2.drain_plan, DrainPlan::Batch(20)); + assert_eq!( + d2.drain_plan, + DrainPlan::Batch(ENTER_QUEUE_DEPTH_LINES + 80) + ); } } diff --git a/crates/tui/src/tui/streaming/commit_tick.rs b/crates/tui/src/tui/streaming/commit_tick.rs index 65783fb1..6a6050e9 100644 --- a/crates/tui/src/tui/streaming/commit_tick.rs +++ b/crates/tui/src/tui/streaming/commit_tick.rs @@ -1,10 +1,9 @@ //! Commit-tick scheduler that drains a stream chunker according to policy. //! //! Bridges [`AdaptiveChunkingPolicy`] with a concrete [`StreamChunker`] queue. -//! Callers feed raw text deltas via [`StreamChunker::push_delta`] (only -//! newline-terminated text becomes queued lines), then call -//! [`run_commit_tick`] on every commit beat to obtain the text safe to flush -//! to the transcript on this beat. +//! Callers feed raw text deltas via [`StreamChunker::push_delta`], then call +//! [`run_commit_tick`] on every commit beat to obtain the next small text +//! slice to flush to the transcript on this beat. //! //! The chunker is the unit of streaming — one per active block (assistant / //! thinking). Tool output is unbuffered and bypasses this path. @@ -13,27 +12,29 @@ use std::collections::VecDeque; use std::time::Duration; use std::time::Instant; +use unicode_segmentation::UnicodeSegmentation; + use super::chunking::AdaptiveChunkingPolicy; use super::chunking::ChunkingDecision; use super::chunking::DrainPlan; use super::chunking::QueueSnapshot; -/// Buffers raw stream deltas and emits committed text in line units. -/// -/// Only the substring up to the *last* `\n` is committed; trailing partial -/// content stays in the buffer. This is what protects partial code fences -/// (` ``` `) and other line-sensitive markdown from rendering mid-state. +const GRAPHEMES_PER_MICRO_CHUNK: usize = 1; +const CATCH_UP_MAX_MICRO_CHUNKS: usize = 12; + +/// Buffers raw stream deltas and emits committed text in small display chunks. #[derive(Debug, Default)] pub struct StreamChunker { - /// Bytes received but not yet split into a complete line. + /// Bytes received but not yet split into display chunks. Normally empty; + /// retained so `drain_remaining` has a lossless place to pull from if we + /// ever decide to hold a tail for a future markdown-sensitive mode. pending: String, - /// Complete lines waiting to be flushed to the transcript. - /// Each entry preserves its trailing `\n` so reassembly is lossless. - queue: VecDeque, + /// Small grapheme-aligned chunks waiting to be flushed to the transcript. + queue: VecDeque, } #[derive(Debug, Clone)] -struct QueuedLine { +struct QueuedChunk { text: String, enqueued_at: Instant, } @@ -43,28 +44,21 @@ impl StreamChunker { Self::default() } - /// Append a raw model delta. Returns whether at least one new line was queued. + /// Append a raw model delta. Returns whether at least one new display chunk was queued. pub fn push_delta(&mut self, delta: &str) -> bool { if delta.is_empty() { return false; } self.pending.push_str(delta); - let Some(last_nl) = self.pending.rfind('\n') else { - return false; - }; - - // Drain everything up to and including the last newline into queued lines. - // Splitting by line keeps the chunker source-agnostic and lets the policy - // count "lines waiting" without peeking at text content. let now = Instant::now(); - let committed: String = self.pending.drain(..=last_nl).collect(); + let committed = std::mem::take(&mut self.pending); let mut produced = false; - for chunk in split_lines_keep_terminator(&committed) { + for chunk in split_into_micro_chunks(&committed) { if chunk.is_empty() { continue; } - self.queue.push_back(QueuedLine { + self.queue.push_back(QueuedChunk { text: chunk, enqueued_at: now, }); @@ -73,12 +67,12 @@ impl StreamChunker { produced } - /// Number of complete lines currently queued for commit. + /// Number of display chunks currently queued for commit. pub fn queued_lines(&self) -> usize { self.queue.len() } - /// Age of the oldest queued line, if any. + /// Age of the oldest queued chunk, if any. pub fn oldest_queued_age(&self, now: Instant) -> Option { self.queue .front() @@ -98,7 +92,7 @@ impl StreamChunker { } } - /// Drain `max_lines` complete lines and return them as concatenated text. + /// Drain `max_lines` queued chunks and return them as concatenated text. pub fn drain_lines(&mut self, max_lines: usize) -> String { let n = max_lines.min(self.queue.len()); let mut out = String::new(); @@ -159,7 +153,7 @@ pub fn run_commit_tick( let max = match decision.drain_plan { DrainPlan::Single => 1, - DrainPlan::Batch(n) => n, + DrainPlan::Batch(n) => n.min(CATCH_UP_MAX_MICRO_CHUNKS), }; // Drain through the chunker; an empty queue under Smooth produces "". @@ -172,24 +166,28 @@ pub fn run_commit_tick( } } -/// Split text into chunks, preserving each terminator. The final chunk is -/// included only if it ends with `\n` (this is enforced upstream in -/// `push_delta`, which only drains up through the last newline). -fn split_lines_keep_terminator(text: &str) -> Vec { +/// Split text into grapheme-aligned chunks. Newlines force a boundary so +/// markdown layout still settles quickly, but prose no longer waits for a full +/// line before becoming visible. +fn split_into_micro_chunks(text: &str) -> Vec { let mut out = Vec::new(); - let mut start = 0; - let bytes = text.as_bytes(); - for (i, &b) in bytes.iter().enumerate() { - if b == b'\n' { - out.push(text[start..=i].to_string()); - start = i + 1; + let mut current = String::new(); + let mut graphemes = 0usize; + + for grapheme in UnicodeSegmentation::graphemes(text, true) { + current.push_str(grapheme); + graphemes += 1; + + if grapheme == "\n" || graphemes >= GRAPHEMES_PER_MICRO_CHUNK { + out.push(std::mem::take(&mut current)); + graphemes = 0; } } - if start < text.len() { - // This branch is unreachable for inputs produced by `push_delta`, - // but stays defensive for direct callers. - out.push(text[start..].to_string()); + + if !current.is_empty() { + out.push(current); } + out } @@ -199,59 +197,69 @@ mod tests { use crate::tui::streaming::chunking::ChunkingMode; #[test] - fn partial_code_fence_is_held_until_newline() { - // Without the chunker, a stray ``` arriving mid-stream could render as - // an opened fence. The chunker must not commit anything until the - // line is terminated by `\n`. + fn prose_streams_before_newline() { let mut chunker = StreamChunker::new(); let mut policy = AdaptiveChunkingPolicy::new(); let now = Instant::now(); - // Partial fence + content; no newline → nothing committed yet. - chunker.push_delta("Here is code:\n"); - chunker.push_delta("```"); + chunker.push_delta("hello world"); let out = run_commit_tick(&mut policy, &mut chunker, now); - assert_eq!(out.committed_text, "Here is code:\n"); - assert!(!chunker.is_idle(), "partial fence still buffered"); + assert_eq!(out.committed_text, "h"); + assert!(!chunker.is_idle(), "remaining prose should keep dripping"); - // Close the fence line. - chunker.push_delta("rust\n"); let out = run_commit_tick(&mut policy, &mut chunker, now + Duration::from_millis(5)); - assert_eq!(out.committed_text, "```rust\n"); + assert_eq!(out.committed_text, "e"); } #[test] - fn smooth_burst_emits_one_line_per_tick() { + fn smooth_burst_emits_one_micro_chunk_per_tick() { let mut chunker = StreamChunker::new(); let mut policy = AdaptiveChunkingPolicy::new(); let t0 = Instant::now(); - chunker.push_delta("a\nb\nc\n"); - // Each tick under Smooth pulls exactly one line. + chunker.push_delta("abc"); + // Each tick under Smooth pulls exactly one grapheme. let out1 = run_commit_tick(&mut policy, &mut chunker, t0); assert_eq!(out1.decision.mode, ChunkingMode::Smooth); - assert_eq!(out1.committed_text, "a\n"); + assert_eq!(out1.committed_text, "a"); let out2 = run_commit_tick(&mut policy, &mut chunker, t0 + Duration::from_millis(20)); - assert_eq!(out2.committed_text, "b\n"); + assert_eq!(out2.committed_text, "b"); let out3 = run_commit_tick(&mut policy, &mut chunker, t0 + Duration::from_millis(40)); - assert_eq!(out3.committed_text, "c\n"); + assert_eq!(out3.committed_text, "c"); assert!(out3.is_idle); } #[test] - fn large_burst_drains_in_catch_up() { - // Eight lines arriving "at once" must trigger CatchUp on the first - // commit tick and drain the full backlog in one go. + fn smooth_stream_keeps_combining_marks_with_base_letter() { + let mut chunker = StreamChunker::new(); + let mut policy = AdaptiveChunkingPolicy::new(); + let t0 = Instant::now(); + + chunker.push_delta("e\u{301}x"); + let out1 = run_commit_tick(&mut policy, &mut chunker, t0); + assert_eq!(out1.committed_text, "e\u{301}"); + let out2 = run_commit_tick(&mut policy, &mut chunker, t0 + Duration::from_millis(20)); + assert_eq!(out2.committed_text, "x"); + } + + #[test] + fn large_burst_drains_in_catch_up_without_full_jump() { + // A large text burst arriving "at once" must trigger CatchUp on the first + // commit tick without dumping the full backlog in one jump. let mut chunker = StreamChunker::new(); let mut policy = AdaptiveChunkingPolicy::new(); let now = Instant::now(); - let burst = "1\n2\n3\n4\n5\n6\n7\n8\n"; - chunker.push_delta(burst); + let burst = "abcdefghijklmnopqrstuvwxyz".repeat(8); + let expected_prefix: String = burst + .chars() + .take(CATCH_UP_MAX_MICRO_CHUNKS * GRAPHEMES_PER_MICRO_CHUNK) + .collect(); + chunker.push_delta(&burst); let out = run_commit_tick(&mut policy, &mut chunker, now); assert_eq!(out.decision.mode, ChunkingMode::CatchUp); - assert_eq!(out.committed_text, burst); - assert!(out.is_idle); + assert_eq!(out.committed_text, expected_prefix); + assert!(!out.is_idle); } #[test] diff --git a/crates/tui/src/tui/streaming/mod.rs b/crates/tui/src/tui/streaming/mod.rs index 74e24373..bb24d150 100644 --- a/crates/tui/src/tui/streaming/mod.rs +++ b/crates/tui/src/tui/streaming/mod.rs @@ -1,11 +1,10 @@ #![allow(dead_code)] -//! Markdown stream collector for newline-gated rendering. +//! Markdown stream collector for live micro-chunk rendering. //! //! This module implements the pattern from codex-rs where: -//! - Streaming text is buffered until a newline is reached -//! - Only complete lines are committed to the UI -//! - This prevents visual flashing of partial words +//! - Streaming text is split into small grapheme-aligned chunks +//! - Commit ticks drip chunks into the transcript between provider deltas //! - Final content is emitted when the stream ends use ratatui::style::{Modifier, Style}; @@ -221,19 +220,17 @@ fn wrap_line(line: &str, width: usize) -> Vec { } } -/// Per-block streaming substate: line-buffer (newline gate) feeding a -/// collector + chunker/policy for two-gear pacing. +/// Per-block streaming substate: optional line-buffer feeding a collector + +/// chunker/policy for two-gear pacing. /// /// Pipeline: /// ```text /// raw delta -> LineBuffer.push -> take_committable -> collector + chunker -> commit tick /// ``` /// -/// The [`LineBuffer`] is upstream of the collector and chunker. It guarantees -/// that no partial multi-character markdown (e.g. an unfinished ``` fence) -/// reaches downstream consumers between deltas. Thinking blocks bypass the -/// gate because thinking is rendered live for responsiveness — its content -/// often arrives without newlines until a full paragraph is composed. +/// The [`LineBuffer`] remains available for line-sensitive modes. Normal +/// assistant prose and thinking blocks bypass it so text can stream in live +/// micro-chunks instead of waiting for newline boundaries. #[derive(Debug, Default)] struct BlockState { /// Newline gate: holds back trailing partial-line text between deltas. @@ -265,14 +262,14 @@ impl StreamingState { Self::default() } - /// Start a new text block. Assistant text is subject to the newline gate - /// so partial code fences and other line-sensitive markdown can never - /// briefly appear between deltas. + /// Start a new text block. Assistant prose streams live in micro-chunks so + /// users can visually track the answer as it forms instead of waiting for + /// a newline-terminated line. pub fn start_text(&mut self, index: usize, width: Option) { self.ensure_capacity(index); self.blocks[index] = Some(BlockState { line_buffer: LineBuffer::new(), - bypass_gate: false, + bypass_gate: true, collector: MarkdownStreamCollector::new(width, false), chunker: StreamChunker::new(), policy: AdaptiveChunkingPolicy::new(), @@ -298,10 +295,8 @@ impl StreamingState { /// Push content to a block. Routing depends on the block kind: /// - /// - Assistant text blocks: incoming bytes go through [`LineBuffer`] - /// first, so only newline-terminated prefixes reach the collector and - /// chunker. This is what protects partial code fences and other - /// line-sensitive markdown from briefly appearing between deltas. + /// - Assistant text blocks: incoming bytes normally bypass [`LineBuffer`] + /// and are split into small display chunks downstream. /// - Thinking blocks: bytes bypass the gate and go straight to the /// collector/chunker so reasoning stays visually live (long thoughts /// often have no intermediate newlines). @@ -333,14 +328,14 @@ impl StreamingState { return; } - block.collector.push(&downstream); - // The collector's own newline-gating is now redundant for gated - // blocks (LineBuffer already enforces it), but we keep the same - // call shape so the collector's bookkeeping (committed_line_count) - // stays consistent and the bypass path still benefits from it. - let committed = block.collector.commit_complete_text(); - if !committed.is_empty() { - block.chunker.push_delta(&committed); + if block.bypass_gate { + block.chunker.push_delta(&downstream); + } else { + block.collector.push(&downstream); + let committed = block.collector.commit_complete_text(); + if !committed.is_empty() { + block.chunker.push_delta(&committed); + } } } } @@ -556,4 +551,36 @@ mod tests { let result = wrap_line("This is a long line that should be wrapped", 20); assert!(result.len() > 1); } + + #[test] + fn assistant_text_streams_before_newline() { + let mut state = StreamingState::new(); + state.start_text(0, None); + state.push_content(0, "hello world"); + + assert_eq!(state.commit_text(0), "h"); + assert_eq!(state.commit_text(0), "e"); + assert!(state.has_pending_chunker_lines(0)); + } + + #[test] + fn thinking_text_streams_before_newline() { + let mut state = StreamingState::new(); + state.start_thinking(0, None); + state.push_content(0, "thinking deeply"); + + assert_eq!(state.commit_text(0), "t"); + assert_eq!(state.commit_text(0), "h"); + assert!(state.has_pending_chunker_lines(0)); + } + + #[test] + fn finalize_preserves_uncommitted_micro_chunks() { + let mut state = StreamingState::new(); + state.start_text(0, None); + state.push_content(0, "abc"); + assert_eq!(state.commit_text(0), "a"); + + assert_eq!(state.finalize_block_text(0), "bc"); + } } diff --git a/crates/tui/src/tui/ui.rs b/crates/tui/src/tui/ui.rs index b6789806..e4f851b3 100644 --- a/crates/tui/src/tui/ui.rs +++ b/crates/tui/src/tui/ui.rs @@ -1342,6 +1342,19 @@ async fn run_event_loop( } } } + if let Some(index) = app.streaming_message_index { + let committed = app.streaming_state.commit_text(0); + if !committed.is_empty() { + append_streaming_text(app, index, &committed); + transcript_batch_updated = true; + } + } else if let Some(entry_idx) = app.streaming_thinking_active_entry { + let committed = app.streaming_state.commit_text(0); + if !committed.is_empty() { + append_streaming_thinking(app, entry_idx, &committed); + transcript_batch_updated = true; + } + } if transcript_batch_updated { app.mark_history_updated(); } diff --git a/crates/tui/src/tui/widgets/footer.rs b/crates/tui/src/tui/widgets/footer.rs index 042dde6c..403dea84 100644 --- a/crates/tui/src/tui/widgets/footer.rs +++ b/crates/tui/src/tui/widgets/footer.rs @@ -39,6 +39,8 @@ pub struct FooterProps { pub text_hint_color: Color, /// Color used for steady secondary chips such as cost. pub text_muted_color: Color, + /// Background color for the full footer/status bar row. + pub footer_bg: Color, /// Status label like `"ready"`, `"thinking ⌫"`, `"working"`. When the /// label equals `"ready"` the footer hides the status segment entirely. pub state_label: String, @@ -281,6 +283,7 @@ impl FooterProps { text_dim_color: app.ui_theme.text_dim, text_hint_color: app.ui_theme.text_hint, text_muted_color: app.ui_theme.text_muted, + footer_bg: app.ui_theme.footer_bg, state_label: state_label.to_string(), state_color, coherence, @@ -593,7 +596,8 @@ impl Renderable for FooterWidget { all_spans.push(spacer_span); all_spans.extend(right_spans); - let paragraph = Paragraph::new(Line::from(all_spans)); + let paragraph = + Paragraph::new(Line::from(all_spans)).style(Style::default().bg(self.props.footer_bg)); paragraph.render(area, buf); } @@ -811,6 +815,7 @@ mod tests { app.ui_theme.text_dim = Color::Rgb(4, 5, 6); app.ui_theme.text_hint = Color::Rgb(7, 8, 9); app.ui_theme.text_muted = Color::Rgb(10, 11, 12); + app.ui_theme.footer_bg = Color::Rgb(13, 14, 15); let props = idle_props_for(&app); @@ -818,6 +823,23 @@ mod tests { assert_eq!(props.text_dim_color, Color::Rgb(4, 5, 6)); assert_eq!(props.text_hint_color, Color::Rgb(7, 8, 9)); assert_eq!(props.text_muted_color, Color::Rgb(10, 11, 12)); + assert_eq!(props.footer_bg, Color::Rgb(13, 14, 15)); + } + + #[test] + fn render_applies_footer_background_to_full_row() { + let mut app = make_app(); + app.ui_theme.footer_bg = Color::Rgb(13, 14, 15); + let props = idle_props_for(&app); + let widget = FooterWidget::new(props); + let area = ratatui::layout::Rect::new(0, 0, 60, 1); + let mut buf = ratatui::buffer::Buffer::empty(area); + + widget.render(area, &mut buf); + + for x in 0..area.width { + assert_eq!(buf[(x, 0)].bg, Color::Rgb(13, 14, 15)); + } } // ---- agents chip wording ----