fix: smooth streaming and release UX slices
* fix(streaming): drip text grapheme by grapheme * fix(snapshot): exclude generated artifacts by default * fix(tui): fill panel and footer backgrounds
This commit is contained in:
@@ -62,7 +62,7 @@ impl Theme {
|
||||
section_borders: Borders::ALL,
|
||||
section_border_type: BorderType::Plain,
|
||||
section_border_color: palette::BORDER_COLOR,
|
||||
section_bg: Color::Reset,
|
||||
section_bg: palette::DEEPSEEK_INK,
|
||||
section_title_color: palette::DEEPSEEK_BLUE,
|
||||
// Horizontal padding only. `Padding::uniform(1)` ate two rows of
|
||||
// each sidebar panel — for compact terminals where Plan/Todos/Tasks
|
||||
@@ -147,7 +147,7 @@ mod tests {
|
||||
let theme = Theme::dark();
|
||||
assert_eq!(theme.variant, Variant::Dark);
|
||||
assert_eq!(theme.section_border_color, palette::BORDER_COLOR);
|
||||
assert_eq!(theme.section_bg, ratatui::style::Color::Reset);
|
||||
assert_eq!(theme.section_bg, palette::DEEPSEEK_INK);
|
||||
assert_eq!(theme.section_title_color, palette::DEEPSEEK_BLUE);
|
||||
assert_eq!(theme.tool_title_color, palette::TEXT_SOFT);
|
||||
assert_eq!(theme.tool_value_color, palette::TEXT_MUTED);
|
||||
|
||||
@@ -109,6 +109,7 @@ pub struct UiTheme {
|
||||
pub composer_bg: Color,
|
||||
pub selection_bg: Color,
|
||||
pub header_bg: Color,
|
||||
pub footer_bg: Color,
|
||||
/// Statusline mode colors (agent/yolo/plan)
|
||||
pub mode_agent: Color,
|
||||
pub mode_yolo: Color,
|
||||
@@ -128,6 +129,7 @@ pub const UI_THEME: UiTheme = UiTheme {
|
||||
composer_bg: DEEPSEEK_SLATE,
|
||||
selection_bg: SELECTION_BG,
|
||||
header_bg: DEEPSEEK_INK,
|
||||
footer_bg: DEEPSEEK_INK,
|
||||
mode_agent: MODE_AGENT,
|
||||
mode_yolo: MODE_YOLO,
|
||||
mode_plan: MODE_PLAN,
|
||||
|
||||
@@ -54,6 +54,13 @@ node_modules/
|
||||
target/
|
||||
dist/
|
||||
build/
|
||||
.build/
|
||||
.next/
|
||||
.nuxt/
|
||||
.svelte-kit/
|
||||
.turbo/
|
||||
.parcel-cache/
|
||||
vendor/
|
||||
.cargo/
|
||||
.rustup/
|
||||
.npm/
|
||||
@@ -63,6 +70,7 @@ build/
|
||||
.cache/
|
||||
.venv/
|
||||
venv/
|
||||
.tox/
|
||||
__pycache__/
|
||||
*.pyc
|
||||
.mypy_cache/
|
||||
@@ -72,6 +80,39 @@ __pycache__/
|
||||
.m2/
|
||||
.local/
|
||||
.DS_Store
|
||||
|
||||
# Binary and generated artifacts. Snapshots are source rollback checkpoints,
|
||||
# not a full binary backup; keeping these out avoids side-repo bloat.
|
||||
*.exe
|
||||
*.dll
|
||||
*.so
|
||||
*.dylib
|
||||
*.wasm
|
||||
*.o
|
||||
*.obj
|
||||
*.class
|
||||
*.pdb
|
||||
*.dSYM
|
||||
*.zip
|
||||
*.tar
|
||||
*.tar.gz
|
||||
*.tgz
|
||||
*.tar.bz2
|
||||
*.tar.xz
|
||||
*.7z
|
||||
*.rar
|
||||
*.iso
|
||||
*.dmg
|
||||
*.bin
|
||||
*.mp4
|
||||
*.mov
|
||||
*.mkv
|
||||
*.avi
|
||||
*.webm
|
||||
*.mp3
|
||||
*.wav
|
||||
*.flac
|
||||
*.aac
|
||||
";
|
||||
|
||||
impl SnapshotRepo {
|
||||
@@ -788,16 +829,21 @@ mod tests {
|
||||
let tmp = tempdir().unwrap();
|
||||
let (repo, _home) = make_repo(tmp.path());
|
||||
std::fs::create_dir_all(repo.work_tree().join("node_modules/pkg")).unwrap();
|
||||
std::fs::create_dir_all(repo.work_tree().join(".next/cache")).unwrap();
|
||||
std::fs::create_dir_all(repo.work_tree().join("src")).unwrap();
|
||||
std::fs::write(
|
||||
repo.work_tree().join("node_modules/pkg/index.js"),
|
||||
b"generated",
|
||||
)
|
||||
.unwrap();
|
||||
std::fs::write(repo.work_tree().join(".next/cache/chunk.bin"), b"generated").unwrap();
|
||||
std::fs::write(repo.work_tree().join("debug.wasm"), b"binary").unwrap();
|
||||
std::fs::write(repo.work_tree().join("src/main.rs"), b"fn main() {}").unwrap();
|
||||
|
||||
let excludes = std::fs::read_to_string(repo.git_dir().join("info/exclude")).unwrap();
|
||||
assert!(excludes.contains("node_modules/"));
|
||||
assert!(excludes.contains(".next/"));
|
||||
assert!(excludes.contains("*.wasm"));
|
||||
|
||||
let id = repo.snapshot("pre-turn:1").expect("snapshot");
|
||||
let ls = run_git(
|
||||
@@ -815,6 +861,14 @@ mod tests {
|
||||
!names.contains("node_modules"),
|
||||
"node_modules should not be in snapshot: {names}",
|
||||
);
|
||||
assert!(
|
||||
!names.contains(".next"),
|
||||
".next should not be in snapshot: {names}",
|
||||
);
|
||||
assert!(
|
||||
!names.contains("debug.wasm"),
|
||||
"binary artifacts should not be in snapshot: {names}",
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -7,13 +7,13 @@
|
||||
//! # Mental model
|
||||
//!
|
||||
//! Two gears:
|
||||
//! - [`ChunkingMode::Smooth`]: drain one line per commit tick (steady pacing).
|
||||
//! - [`ChunkingMode::CatchUp`]: drain the entire queued backlog while pressure exists.
|
||||
//! - [`ChunkingMode::Smooth`]: drain one display chunk per commit tick (steady pacing).
|
||||
//! - [`ChunkingMode::CatchUp`]: drain a bounded burst while pressure exists.
|
||||
//!
|
||||
//! # Hysteresis
|
||||
//!
|
||||
//! - Enter `CatchUp` when `queued_lines >= ENTER_QUEUE_DEPTH_LINES` OR
|
||||
//! the oldest queued line is at least [`ENTER_OLDEST_AGE`].
|
||||
//! the oldest queued chunk is at least [`ENTER_OLDEST_AGE`].
|
||||
//! - Exit `CatchUp` only after pressure stays below [`EXIT_QUEUE_DEPTH_LINES`]
|
||||
//! AND [`EXIT_OLDEST_AGE`] for at least [`EXIT_HOLD`].
|
||||
//! - After exit, suppress immediate re-entry for [`REENTER_CATCH_UP_HOLD`]
|
||||
@@ -24,16 +24,16 @@ use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
/// Queue-depth threshold that allows entering catch-up mode.
|
||||
pub(crate) const ENTER_QUEUE_DEPTH_LINES: usize = 8;
|
||||
pub(crate) const ENTER_QUEUE_DEPTH_LINES: usize = 160;
|
||||
|
||||
/// Oldest-line age threshold that allows entering catch-up mode.
|
||||
pub(crate) const ENTER_OLDEST_AGE: Duration = Duration::from_millis(120);
|
||||
/// Oldest-chunk age threshold that allows entering catch-up mode.
|
||||
pub(crate) const ENTER_OLDEST_AGE: Duration = Duration::from_millis(1_200);
|
||||
|
||||
/// Queue-depth threshold used when evaluating catch-up exit hysteresis.
|
||||
pub(crate) const EXIT_QUEUE_DEPTH_LINES: usize = 2;
|
||||
pub(crate) const EXIT_QUEUE_DEPTH_LINES: usize = 32;
|
||||
|
||||
/// Oldest-line age threshold used when evaluating catch-up exit hysteresis.
|
||||
pub(crate) const EXIT_OLDEST_AGE: Duration = Duration::from_millis(40);
|
||||
/// Oldest-chunk age threshold used when evaluating catch-up exit hysteresis.
|
||||
pub(crate) const EXIT_OLDEST_AGE: Duration = Duration::from_millis(300);
|
||||
|
||||
/// Minimum duration queue pressure must stay below exit thresholds to leave catch-up mode.
|
||||
pub(crate) const EXIT_HOLD: Duration = Duration::from_millis(250);
|
||||
@@ -42,14 +42,14 @@ pub(crate) const EXIT_HOLD: Duration = Duration::from_millis(250);
|
||||
pub(crate) const REENTER_CATCH_UP_HOLD: Duration = Duration::from_millis(250);
|
||||
|
||||
/// Queue-depth cutoff that marks backlog as severe (bypasses re-entry hold).
|
||||
pub(crate) const SEVERE_QUEUE_DEPTH_LINES: usize = 64;
|
||||
pub(crate) const SEVERE_QUEUE_DEPTH_LINES: usize = 640;
|
||||
|
||||
/// Oldest-line age cutoff that marks backlog as severe.
|
||||
pub(crate) const SEVERE_OLDEST_AGE: Duration = Duration::from_millis(300);
|
||||
pub(crate) const SEVERE_OLDEST_AGE: Duration = Duration::from_millis(4_000);
|
||||
|
||||
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
|
||||
pub enum ChunkingMode {
|
||||
/// Drain one line per baseline commit tick.
|
||||
/// Drain one display chunk per baseline commit tick.
|
||||
#[default]
|
||||
Smooth,
|
||||
/// Drain the queued backlog according to queue pressure.
|
||||
@@ -59,9 +59,9 @@ pub enum ChunkingMode {
|
||||
/// Captures queue pressure inputs used by adaptive chunking decisions.
|
||||
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
|
||||
pub struct QueueSnapshot {
|
||||
/// Number of queued stream lines waiting to be displayed.
|
||||
/// Number of queued stream chunks waiting to be displayed.
|
||||
pub queued_lines: usize,
|
||||
/// Age of the oldest queued line at decision time.
|
||||
/// Age of the oldest queued chunk at decision time.
|
||||
pub oldest_age: Option<Duration>,
|
||||
}
|
||||
|
||||
@@ -272,33 +272,39 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn eight_line_burst_flips_to_catch_up_and_drains_full_backlog() {
|
||||
// A burst of 8 queued lines crosses ENTER_QUEUE_DEPTH_LINES exactly.
|
||||
fn deep_burst_flips_to_catch_up_and_drains_backlog() {
|
||||
// A burst crossing ENTER_QUEUE_DEPTH_LINES enters CatchUp. With
|
||||
// single-grapheme chunks, the threshold stays high enough that
|
||||
// ordinary prose still drips in visibly before catch-up engages.
|
||||
// The policy should enter `CatchUp` and request a Batch drain matching
|
||||
// the queue depth.
|
||||
let mut policy = AdaptiveChunkingPolicy::new();
|
||||
let now = Instant::now();
|
||||
|
||||
let decision = policy.decide(snap(8, 10), now);
|
||||
let decision = policy.decide(snap(ENTER_QUEUE_DEPTH_LINES, 10), now);
|
||||
assert_eq!(decision.mode, ChunkingMode::CatchUp);
|
||||
assert!(decision.entered_catch_up);
|
||||
assert_eq!(decision.drain_plan, DrainPlan::Batch(8));
|
||||
assert_eq!(
|
||||
decision.drain_plan,
|
||||
DrainPlan::Batch(ENTER_QUEUE_DEPTH_LINES)
|
||||
);
|
||||
|
||||
// Larger backlog requested next tick: still CatchUp, batch grows to match.
|
||||
let decision = policy.decide(snap(20, 30), now + Duration::from_millis(10));
|
||||
let larger_backlog = ENTER_QUEUE_DEPTH_LINES + 80;
|
||||
let decision = policy.decide(snap(larger_backlog, 30), now + Duration::from_millis(10));
|
||||
assert_eq!(decision.mode, ChunkingMode::CatchUp);
|
||||
assert!(!decision.entered_catch_up, "no second transition signal");
|
||||
assert_eq!(decision.drain_plan, DrainPlan::Batch(20));
|
||||
assert_eq!(decision.drain_plan, DrainPlan::Batch(larger_backlog));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn age_threshold_alone_triggers_catch_up() {
|
||||
// Queue depth is small, but the oldest line has been waiting >=120 ms.
|
||||
// Queue depth is small, but the oldest chunk has crossed the age threshold.
|
||||
// Either condition is sufficient to enter catch-up.
|
||||
let mut policy = AdaptiveChunkingPolicy::new();
|
||||
let now = Instant::now();
|
||||
|
||||
let decision = policy.decide(snap(2, 120), now);
|
||||
let decision = policy.decide(snap(2, ENTER_OLDEST_AGE.as_millis() as u64), now);
|
||||
assert_eq!(decision.mode, ChunkingMode::CatchUp);
|
||||
assert!(decision.entered_catch_up);
|
||||
assert_eq!(decision.drain_plan, DrainPlan::Batch(2));
|
||||
@@ -311,20 +317,29 @@ mod tests {
|
||||
let mut policy = AdaptiveChunkingPolicy::new();
|
||||
let t0 = Instant::now();
|
||||
|
||||
let _ = policy.decide(snap(10, 20), t0);
|
||||
let _ = policy.decide(snap(ENTER_QUEUE_DEPTH_LINES, 20), t0);
|
||||
assert_eq!(policy.mode(), ChunkingMode::CatchUp);
|
||||
|
||||
// Pressure drops to (2 lines, 40 ms) — at the exit thresholds.
|
||||
// Pressure drops to the exit thresholds.
|
||||
// Hold begins; not yet 250ms.
|
||||
let pre_hold = policy.decide(snap(2, 40), t0 + Duration::from_millis(50));
|
||||
let pre_hold = policy.decide(
|
||||
snap(EXIT_QUEUE_DEPTH_LINES, EXIT_OLDEST_AGE.as_millis() as u64),
|
||||
t0 + Duration::from_millis(50),
|
||||
);
|
||||
assert_eq!(pre_hold.mode, ChunkingMode::CatchUp);
|
||||
|
||||
// Still under hold.
|
||||
let mid_hold = policy.decide(snap(2, 40), t0 + Duration::from_millis(200));
|
||||
let mid_hold = policy.decide(
|
||||
snap(EXIT_QUEUE_DEPTH_LINES, EXIT_OLDEST_AGE.as_millis() as u64),
|
||||
t0 + Duration::from_millis(200),
|
||||
);
|
||||
assert_eq!(mid_hold.mode, ChunkingMode::CatchUp);
|
||||
|
||||
// Past EXIT_HOLD (250 ms) → return to Smooth.
|
||||
let post_hold = policy.decide(snap(2, 40), t0 + Duration::from_millis(320));
|
||||
let post_hold = policy.decide(
|
||||
snap(EXIT_QUEUE_DEPTH_LINES, EXIT_OLDEST_AGE.as_millis() as u64),
|
||||
t0 + Duration::from_millis(320),
|
||||
);
|
||||
assert_eq!(post_hold.mode, ChunkingMode::Smooth);
|
||||
assert_eq!(post_hold.drain_plan, DrainPlan::Single);
|
||||
}
|
||||
@@ -335,7 +350,7 @@ mod tests {
|
||||
let mut policy = AdaptiveChunkingPolicy::new();
|
||||
let now = Instant::now();
|
||||
|
||||
let _ = policy.decide(snap(10, 20), now);
|
||||
let _ = policy.decide(snap(ENTER_QUEUE_DEPTH_LINES, 20), now);
|
||||
assert_eq!(policy.mode(), ChunkingMode::CatchUp);
|
||||
|
||||
let decision = policy.decide(empty_snap(), now + Duration::from_millis(10));
|
||||
@@ -345,38 +360,53 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn reentry_hold_blocks_immediate_flip_back() {
|
||||
// After exiting CatchUp via idle, an 8-line burst that arrives within
|
||||
// After exiting CatchUp via idle, a threshold-sized burst that arrives within
|
||||
// the re-entry hold window should not immediately re-enter CatchUp.
|
||||
let mut policy = AdaptiveChunkingPolicy::new();
|
||||
let t0 = Instant::now();
|
||||
|
||||
let _ = policy.decide(snap(10, 20), t0);
|
||||
let _ = policy.decide(snap(ENTER_QUEUE_DEPTH_LINES, 20), t0);
|
||||
let _ = policy.decide(empty_snap(), t0 + Duration::from_millis(10));
|
||||
|
||||
// Within REENTER_CATCH_UP_HOLD (250 ms): hold blocks re-entry.
|
||||
let held = policy.decide(snap(8, 20), t0 + Duration::from_millis(100));
|
||||
let held = policy.decide(
|
||||
snap(ENTER_QUEUE_DEPTH_LINES, 20),
|
||||
t0 + Duration::from_millis(100),
|
||||
);
|
||||
assert_eq!(held.mode, ChunkingMode::Smooth);
|
||||
assert_eq!(held.drain_plan, DrainPlan::Single);
|
||||
|
||||
// Past the hold: re-entry permitted.
|
||||
let reentered = policy.decide(snap(8, 20), t0 + Duration::from_millis(400));
|
||||
let reentered = policy.decide(
|
||||
snap(ENTER_QUEUE_DEPTH_LINES, 20),
|
||||
t0 + Duration::from_millis(400),
|
||||
);
|
||||
assert_eq!(reentered.mode, ChunkingMode::CatchUp);
|
||||
assert_eq!(reentered.drain_plan, DrainPlan::Batch(8));
|
||||
assert_eq!(
|
||||
reentered.drain_plan,
|
||||
DrainPlan::Batch(ENTER_QUEUE_DEPTH_LINES)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn severe_backlog_bypasses_reentry_hold() {
|
||||
// Even within the hold window, a "severe" backlog (>=64 lines) bypasses
|
||||
// Even within the hold window, a "severe" backlog bypasses
|
||||
// the gate so display lag doesn't unbounded-grow.
|
||||
let mut policy = AdaptiveChunkingPolicy::new();
|
||||
let t0 = Instant::now();
|
||||
|
||||
let _ = policy.decide(snap(10, 20), t0);
|
||||
let _ = policy.decide(snap(ENTER_QUEUE_DEPTH_LINES, 20), t0);
|
||||
let _ = policy.decide(empty_snap(), t0 + Duration::from_millis(10));
|
||||
|
||||
let severe = policy.decide(snap(64, 20), t0 + Duration::from_millis(100));
|
||||
let severe = policy.decide(
|
||||
snap(SEVERE_QUEUE_DEPTH_LINES, 20),
|
||||
t0 + Duration::from_millis(100),
|
||||
);
|
||||
assert_eq!(severe.mode, ChunkingMode::CatchUp);
|
||||
assert_eq!(severe.drain_plan, DrainPlan::Batch(64));
|
||||
assert_eq!(
|
||||
severe.drain_plan,
|
||||
DrainPlan::Batch(SEVERE_QUEUE_DEPTH_LINES)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -386,19 +416,28 @@ mod tests {
|
||||
let t0 = Instant::now();
|
||||
|
||||
// Queue depth far above ENTER threshold.
|
||||
let d1 = policy.decide(snap(20, 10), t0);
|
||||
let d1 = policy.decide(snap(ENTER_QUEUE_DEPTH_LINES + 80, 10), t0);
|
||||
assert_eq!(d1.mode, ChunkingMode::Smooth);
|
||||
assert!(!d1.entered_catch_up);
|
||||
assert_eq!(d1.drain_plan, DrainPlan::Single);
|
||||
|
||||
// Oldest age far above ENTER threshold.
|
||||
let d2 = policy.decide(snap(5, 500), t0 + Duration::from_millis(100));
|
||||
let d2 = policy.decide(
|
||||
snap(5, ENTER_OLDEST_AGE.as_millis() as u64),
|
||||
t0 + Duration::from_millis(100),
|
||||
);
|
||||
assert_eq!(d2.mode, ChunkingMode::Smooth);
|
||||
assert!(!d2.entered_catch_up);
|
||||
assert_eq!(d2.drain_plan, DrainPlan::Single);
|
||||
|
||||
// Severe backlog — still Smooth.
|
||||
let d3 = policy.decide(snap(80, 500), t0 + Duration::from_millis(200));
|
||||
let d3 = policy.decide(
|
||||
snap(
|
||||
SEVERE_QUEUE_DEPTH_LINES + 80,
|
||||
SEVERE_OLDEST_AGE.as_millis() as u64,
|
||||
),
|
||||
t0 + Duration::from_millis(200),
|
||||
);
|
||||
assert_eq!(d3.mode, ChunkingMode::Smooth);
|
||||
assert_eq!(d3.drain_plan, DrainPlan::Single);
|
||||
}
|
||||
@@ -410,14 +449,20 @@ mod tests {
|
||||
let t0 = Instant::now();
|
||||
|
||||
// Low motion blocks catch-up.
|
||||
let d1 = policy.decide(snap(20, 10), t0);
|
||||
let d1 = policy.decide(snap(ENTER_QUEUE_DEPTH_LINES + 80, 10), t0);
|
||||
assert_eq!(d1.mode, ChunkingMode::Smooth);
|
||||
|
||||
// Turn off low motion — next burst should enter CatchUp.
|
||||
policy.set_low_motion(false);
|
||||
let d2 = policy.decide(snap(20, 10), t0 + Duration::from_millis(10));
|
||||
let d2 = policy.decide(
|
||||
snap(ENTER_QUEUE_DEPTH_LINES + 80, 10),
|
||||
t0 + Duration::from_millis(10),
|
||||
);
|
||||
assert_eq!(d2.mode, ChunkingMode::CatchUp);
|
||||
assert!(d2.entered_catch_up);
|
||||
assert_eq!(d2.drain_plan, DrainPlan::Batch(20));
|
||||
assert_eq!(
|
||||
d2.drain_plan,
|
||||
DrainPlan::Batch(ENTER_QUEUE_DEPTH_LINES + 80)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
//! Commit-tick scheduler that drains a stream chunker according to policy.
|
||||
//!
|
||||
//! Bridges [`AdaptiveChunkingPolicy`] with a concrete [`StreamChunker`] queue.
|
||||
//! Callers feed raw text deltas via [`StreamChunker::push_delta`] (only
|
||||
//! newline-terminated text becomes queued lines), then call
|
||||
//! [`run_commit_tick`] on every commit beat to obtain the text safe to flush
|
||||
//! to the transcript on this beat.
|
||||
//! Callers feed raw text deltas via [`StreamChunker::push_delta`], then call
|
||||
//! [`run_commit_tick`] on every commit beat to obtain the next small text
|
||||
//! slice to flush to the transcript on this beat.
|
||||
//!
|
||||
//! The chunker is the unit of streaming — one per active block (assistant /
|
||||
//! thinking). Tool output is unbuffered and bypasses this path.
|
||||
@@ -13,27 +12,29 @@ use std::collections::VecDeque;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use unicode_segmentation::UnicodeSegmentation;
|
||||
|
||||
use super::chunking::AdaptiveChunkingPolicy;
|
||||
use super::chunking::ChunkingDecision;
|
||||
use super::chunking::DrainPlan;
|
||||
use super::chunking::QueueSnapshot;
|
||||
|
||||
/// Buffers raw stream deltas and emits committed text in line units.
|
||||
///
|
||||
/// Only the substring up to the *last* `\n` is committed; trailing partial
|
||||
/// content stays in the buffer. This is what protects partial code fences
|
||||
/// (` ``` `) and other line-sensitive markdown from rendering mid-state.
|
||||
const GRAPHEMES_PER_MICRO_CHUNK: usize = 1;
|
||||
const CATCH_UP_MAX_MICRO_CHUNKS: usize = 12;
|
||||
|
||||
/// Buffers raw stream deltas and emits committed text in small display chunks.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct StreamChunker {
|
||||
/// Bytes received but not yet split into a complete line.
|
||||
/// Bytes received but not yet split into display chunks. Normally empty;
|
||||
/// retained so `drain_remaining` has a lossless place to pull from if we
|
||||
/// ever decide to hold a tail for a future markdown-sensitive mode.
|
||||
pending: String,
|
||||
/// Complete lines waiting to be flushed to the transcript.
|
||||
/// Each entry preserves its trailing `\n` so reassembly is lossless.
|
||||
queue: VecDeque<QueuedLine>,
|
||||
/// Small grapheme-aligned chunks waiting to be flushed to the transcript.
|
||||
queue: VecDeque<QueuedChunk>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct QueuedLine {
|
||||
struct QueuedChunk {
|
||||
text: String,
|
||||
enqueued_at: Instant,
|
||||
}
|
||||
@@ -43,28 +44,21 @@ impl StreamChunker {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Append a raw model delta. Returns whether at least one new line was queued.
|
||||
/// Append a raw model delta. Returns whether at least one new display chunk was queued.
|
||||
pub fn push_delta(&mut self, delta: &str) -> bool {
|
||||
if delta.is_empty() {
|
||||
return false;
|
||||
}
|
||||
self.pending.push_str(delta);
|
||||
|
||||
let Some(last_nl) = self.pending.rfind('\n') else {
|
||||
return false;
|
||||
};
|
||||
|
||||
// Drain everything up to and including the last newline into queued lines.
|
||||
// Splitting by line keeps the chunker source-agnostic and lets the policy
|
||||
// count "lines waiting" without peeking at text content.
|
||||
let now = Instant::now();
|
||||
let committed: String = self.pending.drain(..=last_nl).collect();
|
||||
let committed = std::mem::take(&mut self.pending);
|
||||
let mut produced = false;
|
||||
for chunk in split_lines_keep_terminator(&committed) {
|
||||
for chunk in split_into_micro_chunks(&committed) {
|
||||
if chunk.is_empty() {
|
||||
continue;
|
||||
}
|
||||
self.queue.push_back(QueuedLine {
|
||||
self.queue.push_back(QueuedChunk {
|
||||
text: chunk,
|
||||
enqueued_at: now,
|
||||
});
|
||||
@@ -73,12 +67,12 @@ impl StreamChunker {
|
||||
produced
|
||||
}
|
||||
|
||||
/// Number of complete lines currently queued for commit.
|
||||
/// Number of display chunks currently queued for commit.
|
||||
pub fn queued_lines(&self) -> usize {
|
||||
self.queue.len()
|
||||
}
|
||||
|
||||
/// Age of the oldest queued line, if any.
|
||||
/// Age of the oldest queued chunk, if any.
|
||||
pub fn oldest_queued_age(&self, now: Instant) -> Option<Duration> {
|
||||
self.queue
|
||||
.front()
|
||||
@@ -98,7 +92,7 @@ impl StreamChunker {
|
||||
}
|
||||
}
|
||||
|
||||
/// Drain `max_lines` complete lines and return them as concatenated text.
|
||||
/// Drain `max_lines` queued chunks and return them as concatenated text.
|
||||
pub fn drain_lines(&mut self, max_lines: usize) -> String {
|
||||
let n = max_lines.min(self.queue.len());
|
||||
let mut out = String::new();
|
||||
@@ -159,7 +153,7 @@ pub fn run_commit_tick(
|
||||
|
||||
let max = match decision.drain_plan {
|
||||
DrainPlan::Single => 1,
|
||||
DrainPlan::Batch(n) => n,
|
||||
DrainPlan::Batch(n) => n.min(CATCH_UP_MAX_MICRO_CHUNKS),
|
||||
};
|
||||
|
||||
// Drain through the chunker; an empty queue under Smooth produces "".
|
||||
@@ -172,24 +166,28 @@ pub fn run_commit_tick(
|
||||
}
|
||||
}
|
||||
|
||||
/// Split text into chunks, preserving each terminator. The final chunk is
|
||||
/// included only if it ends with `\n` (this is enforced upstream in
|
||||
/// `push_delta`, which only drains up through the last newline).
|
||||
fn split_lines_keep_terminator(text: &str) -> Vec<String> {
|
||||
/// Split text into grapheme-aligned chunks. Newlines force a boundary so
|
||||
/// markdown layout still settles quickly, but prose no longer waits for a full
|
||||
/// line before becoming visible.
|
||||
fn split_into_micro_chunks(text: &str) -> Vec<String> {
|
||||
let mut out = Vec::new();
|
||||
let mut start = 0;
|
||||
let bytes = text.as_bytes();
|
||||
for (i, &b) in bytes.iter().enumerate() {
|
||||
if b == b'\n' {
|
||||
out.push(text[start..=i].to_string());
|
||||
start = i + 1;
|
||||
let mut current = String::new();
|
||||
let mut graphemes = 0usize;
|
||||
|
||||
for grapheme in UnicodeSegmentation::graphemes(text, true) {
|
||||
current.push_str(grapheme);
|
||||
graphemes += 1;
|
||||
|
||||
if grapheme == "\n" || graphemes >= GRAPHEMES_PER_MICRO_CHUNK {
|
||||
out.push(std::mem::take(&mut current));
|
||||
graphemes = 0;
|
||||
}
|
||||
}
|
||||
if start < text.len() {
|
||||
// This branch is unreachable for inputs produced by `push_delta`,
|
||||
// but stays defensive for direct callers.
|
||||
out.push(text[start..].to_string());
|
||||
|
||||
if !current.is_empty() {
|
||||
out.push(current);
|
||||
}
|
||||
|
||||
out
|
||||
}
|
||||
|
||||
@@ -199,59 +197,69 @@ mod tests {
|
||||
use crate::tui::streaming::chunking::ChunkingMode;
|
||||
|
||||
#[test]
|
||||
fn partial_code_fence_is_held_until_newline() {
|
||||
// Without the chunker, a stray ``` arriving mid-stream could render as
|
||||
// an opened fence. The chunker must not commit anything until the
|
||||
// line is terminated by `\n`.
|
||||
fn prose_streams_before_newline() {
|
||||
let mut chunker = StreamChunker::new();
|
||||
let mut policy = AdaptiveChunkingPolicy::new();
|
||||
let now = Instant::now();
|
||||
|
||||
// Partial fence + content; no newline → nothing committed yet.
|
||||
chunker.push_delta("Here is code:\n");
|
||||
chunker.push_delta("```");
|
||||
chunker.push_delta("hello world");
|
||||
let out = run_commit_tick(&mut policy, &mut chunker, now);
|
||||
assert_eq!(out.committed_text, "Here is code:\n");
|
||||
assert!(!chunker.is_idle(), "partial fence still buffered");
|
||||
assert_eq!(out.committed_text, "h");
|
||||
assert!(!chunker.is_idle(), "remaining prose should keep dripping");
|
||||
|
||||
// Close the fence line.
|
||||
chunker.push_delta("rust\n");
|
||||
let out = run_commit_tick(&mut policy, &mut chunker, now + Duration::from_millis(5));
|
||||
assert_eq!(out.committed_text, "```rust\n");
|
||||
assert_eq!(out.committed_text, "e");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smooth_burst_emits_one_line_per_tick() {
|
||||
fn smooth_burst_emits_one_micro_chunk_per_tick() {
|
||||
let mut chunker = StreamChunker::new();
|
||||
let mut policy = AdaptiveChunkingPolicy::new();
|
||||
let t0 = Instant::now();
|
||||
|
||||
chunker.push_delta("a\nb\nc\n");
|
||||
// Each tick under Smooth pulls exactly one line.
|
||||
chunker.push_delta("abc");
|
||||
// Each tick under Smooth pulls exactly one grapheme.
|
||||
let out1 = run_commit_tick(&mut policy, &mut chunker, t0);
|
||||
assert_eq!(out1.decision.mode, ChunkingMode::Smooth);
|
||||
assert_eq!(out1.committed_text, "a\n");
|
||||
assert_eq!(out1.committed_text, "a");
|
||||
let out2 = run_commit_tick(&mut policy, &mut chunker, t0 + Duration::from_millis(20));
|
||||
assert_eq!(out2.committed_text, "b\n");
|
||||
assert_eq!(out2.committed_text, "b");
|
||||
let out3 = run_commit_tick(&mut policy, &mut chunker, t0 + Duration::from_millis(40));
|
||||
assert_eq!(out3.committed_text, "c\n");
|
||||
assert_eq!(out3.committed_text, "c");
|
||||
assert!(out3.is_idle);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn large_burst_drains_in_catch_up() {
|
||||
// Eight lines arriving "at once" must trigger CatchUp on the first
|
||||
// commit tick and drain the full backlog in one go.
|
||||
fn smooth_stream_keeps_combining_marks_with_base_letter() {
|
||||
let mut chunker = StreamChunker::new();
|
||||
let mut policy = AdaptiveChunkingPolicy::new();
|
||||
let t0 = Instant::now();
|
||||
|
||||
chunker.push_delta("e\u{301}x");
|
||||
let out1 = run_commit_tick(&mut policy, &mut chunker, t0);
|
||||
assert_eq!(out1.committed_text, "e\u{301}");
|
||||
let out2 = run_commit_tick(&mut policy, &mut chunker, t0 + Duration::from_millis(20));
|
||||
assert_eq!(out2.committed_text, "x");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn large_burst_drains_in_catch_up_without_full_jump() {
|
||||
// A large text burst arriving "at once" must trigger CatchUp on the first
|
||||
// commit tick without dumping the full backlog in one jump.
|
||||
let mut chunker = StreamChunker::new();
|
||||
let mut policy = AdaptiveChunkingPolicy::new();
|
||||
let now = Instant::now();
|
||||
|
||||
let burst = "1\n2\n3\n4\n5\n6\n7\n8\n";
|
||||
chunker.push_delta(burst);
|
||||
let burst = "abcdefghijklmnopqrstuvwxyz".repeat(8);
|
||||
let expected_prefix: String = burst
|
||||
.chars()
|
||||
.take(CATCH_UP_MAX_MICRO_CHUNKS * GRAPHEMES_PER_MICRO_CHUNK)
|
||||
.collect();
|
||||
chunker.push_delta(&burst);
|
||||
let out = run_commit_tick(&mut policy, &mut chunker, now);
|
||||
assert_eq!(out.decision.mode, ChunkingMode::CatchUp);
|
||||
assert_eq!(out.committed_text, burst);
|
||||
assert!(out.is_idle);
|
||||
assert_eq!(out.committed_text, expected_prefix);
|
||||
assert!(!out.is_idle);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
#![allow(dead_code)]
|
||||
|
||||
//! Markdown stream collector for newline-gated rendering.
|
||||
//! Markdown stream collector for live micro-chunk rendering.
|
||||
//!
|
||||
//! This module implements the pattern from codex-rs where:
|
||||
//! - Streaming text is buffered until a newline is reached
|
||||
//! - Only complete lines are committed to the UI
|
||||
//! - This prevents visual flashing of partial words
|
||||
//! - Streaming text is split into small grapheme-aligned chunks
|
||||
//! - Commit ticks drip chunks into the transcript between provider deltas
|
||||
//! - Final content is emitted when the stream ends
|
||||
|
||||
use ratatui::style::{Modifier, Style};
|
||||
@@ -221,19 +220,17 @@ fn wrap_line(line: &str, width: usize) -> Vec<String> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Per-block streaming substate: line-buffer (newline gate) feeding a
|
||||
/// collector + chunker/policy for two-gear pacing.
|
||||
/// Per-block streaming substate: optional line-buffer feeding a collector +
|
||||
/// chunker/policy for two-gear pacing.
|
||||
///
|
||||
/// Pipeline:
|
||||
/// ```text
|
||||
/// raw delta -> LineBuffer.push -> take_committable -> collector + chunker -> commit tick
|
||||
/// ```
|
||||
///
|
||||
/// The [`LineBuffer`] is upstream of the collector and chunker. It guarantees
|
||||
/// that no partial multi-character markdown (e.g. an unfinished ``` fence)
|
||||
/// reaches downstream consumers between deltas. Thinking blocks bypass the
|
||||
/// gate because thinking is rendered live for responsiveness — its content
|
||||
/// often arrives without newlines until a full paragraph is composed.
|
||||
/// The [`LineBuffer`] remains available for line-sensitive modes. Normal
|
||||
/// assistant prose and thinking blocks bypass it so text can stream in live
|
||||
/// micro-chunks instead of waiting for newline boundaries.
|
||||
#[derive(Debug, Default)]
|
||||
struct BlockState {
|
||||
/// Newline gate: holds back trailing partial-line text between deltas.
|
||||
@@ -265,14 +262,14 @@ impl StreamingState {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Start a new text block. Assistant text is subject to the newline gate
|
||||
/// so partial code fences and other line-sensitive markdown can never
|
||||
/// briefly appear between deltas.
|
||||
/// Start a new text block. Assistant prose streams live in micro-chunks so
|
||||
/// users can visually track the answer as it forms instead of waiting for
|
||||
/// a newline-terminated line.
|
||||
pub fn start_text(&mut self, index: usize, width: Option<usize>) {
|
||||
self.ensure_capacity(index);
|
||||
self.blocks[index] = Some(BlockState {
|
||||
line_buffer: LineBuffer::new(),
|
||||
bypass_gate: false,
|
||||
bypass_gate: true,
|
||||
collector: MarkdownStreamCollector::new(width, false),
|
||||
chunker: StreamChunker::new(),
|
||||
policy: AdaptiveChunkingPolicy::new(),
|
||||
@@ -298,10 +295,8 @@ impl StreamingState {
|
||||
|
||||
/// Push content to a block. Routing depends on the block kind:
|
||||
///
|
||||
/// - Assistant text blocks: incoming bytes go through [`LineBuffer`]
|
||||
/// first, so only newline-terminated prefixes reach the collector and
|
||||
/// chunker. This is what protects partial code fences and other
|
||||
/// line-sensitive markdown from briefly appearing between deltas.
|
||||
/// - Assistant text blocks: incoming bytes normally bypass [`LineBuffer`]
|
||||
/// and are split into small display chunks downstream.
|
||||
/// - Thinking blocks: bytes bypass the gate and go straight to the
|
||||
/// collector/chunker so reasoning stays visually live (long thoughts
|
||||
/// often have no intermediate newlines).
|
||||
@@ -333,14 +328,14 @@ impl StreamingState {
|
||||
return;
|
||||
}
|
||||
|
||||
block.collector.push(&downstream);
|
||||
// The collector's own newline-gating is now redundant for gated
|
||||
// blocks (LineBuffer already enforces it), but we keep the same
|
||||
// call shape so the collector's bookkeeping (committed_line_count)
|
||||
// stays consistent and the bypass path still benefits from it.
|
||||
let committed = block.collector.commit_complete_text();
|
||||
if !committed.is_empty() {
|
||||
block.chunker.push_delta(&committed);
|
||||
if block.bypass_gate {
|
||||
block.chunker.push_delta(&downstream);
|
||||
} else {
|
||||
block.collector.push(&downstream);
|
||||
let committed = block.collector.commit_complete_text();
|
||||
if !committed.is_empty() {
|
||||
block.chunker.push_delta(&committed);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -556,4 +551,36 @@ mod tests {
|
||||
let result = wrap_line("This is a long line that should be wrapped", 20);
|
||||
assert!(result.len() > 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn assistant_text_streams_before_newline() {
|
||||
let mut state = StreamingState::new();
|
||||
state.start_text(0, None);
|
||||
state.push_content(0, "hello world");
|
||||
|
||||
assert_eq!(state.commit_text(0), "h");
|
||||
assert_eq!(state.commit_text(0), "e");
|
||||
assert!(state.has_pending_chunker_lines(0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn thinking_text_streams_before_newline() {
|
||||
let mut state = StreamingState::new();
|
||||
state.start_thinking(0, None);
|
||||
state.push_content(0, "thinking deeply");
|
||||
|
||||
assert_eq!(state.commit_text(0), "t");
|
||||
assert_eq!(state.commit_text(0), "h");
|
||||
assert!(state.has_pending_chunker_lines(0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn finalize_preserves_uncommitted_micro_chunks() {
|
||||
let mut state = StreamingState::new();
|
||||
state.start_text(0, None);
|
||||
state.push_content(0, "abc");
|
||||
assert_eq!(state.commit_text(0), "a");
|
||||
|
||||
assert_eq!(state.finalize_block_text(0), "bc");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1342,6 +1342,19 @@ async fn run_event_loop(
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(index) = app.streaming_message_index {
|
||||
let committed = app.streaming_state.commit_text(0);
|
||||
if !committed.is_empty() {
|
||||
append_streaming_text(app, index, &committed);
|
||||
transcript_batch_updated = true;
|
||||
}
|
||||
} else if let Some(entry_idx) = app.streaming_thinking_active_entry {
|
||||
let committed = app.streaming_state.commit_text(0);
|
||||
if !committed.is_empty() {
|
||||
append_streaming_thinking(app, entry_idx, &committed);
|
||||
transcript_batch_updated = true;
|
||||
}
|
||||
}
|
||||
if transcript_batch_updated {
|
||||
app.mark_history_updated();
|
||||
}
|
||||
|
||||
@@ -39,6 +39,8 @@ pub struct FooterProps {
|
||||
pub text_hint_color: Color,
|
||||
/// Color used for steady secondary chips such as cost.
|
||||
pub text_muted_color: Color,
|
||||
/// Background color for the full footer/status bar row.
|
||||
pub footer_bg: Color,
|
||||
/// Status label like `"ready"`, `"thinking ⌫"`, `"working"`. When the
|
||||
/// label equals `"ready"` the footer hides the status segment entirely.
|
||||
pub state_label: String,
|
||||
@@ -281,6 +283,7 @@ impl FooterProps {
|
||||
text_dim_color: app.ui_theme.text_dim,
|
||||
text_hint_color: app.ui_theme.text_hint,
|
||||
text_muted_color: app.ui_theme.text_muted,
|
||||
footer_bg: app.ui_theme.footer_bg,
|
||||
state_label: state_label.to_string(),
|
||||
state_color,
|
||||
coherence,
|
||||
@@ -593,7 +596,8 @@ impl Renderable for FooterWidget {
|
||||
all_spans.push(spacer_span);
|
||||
all_spans.extend(right_spans);
|
||||
|
||||
let paragraph = Paragraph::new(Line::from(all_spans));
|
||||
let paragraph =
|
||||
Paragraph::new(Line::from(all_spans)).style(Style::default().bg(self.props.footer_bg));
|
||||
paragraph.render(area, buf);
|
||||
}
|
||||
|
||||
@@ -811,6 +815,7 @@ mod tests {
|
||||
app.ui_theme.text_dim = Color::Rgb(4, 5, 6);
|
||||
app.ui_theme.text_hint = Color::Rgb(7, 8, 9);
|
||||
app.ui_theme.text_muted = Color::Rgb(10, 11, 12);
|
||||
app.ui_theme.footer_bg = Color::Rgb(13, 14, 15);
|
||||
|
||||
let props = idle_props_for(&app);
|
||||
|
||||
@@ -818,6 +823,23 @@ mod tests {
|
||||
assert_eq!(props.text_dim_color, Color::Rgb(4, 5, 6));
|
||||
assert_eq!(props.text_hint_color, Color::Rgb(7, 8, 9));
|
||||
assert_eq!(props.text_muted_color, Color::Rgb(10, 11, 12));
|
||||
assert_eq!(props.footer_bg, Color::Rgb(13, 14, 15));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn render_applies_footer_background_to_full_row() {
|
||||
let mut app = make_app();
|
||||
app.ui_theme.footer_bg = Color::Rgb(13, 14, 15);
|
||||
let props = idle_props_for(&app);
|
||||
let widget = FooterWidget::new(props);
|
||||
let area = ratatui::layout::Rect::new(0, 0, 60, 1);
|
||||
let mut buf = ratatui::buffer::Buffer::empty(area);
|
||||
|
||||
widget.render(area, &mut buf);
|
||||
|
||||
for x in 0..area.width {
|
||||
assert_eq!(buf[(x, 0)].bg, Color::Rgb(13, 14, 15));
|
||||
}
|
||||
}
|
||||
|
||||
// ---- agents chip wording ----
|
||||
|
||||
Reference in New Issue
Block a user