CX#4: two-gear streaming chunker (Smooth ↔ CatchUp)

Splits crates/tui/src/tui/streaming.rs into a streaming/ module:
- streaming/mod.rs — StreamingState with per-block BlockState
- streaming/chunking.rs — policy state machine, 7 tests
- streaming/commit_tick.rs — StreamChunker queue + run_commit_tick

Thresholds match codex parity: ENTER_QUEUE_DEPTH=8, ENTER_OLDEST_AGE=120ms,
EXIT_QUEUE_DEPTH=2, EXIT_OLDEST_AGE=40ms, EXIT_HOLD=250ms.

894/894 tests pass; clippy -D warnings clean; fmt clean.
This commit is contained in:
Hunter Bown
2026-04-25 22:28:00 -05:00
parent d111680a3b
commit 1ad0c886b8
3 changed files with 773 additions and 43 deletions
+354
View File
@@ -0,0 +1,354 @@
//! Adaptive stream chunking policy for two-gear streaming.
//!
//! Ported from `codex-rs/tui/src/streaming/chunking.rs`, adapted for deepseek-tui's
//! text-based streaming pipeline. The policy is queue-pressure driven and
//! source-agnostic.
//!
//! # Mental model
//!
//! Two gears:
//! - [`ChunkingMode::Smooth`]: drain one line per commit tick (steady pacing).
//! - [`ChunkingMode::CatchUp`]: drain the entire queued backlog while pressure exists.
//!
//! # Hysteresis
//!
//! - Enter `CatchUp` when `queued_lines >= ENTER_QUEUE_DEPTH_LINES` OR
//! the oldest queued line is at least [`ENTER_OLDEST_AGE`].
//! - Exit `CatchUp` only after pressure stays below [`EXIT_QUEUE_DEPTH_LINES`]
//! AND [`EXIT_OLDEST_AGE`] for at least [`EXIT_HOLD`].
//! - After exit, suppress immediate re-entry for [`REENTER_CATCH_UP_HOLD`]
//! unless backlog is "severe" (queue >= [`SEVERE_QUEUE_DEPTH_LINES`] or
//! oldest >= [`SEVERE_OLDEST_AGE`]).
use std::time::Duration;
use std::time::Instant;
/// Queue-depth threshold that allows entering catch-up mode.
pub(crate) const ENTER_QUEUE_DEPTH_LINES: usize = 8;
/// Oldest-line age threshold that allows entering catch-up mode.
pub(crate) const ENTER_OLDEST_AGE: Duration = Duration::from_millis(120);
/// Queue-depth threshold used when evaluating catch-up exit hysteresis.
pub(crate) const EXIT_QUEUE_DEPTH_LINES: usize = 2;
/// Oldest-line age threshold used when evaluating catch-up exit hysteresis.
pub(crate) const EXIT_OLDEST_AGE: Duration = Duration::from_millis(40);
/// Minimum duration queue pressure must stay below exit thresholds to leave catch-up mode.
pub(crate) const EXIT_HOLD: Duration = Duration::from_millis(250);
/// Cooldown window after a catch-up exit that suppresses immediate re-entry.
pub(crate) const REENTER_CATCH_UP_HOLD: Duration = Duration::from_millis(250);
/// Queue-depth cutoff that marks backlog as severe (bypasses re-entry hold).
pub(crate) const SEVERE_QUEUE_DEPTH_LINES: usize = 64;
/// Oldest-line age cutoff that marks backlog as severe.
pub(crate) const SEVERE_OLDEST_AGE: Duration = Duration::from_millis(300);
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum ChunkingMode {
/// Drain one line per baseline commit tick.
#[default]
Smooth,
/// Drain the queued backlog according to queue pressure.
CatchUp,
}
/// Captures queue pressure inputs used by adaptive chunking decisions.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct QueueSnapshot {
/// Number of queued stream lines waiting to be displayed.
pub queued_lines: usize,
/// Age of the oldest queued line at decision time.
pub oldest_age: Option<Duration>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum DrainPlan {
/// Emit exactly one queued line.
Single,
/// Emit up to `usize` queued lines.
Batch(usize),
}
/// Represents one policy decision for a specific queue snapshot.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct ChunkingDecision {
/// Mode after applying hysteresis transitions for this decision.
pub mode: ChunkingMode,
/// Whether this decision transitioned from `Smooth` into `CatchUp`.
pub entered_catch_up: bool,
/// Drain plan to execute for the current commit tick.
pub drain_plan: DrainPlan,
}
/// Maintains adaptive chunking mode and hysteresis state across ticks.
#[derive(Debug, Default, Clone)]
pub struct AdaptiveChunkingPolicy {
mode: ChunkingMode,
below_exit_threshold_since: Option<Instant>,
last_catch_up_exit_at: Option<Instant>,
}
impl AdaptiveChunkingPolicy {
pub fn new() -> Self {
Self::default()
}
/// Returns the policy mode used by the most recent decision.
pub fn mode(&self) -> ChunkingMode {
self.mode
}
/// Resets state to baseline smooth mode.
pub fn reset(&mut self) {
self.mode = ChunkingMode::Smooth;
self.below_exit_threshold_since = None;
self.last_catch_up_exit_at = None;
}
/// Computes a drain decision from the current queue snapshot.
pub fn decide(&mut self, snapshot: QueueSnapshot, now: Instant) -> ChunkingDecision {
if snapshot.queued_lines == 0 {
self.note_catch_up_exit(now);
self.mode = ChunkingMode::Smooth;
self.below_exit_threshold_since = None;
return ChunkingDecision {
mode: self.mode,
entered_catch_up: false,
drain_plan: DrainPlan::Single,
};
}
let entered_catch_up = match self.mode {
ChunkingMode::Smooth => self.maybe_enter_catch_up(snapshot, now),
ChunkingMode::CatchUp => {
self.maybe_exit_catch_up(snapshot, now);
false
}
};
let drain_plan = match self.mode {
ChunkingMode::Smooth => DrainPlan::Single,
ChunkingMode::CatchUp => DrainPlan::Batch(snapshot.queued_lines.max(1)),
};
ChunkingDecision {
mode: self.mode,
entered_catch_up,
drain_plan,
}
}
fn maybe_enter_catch_up(&mut self, snapshot: QueueSnapshot, now: Instant) -> bool {
if !should_enter_catch_up(snapshot) {
return false;
}
if self.reentry_hold_active(now) && !is_severe_backlog(snapshot) {
return false;
}
self.mode = ChunkingMode::CatchUp;
self.below_exit_threshold_since = None;
self.last_catch_up_exit_at = None;
true
}
fn maybe_exit_catch_up(&mut self, snapshot: QueueSnapshot, now: Instant) {
if !should_exit_catch_up(snapshot) {
self.below_exit_threshold_since = None;
return;
}
match self.below_exit_threshold_since {
Some(since) if now.saturating_duration_since(since) >= EXIT_HOLD => {
self.mode = ChunkingMode::Smooth;
self.below_exit_threshold_since = None;
self.last_catch_up_exit_at = Some(now);
}
Some(_) => {}
None => {
self.below_exit_threshold_since = Some(now);
}
}
}
fn note_catch_up_exit(&mut self, now: Instant) {
if self.mode == ChunkingMode::CatchUp {
self.last_catch_up_exit_at = Some(now);
}
}
fn reentry_hold_active(&self, now: Instant) -> bool {
self.last_catch_up_exit_at
.is_some_and(|exit| now.saturating_duration_since(exit) < REENTER_CATCH_UP_HOLD)
}
}
/// Returns whether current queue pressure warrants entering catch-up mode.
fn should_enter_catch_up(snapshot: QueueSnapshot) -> bool {
snapshot.queued_lines >= ENTER_QUEUE_DEPTH_LINES
|| snapshot
.oldest_age
.is_some_and(|oldest| oldest >= ENTER_OLDEST_AGE)
}
/// Returns whether queue pressure is low enough to begin exit hysteresis.
fn should_exit_catch_up(snapshot: QueueSnapshot) -> bool {
snapshot.queued_lines <= EXIT_QUEUE_DEPTH_LINES
&& snapshot
.oldest_age
.is_some_and(|oldest| oldest <= EXIT_OLDEST_AGE)
}
/// Returns whether backlog is severe enough to bypass the re-entry hold.
fn is_severe_backlog(snapshot: QueueSnapshot) -> bool {
snapshot.queued_lines >= SEVERE_QUEUE_DEPTH_LINES
|| snapshot
.oldest_age
.is_some_and(|oldest| oldest >= SEVERE_OLDEST_AGE)
}
#[cfg(test)]
mod tests {
use super::*;
fn snap(queued_lines: usize, oldest_age_ms: u64) -> QueueSnapshot {
QueueSnapshot {
queued_lines,
oldest_age: Some(Duration::from_millis(oldest_age_ms)),
}
}
fn empty_snap() -> QueueSnapshot {
QueueSnapshot {
queued_lines: 0,
oldest_age: None,
}
}
#[test]
fn smooth_only_burst_emits_one_per_tick() {
// Five slowly-arriving lines, each well below enter thresholds, never
// flip the policy out of `Smooth`. Each decision should plan a single drain.
let mut policy = AdaptiveChunkingPolicy::new();
let t0 = Instant::now();
for i in 0..5 {
// 1 queued line, age 10 ms — far below ENTER thresholds.
let decision = policy.decide(snap(1, 10), t0 + Duration::from_millis(50 * i));
assert_eq!(decision.mode, ChunkingMode::Smooth);
assert!(!decision.entered_catch_up);
assert_eq!(decision.drain_plan, DrainPlan::Single);
}
}
#[test]
fn eight_line_burst_flips_to_catch_up_and_drains_full_backlog() {
// A burst of 8 queued lines crosses ENTER_QUEUE_DEPTH_LINES exactly.
// The policy should enter `CatchUp` and request a Batch drain matching
// the queue depth.
let mut policy = AdaptiveChunkingPolicy::new();
let now = Instant::now();
let decision = policy.decide(snap(8, 10), now);
assert_eq!(decision.mode, ChunkingMode::CatchUp);
assert!(decision.entered_catch_up);
assert_eq!(decision.drain_plan, DrainPlan::Batch(8));
// Larger backlog requested next tick: still CatchUp, batch grows to match.
let decision = policy.decide(snap(20, 30), now + Duration::from_millis(10));
assert_eq!(decision.mode, ChunkingMode::CatchUp);
assert!(!decision.entered_catch_up, "no second transition signal");
assert_eq!(decision.drain_plan, DrainPlan::Batch(20));
}
#[test]
fn age_threshold_alone_triggers_catch_up() {
// Queue depth is small, but the oldest line has been waiting >=120 ms.
// Either condition is sufficient to enter catch-up.
let mut policy = AdaptiveChunkingPolicy::new();
let now = Instant::now();
let decision = policy.decide(snap(2, 120), now);
assert_eq!(decision.mode, ChunkingMode::CatchUp);
assert!(decision.entered_catch_up);
assert_eq!(decision.drain_plan, DrainPlan::Batch(2));
}
#[test]
fn catch_up_exits_after_low_activity_hold() {
// Enter CatchUp via depth burst, then drop pressure below exit
// thresholds. Policy must hold for >=EXIT_HOLD before returning to Smooth.
let mut policy = AdaptiveChunkingPolicy::new();
let t0 = Instant::now();
let _ = policy.decide(snap(10, 20), t0);
assert_eq!(policy.mode(), ChunkingMode::CatchUp);
// Pressure drops to (2 lines, 40 ms) — at the exit thresholds.
// Hold begins; not yet 250ms.
let pre_hold = policy.decide(snap(2, 40), t0 + Duration::from_millis(50));
assert_eq!(pre_hold.mode, ChunkingMode::CatchUp);
// Still under hold.
let mid_hold = policy.decide(snap(2, 40), t0 + Duration::from_millis(200));
assert_eq!(mid_hold.mode, ChunkingMode::CatchUp);
// Past EXIT_HOLD (250 ms) → return to Smooth.
let post_hold = policy.decide(snap(2, 40), t0 + Duration::from_millis(320));
assert_eq!(post_hold.mode, ChunkingMode::Smooth);
assert_eq!(post_hold.drain_plan, DrainPlan::Single);
}
#[test]
fn idle_resets_to_smooth_immediately() {
// An empty queue forces Smooth regardless of prior mode.
let mut policy = AdaptiveChunkingPolicy::new();
let now = Instant::now();
let _ = policy.decide(snap(10, 20), now);
assert_eq!(policy.mode(), ChunkingMode::CatchUp);
let decision = policy.decide(empty_snap(), now + Duration::from_millis(10));
assert_eq!(decision.mode, ChunkingMode::Smooth);
assert_eq!(decision.drain_plan, DrainPlan::Single);
}
#[test]
fn reentry_hold_blocks_immediate_flip_back() {
// After exiting CatchUp via idle, an 8-line burst that arrives within
// the re-entry hold window should not immediately re-enter CatchUp.
let mut policy = AdaptiveChunkingPolicy::new();
let t0 = Instant::now();
let _ = policy.decide(snap(10, 20), t0);
let _ = policy.decide(empty_snap(), t0 + Duration::from_millis(10));
// Within REENTER_CATCH_UP_HOLD (250 ms): hold blocks re-entry.
let held = policy.decide(snap(8, 20), t0 + Duration::from_millis(100));
assert_eq!(held.mode, ChunkingMode::Smooth);
assert_eq!(held.drain_plan, DrainPlan::Single);
// Past the hold: re-entry permitted.
let reentered = policy.decide(snap(8, 20), t0 + Duration::from_millis(400));
assert_eq!(reentered.mode, ChunkingMode::CatchUp);
assert_eq!(reentered.drain_plan, DrainPlan::Batch(8));
}
#[test]
fn severe_backlog_bypasses_reentry_hold() {
// Even within the hold window, a "severe" backlog (>=64 lines) bypasses
// the gate so display lag doesn't unbounded-grow.
let mut policy = AdaptiveChunkingPolicy::new();
let t0 = Instant::now();
let _ = policy.decide(snap(10, 20), t0);
let _ = policy.decide(empty_snap(), t0 + Duration::from_millis(10));
let severe = policy.decide(snap(64, 20), t0 + Duration::from_millis(100));
assert_eq!(severe.mode, ChunkingMode::CatchUp);
assert_eq!(severe.drain_plan, DrainPlan::Batch(64));
}
}
+266
View File
@@ -0,0 +1,266 @@
//! Commit-tick scheduler that drains a stream chunker according to policy.
//!
//! Bridges [`AdaptiveChunkingPolicy`] with a concrete [`StreamChunker`] queue.
//! Callers feed raw text deltas via [`StreamChunker::push_delta`] (only
//! newline-terminated text becomes queued lines), then call
//! [`run_commit_tick`] on every commit beat to obtain the text safe to flush
//! to the transcript on this beat.
//!
//! The chunker is the unit of streaming — one per active block (assistant /
//! thinking). Tool output is unbuffered and bypasses this path.
use std::collections::VecDeque;
use std::time::Duration;
use std::time::Instant;
use super::chunking::AdaptiveChunkingPolicy;
use super::chunking::ChunkingDecision;
use super::chunking::DrainPlan;
use super::chunking::QueueSnapshot;
/// Buffers raw stream deltas and emits committed text in line units.
///
/// Only the substring up to the *last* `\n` is committed; trailing partial
/// content stays in the buffer. This is what protects partial code fences
/// (` ``` `) and other line-sensitive markdown from rendering mid-state.
#[derive(Debug, Default)]
pub struct StreamChunker {
/// Bytes received but not yet split into a complete line.
pending: String,
/// Complete lines waiting to be flushed to the transcript.
/// Each entry preserves its trailing `\n` so reassembly is lossless.
queue: VecDeque<QueuedLine>,
}
#[derive(Debug, Clone)]
struct QueuedLine {
text: String,
enqueued_at: Instant,
}
impl StreamChunker {
pub fn new() -> Self {
Self::default()
}
/// Append a raw model delta. Returns whether at least one new line was queued.
pub fn push_delta(&mut self, delta: &str) -> bool {
if delta.is_empty() {
return false;
}
self.pending.push_str(delta);
let Some(last_nl) = self.pending.rfind('\n') else {
return false;
};
// Drain everything up to and including the last newline into queued lines.
// Splitting by line keeps the chunker source-agnostic and lets the policy
// count "lines waiting" without peeking at text content.
let now = Instant::now();
let committed: String = self.pending.drain(..=last_nl).collect();
let mut produced = false;
for chunk in split_lines_keep_terminator(&committed) {
if chunk.is_empty() {
continue;
}
self.queue.push_back(QueuedLine {
text: chunk,
enqueued_at: now,
});
produced = true;
}
produced
}
/// Number of complete lines currently queued for commit.
pub fn queued_lines(&self) -> usize {
self.queue.len()
}
/// Age of the oldest queued line, if any.
pub fn oldest_queued_age(&self, now: Instant) -> Option<Duration> {
self.queue
.front()
.map(|q| now.saturating_duration_since(q.enqueued_at))
}
/// Whether the queue is empty AND no buffered partial line remains.
pub fn is_idle(&self) -> bool {
self.queue.is_empty() && self.pending.is_empty()
}
/// Snapshot for policy decisions.
pub fn snapshot(&self, now: Instant) -> QueueSnapshot {
QueueSnapshot {
queued_lines: self.queue.len(),
oldest_age: self.oldest_queued_age(now),
}
}
/// Drain `max_lines` complete lines and return them as concatenated text.
pub fn drain_lines(&mut self, max_lines: usize) -> String {
let n = max_lines.min(self.queue.len());
let mut out = String::new();
for queued in self.queue.drain(..n) {
out.push_str(&queued.text);
}
out
}
/// Drain any remaining pending bytes (called at stream finalize).
/// This includes both queued complete lines AND the tail partial line.
pub fn drain_remaining(&mut self) -> String {
let mut out = String::new();
while let Some(q) = self.queue.pop_front() {
out.push_str(&q.text);
}
if !self.pending.is_empty() {
out.push_str(&self.pending);
self.pending.clear();
}
out
}
/// Reset internal state.
pub fn reset(&mut self) {
self.pending.clear();
self.queue.clear();
}
}
/// One commit-tick decision plus the text that should be flushed on this tick.
pub struct CommitTickOutput {
pub committed_text: String,
pub decision: ChunkingDecision,
pub is_idle: bool,
}
/// Run a single commit tick: ask the policy, drain the chunker accordingly.
pub fn run_commit_tick(
policy: &mut AdaptiveChunkingPolicy,
chunker: &mut StreamChunker,
now: Instant,
) -> CommitTickOutput {
let snapshot = chunker.snapshot(now);
let prior_mode = policy.mode();
let decision = policy.decide(snapshot, now);
if decision.mode != prior_mode {
tracing::trace!(
prior_mode = ?prior_mode,
new_mode = ?decision.mode,
queued_lines = snapshot.queued_lines,
oldest_queued_age_ms = snapshot.oldest_age.map(|age| age.as_millis() as u64),
entered_catch_up = decision.entered_catch_up,
"stream chunking mode transition"
);
}
let max = match decision.drain_plan {
DrainPlan::Single => 1,
DrainPlan::Batch(n) => n,
};
// Drain through the chunker; an empty queue under Smooth produces "".
let committed_text = chunker.drain_lines(max);
CommitTickOutput {
committed_text,
decision,
is_idle: chunker.is_idle(),
}
}
/// Split text into chunks, preserving each terminator. The final chunk is
/// included only if it ends with `\n` (this is enforced upstream in
/// `push_delta`, which only drains up through the last newline).
fn split_lines_keep_terminator(text: &str) -> Vec<String> {
let mut out = Vec::new();
let mut start = 0;
let bytes = text.as_bytes();
for (i, &b) in bytes.iter().enumerate() {
if b == b'\n' {
out.push(text[start..=i].to_string());
start = i + 1;
}
}
if start < text.len() {
// This branch is unreachable for inputs produced by `push_delta`,
// but stays defensive for direct callers.
out.push(text[start..].to_string());
}
out
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tui::streaming::chunking::ChunkingMode;
#[test]
fn partial_code_fence_is_held_until_newline() {
// Without the chunker, a stray ``` arriving mid-stream could render as
// an opened fence. The chunker must not commit anything until the
// line is terminated by `\n`.
let mut chunker = StreamChunker::new();
let mut policy = AdaptiveChunkingPolicy::new();
let now = Instant::now();
// Partial fence + content; no newline → nothing committed yet.
chunker.push_delta("Here is code:\n");
chunker.push_delta("```");
let out = run_commit_tick(&mut policy, &mut chunker, now);
assert_eq!(out.committed_text, "Here is code:\n");
assert!(!chunker.is_idle(), "partial fence still buffered");
// Close the fence line.
chunker.push_delta("rust\n");
let out = run_commit_tick(&mut policy, &mut chunker, now + Duration::from_millis(5));
assert_eq!(out.committed_text, "```rust\n");
}
#[test]
fn smooth_burst_emits_one_line_per_tick() {
let mut chunker = StreamChunker::new();
let mut policy = AdaptiveChunkingPolicy::new();
let t0 = Instant::now();
chunker.push_delta("a\nb\nc\n");
// Each tick under Smooth pulls exactly one line.
let out1 = run_commit_tick(&mut policy, &mut chunker, t0);
assert_eq!(out1.decision.mode, ChunkingMode::Smooth);
assert_eq!(out1.committed_text, "a\n");
let out2 = run_commit_tick(&mut policy, &mut chunker, t0 + Duration::from_millis(20));
assert_eq!(out2.committed_text, "b\n");
let out3 = run_commit_tick(&mut policy, &mut chunker, t0 + Duration::from_millis(40));
assert_eq!(out3.committed_text, "c\n");
assert!(out3.is_idle);
}
#[test]
fn large_burst_drains_in_catch_up() {
// Eight lines arriving "at once" must trigger CatchUp on the first
// commit tick and drain the full backlog in one go.
let mut chunker = StreamChunker::new();
let mut policy = AdaptiveChunkingPolicy::new();
let now = Instant::now();
let burst = "1\n2\n3\n4\n5\n6\n7\n8\n";
chunker.push_delta(burst);
let out = run_commit_tick(&mut policy, &mut chunker, now);
assert_eq!(out.decision.mode, ChunkingMode::CatchUp);
assert_eq!(out.committed_text, burst);
assert!(out.is_idle);
}
#[test]
fn finalize_drains_partial_tail() {
// The final, possibly-incomplete line must be flushed by drain_remaining.
let mut chunker = StreamChunker::new();
chunker.push_delta("done\nno-newline-here");
let drained = chunker.drain_remaining();
assert_eq!(drained, "done\nno-newline-here");
assert!(chunker.is_idle());
}
}
@@ -11,9 +11,16 @@
use ratatui::style::{Modifier, Style};
use ratatui::text::{Line, Span};
use std::time::Instant;
use unicode_width::UnicodeWidthStr;
use crate::palette;
pub mod chunking;
pub mod commit_tick;
pub use chunking::{AdaptiveChunkingPolicy, ChunkingMode};
pub use commit_tick::{StreamChunker, run_commit_tick};
/// Collects streaming text and commits complete lines.
#[derive(Debug, Clone)]
pub struct MarkdownStreamCollector {
@@ -29,6 +36,14 @@ pub struct MarkdownStreamCollector {
is_thinking: bool,
}
impl Default for MarkdownStreamCollector {
fn default() -> Self {
// `is_streaming: true` matches `MarkdownStreamCollector::new` so a
// freshly-default block behaves like a freshly-started stream.
Self::new(None, false)
}
}
impl MarkdownStreamCollector {
/// Create a new collector
pub fn new(width: Option<usize>, is_thinking: bool) -> Self {
@@ -205,11 +220,20 @@ fn wrap_line(line: &str, width: usize) -> Vec<String> {
}
}
/// Per-block streaming substate: collector for newline-gating + chunker/policy
/// for two-gear pacing.
#[derive(Debug, Default)]
struct BlockState {
collector: MarkdownStreamCollector,
chunker: StreamChunker,
policy: AdaptiveChunkingPolicy,
}
/// State for managing multiple stream collectors (one per content block)
#[derive(Debug, Clone, Default)]
#[derive(Debug, Default)]
pub struct StreamingState {
/// Collectors for each content block by index
collectors: Vec<Option<MarkdownStreamCollector>>,
/// Per-block state by index (collector + chunker + policy).
blocks: Vec<Option<BlockState>>,
/// Whether any stream is currently active
pub is_active: bool,
/// Accumulated text for display
@@ -227,66 +251,153 @@ impl StreamingState {
/// Start a new text block
pub fn start_text(&mut self, index: usize, width: Option<usize>) {
self.ensure_capacity(index);
self.collectors[index] = Some(MarkdownStreamCollector::new(width, false));
self.blocks[index] = Some(BlockState {
collector: MarkdownStreamCollector::new(width, false),
chunker: StreamChunker::new(),
policy: AdaptiveChunkingPolicy::new(),
});
self.is_active = true;
}
/// Start a new thinking block
pub fn start_thinking(&mut self, index: usize, width: Option<usize>) {
self.ensure_capacity(index);
self.collectors[index] = Some(MarkdownStreamCollector::new(width, true));
self.blocks[index] = Some(BlockState {
collector: MarkdownStreamCollector::new(width, true),
chunker: StreamChunker::new(),
policy: AdaptiveChunkingPolicy::new(),
});
self.is_active = true;
}
/// Push content to a block
/// Push content to a block. The text is buffered in the collector and
/// any newline-complete portion is forwarded to the chunker, which decides
/// what (if anything) becomes visible on the next [`Self::commit_text`] tick.
pub fn push_content(&mut self, index: usize, content: &str) {
if let Some(Some(collector)) = self.collectors.get_mut(index) {
collector.push(content);
if let Some(Some(block)) = self.blocks.get_mut(index) {
block.collector.push(content);
// Update accumulated text
if collector.is_thinking {
if block.collector.is_thinking {
self.accumulated_thinking.push_str(content);
} else {
self.accumulated_text.push_str(content);
}
// Forward newline-complete bytes to the chunker. Partial trailing
// content stays in the collector buffer (this is what protects
// partial code fences and other line-sensitive markdown from
// becoming briefly visible).
let committed = block.collector.commit_complete_text();
if !committed.is_empty() {
block.chunker.push_delta(&committed);
}
}
}
/// Get newly committed lines from a block
/// Get newly committed lines from a block. (Legacy entry point that maps
/// onto the chunker.)
pub fn commit_lines(&mut self, index: usize) -> Vec<Line<'static>> {
if let Some(Some(collector)) = self.collectors.get_mut(index) {
collector.commit_complete_lines()
} else {
Vec::new()
let text = self.commit_text(index);
if text.is_empty() {
return Vec::new();
}
// Re-render the text through the same path the collector used.
let style = if self
.blocks
.get(index)
.and_then(|b| b.as_ref())
.is_some_and(|b| b.collector.is_thinking)
{
Style::default()
.fg(palette::STATUS_WARNING)
.add_modifier(Modifier::DIM | Modifier::ITALIC)
} else {
Style::default()
};
let mut lines = Vec::new();
for line in text.lines() {
lines.push(Line::from(Span::styled(line.to_string(), style)));
}
if text.ends_with('\n') {
lines.push(Line::from(""));
}
lines
}
/// Get newly committed raw text from a block.
/// Run one commit-tick of the chunker policy and return any text safe to
/// flush to the transcript on this tick. May be empty (Smooth-mode tick
/// against an empty queue) or contain anywhere from one line up to the
/// full backlog (CatchUp-mode burst drain).
pub fn commit_text(&mut self, index: usize) -> String {
if let Some(Some(collector)) = self.collectors.get_mut(index) {
collector.commit_complete_text()
if let Some(Some(block)) = self.blocks.get_mut(index) {
let now = Instant::now();
let out = run_commit_tick(&mut block.policy, &mut block.chunker, now);
out.committed_text
} else {
String::new()
}
}
/// Finalize a block and get remaining lines
pub fn finalize_block(&mut self, index: usize) -> Vec<Line<'static>> {
if let Some(Some(collector)) = self.collectors.get_mut(index) {
let lines = collector.finalize();
// Check if all blocks are done
self.check_active();
lines
} else {
Vec::new()
}
/// Inspect the current chunking mode for a block (testing/observability).
pub fn chunking_mode(&self, index: usize) -> Option<ChunkingMode> {
self.blocks
.get(index)
.and_then(|b| b.as_ref())
.map(|b| b.policy.mode())
}
/// Finalize a block and get remaining raw text.
/// Whether the chunker has queued content waiting to be flushed by the
/// next commit tick. Useful for callers that want to drive an extra tick
/// while the queue drains under Smooth-mode pacing.
pub fn has_pending_chunker_lines(&self, index: usize) -> bool {
self.blocks
.get(index)
.and_then(|b| b.as_ref())
.is_some_and(|b| b.chunker.queued_lines() > 0)
}
/// Finalize a block and get remaining lines
pub fn finalize_block(&mut self, index: usize) -> Vec<Line<'static>> {
let text = self.finalize_block_text(index);
if text.is_empty() {
return Vec::new();
}
let style = if self
.blocks
.get(index)
.and_then(|b| b.as_ref())
.is_some_and(|b| b.collector.is_thinking)
{
Style::default()
.fg(palette::STATUS_WARNING)
.add_modifier(Modifier::DIM | Modifier::ITALIC)
} else {
Style::default()
};
let mut lines = Vec::new();
for line in text.lines() {
lines.push(Line::from(Span::styled(line.to_string(), style)));
}
if text.ends_with('\n') {
lines.push(Line::from(""));
}
lines
}
/// Finalize a block and get remaining raw text. Drains any chunker
/// backlog plus any unterminated partial line in the collector.
pub fn finalize_block_text(&mut self, index: usize) -> String {
if let Some(Some(collector)) = self.collectors.get_mut(index) {
let text = collector.finalize_text();
if let Some(Some(block)) = self.blocks.get_mut(index) {
// First, push any tail buffered in the collector through (it may
// not be newline-terminated; finalize_text returns it raw).
let tail = block.collector.finalize_text();
// Any whole-line text held by the chunker is safe to emit now.
let mut out = block.chunker.drain_remaining();
if !tail.is_empty() {
out.push_str(&tail);
}
self.check_active();
text
out
} else {
String::new()
}
@@ -295,12 +406,11 @@ impl StreamingState {
/// Finalize all blocks
pub fn finalize_all(&mut self) -> Vec<(usize, Vec<Line<'static>>)> {
let mut result = Vec::new();
for (i, collector) in self.collectors.iter_mut().enumerate() {
if let Some(c) = collector {
let lines = c.finalize();
if !lines.is_empty() {
result.push((i, lines));
}
let len = self.blocks.len();
for i in 0..len {
let lines = self.finalize_block(i);
if !lines.is_empty() {
result.push((i, lines));
}
}
self.is_active = false;
@@ -309,22 +419,22 @@ impl StreamingState {
/// Check if any stream is still active
fn check_active(&mut self) {
self.is_active = self.collectors.iter().any(|c| {
c.as_ref()
.is_some_and(MarkdownStreamCollector::is_streaming)
self.is_active = self.blocks.iter().any(|b| {
b.as_ref()
.is_some_and(|state| state.collector.is_streaming())
});
}
/// Ensure capacity for the given index
fn ensure_capacity(&mut self, index: usize) {
while self.collectors.len() <= index {
self.collectors.push(None);
while self.blocks.len() <= index {
self.blocks.push(None);
}
}
/// Reset the streaming state
pub fn reset(&mut self) {
self.collectors.clear();
self.blocks.clear();
self.is_active = false;
self.accumulated_text.clear();
self.accumulated_thinking.clear();