diff --git a/crates/tui/src/tui/streaming/chunking.rs b/crates/tui/src/tui/streaming/chunking.rs new file mode 100644 index 00000000..37209ba8 --- /dev/null +++ b/crates/tui/src/tui/streaming/chunking.rs @@ -0,0 +1,354 @@ +//! Adaptive stream chunking policy for two-gear streaming. +//! +//! Ported from `codex-rs/tui/src/streaming/chunking.rs`, adapted for deepseek-tui's +//! text-based streaming pipeline. The policy is queue-pressure driven and +//! source-agnostic. +//! +//! # Mental model +//! +//! Two gears: +//! - [`ChunkingMode::Smooth`]: drain one line per commit tick (steady pacing). +//! - [`ChunkingMode::CatchUp`]: drain the entire queued backlog while pressure exists. +//! +//! # Hysteresis +//! +//! - Enter `CatchUp` when `queued_lines >= ENTER_QUEUE_DEPTH_LINES` OR +//! the oldest queued line is at least [`ENTER_OLDEST_AGE`]. +//! - Exit `CatchUp` only after pressure stays below [`EXIT_QUEUE_DEPTH_LINES`] +//! AND [`EXIT_OLDEST_AGE`] for at least [`EXIT_HOLD`]. +//! - After exit, suppress immediate re-entry for [`REENTER_CATCH_UP_HOLD`] +//! unless backlog is "severe" (queue >= [`SEVERE_QUEUE_DEPTH_LINES`] or +//! oldest >= [`SEVERE_OLDEST_AGE`]). + +use std::time::Duration; +use std::time::Instant; + +/// Queue-depth threshold that allows entering catch-up mode. +pub(crate) const ENTER_QUEUE_DEPTH_LINES: usize = 8; + +/// Oldest-line age threshold that allows entering catch-up mode. +pub(crate) const ENTER_OLDEST_AGE: Duration = Duration::from_millis(120); + +/// Queue-depth threshold used when evaluating catch-up exit hysteresis. +pub(crate) const EXIT_QUEUE_DEPTH_LINES: usize = 2; + +/// Oldest-line age threshold used when evaluating catch-up exit hysteresis. +pub(crate) const EXIT_OLDEST_AGE: Duration = Duration::from_millis(40); + +/// Minimum duration queue pressure must stay below exit thresholds to leave catch-up mode. +pub(crate) const EXIT_HOLD: Duration = Duration::from_millis(250); + +/// Cooldown window after a catch-up exit that suppresses immediate re-entry. +pub(crate) const REENTER_CATCH_UP_HOLD: Duration = Duration::from_millis(250); + +/// Queue-depth cutoff that marks backlog as severe (bypasses re-entry hold). +pub(crate) const SEVERE_QUEUE_DEPTH_LINES: usize = 64; + +/// Oldest-line age cutoff that marks backlog as severe. +pub(crate) const SEVERE_OLDEST_AGE: Duration = Duration::from_millis(300); + +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub enum ChunkingMode { + /// Drain one line per baseline commit tick. + #[default] + Smooth, + /// Drain the queued backlog according to queue pressure. + CatchUp, +} + +/// Captures queue pressure inputs used by adaptive chunking decisions. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub struct QueueSnapshot { + /// Number of queued stream lines waiting to be displayed. + pub queued_lines: usize, + /// Age of the oldest queued line at decision time. + pub oldest_age: Option, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum DrainPlan { + /// Emit exactly one queued line. + Single, + /// Emit up to `usize` queued lines. + Batch(usize), +} + +/// Represents one policy decision for a specific queue snapshot. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct ChunkingDecision { + /// Mode after applying hysteresis transitions for this decision. + pub mode: ChunkingMode, + /// Whether this decision transitioned from `Smooth` into `CatchUp`. + pub entered_catch_up: bool, + /// Drain plan to execute for the current commit tick. + pub drain_plan: DrainPlan, +} + +/// Maintains adaptive chunking mode and hysteresis state across ticks. +#[derive(Debug, Default, Clone)] +pub struct AdaptiveChunkingPolicy { + mode: ChunkingMode, + below_exit_threshold_since: Option, + last_catch_up_exit_at: Option, +} + +impl AdaptiveChunkingPolicy { + pub fn new() -> Self { + Self::default() + } + + /// Returns the policy mode used by the most recent decision. + pub fn mode(&self) -> ChunkingMode { + self.mode + } + + /// Resets state to baseline smooth mode. + pub fn reset(&mut self) { + self.mode = ChunkingMode::Smooth; + self.below_exit_threshold_since = None; + self.last_catch_up_exit_at = None; + } + + /// Computes a drain decision from the current queue snapshot. + pub fn decide(&mut self, snapshot: QueueSnapshot, now: Instant) -> ChunkingDecision { + if snapshot.queued_lines == 0 { + self.note_catch_up_exit(now); + self.mode = ChunkingMode::Smooth; + self.below_exit_threshold_since = None; + return ChunkingDecision { + mode: self.mode, + entered_catch_up: false, + drain_plan: DrainPlan::Single, + }; + } + + let entered_catch_up = match self.mode { + ChunkingMode::Smooth => self.maybe_enter_catch_up(snapshot, now), + ChunkingMode::CatchUp => { + self.maybe_exit_catch_up(snapshot, now); + false + } + }; + + let drain_plan = match self.mode { + ChunkingMode::Smooth => DrainPlan::Single, + ChunkingMode::CatchUp => DrainPlan::Batch(snapshot.queued_lines.max(1)), + }; + + ChunkingDecision { + mode: self.mode, + entered_catch_up, + drain_plan, + } + } + + fn maybe_enter_catch_up(&mut self, snapshot: QueueSnapshot, now: Instant) -> bool { + if !should_enter_catch_up(snapshot) { + return false; + } + if self.reentry_hold_active(now) && !is_severe_backlog(snapshot) { + return false; + } + self.mode = ChunkingMode::CatchUp; + self.below_exit_threshold_since = None; + self.last_catch_up_exit_at = None; + true + } + + fn maybe_exit_catch_up(&mut self, snapshot: QueueSnapshot, now: Instant) { + if !should_exit_catch_up(snapshot) { + self.below_exit_threshold_since = None; + return; + } + + match self.below_exit_threshold_since { + Some(since) if now.saturating_duration_since(since) >= EXIT_HOLD => { + self.mode = ChunkingMode::Smooth; + self.below_exit_threshold_since = None; + self.last_catch_up_exit_at = Some(now); + } + Some(_) => {} + None => { + self.below_exit_threshold_since = Some(now); + } + } + } + + fn note_catch_up_exit(&mut self, now: Instant) { + if self.mode == ChunkingMode::CatchUp { + self.last_catch_up_exit_at = Some(now); + } + } + + fn reentry_hold_active(&self, now: Instant) -> bool { + self.last_catch_up_exit_at + .is_some_and(|exit| now.saturating_duration_since(exit) < REENTER_CATCH_UP_HOLD) + } +} + +/// Returns whether current queue pressure warrants entering catch-up mode. +fn should_enter_catch_up(snapshot: QueueSnapshot) -> bool { + snapshot.queued_lines >= ENTER_QUEUE_DEPTH_LINES + || snapshot + .oldest_age + .is_some_and(|oldest| oldest >= ENTER_OLDEST_AGE) +} + +/// Returns whether queue pressure is low enough to begin exit hysteresis. +fn should_exit_catch_up(snapshot: QueueSnapshot) -> bool { + snapshot.queued_lines <= EXIT_QUEUE_DEPTH_LINES + && snapshot + .oldest_age + .is_some_and(|oldest| oldest <= EXIT_OLDEST_AGE) +} + +/// Returns whether backlog is severe enough to bypass the re-entry hold. +fn is_severe_backlog(snapshot: QueueSnapshot) -> bool { + snapshot.queued_lines >= SEVERE_QUEUE_DEPTH_LINES + || snapshot + .oldest_age + .is_some_and(|oldest| oldest >= SEVERE_OLDEST_AGE) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn snap(queued_lines: usize, oldest_age_ms: u64) -> QueueSnapshot { + QueueSnapshot { + queued_lines, + oldest_age: Some(Duration::from_millis(oldest_age_ms)), + } + } + + fn empty_snap() -> QueueSnapshot { + QueueSnapshot { + queued_lines: 0, + oldest_age: None, + } + } + + #[test] + fn smooth_only_burst_emits_one_per_tick() { + // Five slowly-arriving lines, each well below enter thresholds, never + // flip the policy out of `Smooth`. Each decision should plan a single drain. + let mut policy = AdaptiveChunkingPolicy::new(); + let t0 = Instant::now(); + + for i in 0..5 { + // 1 queued line, age 10 ms — far below ENTER thresholds. + let decision = policy.decide(snap(1, 10), t0 + Duration::from_millis(50 * i)); + assert_eq!(decision.mode, ChunkingMode::Smooth); + assert!(!decision.entered_catch_up); + assert_eq!(decision.drain_plan, DrainPlan::Single); + } + } + + #[test] + fn eight_line_burst_flips_to_catch_up_and_drains_full_backlog() { + // A burst of 8 queued lines crosses ENTER_QUEUE_DEPTH_LINES exactly. + // The policy should enter `CatchUp` and request a Batch drain matching + // the queue depth. + let mut policy = AdaptiveChunkingPolicy::new(); + let now = Instant::now(); + + let decision = policy.decide(snap(8, 10), now); + assert_eq!(decision.mode, ChunkingMode::CatchUp); + assert!(decision.entered_catch_up); + assert_eq!(decision.drain_plan, DrainPlan::Batch(8)); + + // Larger backlog requested next tick: still CatchUp, batch grows to match. + let decision = policy.decide(snap(20, 30), now + Duration::from_millis(10)); + assert_eq!(decision.mode, ChunkingMode::CatchUp); + assert!(!decision.entered_catch_up, "no second transition signal"); + assert_eq!(decision.drain_plan, DrainPlan::Batch(20)); + } + + #[test] + fn age_threshold_alone_triggers_catch_up() { + // Queue depth is small, but the oldest line has been waiting >=120 ms. + // Either condition is sufficient to enter catch-up. + let mut policy = AdaptiveChunkingPolicy::new(); + let now = Instant::now(); + + let decision = policy.decide(snap(2, 120), now); + assert_eq!(decision.mode, ChunkingMode::CatchUp); + assert!(decision.entered_catch_up); + assert_eq!(decision.drain_plan, DrainPlan::Batch(2)); + } + + #[test] + fn catch_up_exits_after_low_activity_hold() { + // Enter CatchUp via depth burst, then drop pressure below exit + // thresholds. Policy must hold for >=EXIT_HOLD before returning to Smooth. + let mut policy = AdaptiveChunkingPolicy::new(); + let t0 = Instant::now(); + + let _ = policy.decide(snap(10, 20), t0); + assert_eq!(policy.mode(), ChunkingMode::CatchUp); + + // Pressure drops to (2 lines, 40 ms) — at the exit thresholds. + // Hold begins; not yet 250ms. + let pre_hold = policy.decide(snap(2, 40), t0 + Duration::from_millis(50)); + assert_eq!(pre_hold.mode, ChunkingMode::CatchUp); + + // Still under hold. + let mid_hold = policy.decide(snap(2, 40), t0 + Duration::from_millis(200)); + assert_eq!(mid_hold.mode, ChunkingMode::CatchUp); + + // Past EXIT_HOLD (250 ms) → return to Smooth. + let post_hold = policy.decide(snap(2, 40), t0 + Duration::from_millis(320)); + assert_eq!(post_hold.mode, ChunkingMode::Smooth); + assert_eq!(post_hold.drain_plan, DrainPlan::Single); + } + + #[test] + fn idle_resets_to_smooth_immediately() { + // An empty queue forces Smooth regardless of prior mode. + let mut policy = AdaptiveChunkingPolicy::new(); + let now = Instant::now(); + + let _ = policy.decide(snap(10, 20), now); + assert_eq!(policy.mode(), ChunkingMode::CatchUp); + + let decision = policy.decide(empty_snap(), now + Duration::from_millis(10)); + assert_eq!(decision.mode, ChunkingMode::Smooth); + assert_eq!(decision.drain_plan, DrainPlan::Single); + } + + #[test] + fn reentry_hold_blocks_immediate_flip_back() { + // After exiting CatchUp via idle, an 8-line burst that arrives within + // the re-entry hold window should not immediately re-enter CatchUp. + let mut policy = AdaptiveChunkingPolicy::new(); + let t0 = Instant::now(); + + let _ = policy.decide(snap(10, 20), t0); + let _ = policy.decide(empty_snap(), t0 + Duration::from_millis(10)); + + // Within REENTER_CATCH_UP_HOLD (250 ms): hold blocks re-entry. + let held = policy.decide(snap(8, 20), t0 + Duration::from_millis(100)); + assert_eq!(held.mode, ChunkingMode::Smooth); + assert_eq!(held.drain_plan, DrainPlan::Single); + + // Past the hold: re-entry permitted. + let reentered = policy.decide(snap(8, 20), t0 + Duration::from_millis(400)); + assert_eq!(reentered.mode, ChunkingMode::CatchUp); + assert_eq!(reentered.drain_plan, DrainPlan::Batch(8)); + } + + #[test] + fn severe_backlog_bypasses_reentry_hold() { + // Even within the hold window, a "severe" backlog (>=64 lines) bypasses + // the gate so display lag doesn't unbounded-grow. + let mut policy = AdaptiveChunkingPolicy::new(); + let t0 = Instant::now(); + + let _ = policy.decide(snap(10, 20), t0); + let _ = policy.decide(empty_snap(), t0 + Duration::from_millis(10)); + + let severe = policy.decide(snap(64, 20), t0 + Duration::from_millis(100)); + assert_eq!(severe.mode, ChunkingMode::CatchUp); + assert_eq!(severe.drain_plan, DrainPlan::Batch(64)); + } +} diff --git a/crates/tui/src/tui/streaming/commit_tick.rs b/crates/tui/src/tui/streaming/commit_tick.rs new file mode 100644 index 00000000..65783fb1 --- /dev/null +++ b/crates/tui/src/tui/streaming/commit_tick.rs @@ -0,0 +1,266 @@ +//! Commit-tick scheduler that drains a stream chunker according to policy. +//! +//! Bridges [`AdaptiveChunkingPolicy`] with a concrete [`StreamChunker`] queue. +//! Callers feed raw text deltas via [`StreamChunker::push_delta`] (only +//! newline-terminated text becomes queued lines), then call +//! [`run_commit_tick`] on every commit beat to obtain the text safe to flush +//! to the transcript on this beat. +//! +//! The chunker is the unit of streaming — one per active block (assistant / +//! thinking). Tool output is unbuffered and bypasses this path. + +use std::collections::VecDeque; +use std::time::Duration; +use std::time::Instant; + +use super::chunking::AdaptiveChunkingPolicy; +use super::chunking::ChunkingDecision; +use super::chunking::DrainPlan; +use super::chunking::QueueSnapshot; + +/// Buffers raw stream deltas and emits committed text in line units. +/// +/// Only the substring up to the *last* `\n` is committed; trailing partial +/// content stays in the buffer. This is what protects partial code fences +/// (` ``` `) and other line-sensitive markdown from rendering mid-state. +#[derive(Debug, Default)] +pub struct StreamChunker { + /// Bytes received but not yet split into a complete line. + pending: String, + /// Complete lines waiting to be flushed to the transcript. + /// Each entry preserves its trailing `\n` so reassembly is lossless. + queue: VecDeque, +} + +#[derive(Debug, Clone)] +struct QueuedLine { + text: String, + enqueued_at: Instant, +} + +impl StreamChunker { + pub fn new() -> Self { + Self::default() + } + + /// Append a raw model delta. Returns whether at least one new line was queued. + pub fn push_delta(&mut self, delta: &str) -> bool { + if delta.is_empty() { + return false; + } + self.pending.push_str(delta); + + let Some(last_nl) = self.pending.rfind('\n') else { + return false; + }; + + // Drain everything up to and including the last newline into queued lines. + // Splitting by line keeps the chunker source-agnostic and lets the policy + // count "lines waiting" without peeking at text content. + let now = Instant::now(); + let committed: String = self.pending.drain(..=last_nl).collect(); + let mut produced = false; + for chunk in split_lines_keep_terminator(&committed) { + if chunk.is_empty() { + continue; + } + self.queue.push_back(QueuedLine { + text: chunk, + enqueued_at: now, + }); + produced = true; + } + produced + } + + /// Number of complete lines currently queued for commit. + pub fn queued_lines(&self) -> usize { + self.queue.len() + } + + /// Age of the oldest queued line, if any. + pub fn oldest_queued_age(&self, now: Instant) -> Option { + self.queue + .front() + .map(|q| now.saturating_duration_since(q.enqueued_at)) + } + + /// Whether the queue is empty AND no buffered partial line remains. + pub fn is_idle(&self) -> bool { + self.queue.is_empty() && self.pending.is_empty() + } + + /// Snapshot for policy decisions. + pub fn snapshot(&self, now: Instant) -> QueueSnapshot { + QueueSnapshot { + queued_lines: self.queue.len(), + oldest_age: self.oldest_queued_age(now), + } + } + + /// Drain `max_lines` complete lines and return them as concatenated text. + pub fn drain_lines(&mut self, max_lines: usize) -> String { + let n = max_lines.min(self.queue.len()); + let mut out = String::new(); + for queued in self.queue.drain(..n) { + out.push_str(&queued.text); + } + out + } + + /// Drain any remaining pending bytes (called at stream finalize). + /// This includes both queued complete lines AND the tail partial line. + pub fn drain_remaining(&mut self) -> String { + let mut out = String::new(); + while let Some(q) = self.queue.pop_front() { + out.push_str(&q.text); + } + if !self.pending.is_empty() { + out.push_str(&self.pending); + self.pending.clear(); + } + out + } + + /// Reset internal state. + pub fn reset(&mut self) { + self.pending.clear(); + self.queue.clear(); + } +} + +/// One commit-tick decision plus the text that should be flushed on this tick. +pub struct CommitTickOutput { + pub committed_text: String, + pub decision: ChunkingDecision, + pub is_idle: bool, +} + +/// Run a single commit tick: ask the policy, drain the chunker accordingly. +pub fn run_commit_tick( + policy: &mut AdaptiveChunkingPolicy, + chunker: &mut StreamChunker, + now: Instant, +) -> CommitTickOutput { + let snapshot = chunker.snapshot(now); + let prior_mode = policy.mode(); + let decision = policy.decide(snapshot, now); + + if decision.mode != prior_mode { + tracing::trace!( + prior_mode = ?prior_mode, + new_mode = ?decision.mode, + queued_lines = snapshot.queued_lines, + oldest_queued_age_ms = snapshot.oldest_age.map(|age| age.as_millis() as u64), + entered_catch_up = decision.entered_catch_up, + "stream chunking mode transition" + ); + } + + let max = match decision.drain_plan { + DrainPlan::Single => 1, + DrainPlan::Batch(n) => n, + }; + + // Drain through the chunker; an empty queue under Smooth produces "". + let committed_text = chunker.drain_lines(max); + + CommitTickOutput { + committed_text, + decision, + is_idle: chunker.is_idle(), + } +} + +/// Split text into chunks, preserving each terminator. The final chunk is +/// included only if it ends with `\n` (this is enforced upstream in +/// `push_delta`, which only drains up through the last newline). +fn split_lines_keep_terminator(text: &str) -> Vec { + let mut out = Vec::new(); + let mut start = 0; + let bytes = text.as_bytes(); + for (i, &b) in bytes.iter().enumerate() { + if b == b'\n' { + out.push(text[start..=i].to_string()); + start = i + 1; + } + } + if start < text.len() { + // This branch is unreachable for inputs produced by `push_delta`, + // but stays defensive for direct callers. + out.push(text[start..].to_string()); + } + out +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::tui::streaming::chunking::ChunkingMode; + + #[test] + fn partial_code_fence_is_held_until_newline() { + // Without the chunker, a stray ``` arriving mid-stream could render as + // an opened fence. The chunker must not commit anything until the + // line is terminated by `\n`. + let mut chunker = StreamChunker::new(); + let mut policy = AdaptiveChunkingPolicy::new(); + let now = Instant::now(); + + // Partial fence + content; no newline → nothing committed yet. + chunker.push_delta("Here is code:\n"); + chunker.push_delta("```"); + let out = run_commit_tick(&mut policy, &mut chunker, now); + assert_eq!(out.committed_text, "Here is code:\n"); + assert!(!chunker.is_idle(), "partial fence still buffered"); + + // Close the fence line. + chunker.push_delta("rust\n"); + let out = run_commit_tick(&mut policy, &mut chunker, now + Duration::from_millis(5)); + assert_eq!(out.committed_text, "```rust\n"); + } + + #[test] + fn smooth_burst_emits_one_line_per_tick() { + let mut chunker = StreamChunker::new(); + let mut policy = AdaptiveChunkingPolicy::new(); + let t0 = Instant::now(); + + chunker.push_delta("a\nb\nc\n"); + // Each tick under Smooth pulls exactly one line. + let out1 = run_commit_tick(&mut policy, &mut chunker, t0); + assert_eq!(out1.decision.mode, ChunkingMode::Smooth); + assert_eq!(out1.committed_text, "a\n"); + let out2 = run_commit_tick(&mut policy, &mut chunker, t0 + Duration::from_millis(20)); + assert_eq!(out2.committed_text, "b\n"); + let out3 = run_commit_tick(&mut policy, &mut chunker, t0 + Duration::from_millis(40)); + assert_eq!(out3.committed_text, "c\n"); + assert!(out3.is_idle); + } + + #[test] + fn large_burst_drains_in_catch_up() { + // Eight lines arriving "at once" must trigger CatchUp on the first + // commit tick and drain the full backlog in one go. + let mut chunker = StreamChunker::new(); + let mut policy = AdaptiveChunkingPolicy::new(); + let now = Instant::now(); + + let burst = "1\n2\n3\n4\n5\n6\n7\n8\n"; + chunker.push_delta(burst); + let out = run_commit_tick(&mut policy, &mut chunker, now); + assert_eq!(out.decision.mode, ChunkingMode::CatchUp); + assert_eq!(out.committed_text, burst); + assert!(out.is_idle); + } + + #[test] + fn finalize_drains_partial_tail() { + // The final, possibly-incomplete line must be flushed by drain_remaining. + let mut chunker = StreamChunker::new(); + chunker.push_delta("done\nno-newline-here"); + let drained = chunker.drain_remaining(); + assert_eq!(drained, "done\nno-newline-here"); + assert!(chunker.is_idle()); + } +} diff --git a/crates/tui/src/tui/streaming.rs b/crates/tui/src/tui/streaming/mod.rs similarity index 61% rename from crates/tui/src/tui/streaming.rs rename to crates/tui/src/tui/streaming/mod.rs index 1406e823..cf3a5be3 100644 --- a/crates/tui/src/tui/streaming.rs +++ b/crates/tui/src/tui/streaming/mod.rs @@ -11,9 +11,16 @@ use ratatui::style::{Modifier, Style}; use ratatui::text::{Line, Span}; +use std::time::Instant; use unicode_width::UnicodeWidthStr; use crate::palette; + +pub mod chunking; +pub mod commit_tick; + +pub use chunking::{AdaptiveChunkingPolicy, ChunkingMode}; +pub use commit_tick::{StreamChunker, run_commit_tick}; /// Collects streaming text and commits complete lines. #[derive(Debug, Clone)] pub struct MarkdownStreamCollector { @@ -29,6 +36,14 @@ pub struct MarkdownStreamCollector { is_thinking: bool, } +impl Default for MarkdownStreamCollector { + fn default() -> Self { + // `is_streaming: true` matches `MarkdownStreamCollector::new` so a + // freshly-default block behaves like a freshly-started stream. + Self::new(None, false) + } +} + impl MarkdownStreamCollector { /// Create a new collector pub fn new(width: Option, is_thinking: bool) -> Self { @@ -205,11 +220,20 @@ fn wrap_line(line: &str, width: usize) -> Vec { } } +/// Per-block streaming substate: collector for newline-gating + chunker/policy +/// for two-gear pacing. +#[derive(Debug, Default)] +struct BlockState { + collector: MarkdownStreamCollector, + chunker: StreamChunker, + policy: AdaptiveChunkingPolicy, +} + /// State for managing multiple stream collectors (one per content block) -#[derive(Debug, Clone, Default)] +#[derive(Debug, Default)] pub struct StreamingState { - /// Collectors for each content block by index - collectors: Vec>, + /// Per-block state by index (collector + chunker + policy). + blocks: Vec>, /// Whether any stream is currently active pub is_active: bool, /// Accumulated text for display @@ -227,66 +251,153 @@ impl StreamingState { /// Start a new text block pub fn start_text(&mut self, index: usize, width: Option) { self.ensure_capacity(index); - self.collectors[index] = Some(MarkdownStreamCollector::new(width, false)); + self.blocks[index] = Some(BlockState { + collector: MarkdownStreamCollector::new(width, false), + chunker: StreamChunker::new(), + policy: AdaptiveChunkingPolicy::new(), + }); self.is_active = true; } /// Start a new thinking block pub fn start_thinking(&mut self, index: usize, width: Option) { self.ensure_capacity(index); - self.collectors[index] = Some(MarkdownStreamCollector::new(width, true)); + self.blocks[index] = Some(BlockState { + collector: MarkdownStreamCollector::new(width, true), + chunker: StreamChunker::new(), + policy: AdaptiveChunkingPolicy::new(), + }); self.is_active = true; } - /// Push content to a block + /// Push content to a block. The text is buffered in the collector and + /// any newline-complete portion is forwarded to the chunker, which decides + /// what (if anything) becomes visible on the next [`Self::commit_text`] tick. pub fn push_content(&mut self, index: usize, content: &str) { - if let Some(Some(collector)) = self.collectors.get_mut(index) { - collector.push(content); + if let Some(Some(block)) = self.blocks.get_mut(index) { + block.collector.push(content); // Update accumulated text - if collector.is_thinking { + if block.collector.is_thinking { self.accumulated_thinking.push_str(content); } else { self.accumulated_text.push_str(content); } + + // Forward newline-complete bytes to the chunker. Partial trailing + // content stays in the collector buffer (this is what protects + // partial code fences and other line-sensitive markdown from + // becoming briefly visible). + let committed = block.collector.commit_complete_text(); + if !committed.is_empty() { + block.chunker.push_delta(&committed); + } } } - /// Get newly committed lines from a block + /// Get newly committed lines from a block. (Legacy entry point that maps + /// onto the chunker.) pub fn commit_lines(&mut self, index: usize) -> Vec> { - if let Some(Some(collector)) = self.collectors.get_mut(index) { - collector.commit_complete_lines() - } else { - Vec::new() + let text = self.commit_text(index); + if text.is_empty() { + return Vec::new(); } + // Re-render the text through the same path the collector used. + let style = if self + .blocks + .get(index) + .and_then(|b| b.as_ref()) + .is_some_and(|b| b.collector.is_thinking) + { + Style::default() + .fg(palette::STATUS_WARNING) + .add_modifier(Modifier::DIM | Modifier::ITALIC) + } else { + Style::default() + }; + let mut lines = Vec::new(); + for line in text.lines() { + lines.push(Line::from(Span::styled(line.to_string(), style))); + } + if text.ends_with('\n') { + lines.push(Line::from("")); + } + lines } - /// Get newly committed raw text from a block. + /// Run one commit-tick of the chunker policy and return any text safe to + /// flush to the transcript on this tick. May be empty (Smooth-mode tick + /// against an empty queue) or contain anywhere from one line up to the + /// full backlog (CatchUp-mode burst drain). pub fn commit_text(&mut self, index: usize) -> String { - if let Some(Some(collector)) = self.collectors.get_mut(index) { - collector.commit_complete_text() + if let Some(Some(block)) = self.blocks.get_mut(index) { + let now = Instant::now(); + let out = run_commit_tick(&mut block.policy, &mut block.chunker, now); + out.committed_text } else { String::new() } } - /// Finalize a block and get remaining lines - pub fn finalize_block(&mut self, index: usize) -> Vec> { - if let Some(Some(collector)) = self.collectors.get_mut(index) { - let lines = collector.finalize(); - // Check if all blocks are done - self.check_active(); - lines - } else { - Vec::new() - } + /// Inspect the current chunking mode for a block (testing/observability). + pub fn chunking_mode(&self, index: usize) -> Option { + self.blocks + .get(index) + .and_then(|b| b.as_ref()) + .map(|b| b.policy.mode()) } - /// Finalize a block and get remaining raw text. + /// Whether the chunker has queued content waiting to be flushed by the + /// next commit tick. Useful for callers that want to drive an extra tick + /// while the queue drains under Smooth-mode pacing. + pub fn has_pending_chunker_lines(&self, index: usize) -> bool { + self.blocks + .get(index) + .and_then(|b| b.as_ref()) + .is_some_and(|b| b.chunker.queued_lines() > 0) + } + + /// Finalize a block and get remaining lines + pub fn finalize_block(&mut self, index: usize) -> Vec> { + let text = self.finalize_block_text(index); + if text.is_empty() { + return Vec::new(); + } + let style = if self + .blocks + .get(index) + .and_then(|b| b.as_ref()) + .is_some_and(|b| b.collector.is_thinking) + { + Style::default() + .fg(palette::STATUS_WARNING) + .add_modifier(Modifier::DIM | Modifier::ITALIC) + } else { + Style::default() + }; + let mut lines = Vec::new(); + for line in text.lines() { + lines.push(Line::from(Span::styled(line.to_string(), style))); + } + if text.ends_with('\n') { + lines.push(Line::from("")); + } + lines + } + + /// Finalize a block and get remaining raw text. Drains any chunker + /// backlog plus any unterminated partial line in the collector. pub fn finalize_block_text(&mut self, index: usize) -> String { - if let Some(Some(collector)) = self.collectors.get_mut(index) { - let text = collector.finalize_text(); + if let Some(Some(block)) = self.blocks.get_mut(index) { + // First, push any tail buffered in the collector through (it may + // not be newline-terminated; finalize_text returns it raw). + let tail = block.collector.finalize_text(); + // Any whole-line text held by the chunker is safe to emit now. + let mut out = block.chunker.drain_remaining(); + if !tail.is_empty() { + out.push_str(&tail); + } self.check_active(); - text + out } else { String::new() } @@ -295,12 +406,11 @@ impl StreamingState { /// Finalize all blocks pub fn finalize_all(&mut self) -> Vec<(usize, Vec>)> { let mut result = Vec::new(); - for (i, collector) in self.collectors.iter_mut().enumerate() { - if let Some(c) = collector { - let lines = c.finalize(); - if !lines.is_empty() { - result.push((i, lines)); - } + let len = self.blocks.len(); + for i in 0..len { + let lines = self.finalize_block(i); + if !lines.is_empty() { + result.push((i, lines)); } } self.is_active = false; @@ -309,22 +419,22 @@ impl StreamingState { /// Check if any stream is still active fn check_active(&mut self) { - self.is_active = self.collectors.iter().any(|c| { - c.as_ref() - .is_some_and(MarkdownStreamCollector::is_streaming) + self.is_active = self.blocks.iter().any(|b| { + b.as_ref() + .is_some_and(|state| state.collector.is_streaming()) }); } /// Ensure capacity for the given index fn ensure_capacity(&mut self, index: usize) { - while self.collectors.len() <= index { - self.collectors.push(None); + while self.blocks.len() <= index { + self.blocks.push(None); } } /// Reset the streaming state pub fn reset(&mut self) { - self.collectors.clear(); + self.blocks.clear(); self.is_active = false; self.accumulated_text.clear(); self.accumulated_thinking.clear();