Add capacity memory controller and smoother TUI streaming

This commit is contained in:
Hunter Bown
2026-02-17 16:09:07 -06:00
parent 87884a1e84
commit 1a04659a95
23 changed files with 2965 additions and 156 deletions
+1 -1
View File
@@ -103,7 +103,7 @@ Config lives at `~/.deepseek/config.toml`:
```toml
api_key = "sk-..."
default_text_model = "deepseek-reasoner" # optional (or "deepseek-chat")
allow_shell = false # optional
allow_shell = true # optional (sandboxed by default)
max_subagents = 3 # optional (1-20)
```
+19 -1
View File
@@ -37,7 +37,7 @@ memory_path = "~/.deepseek/memory.md"
# ─────────────────────────────────────────────────────────────────────────────────
# Security
# ─────────────────────────────────────────────────────────────────────────────────
allow_shell = false
allow_shell = true
approval_policy = "on-request" # on-request | untrusted | never
sandbox_mode = "workspace-write" # read-only | workspace-write | danger-full-access | external-sandbox
max_subagents = 5 # optional (1-20)
@@ -83,6 +83,24 @@ exponential_base = 2.0
# model = "deepseek-chat" # Model to use for summarization
# cache_summary = true # Cache the summary block
# ─────────────────────────────────────────────────────────────────────────────────
# Capacity Controller (runtime pressure guardrails)
# ─────────────────────────────────────────────────────────────────────────────────
[capacity]
enabled = true
low_risk_max = 0.34
medium_risk_max = 0.62
severe_min_slack = -0.25
severe_violation_ratio = 0.40
refresh_cooldown_turns = 2
replan_cooldown_turns = 5
max_replay_per_turn = 1
min_turns_before_guardrail = 2
profile_window = 8
deepseek_v3_2_chat_prior = 3.9
deepseek_v3_2_reasoner_prior = 4.1
fallback_default_prior = 3.8
# ─────────────────────────────────────────────────────────────────────────────────
# Profile Example (for multiple environments)
# ─────────────────────────────────────────────────────────────────────────────────
+1 -1
View File
@@ -154,7 +154,7 @@ Responses API (with automatic fallback if needed).
### Crash Recovery + Offline Queue
1. Before sending user input, the TUI writes a checkpoint snapshot to `~/.deepseek/sessions/checkpoints/latest.json`
2. If the process crashes mid-turn, startup restores that checkpoint automatically (unless explicit `--resume` is used)
2. Startup remains fresh by default; prior sessions are resumed explicitly via `--resume`/`--continue` (or `Ctrl+R` in TUI)
3. While degraded/offline, new prompts are queued in-memory and mirrored to `~/.deepseek/sessions/checkpoints/offline_queue.json`
4. Queue edits (`/queue ...`) are persisted continuously so drafts and queued prompts survive restarts
5. Successful turn completion clears the active checkpoint and writes a durable session snapshot
+30 -1
View File
@@ -56,6 +56,19 @@ These override config values:
- `DEEPSEEK_MAX_SUBAGENTS` (clamped to `1..=20`)
- `DEEPSEEK_TASKS_DIR` (runtime task queue/artifact storage, default `~/.deepseek/tasks`)
- `DEEPSEEK_ALLOW_INSECURE_HTTP` (`1`/`true` allows non-local `http://` base URLs; default is reject)
- `DEEPSEEK_CAPACITY_ENABLED`
- `DEEPSEEK_CAPACITY_LOW_RISK_MAX`
- `DEEPSEEK_CAPACITY_MEDIUM_RISK_MAX`
- `DEEPSEEK_CAPACITY_SEVERE_MIN_SLACK`
- `DEEPSEEK_CAPACITY_SEVERE_VIOLATION_RATIO`
- `DEEPSEEK_CAPACITY_REFRESH_COOLDOWN_TURNS`
- `DEEPSEEK_CAPACITY_REPLAN_COOLDOWN_TURNS`
- `DEEPSEEK_CAPACITY_MAX_REPLAY_PER_TURN`
- `DEEPSEEK_CAPACITY_MIN_TURNS_BEFORE_GUARDRAIL`
- `DEEPSEEK_CAPACITY_PROFILE_WINDOW`
- `DEEPSEEK_CAPACITY_PRIOR_CHAT`
- `DEEPSEEK_CAPACITY_PRIOR_REASONER`
- `DEEPSEEK_CAPACITY_PRIOR_FALLBACK`
## Settings File (Persistent UI Preferences)
@@ -84,7 +97,7 @@ Common settings keys:
- `api_key` (string, required): must be non-empty (or set `DEEPSEEK_API_KEY`).
- `base_url` (string, optional): defaults to `https://api.deepseek.com` (OpenAI-compatible Responses API).
- `default_text_model` (string, optional): defaults to `deepseek-reasoner`. Supported IDs are `deepseek-reasoner` and `deepseek-chat`.
- `allow_shell` (bool, optional): defaults to `false`.
- `allow_shell` (bool, optional): defaults to `true` (sandboxed).
- `approval_policy` (string, optional): `on-request`, `untrusted`, or `never`. Runtime `/set approval_mode` also accepts `on-request` and `untrusted` aliases.
- `sandbox_mode` (string, optional): `read-only`, `workspace-write`, `danger-full-access`, `external-sandbox`.
- `managed_config_path` (string, optional): managed config file loaded after user/env config.
@@ -100,6 +113,20 @@ Common settings keys:
- `[retry].initial_delay` (float seconds, default `1.0`)
- `[retry].max_delay` (float seconds, default `60.0`)
- `[retry].exponential_base` (float, default `2.0`)
- `capacity.*` (optional): runtime context-capacity controller:
- `[capacity].enabled` (bool, default `true`)
- `[capacity].low_risk_max` (float, default `0.34`)
- `[capacity].medium_risk_max` (float, default `0.62`)
- `[capacity].severe_min_slack` (float, default `-0.25`)
- `[capacity].severe_violation_ratio` (float, default `0.40`)
- `[capacity].refresh_cooldown_turns` (int, default `2`)
- `[capacity].replan_cooldown_turns` (int, default `5`)
- `[capacity].max_replay_per_turn` (int, default `1`)
- `[capacity].min_turns_before_guardrail` (int, default `2`)
- `[capacity].profile_window` (int, default `8`)
- `[capacity].deepseek_v3_2_chat_prior` (float, default `3.9`)
- `[capacity].deepseek_v3_2_reasoner_prior` (float, default `4.1`)
- `[capacity].fallback_default_prior` (float, default `3.8`)
- `tui.alternate_screen` (string, optional): `auto`, `always`, or `never`. `auto` disables the alternate screen in Zellij; `--no-alt-screen` forces inline mode.
- `hooks` (optional): lifecycle hooks configuration (see `config.example.toml`).
- `features.*` (optional): feature flag overrides (see below).
@@ -154,6 +181,8 @@ allowed_sandbox_modes = ["read-only", "workspace-write"]
If configured values violate requirements, startup fails with a descriptive error.
See `docs/capacity_controller.md` for formulas, intervention behavior, and telemetry.
## Notes On `deepseek doctor`
`deepseek doctor` now follows the same config resolution rules as the rest of the CLI.
+3 -3
View File
@@ -51,11 +51,11 @@ Actions:
Expected behavior:
- Checkpoint stored at `~/.deepseek/sessions/checkpoints/latest.json`
- Startup auto-restores checkpoint when no explicit `--resume` target is supplied
- Startup begins a fresh session unless `--resume`/`--continue` is supplied
Actions:
1. Start TUI normally and verify "Recovered checkpoint session" status
2. If automatic recovery fails, inspect checkpoint JSON for schema mismatch
1. Resume prior work explicitly via `deepseek --resume <id>` or `Ctrl+R` in TUI
2. If checkpoint inspection is needed, inspect `latest.json` for schema mismatch/details
3. If schema is newer than binary supports, upgrade binary or remove stale checkpoint
## Incident: Persistent State Schema Errors
+136
View File
@@ -0,0 +1,136 @@
# Capacity Controller
`deepseek-tui` includes a capacity-aware context controller that keeps active prompt context near coherent operating range while preserving full history on disk.
## Policy Overview
Each checkpoint computes:
- `H_hat` (runtime pressure proxy)
- `C_hat` (model capacity prior)
- `slack = C_hat - H_hat`
- dynamic slack profile over last `N=8` observations
### Runtime Pressure Proxy (`H_hat`)
- `action_complexity_bits = log2(1 + action_count_this_turn)`
- `tool_complexity_bits = log2(1 + tool_calls_recent_window)`
- `ref_complexity_bits = log2(1 + unique_reference_ids_recent_window)`
- `context_pressure_bits = 6.0 * context_used_ratio`
Formula:
`H_hat = 0.35*action_complexity_bits + 0.30*tool_complexity_bits + 0.20*ref_complexity_bits + 0.15*context_pressure_bits`
### Capacity Prior (`C_hat`)
Per-model priors:
- `deepseek_v3_2_chat = 3.9`
- `deepseek_v3_2_reasoner = 4.1`
- fallback `3.8`
### Failure Probability
Using rolling profile fields:
- `final_slack`
- `min_slack`
- `violation_ratio`
- `slack_volatility`
- `slack_drop`
Formula:
`z = -1.65*final_slack -0.85*min_slack +1.35*violation_ratio +0.70*slack_volatility +0.28*slack_drop -0.12`
`p_fail = sigmoid(z)` clamped to `[0,1]`.
Risk bands:
- low: `p_fail <= low_risk_max`
- medium: `p_fail <= medium_risk_max`
- high: otherwise
Action mapping:
- low -> `NoIntervention`
- medium -> `TargetedContextRefresh`
- high + severe dynamics (`min_slack <= severe_min_slack` or `violation_ratio >= severe_violation_ratio`) -> `VerifyAndReplan`
- otherwise high -> `VerifyWithToolReplay`
## Checkpoints
The engine evaluates controller policy at:
1. Pre-request checkpoint (before `MessageRequest` assembly).
2. Post-tool checkpoint (after tool result append).
3. Error-escalation checkpoint (tool error streak path).
## Interventions
### `TargetedContextRefresh`
- Runs compaction (`compact_messages_safe`) when possible.
- Falls back to local trim if compaction path fails.
- Persists canonical state.
- Replaces long-tail active context with compact canonical prompt + memory pointer.
### `VerifyWithToolReplay`
- Replays one read-only critical tool call from recent turn context.
- Appends verification note with pass/fail + diff summary.
- On replay conflict/error, marks escalation candidate and disables replay for current turn.
### `VerifyAndReplan`
- Persists canonical snapshot.
- Clears volatile prompt tail while preserving latest user ask and latest verification note.
- Injects canonical replan instruction into system prompt.
- Continues turn loop from compact canonical state.
## Safety Controls
- Max one intervention per turn.
- Cooldowns for refresh and replan.
- Replay budget per turn (`max_replay_per_turn`).
- Fail-open behavior when controller inputs are unavailable.
- Compaction/replay failures are logged; turn continues.
## Memory Store
Path:
- `DEEPSEEK_CAPACITY_MEMORY_DIR` (if set)
- otherwise `~/.deepseek/memory/<session_id>.jsonl`
- fallback: `<workspace>/.deepseek/memory/<session_id>.jsonl` when home path is unavailable/unwritable
Record fields:
- `id`, `ts`, `turn_index`, `action_trigger`
- `h_hat`, `c_hat`, `slack`, `risk_band`
- `canonical_state`
- `source_message_ids`
- optional `replay_info`
Loader utility supports fetching last `K` snapshots for rehydration.
## Configuration
`[capacity]` keys:
- `enabled`
- `low_risk_max`
- `medium_risk_max`
- `severe_min_slack`
- `severe_violation_ratio`
- `refresh_cooldown_turns`
- `replan_cooldown_turns`
- `max_replay_per_turn`
- `min_turns_before_guardrail`
- `profile_window`
- `deepseek_v3_2_chat_prior`
- `deepseek_v3_2_reasoner_prior`
- `fallback_default_prior`
Equivalent environment overrides are available with `DEEPSEEK_CAPACITY_*`.
+20 -4
View File
@@ -37,7 +37,10 @@ pub fn clear(app: &mut App) -> CommandResult {
app.history.clear();
app.mark_history_updated();
app.api_messages.clear();
app.system_prompt = None;
app.transcript_selection.clear();
app.queued_messages.clear();
app.queued_draft = None;
app.total_conversation_tokens = 0;
let todos_cleared = app.clear_todos();
app.tool_log.clear();
@@ -49,11 +52,21 @@ pub fn clear(app: &mut App) -> CommandResult {
app.last_exec_wait_command = None;
app.last_prompt_tokens = None;
app.last_completion_tokens = None;
if todos_cleared {
CommandResult::message("Conversation cleared")
app.current_session_id = None;
let message = if todos_cleared {
"Conversation cleared".to_string()
} else {
CommandResult::message("Conversation cleared (plan state busy; run /clear again if needed)")
}
"Conversation cleared (plan state busy; run /clear again if needed)".to_string()
};
CommandResult::with_message_and_action(
message,
AppAction::SyncSession {
messages: Vec::new(),
system_prompt: None,
model: app.model.clone(),
workspace: app.workspace.clone(),
},
)
}
/// Exit the application
@@ -265,6 +278,7 @@ mod tests {
});
app.total_conversation_tokens = 100;
app.tool_log.push("test".to_string());
app.current_session_id = Some("existing-session".to_string());
let result = clear(&mut app);
assert!(result.message.is_some());
@@ -274,6 +288,8 @@ mod tests {
assert!(app.tool_log.is_empty());
assert!(app.tool_cells.is_empty());
assert!(app.tool_details_by_cell.is_empty());
assert!(app.current_session_id.is_none());
assert!(matches!(result.action, Some(AppAction::SyncSession { .. })));
}
#[test]
+3 -8
View File
@@ -3,7 +3,7 @@
//! Debug commands: tokens, cost, system, context, undo, retry
use super::CommandResult;
use crate::compaction::estimate_tokens;
use crate::compaction::estimate_input_tokens_conservative;
use crate::models::{DEFAULT_CONTEXT_WINDOW_TOKENS, SystemPrompt, context_window_for_model};
use crate::tui::app::{App, AppAction};
use crate::tui::history::HistoryCell;
@@ -78,16 +78,15 @@ pub fn system_prompt(app: &mut App) -> CommandResult {
/// Show context window usage
pub fn context(app: &mut App) -> CommandResult {
let mut total_chars = estimate_message_chars(&app.api_messages);
let mut estimated_tokens = estimate_tokens(&app.api_messages);
let estimated_tokens =
estimate_input_tokens_conservative(&app.api_messages, app.system_prompt.as_ref());
// System prompt
if let Some(SystemPrompt::Text(text)) = &app.system_prompt {
total_chars += text.len();
estimated_tokens = estimated_tokens.saturating_add(estimate_text_tokens(text));
} else if let Some(SystemPrompt::Blocks(blocks)) = &app.system_prompt {
for block in blocks {
total_chars += block.text.len();
estimated_tokens = estimated_tokens.saturating_add(estimate_text_tokens(&block.text));
}
}
@@ -114,10 +113,6 @@ pub fn context(app: &mut App) -> CommandResult {
))
}
fn estimate_text_tokens(text: &str) -> usize {
text.chars().count().div_ceil(4)
}
#[cfg(test)]
mod tests {
use super::*;
+65 -2
View File
@@ -44,6 +44,11 @@ const KEEP_RECENT_MESSAGES: usize = 4;
const RECENT_WORKING_SET_WINDOW: usize = 12;
const MAX_WORKING_SET_PATHS: usize = 24;
const MIN_SUMMARIZE_MESSAGES: usize = 6;
const SUMMARY_TEXT_SNIPPET_CHARS: usize = 800;
const SUMMARY_TOOL_RESULT_SNIPPET_CHARS: usize = 240;
const SUMMARY_INPUT_MAX_CHARS: usize = 24_000;
const SUMMARY_INPUT_HEAD_CHARS: usize = 14_000;
const SUMMARY_INPUT_TAIL_CHARS: usize = 6_000;
#[derive(Debug, Clone, Default)]
struct CompactionPlan {
@@ -466,6 +471,35 @@ pub fn estimate_tokens(messages: &[Message]) -> usize {
messages.iter().map(estimate_tokens_for_message).sum()
}
fn estimate_text_tokens_conservative(text: &str) -> usize {
text.chars().count().div_ceil(3)
}
fn estimate_system_tokens_conservative(system: Option<&SystemPrompt>) -> usize {
match system {
Some(SystemPrompt::Text(text)) => estimate_text_tokens_conservative(text),
Some(SystemPrompt::Blocks(blocks)) => blocks
.iter()
.map(|block| estimate_text_tokens_conservative(&block.text))
.sum(),
None => 0,
}
}
/// Conservative estimate for full request input tokens (messages + system + framing).
#[must_use]
pub fn estimate_input_tokens_conservative(
messages: &[Message],
system: Option<&SystemPrompt>,
) -> usize {
let message_tokens = estimate_tokens(messages).saturating_mul(3).div_ceil(2);
let system_tokens = estimate_system_tokens_conservative(system);
let framing_overhead = messages.len().saturating_mul(12).saturating_add(48);
message_tokens
.saturating_add(system_tokens)
.saturating_add(framing_overhead)
}
pub fn should_compact(
messages: &[Message],
config: &CompactionConfig,
@@ -527,6 +561,22 @@ fn truncate_chars(text: &str, max_chars: usize) -> &str {
}
}
fn tail_chars(text: &str, max_chars: usize) -> String {
if max_chars == 0 {
return String::new();
}
let total_chars = text.chars().count();
if total_chars <= max_chars {
return text.to_string();
}
let start_char = total_chars.saturating_sub(max_chars);
let start_idx = text
.char_indices()
.nth(start_char)
.map_or(0, |(idx, _)| idx);
text[start_idx..].to_string()
}
/// Result of a compaction operation with metadata.
#[derive(Debug)]
pub struct CompactionResult {
@@ -707,13 +757,14 @@ async fn create_summary(
for block in &msg.content {
match block {
ContentBlock::Text { text, .. } => {
let _ = write!(conversation_text, "{role}: {text}\n\n");
let snippet = truncate_chars(text, SUMMARY_TEXT_SNIPPET_CHARS);
let _ = write!(conversation_text, "{role}: {snippet}\n\n");
}
ContentBlock::ToolUse { name, .. } => {
let _ = write!(conversation_text, "{role}: [Used tool: {name}]\n\n");
}
ContentBlock::ToolResult { content, .. } => {
let snippet = truncate_chars(content, 500);
let snippet = truncate_chars(content, SUMMARY_TOOL_RESULT_SNIPPET_CHARS);
let _ = write!(conversation_text, "Tool result: {}\n\n", snippet);
}
ContentBlock::Thinking { .. } => {
@@ -723,6 +774,17 @@ async fn create_summary(
}
}
let conversation_chars = conversation_text.chars().count();
if conversation_chars > SUMMARY_INPUT_MAX_CHARS {
let head = truncate_chars(&conversation_text, SUMMARY_INPUT_HEAD_CHARS).to_string();
let tail = tail_chars(&conversation_text, SUMMARY_INPUT_TAIL_CHARS);
let omitted = conversation_chars
.saturating_sub(head.chars().count())
.saturating_sub(tail.chars().count());
conversation_text =
format!("{head}\n\n[... {omitted} characters omitted before summary ...]\n\n{tail}");
}
let request = MessageRequest {
model: model.to_string(),
messages: vec![Message {
@@ -731,6 +793,7 @@ async fn create_summary(
text: format!(
"Summarize the following conversation in a concise but comprehensive way. \
Preserve key information, decisions made, and any important context. \
Tool outputs may be abbreviated. \
Keep it under 500 words.\n\n---\n\n{conversation_text}"
),
cache_control: None,
+150 -1
View File
@@ -57,6 +57,24 @@ pub struct RetryPolicy {
pub exponential_base: f64,
}
/// Capacity-controller config loaded from config files/environment.
#[derive(Debug, Clone, Deserialize)]
pub struct CapacityConfig {
pub enabled: Option<bool>,
pub low_risk_max: Option<f64>,
pub medium_risk_max: Option<f64>,
pub severe_min_slack: Option<f64>,
pub severe_violation_ratio: Option<f64>,
pub refresh_cooldown_turns: Option<u64>,
pub replan_cooldown_turns: Option<u64>,
pub max_replay_per_turn: Option<usize>,
pub min_turns_before_guardrail: Option<u64>,
pub profile_window: Option<usize>,
pub deepseek_v3_2_chat_prior: Option<f64>,
pub deepseek_v3_2_reasoner_prior: Option<f64>,
pub fallback_default_prior: Option<f64>,
}
impl RetryPolicy {
/// Compute the backoff delay for a retry attempt.
#[must_use]
@@ -89,6 +107,7 @@ pub struct Config {
pub requirements_path: Option<String>,
pub max_subagents: Option<usize>,
pub retry: Option<RetryConfig>,
pub capacity: Option<CapacityConfig>,
pub features: Option<FeaturesToml>,
/// TUI configuration (alternate screen, etc.)
@@ -203,6 +222,36 @@ impl Config {
);
}
}
if let Some(capacity) = &self.capacity {
if let Some(v) = capacity.low_risk_max
&& !(0.0..=1.0).contains(&v)
{
anyhow::bail!(
"Invalid capacity.low_risk_max '{v}': expected a value in [0.0, 1.0]."
);
}
if let Some(v) = capacity.medium_risk_max
&& !(0.0..=1.0).contains(&v)
{
anyhow::bail!(
"Invalid capacity.medium_risk_max '{v}': expected a value in [0.0, 1.0]."
);
}
if let (Some(low), Some(medium)) = (capacity.low_risk_max, capacity.medium_risk_max)
&& low > medium
{
anyhow::bail!(
"Invalid capacity thresholds: low_risk_max ({low}) must be <= medium_risk_max ({medium})."
);
}
if let Some(v) = capacity.severe_violation_ratio
&& !(0.0..=1.0).contains(&v)
{
anyhow::bail!(
"Invalid capacity.severe_violation_ratio '{v}': expected a value in [0.0, 1.0]."
);
}
}
Ok(())
}
@@ -285,7 +334,7 @@ impl Config {
/// Return whether shell execution is allowed.
#[must_use]
pub fn allow_shell(&self) -> bool {
self.allow_shell.unwrap_or(false)
self.allow_shell.unwrap_or(true)
}
/// Return the maximum number of concurrent sub-agents.
@@ -479,6 +528,105 @@ fn apply_env_overrides(config: &mut Config) {
{
config.max_subagents = Some(parsed.clamp(1, MAX_SUBAGENTS));
}
let capacity = config.capacity.get_or_insert(CapacityConfig {
enabled: None,
low_risk_max: None,
medium_risk_max: None,
severe_min_slack: None,
severe_violation_ratio: None,
refresh_cooldown_turns: None,
replan_cooldown_turns: None,
max_replay_per_turn: None,
min_turns_before_guardrail: None,
profile_window: None,
deepseek_v3_2_chat_prior: None,
deepseek_v3_2_reasoner_prior: None,
fallback_default_prior: None,
});
if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_ENABLED") {
let val = value.trim().to_ascii_lowercase();
capacity.enabled = Some(matches!(val.as_str(), "1" | "true" | "yes" | "on"));
}
if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_LOW_RISK_MAX")
&& let Ok(parsed) = value.parse::<f64>()
{
capacity.low_risk_max = Some(parsed);
}
if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_MEDIUM_RISK_MAX")
&& let Ok(parsed) = value.parse::<f64>()
{
capacity.medium_risk_max = Some(parsed);
}
if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_SEVERE_MIN_SLACK")
&& let Ok(parsed) = value.parse::<f64>()
{
capacity.severe_min_slack = Some(parsed);
}
if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_SEVERE_VIOLATION_RATIO")
&& let Ok(parsed) = value.parse::<f64>()
{
capacity.severe_violation_ratio = Some(parsed);
}
if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_REFRESH_COOLDOWN_TURNS")
&& let Ok(parsed) = value.parse::<u64>()
{
capacity.refresh_cooldown_turns = Some(parsed);
}
if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_REPLAN_COOLDOWN_TURNS")
&& let Ok(parsed) = value.parse::<u64>()
{
capacity.replan_cooldown_turns = Some(parsed);
}
if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_MAX_REPLAY_PER_TURN")
&& let Ok(parsed) = value.parse::<usize>()
{
capacity.max_replay_per_turn = Some(parsed);
}
if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_MIN_TURNS_BEFORE_GUARDRAIL")
&& let Ok(parsed) = value.parse::<u64>()
{
capacity.min_turns_before_guardrail = Some(parsed);
}
if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_PROFILE_WINDOW")
&& let Ok(parsed) = value.parse::<usize>()
{
capacity.profile_window = Some(parsed);
}
if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_PRIOR_CHAT")
&& let Ok(parsed) = value.parse::<f64>()
{
capacity.deepseek_v3_2_chat_prior = Some(parsed);
}
if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_PRIOR_REASONER")
&& let Ok(parsed) = value.parse::<f64>()
{
capacity.deepseek_v3_2_reasoner_prior = Some(parsed);
}
if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_PRIOR_FALLBACK")
&& let Ok(parsed) = value.parse::<f64>()
{
capacity.fallback_default_prior = Some(parsed);
}
if config.capacity.as_ref().is_some_and(|c| {
c.enabled.is_none()
&& c.low_risk_max.is_none()
&& c.medium_risk_max.is_none()
&& c.severe_min_slack.is_none()
&& c.severe_violation_ratio.is_none()
&& c.refresh_cooldown_turns.is_none()
&& c.replan_cooldown_turns.is_none()
&& c.max_replay_per_turn.is_none()
&& c.min_turns_before_guardrail.is_none()
&& c.profile_window.is_none()
&& c.deepseek_v3_2_chat_prior.is_none()
&& c.deepseek_v3_2_reasoner_prior.is_none()
&& c.fallback_default_prior.is_none()
}) {
config.capacity = None;
}
}
fn normalize_model_config(config: &mut Config) {
@@ -549,6 +697,7 @@ fn merge_config(base: Config, override_cfg: Config) -> Config {
requirements_path: override_cfg.requirements_path.or(base.requirements_path),
max_subagents: override_cfg.max_subagents.or(base.max_subagents),
retry: override_cfg.retry.or(base.retry),
capacity: override_cfg.capacity.or(base.capacity),
tui: override_cfg.tui.or(base.tui),
hooks: override_cfg.hooks.or(base.hooks),
features: merge_features(base.features, override_cfg.features),
+610
View File
@@ -0,0 +1,610 @@
//! Capacity-aware guardrail controller for context pressure management.
use std::collections::{HashMap, VecDeque};
/// Controller settings.
#[derive(Debug, Clone, PartialEq)]
pub struct CapacityControllerConfig {
pub enabled: bool,
pub low_risk_max: f64,
pub medium_risk_max: f64,
pub severe_min_slack: f64,
pub severe_violation_ratio: f64,
pub refresh_cooldown_turns: u64,
pub replan_cooldown_turns: u64,
pub max_replay_per_turn: usize,
pub min_turns_before_guardrail: u64,
pub profile_window: usize,
pub model_priors: HashMap<String, f64>,
pub fallback_default: f64,
}
impl Default for CapacityControllerConfig {
fn default() -> Self {
let mut model_priors = HashMap::new();
model_priors.insert("deepseek_v3_2_chat".to_string(), 3.9);
model_priors.insert("deepseek_v3_2_reasoner".to_string(), 4.1);
Self {
enabled: true,
low_risk_max: 0.34,
medium_risk_max: 0.62,
severe_min_slack: -0.25,
severe_violation_ratio: 0.40,
refresh_cooldown_turns: 2,
replan_cooldown_turns: 5,
max_replay_per_turn: 1,
min_turns_before_guardrail: 2,
profile_window: 8,
model_priors,
fallback_default: 3.8,
}
}
}
impl CapacityControllerConfig {
/// Build effective capacity config from app config.
#[must_use]
pub fn from_app_config(config: &crate::config::Config) -> Self {
let mut out = Self::default();
let Some(capacity) = config.capacity.as_ref() else {
return out;
};
if let Some(v) = capacity.enabled {
out.enabled = v;
}
if let Some(v) = capacity.low_risk_max {
out.low_risk_max = v;
}
if let Some(v) = capacity.medium_risk_max {
out.medium_risk_max = v;
}
if let Some(v) = capacity.severe_min_slack {
out.severe_min_slack = v;
}
if let Some(v) = capacity.severe_violation_ratio {
out.severe_violation_ratio = v;
}
if let Some(v) = capacity.refresh_cooldown_turns {
out.refresh_cooldown_turns = v;
}
if let Some(v) = capacity.replan_cooldown_turns {
out.replan_cooldown_turns = v;
}
if let Some(v) = capacity.max_replay_per_turn {
out.max_replay_per_turn = v;
}
if let Some(v) = capacity.min_turns_before_guardrail {
out.min_turns_before_guardrail = v;
}
if let Some(v) = capacity.profile_window {
out.profile_window = v.max(2);
}
if let Some(v) = capacity.deepseek_v3_2_chat_prior {
out.model_priors.insert("deepseek_v3_2_chat".to_string(), v);
}
if let Some(v) = capacity.deepseek_v3_2_reasoner_prior {
out.model_priors
.insert("deepseek_v3_2_reasoner".to_string(), v);
}
if let Some(v) = capacity.fallback_default_prior {
out.fallback_default = v;
}
out
}
}
/// Guardrail decision output.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GuardrailAction {
NoIntervention,
TargetedContextRefresh,
VerifyWithToolReplay,
VerifyAndReplan,
}
impl GuardrailAction {
#[must_use]
pub fn as_str(self) -> &'static str {
match self {
GuardrailAction::NoIntervention => "no_intervention",
GuardrailAction::TargetedContextRefresh => "targeted_context_refresh",
GuardrailAction::VerifyWithToolReplay => "verify_with_tool_replay",
GuardrailAction::VerifyAndReplan => "verify_and_replan",
}
}
}
/// Coarse failure risk band.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RiskBand {
Low,
Medium,
High,
}
impl RiskBand {
#[must_use]
pub fn as_str(self) -> &'static str {
match self {
RiskBand::Low => "low",
RiskBand::Medium => "medium",
RiskBand::High => "high",
}
}
}
/// Input used to observe current turn pressure.
#[derive(Debug, Clone)]
pub struct CapacityObservationInput {
pub turn_index: u64,
pub model: String,
pub action_count_this_turn: usize,
pub tool_calls_recent_window: usize,
pub unique_reference_ids_recent_window: usize,
pub context_used_ratio: f64,
}
/// Rolling slack profile.
#[derive(Debug, Clone, Copy, Default)]
pub struct DynamicSlackProfile {
pub final_slack: f64,
pub min_slack: f64,
pub violation_ratio: f64,
pub slack_volatility: f64,
pub slack_drop: f64,
}
/// Per-checkpoint capacity snapshot.
#[derive(Debug, Clone)]
pub struct CapacitySnapshot {
pub turn_index: u64,
pub h_hat: f64,
pub c_hat: f64,
pub slack: f64,
pub profile: DynamicSlackProfile,
pub p_fail: f64,
pub risk_band: RiskBand,
pub severe: bool,
}
/// Full controller decision including reason and block flags.
#[derive(Debug, Clone)]
pub struct CapacityDecision {
pub action: GuardrailAction,
pub reason: String,
pub cooldown_blocked: bool,
}
#[derive(Debug, Clone, Default)]
struct GuardrailRuntimeState {
last_refresh_turn: Option<u64>,
last_replan_turn: Option<u64>,
replay_count_this_turn: usize,
replay_disabled_turn: Option<u64>,
intervention_applied_turn: Option<u64>,
}
/// Capacity controller.
#[derive(Debug, Clone)]
pub struct CapacityController {
config: CapacityControllerConfig,
slack_window: VecDeque<f64>,
recent_tool_counts: VecDeque<usize>,
recent_ref_counts: VecDeque<usize>,
state: GuardrailRuntimeState,
last_snapshot: Option<CapacitySnapshot>,
}
impl CapacityController {
#[must_use]
pub fn new(config: CapacityControllerConfig) -> Self {
Self {
config,
slack_window: VecDeque::new(),
recent_tool_counts: VecDeque::new(),
recent_ref_counts: VecDeque::new(),
state: GuardrailRuntimeState::default(),
last_snapshot: None,
}
}
#[must_use]
pub fn config(&self) -> &CapacityControllerConfig {
&self.config
}
pub fn observe_pre_turn(
&mut self,
input: CapacityObservationInput,
) -> Option<CapacitySnapshot> {
self.observe(input)
}
pub fn observe_post_tool(
&mut self,
input: CapacityObservationInput,
) -> Option<CapacitySnapshot> {
self.observe(input)
}
/// Decide intervention from the latest snapshot, with cooldown and safety gates.
#[must_use]
pub fn decide(
&mut self,
turn_index: u64,
snapshot: Option<&CapacitySnapshot>,
) -> CapacityDecision {
if !self.config.enabled {
return CapacityDecision {
action: GuardrailAction::NoIntervention,
reason: "capacity_controller_disabled".to_string(),
cooldown_blocked: false,
};
}
let Some(snapshot) = snapshot else {
return CapacityDecision {
action: GuardrailAction::NoIntervention,
reason: "missing_capacity_data_fail_open".to_string(),
cooldown_blocked: false,
};
};
if turn_index < self.config.min_turns_before_guardrail {
return CapacityDecision {
action: GuardrailAction::NoIntervention,
reason: "min_turns_before_guardrail_not_reached".to_string(),
cooldown_blocked: false,
};
}
let proposed = decide_policy(&self.config, snapshot);
if proposed == GuardrailAction::NoIntervention {
return CapacityDecision {
action: proposed,
reason: "low_risk_no_intervention".to_string(),
cooldown_blocked: false,
};
}
if self
.state
.intervention_applied_turn
.is_some_and(|t| t == turn_index)
{
return CapacityDecision {
action: GuardrailAction::NoIntervention,
reason: "intervention_already_applied_this_turn".to_string(),
cooldown_blocked: true,
};
}
match proposed {
GuardrailAction::TargetedContextRefresh => {
if self
.state
.last_refresh_turn
.is_some_and(|last| turn_index <= last + self.config.refresh_cooldown_turns)
{
return CapacityDecision {
action: GuardrailAction::NoIntervention,
reason: "refresh_cooldown_active".to_string(),
cooldown_blocked: true,
};
}
}
GuardrailAction::VerifyWithToolReplay => {
if self
.state
.replay_disabled_turn
.is_some_and(|t| t == turn_index)
{
return CapacityDecision {
action: GuardrailAction::NoIntervention,
reason: "replay_disabled_for_turn".to_string(),
cooldown_blocked: true,
};
}
if self.state.replay_count_this_turn >= self.config.max_replay_per_turn {
return CapacityDecision {
action: GuardrailAction::NoIntervention,
reason: "max_replay_per_turn_reached".to_string(),
cooldown_blocked: true,
};
}
}
GuardrailAction::VerifyAndReplan => {
if self
.state
.last_replan_turn
.is_some_and(|last| turn_index <= last + self.config.replan_cooldown_turns)
{
return CapacityDecision {
action: GuardrailAction::NoIntervention,
reason: "replan_cooldown_active".to_string(),
cooldown_blocked: true,
};
}
}
GuardrailAction::NoIntervention => {}
}
CapacityDecision {
action: proposed,
reason: "policy_selected_action".to_string(),
cooldown_blocked: false,
}
}
pub fn mark_turn_start(&mut self, turn_index: u64) {
let new_turn = match self.last_snapshot.as_ref() {
None => true,
Some(snapshot) => snapshot.turn_index != turn_index,
};
if new_turn {
self.state.replay_count_this_turn = 0;
self.state.replay_disabled_turn = None;
self.state.intervention_applied_turn = None;
}
}
pub fn mark_intervention_applied(&mut self, turn_index: u64, action: GuardrailAction) {
self.state.intervention_applied_turn = Some(turn_index);
match action {
GuardrailAction::TargetedContextRefresh => {
self.state.last_refresh_turn = Some(turn_index);
}
GuardrailAction::VerifyWithToolReplay => {
self.state.replay_count_this_turn =
self.state.replay_count_this_turn.saturating_add(1);
}
GuardrailAction::VerifyAndReplan => {
self.state.last_replan_turn = Some(turn_index);
}
GuardrailAction::NoIntervention => {}
}
}
pub fn mark_replay_failed(&mut self, turn_index: u64) {
self.state.replay_disabled_turn = Some(turn_index);
}
#[must_use]
pub fn last_snapshot(&self) -> Option<&CapacitySnapshot> {
self.last_snapshot.as_ref()
}
fn observe(&mut self, input: CapacityObservationInput) -> Option<CapacitySnapshot> {
if !self.config.enabled {
return None;
}
let context_used_ratio = input.context_used_ratio.clamp(0.0, 2.0);
let action_complexity_bits = log2_1p(input.action_count_this_turn);
let tool_complexity_bits = log2_1p(input.tool_calls_recent_window);
let ref_complexity_bits = log2_1p(input.unique_reference_ids_recent_window);
let context_pressure_bits = 6.0 * context_used_ratio;
let h_hat = (0.35 * action_complexity_bits)
+ (0.30 * tool_complexity_bits)
+ (0.20 * ref_complexity_bits)
+ (0.15 * context_pressure_bits);
let c_hat = self.model_prior(&input.model);
let slack = c_hat - h_hat;
push_window(&mut self.slack_window, slack, self.config.profile_window);
push_window(
&mut self.recent_tool_counts,
input.tool_calls_recent_window,
self.config.profile_window,
);
push_window(
&mut self.recent_ref_counts,
input.unique_reference_ids_recent_window,
self.config.profile_window,
);
let profile = compute_profile(&self.slack_window);
let z = (-1.65 * profile.final_slack)
+ (-0.85 * profile.min_slack)
+ (1.35 * profile.violation_ratio)
+ (0.70 * profile.slack_volatility)
+ (0.28 * profile.slack_drop)
- 0.12;
let p_fail = sigmoid(z).clamp(0.0, 1.0);
let risk_band = if p_fail <= self.config.low_risk_max {
RiskBand::Low
} else if p_fail <= self.config.medium_risk_max {
RiskBand::Medium
} else {
RiskBand::High
};
let severe = profile.min_slack <= self.config.severe_min_slack
|| profile.violation_ratio >= self.config.severe_violation_ratio;
let snapshot = CapacitySnapshot {
turn_index: input.turn_index,
h_hat,
c_hat,
slack,
profile,
p_fail,
risk_band,
severe,
};
self.last_snapshot = Some(snapshot.clone());
Some(snapshot)
}
fn model_prior(&self, model: &str) -> f64 {
let normalized = normalize_model_prior_key(model);
self.config
.model_priors
.get(normalized)
.copied()
.unwrap_or(self.config.fallback_default)
}
}
/// Pure policy mapping for snapshot -> action.
#[must_use]
pub fn decide_policy(
_config: &CapacityControllerConfig,
snapshot: &CapacitySnapshot,
) -> GuardrailAction {
match snapshot.risk_band {
RiskBand::Low => GuardrailAction::NoIntervention,
RiskBand::Medium => GuardrailAction::TargetedContextRefresh,
RiskBand::High if snapshot.severe => GuardrailAction::VerifyAndReplan,
RiskBand::High => GuardrailAction::VerifyWithToolReplay,
}
}
fn normalize_model_prior_key(model: &str) -> &str {
let lower = model.to_ascii_lowercase();
if lower.contains("reasoner") || lower.contains("r1") {
"deepseek_v3_2_reasoner"
} else if lower.contains("chat") || lower.contains("v3") {
"deepseek_v3_2_chat"
} else {
"fallback_default"
}
}
fn log2_1p(v: usize) -> f64 {
(1.0 + (v as f64)).log2()
}
fn push_window<T>(window: &mut VecDeque<T>, value: T, max_len: usize) {
window.push_back(value);
while window.len() > max_len {
window.pop_front();
}
}
fn compute_profile(window: &VecDeque<f64>) -> DynamicSlackProfile {
if window.is_empty() {
return DynamicSlackProfile::default();
}
let values: Vec<f64> = window.iter().copied().collect();
let final_slack = *values.last().unwrap_or(&0.0);
let min_slack = values.iter().copied().fold(f64::INFINITY, f64::min);
let violations = values.iter().filter(|v| **v <= 0.0).count() as f64;
let violation_ratio = violations / (values.len() as f64);
let deltas: Vec<f64> = values.windows(2).map(|w| w[1] - w[0]).collect();
let slack_drop = if values.len() >= 2 {
(values[values.len() - 2] - values[values.len() - 1]).max(0.0)
} else {
0.0
};
let slack_volatility = if deltas.is_empty() {
0.0
} else {
let mean = deltas.iter().sum::<f64>() / (deltas.len() as f64);
let var = deltas
.iter()
.map(|delta| {
let centered = *delta - mean;
centered * centered
})
.sum::<f64>()
/ (deltas.len() as f64);
var.sqrt()
};
DynamicSlackProfile {
final_slack,
min_slack,
violation_ratio,
slack_volatility,
slack_drop,
}
}
fn sigmoid(z: f64) -> f64 {
if z >= 0.0 {
let ez = (-z).exp();
1.0 / (1.0 + ez)
} else {
let ez = z.exp();
ez / (1.0 + ez)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_snapshot(p_fail: f64, severe: bool, risk_band: RiskBand) -> CapacitySnapshot {
CapacitySnapshot {
turn_index: 3,
h_hat: 1.0,
c_hat: 3.8,
slack: 2.8,
profile: DynamicSlackProfile {
final_slack: 2.8,
min_slack: if severe { -0.5 } else { 0.2 },
violation_ratio: if severe { 0.6 } else { 0.1 },
slack_volatility: 0.2,
slack_drop: 0.1,
},
p_fail,
risk_band,
severe,
}
}
#[test]
fn low_risk_maps_to_no_intervention() {
let cfg = CapacityControllerConfig::default();
let snap = make_snapshot(0.2, false, RiskBand::Low);
assert_eq!(decide_policy(&cfg, &snap), GuardrailAction::NoIntervention);
}
#[test]
fn medium_risk_maps_to_refresh() {
let cfg = CapacityControllerConfig::default();
let snap = make_snapshot(0.5, false, RiskBand::Medium);
assert_eq!(
decide_policy(&cfg, &snap),
GuardrailAction::TargetedContextRefresh
);
}
#[test]
fn high_non_severe_maps_to_replay() {
let cfg = CapacityControllerConfig::default();
let snap = make_snapshot(0.8, false, RiskBand::High);
assert_eq!(
decide_policy(&cfg, &snap),
GuardrailAction::VerifyWithToolReplay
);
}
#[test]
fn high_severe_maps_to_replan() {
let cfg = CapacityControllerConfig::default();
let snap = make_snapshot(0.9, true, RiskBand::High);
assert_eq!(decide_policy(&cfg, &snap), GuardrailAction::VerifyAndReplan);
}
#[test]
fn cooldown_blocks_repeated_action() {
let mut controller = CapacityController::new(CapacityControllerConfig::default());
let turn_index = 5;
controller.mark_turn_start(turn_index);
controller.mark_intervention_applied(turn_index, GuardrailAction::TargetedContextRefresh);
let snapshot = make_snapshot(0.5, false, RiskBand::Medium);
let decision = controller.decide(turn_index + 1, Some(&snapshot));
assert_eq!(decision.action, GuardrailAction::NoIntervention);
assert!(decision.cooldown_blocked);
}
}
+337
View File
@@ -0,0 +1,337 @@
//! Persistent memory snapshots for capacity controller interventions.
use std::fs::{self, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::time::SystemTime;
use anyhow::{Context, Result, anyhow};
use chrono::Utc;
use serde::{Deserialize, Serialize};
/// Canonical compact state persisted by interventions.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct CanonicalState {
pub goal: String,
pub constraints: Vec<String>,
pub confirmed_facts: Vec<String>,
pub open_loops: Vec<String>,
pub pending_actions: Vec<String>,
pub critical_refs: Vec<String>,
}
/// Replay verification metadata.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplayInfo {
pub tool_id: String,
pub tool_name: String,
pub pass: bool,
pub diff_summary: String,
}
/// JSONL record written for each intervention.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CapacityMemoryRecord {
pub id: String,
pub ts: String,
pub turn_index: u64,
pub action_trigger: String,
pub h_hat: f64,
pub c_hat: f64,
pub slack: f64,
pub risk_band: String,
pub canonical_state: CanonicalState,
pub source_message_ids: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub replay_info: Option<ReplayInfo>,
}
/// Resolve `~/.deepseek/memory`.
#[must_use]
pub fn default_capacity_memory_dir() -> PathBuf {
capacity_memory_dirs()
.into_iter()
.next()
.unwrap_or_else(|| PathBuf::from(".").join(".deepseek").join("memory"))
}
fn capacity_memory_dirs() -> Vec<PathBuf> {
if let Ok(raw) = std::env::var("DEEPSEEK_CAPACITY_MEMORY_DIR") {
let trimmed = raw.trim();
if !trimmed.is_empty() {
return vec![PathBuf::from(shellexpand::tilde(trimmed).as_ref())];
}
}
let mut dirs = Vec::new();
if let Some(home) = dirs::home_dir() {
dirs.push(home.join(".deepseek").join("memory"));
}
let cwd = std::env::current_dir()
.unwrap_or_else(|_| PathBuf::from("."))
.join(".deepseek")
.join("memory");
dirs.push(cwd);
dirs.dedup();
dirs
}
#[must_use]
pub fn session_memory_path(session_id: &str) -> PathBuf {
default_capacity_memory_dir().join(format!("{session_id}.jsonl"))
}
pub fn append_capacity_record(session_id: &str, record: &CapacityMemoryRecord) -> Result<PathBuf> {
let candidates = candidate_session_memory_paths(session_id);
append_capacity_record_to_candidates(&candidates, record)
}
pub fn append_capacity_record_to_path(path: &Path, record: &CapacityMemoryRecord) -> Result<()> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)
.with_context(|| format!("Failed to create memory directory {}", parent.display()))?;
}
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(path)
.with_context(|| format!("Failed to open memory log {}", path.display()))?;
let line =
serde_json::to_string(record).context("Failed to serialize capacity memory record")?;
writeln!(file, "{line}")
.with_context(|| format!("Failed to write memory record {}", path.display()))?;
Ok(())
}
pub fn load_last_k_capacity_records(
session_id: &str,
k: usize,
) -> Result<Vec<CapacityMemoryRecord>> {
let candidates = candidate_session_memory_paths(session_id);
load_last_k_capacity_records_from_candidates(&candidates, k)
}
pub fn load_last_k_capacity_records_from_path(
path: &Path,
k: usize,
) -> Result<Vec<CapacityMemoryRecord>> {
if k == 0 || !path.exists() {
return Ok(Vec::new());
}
let file = OpenOptions::new()
.read(true)
.open(path)
.with_context(|| format!("Failed to open memory log {}", path.display()))?;
let reader = BufReader::new(file);
let mut records = Vec::new();
for line in reader.lines() {
let line = line.with_context(|| format!("Failed reading {}", path.display()))?;
if line.trim().is_empty() {
continue;
}
if let Ok(record) = serde_json::from_str::<CapacityMemoryRecord>(&line) {
records.push(record);
}
}
if records.len() > k {
Ok(records.split_off(records.len() - k))
} else {
Ok(records)
}
}
fn candidate_session_memory_paths(session_id: &str) -> Vec<PathBuf> {
capacity_memory_dirs()
.into_iter()
.map(|dir| dir.join(format!("{session_id}.jsonl")))
.collect()
}
fn append_capacity_record_to_candidates(
paths: &[PathBuf],
record: &CapacityMemoryRecord,
) -> Result<PathBuf> {
let mut last_err: Option<anyhow::Error> = None;
for path in paths {
match append_capacity_record_to_path(path, record) {
Ok(()) => return Ok(path.clone()),
Err(err) => last_err = Some(err),
}
}
Err(last_err.unwrap_or_else(|| anyhow!("No capacity memory path candidates available")))
}
fn load_last_k_capacity_records_from_candidates(
paths: &[PathBuf],
k: usize,
) -> Result<Vec<CapacityMemoryRecord>> {
if k == 0 {
return Ok(Vec::new());
}
let mut newest: Option<(SystemTime, Vec<CapacityMemoryRecord>)> = None;
let mut last_err: Option<anyhow::Error> = None;
for path in paths {
if !path.exists() {
continue;
}
match load_last_k_capacity_records_from_path(path, k) {
Ok(records) => {
if records.is_empty() {
continue;
}
let modified = fs::metadata(path)
.and_then(|meta| meta.modified())
.unwrap_or(SystemTime::UNIX_EPOCH);
let should_replace = newest
.as_ref()
.map(|(current, _)| modified >= *current)
.unwrap_or(true);
if should_replace {
newest = Some((modified, records));
}
}
Err(err) => last_err = Some(err),
}
}
if let Some((_, records)) = newest {
return Ok(records);
}
if let Some(err) = last_err {
return Err(err);
}
Ok(Vec::new())
}
#[must_use]
pub fn new_record_id() -> String {
format!("cap_{}", &uuid::Uuid::new_v4().to_string()[..8])
}
#[must_use]
pub fn now_rfc3339() -> String {
Utc::now().to_rfc3339()
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn memory_jsonl_round_trip() {
let tmp = tempdir().expect("tempdir");
let path = tmp.path().join("session.jsonl");
let record = CapacityMemoryRecord {
id: "cap_1".to_string(),
ts: now_rfc3339(),
turn_index: 2,
action_trigger: "targeted_context_refresh".to_string(),
h_hat: 1.2,
c_hat: 3.8,
slack: 2.6,
risk_band: "medium".to_string(),
canonical_state: CanonicalState {
goal: "Ship feature".to_string(),
..CanonicalState::default()
},
source_message_ids: vec!["m1".to_string()],
replay_info: None,
};
append_capacity_record_to_path(&path, &record).expect("append");
let records = load_last_k_capacity_records_from_path(&path, 1).expect("load");
assert_eq!(records.len(), 1);
assert_eq!(records[0].canonical_state.goal, "Ship feature");
}
#[test]
fn append_falls_back_to_next_candidate_path() {
let tmp = tempdir().expect("tempdir");
let blocked_root = tmp.path().join("blocked");
fs::write(&blocked_root, "file").expect("create blocking file");
let blocked_path = blocked_root.join("session.jsonl");
let fallback_path = tmp.path().join("fallback").join("session.jsonl");
let record = CapacityMemoryRecord {
id: "cap_fallback".to_string(),
ts: now_rfc3339(),
turn_index: 1,
action_trigger: "targeted_context_refresh".to_string(),
h_hat: 1.0,
c_hat: 3.8,
slack: 2.8,
risk_band: "medium".to_string(),
canonical_state: CanonicalState::default(),
source_message_ids: vec!["m1".to_string()],
replay_info: None,
};
let chosen = append_capacity_record_to_candidates(
&[blocked_path.clone(), fallback_path.clone()],
&record,
)
.expect("append with fallback");
assert_eq!(chosen, fallback_path);
assert!(chosen.exists());
}
#[test]
fn load_prefers_newest_candidate_records() {
let tmp = tempdir().expect("tempdir");
let older = tmp.path().join("older.jsonl");
let newer = tmp.path().join("newer.jsonl");
let old_record = CapacityMemoryRecord {
id: "cap_old".to_string(),
ts: now_rfc3339(),
turn_index: 1,
action_trigger: "targeted_context_refresh".to_string(),
h_hat: 1.0,
c_hat: 3.8,
slack: 2.8,
risk_band: "medium".to_string(),
canonical_state: CanonicalState {
goal: "old".to_string(),
..CanonicalState::default()
},
source_message_ids: vec!["m1".to_string()],
replay_info: None,
};
let new_record = CapacityMemoryRecord {
id: "cap_new".to_string(),
ts: now_rfc3339(),
turn_index: 2,
action_trigger: "verify_and_replan".to_string(),
h_hat: 1.4,
c_hat: 3.8,
slack: 2.4,
risk_band: "high".to_string(),
canonical_state: CanonicalState {
goal: "new".to_string(),
..CanonicalState::default()
},
source_message_ids: vec!["m2".to_string()],
replay_info: None,
};
append_capacity_record_to_path(&older, &old_record).expect("write older");
std::thread::sleep(std::time::Duration::from_millis(10));
append_capacity_record_to_path(&newer, &new_record).expect("write newer");
let records = load_last_k_capacity_records_from_candidates(&[older, newer], 1)
.expect("load newest records");
assert_eq!(records.len(), 1);
assert_eq!(records[0].canonical_state.goal, "new");
}
}
+1030 -39
View File
File diff suppressed because it is too large Load Diff
+219
View File
@@ -1,8 +1,17 @@
use super::*;
use serde_json::json;
use std::fs;
use std::path::PathBuf;
use std::time::Instant;
use tempfile::tempdir;
fn build_engine_with_capacity(capacity: CapacityControllerConfig) -> Engine {
let mut engine_config = EngineConfig::default();
engine_config.capacity = capacity;
let (engine, _handle) = Engine::new(engine_config, &Config::default());
engine
}
fn make_plan(
read_only: bool,
@@ -23,6 +32,19 @@ fn make_plan(
}
}
#[test]
fn engine_handle_cancel_tracks_latest_turn_token() {
let (mut engine, handle) = Engine::new(EngineConfig::default(), &Config::default());
let stale_token = engine.cancel_token.clone();
engine.reset_cancel_token();
handle.cancel();
assert!(engine.cancel_token.is_cancelled());
assert!(handle.is_cancelled());
assert!(!stale_token.is_cancelled());
}
#[test]
fn parallel_batch_requires_read_only_parallel_tools() {
let plans = vec![make_plan(true, true, false, false)];
@@ -92,3 +114,200 @@ fn context_budget_reserves_output_and_headroom() {
let expected = 128_000usize - 4_096usize - 1_024usize;
assert_eq!(budget, expected);
}
#[tokio::test]
async fn pre_request_refresh_invoked_when_medium_risk() {
let mut capacity = CapacityControllerConfig::default();
capacity.enabled = true;
capacity.low_risk_max = 0.0;
capacity.medium_risk_max = 1.0;
capacity.min_turns_before_guardrail = 0;
let mut engine = build_engine_with_capacity(capacity.clone());
engine.config.capacity = capacity.clone();
engine.capacity_controller = CapacityController::new(capacity);
engine.turn_counter = 5;
engine
.capacity_controller
.mark_turn_start(engine.turn_counter);
let long = "x".repeat(5_000);
for _ in 0..200 {
engine.session.messages.push(Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text: long.clone(),
cache_control: None,
}],
});
}
let before = engine.estimated_input_tokens();
let turn = TurnContext::new(10);
let applied = engine
.run_capacity_pre_request_checkpoint(&turn, None, AppMode::Agent)
.await;
let after = engine.estimated_input_tokens();
assert!(applied);
assert!(after < before);
}
#[tokio::test]
async fn post_tool_replay_invoked_when_high_non_severe_risk() {
let tmp = tempdir().expect("tempdir");
fs::write(tmp.path().join("sample.txt"), "hello replay").expect("write");
let mut capacity = CapacityControllerConfig::default();
capacity.enabled = true;
capacity.low_risk_max = 0.0;
capacity.medium_risk_max = 0.0;
capacity.severe_min_slack = -10.0;
capacity.severe_violation_ratio = 2.0;
capacity.min_turns_before_guardrail = 0;
let mut engine = build_engine_with_capacity(capacity.clone());
engine.session.workspace = tmp.path().to_path_buf();
engine.config.workspace = tmp.path().to_path_buf();
engine.config.capacity = capacity.clone();
engine.capacity_controller = CapacityController::new(capacity);
engine.turn_counter = 4;
engine
.capacity_controller
.mark_turn_start(engine.turn_counter);
let mut turn = TurnContext::new(10);
let mut tool_call = TurnToolCall::new(
"tool_read_1".to_string(),
"read_file".to_string(),
json!({ "path": "sample.txt" }),
);
tool_call.set_result(
"hello replay".to_string(),
std::time::Duration::from_millis(1),
);
turn.record_tool_call(tool_call);
let registry = ToolRegistryBuilder::new()
.with_read_only_file_tools()
.build(engine.build_tool_context(AppMode::Agent));
let restarted = engine
.run_capacity_post_tool_checkpoint(
&turn,
AppMode::Agent,
Some(&registry),
Arc::new(RwLock::new(())),
None,
0,
0,
)
.await;
assert!(!restarted);
let has_verification_note = engine.session.messages.iter().any(|msg| {
msg.content.iter().any(|block| match block {
ContentBlock::ToolResult { content, .. } => content.contains("[verification replay]"),
_ => false,
})
});
assert!(has_verification_note);
}
#[tokio::test]
async fn error_escalation_triggers_replan_when_severe_or_repeated_failures() {
let tmp = tempdir().expect("tempdir");
// Safety: scoped to test process; reset at end.
unsafe {
std::env::set_var(
"DEEPSEEK_CAPACITY_MEMORY_DIR",
tmp.path().to_string_lossy().to_string(),
);
}
let mut capacity = CapacityControllerConfig::default();
capacity.enabled = true;
capacity.low_risk_max = 0.0;
capacity.medium_risk_max = 0.0;
capacity.min_turns_before_guardrail = 0;
let mut engine = build_engine_with_capacity(capacity.clone());
engine.config.capacity = capacity.clone();
engine.capacity_controller = CapacityController::new(capacity);
engine.turn_counter = 6;
engine
.capacity_controller
.mark_turn_start(engine.turn_counter);
for i in 0..10 {
engine.session.messages.push(Message {
role: if i % 2 == 0 { "user" } else { "assistant" }.to_string(),
content: vec![ContentBlock::Text {
text: format!("noise message {i}"),
cache_control: None,
}],
});
}
engine.session.messages.push(Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text: "Please finish task".to_string(),
cache_control: None,
}],
});
let before_len = engine.session.messages.len();
let turn = TurnContext::new(10);
let restarted = engine
.run_capacity_error_escalation_checkpoint(&turn, AppMode::Agent, 2, 2)
.await;
assert!(restarted);
assert!(engine.session.messages.len() < before_len);
assert!(engine.session.messages.len() <= 2);
let records = load_last_k_capacity_records(&engine.session.id, 1).expect("load memory");
assert!(!records.is_empty());
assert!(!records[0].canonical_state.goal.is_empty());
unsafe {
std::env::remove_var("DEEPSEEK_CAPACITY_MEMORY_DIR");
}
}
#[tokio::test]
async fn controller_disabled_keeps_behavior_unchanged() {
let mut capacity = CapacityControllerConfig::default();
capacity.enabled = false;
let mut engine = build_engine_with_capacity(capacity.clone());
engine.config.capacity = capacity.clone();
engine.capacity_controller = CapacityController::new(capacity);
engine.turn_counter = 3;
engine
.capacity_controller
.mark_turn_start(engine.turn_counter);
let long = "y".repeat(5_000);
for _ in 0..120 {
engine.session.messages.push(Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text: long.clone(),
cache_control: None,
}],
});
}
let before = engine.estimated_input_tokens();
let before_len = engine.session.messages.len();
let turn = TurnContext::new(10);
let applied = engine
.run_capacity_pre_request_checkpoint(&turn, None, AppMode::Agent)
.await;
let after = engine.estimated_input_tokens();
let after_len = engine.session.messages.len();
assert!(!applied);
assert_eq!(before, after);
assert_eq!(before_len, after_len);
}
+36
View File
@@ -90,6 +90,42 @@ pub enum Event {
message: String,
},
/// Capacity decision telemetry.
CapacityDecision {
session_id: String,
turn_id: String,
h_hat: f64,
c_hat: f64,
slack: f64,
min_slack: f64,
violation_ratio: f64,
p_fail: f64,
risk_band: String,
action: String,
cooldown_blocked: bool,
reason: String,
},
/// Capacity intervention telemetry.
CapacityIntervention {
session_id: String,
turn_id: String,
action: String,
before_prompt_tokens: usize,
after_prompt_tokens: usize,
compaction_size_reduction: usize,
replay_outcome: Option<String>,
replan_performed: bool,
},
/// Capacity memory persistence failure telemetry.
CapacityMemoryPersistFailed {
session_id: String,
turn_id: String,
action: String,
error: String,
},
// === Sub-Agent Events ===
/// A sub-agent has been spawned
AgentSpawned { id: String, prompt: String },
+2
View File
@@ -11,6 +11,8 @@
#![allow(dead_code)]
pub mod capacity;
pub mod capacity_memory;
pub mod engine;
pub mod events;
pub mod ops;
+4
View File
@@ -18,6 +18,8 @@ pub struct Session {
/// System prompt (optional)
pub system_prompt: Option<SystemPrompt>,
/// Persisted summary blocks generated by context compaction.
pub compaction_summary_prompt: Option<SystemPrompt>,
/// Conversation history (API format)
pub messages: Vec<Message>,
@@ -88,6 +90,7 @@ impl Session {
model,
workspace,
system_prompt: None,
compaction_summary_prompt: None,
messages: Vec::new(),
total_usage: SessionUsage::default(),
allow_shell,
@@ -125,6 +128,7 @@ impl Session {
/// Clear the conversation history
pub fn clear(&mut self) {
self.messages.clear();
self.compaction_summary_prompt = None;
}
/// Get the message count
+9 -1
View File
@@ -2108,10 +2108,17 @@ async fn run_exec_agent(
use crate::core::engine::{EngineConfig, spawn_engine};
use crate::core::events::Event;
use crate::core::ops::Op;
use crate::models::{compaction_message_threshold_for_model, compaction_threshold_for_model};
use crate::tools::plan::new_shared_plan_state;
use crate::tools::todo::new_shared_todo_list;
use crate::tui::app::AppMode;
let mut compaction = CompactionConfig::default();
compaction.enabled = true;
compaction.model = model.to_string();
compaction.token_threshold = compaction_threshold_for_model(model);
compaction.message_threshold = compaction_message_threshold_for_model(model);
let engine_config = EngineConfig {
model: model.to_string(),
workspace: workspace.clone(),
@@ -2122,7 +2129,8 @@ async fn run_exec_agent(
max_steps: 100,
max_subagents,
features: config.features(),
compaction: CompactionConfig::default(),
compaction,
capacity: crate::core::capacity::CapacityControllerConfig::from_app_config(config),
todos: new_shared_todo_list(),
plan_state: new_shared_plan_state(),
};
+48 -14
View File
@@ -802,11 +802,13 @@ mod tests {
}
}
async fn spawn_test_server() -> Result<(
SocketAddr,
SharedRuntimeThreadManager,
tokio::task::JoinHandle<()>,
)> {
async fn spawn_test_server() -> Result<
Option<(
SocketAddr,
SharedRuntimeThreadManager,
tokio::task::JoinHandle<()>,
)>,
> {
let root = std::env::temp_dir().join(format!("deepseek-runtime-api-{}", Uuid::new_v4()));
let sessions_dir = root.join("sessions");
fs::create_dir_all(&sessions_dir)?;
@@ -824,8 +826,24 @@ mod tests {
Arc::new(MockExecutor),
)
.await?;
let mut config = Config::default();
config.capacity = Some(crate::config::CapacityConfig {
enabled: Some(false),
low_risk_max: None,
medium_risk_max: None,
severe_min_slack: None,
severe_violation_ratio: None,
refresh_cooldown_turns: None,
replan_cooldown_turns: None,
max_replay_per_turn: None,
min_turns_before_guardrail: None,
profile_window: None,
deepseek_v3_2_chat_prior: None,
deepseek_v3_2_reasoner_prior: None,
fallback_default_prior: None,
});
let runtime_threads: SharedRuntimeThreadManager = Arc::new(RuntimeThreadManager::open(
Config::default(),
config,
PathBuf::from("."),
RuntimeThreadManagerConfig::from_task_data_dir(root.join("runtime")),
)?);
@@ -838,12 +856,16 @@ mod tests {
sessions_dir,
};
let app = build_router(state);
let listener = TcpListener::bind("127.0.0.1:0").await?;
let listener = match TcpListener::bind("127.0.0.1:0").await {
Ok(listener) => listener,
Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => return Ok(None),
Err(err) => return Err(err.into()),
};
let addr = listener.local_addr()?;
let handle = tokio::spawn(async move {
let _ = axum::serve(listener, app).await;
});
Ok((addr, runtime_threads, handle))
Ok(Some((addr, runtime_threads, handle)))
}
async fn read_first_sse_frame(resp: reqwest::Response) -> Result<String> {
@@ -925,7 +947,9 @@ mod tests {
#[tokio::test]
async fn health_and_tasks_endpoints_work() -> Result<()> {
let (addr, _runtime_threads, handle) = spawn_test_server().await?;
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = reqwest::Client::new();
let health: serde_json::Value = client
@@ -983,7 +1007,9 @@ mod tests {
#[tokio::test]
async fn stream_requires_prompt() -> Result<()> {
let (addr, _runtime_threads, handle) = spawn_test_server().await?;
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = reqwest::Client::new();
let resp = client
@@ -998,7 +1024,9 @@ mod tests {
#[tokio::test]
async fn thread_endpoints_expose_lifecycle_contract() -> Result<()> {
let (addr, _runtime_threads, handle) = spawn_test_server().await?;
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = reqwest::Client::new();
let created: serde_json::Value = client
@@ -1130,7 +1158,9 @@ mod tests {
#[tokio::test]
async fn events_endpoint_respects_since_seq_cursor() -> Result<()> {
let (addr, _runtime_threads, handle) = spawn_test_server().await?;
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = reqwest::Client::new();
let created: serde_json::Value = client
@@ -1207,7 +1237,9 @@ mod tests {
#[tokio::test]
async fn steer_and_interrupt_endpoints_work_on_active_turn() -> Result<()> {
let (addr, runtime_threads, handle) = spawn_test_server().await?;
let Some((addr, runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = reqwest::Client::new();
let created: serde_json::Value = client
@@ -1425,7 +1457,9 @@ mod tests {
#[tokio::test]
async fn stream_endpoint_remains_backward_compatible() -> Result<()> {
let (addr, _runtime_threads, handle) = spawn_test_server().await?;
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = reqwest::Client::new();
let resp = client
+105 -2
View File
@@ -22,7 +22,10 @@ use crate::config::{Config, DEFAULT_TEXT_MODEL, MAX_SUBAGENTS};
use crate::core::engine::{EngineConfig, EngineHandle, spawn_engine};
use crate::core::events::{Event as EngineEvent, TurnOutcomeStatus};
use crate::core::ops::Op;
use crate::models::{ContentBlock, Message, Usage};
use crate::models::{
ContentBlock, Message, Usage, compaction_message_threshold_for_model,
compaction_threshold_for_model,
};
use crate::tools::plan::new_shared_plan_state;
use crate::tools::todo::new_shared_todo_list;
use crate::tui::app::AppMode;
@@ -1150,7 +1153,11 @@ impl RuntimeThreadManager {
}
}
let compaction = CompactionConfig::default();
let mut compaction = CompactionConfig::default();
compaction.enabled = true;
compaction.model = thread.model.clone();
compaction.token_threshold = compaction_threshold_for_model(&thread.model);
compaction.message_threshold = compaction_message_threshold_for_model(&thread.model);
let engine_cfg = EngineConfig {
model: thread.model.clone(),
workspace: thread.workspace.clone(),
@@ -1162,6 +1169,9 @@ impl RuntimeThreadManager {
max_subagents: self.config.max_subagents().clamp(1, MAX_SUBAGENTS),
features: self.config.features(),
compaction,
capacity: crate::core::capacity::CapacityControllerConfig::from_app_config(
&self.config,
),
todos: new_shared_todo_list(),
plan_state: new_shared_plan_state(),
};
@@ -1471,6 +1481,99 @@ impl RuntimeThreadManager {
.await?;
}
}
EngineEvent::CapacityDecision {
risk_band,
action,
reason,
..
} => {
let message = format!(
"Capacity decision: risk={risk_band} action={action} reason={reason}"
);
let item = TurnItemRecord {
schema_version: CURRENT_RUNTIME_SCHEMA_VERSION,
id: format!("item_{}", &Uuid::new_v4().to_string()[..8]),
turn_id: turn_id.clone(),
kind: TurnItemKind::Status,
status: TurnItemLifecycleStatus::Completed,
summary: summarize_text(&message, SUMMARY_LIMIT),
detail: Some(message),
artifact_refs: Vec::new(),
started_at: Some(Utc::now()),
ended_at: Some(Utc::now()),
};
self.store.save_item(&item)?;
self.attach_item_to_turn(&turn_id, &item.id)?;
self.emit_event(
&thread_id,
Some(&turn_id),
Some(&item.id),
"item.completed",
json!({ "item": item }),
)
.await?;
}
EngineEvent::CapacityIntervention {
action,
before_prompt_tokens,
after_prompt_tokens,
replay_outcome,
replan_performed,
..
} => {
let message = format!(
"Capacity intervention: {action} (~{before_prompt_tokens} -> ~{after_prompt_tokens}) replay={:?} replan={replan_performed}",
replay_outcome
);
let item = TurnItemRecord {
schema_version: CURRENT_RUNTIME_SCHEMA_VERSION,
id: format!("item_{}", &Uuid::new_v4().to_string()[..8]),
turn_id: turn_id.clone(),
kind: TurnItemKind::Status,
status: TurnItemLifecycleStatus::Completed,
summary: summarize_text(&message, SUMMARY_LIMIT),
detail: Some(message),
artifact_refs: Vec::new(),
started_at: Some(Utc::now()),
ended_at: Some(Utc::now()),
};
self.store.save_item(&item)?;
self.attach_item_to_turn(&turn_id, &item.id)?;
self.emit_event(
&thread_id,
Some(&turn_id),
Some(&item.id),
"item.completed",
json!({ "item": item }),
)
.await?;
}
EngineEvent::CapacityMemoryPersistFailed { action, error, .. } => {
let message =
format!("Capacity memory persist failed: action={action} error={error}");
let item = TurnItemRecord {
schema_version: CURRENT_RUNTIME_SCHEMA_VERSION,
id: format!("item_{}", &Uuid::new_v4().to_string()[..8]),
turn_id: turn_id.clone(),
kind: TurnItemKind::Status,
status: TurnItemLifecycleStatus::Failed,
summary: summarize_text(&message, SUMMARY_LIMIT),
detail: Some(message),
artifact_refs: Vec::new(),
started_at: Some(Utc::now()),
ended_at: Some(Utc::now()),
};
self.store.save_item(&item)?;
self.attach_item_to_turn(&turn_id, &item.id)?;
self.emit_event(
&thread_id,
Some(&turn_id),
Some(&item.id),
"item.failed",
json!({ "item": item }),
)
.await?;
}
EngineEvent::ApprovalRequired {
id,
tool_name,
+11 -26
View File
@@ -19,6 +19,8 @@ use crate::tui::markdown_render;
const TOOL_COMMAND_LINE_LIMIT: usize = 5;
const TOOL_OUTPUT_LINE_LIMIT: usize = 12;
const TOOL_TEXT_LIMIT: usize = 240;
const TOOL_RUNNING_SYMBOLS: [&str; 3] = ["", "", ""];
const TOOL_STATUS_SYMBOL_MS: u64 = 900;
// === History Cells ===
@@ -52,33 +54,14 @@ impl HistoryCell {
pub fn lines(&self, width: u16) -> Vec<Line<'static>> {
match self {
HistoryCell::User { content } => render_message("You", content, user_style(), width),
HistoryCell::Assistant { content, streaming } => {
let mut lines = render_message("Answer", content, assistant_style(), width);
if *streaming {
// Add blinking cursor to last line
if let Some(last) = lines.last_mut() {
last.spans.push(Span::styled(
"",
Style::default().fg(palette::DEEPSEEK_SKY),
));
}
}
lines
HistoryCell::Assistant { content, .. } => {
render_message("Answer", content, assistant_style(), width)
}
HistoryCell::System { content } => {
render_message("System", content, system_style(), width)
}
HistoryCell::Thinking { content, streaming } => {
let mut lines = render_thinking(content, width, *streaming);
if *streaming {
if let Some(last) = lines.last_mut() {
last.spans.push(Span::styled(
"",
Style::default().fg(palette::DEEPSEEK_SKY),
));
}
}
lines
render_thinking(content, width, *streaming)
}
HistoryCell::Tool(cell) => cell.lines(width),
}
@@ -1277,11 +1260,13 @@ fn status_symbol(started_at: Option<Instant>, status: ToolStatus) -> String {
match status {
ToolStatus::Running => {
let elapsed_ms = started_at.map_or(0, |t| t.elapsed().as_millis());
if (elapsed_ms / 900).is_multiple_of(2) {
"*".to_string()
let cycle = u128::from(TOOL_STATUS_SYMBOL_MS);
let idx = if cycle == 0 {
0
} else {
".".to_string()
}
(elapsed_ms / cycle) % (TOOL_RUNNING_SYMBOLS.len() as u128)
};
TOOL_RUNNING_SYMBOLS[usize::try_from(idx).unwrap_or_default()].to_string()
}
ToolStatus::Success => "o".to_string(),
ToolStatus::Failed => "x".to_string(),
+30
View File
@@ -130,6 +130,20 @@ impl PasteBurst {
}
}
/// Return the remaining delay before a pending char/paste buffer must flush.
///
/// This lets the UI event loop avoid sleeping past the flush deadline.
#[must_use]
pub fn next_flush_delay(&self, now: Instant) -> Option<Duration> {
let last = self.last_plain_char_time?;
let timeout = if self.is_active_internal() {
PASTE_BURST_ACTIVE_IDLE_TIMEOUT
} else {
PASTE_BURST_CHAR_INTERVAL
};
Some(timeout.saturating_sub(now.duration_since(last)))
}
pub fn append_newline_if_active(&mut self, now: Instant) -> bool {
if self.is_active() {
self.buffer.push('\n');
@@ -295,4 +309,20 @@ mod tests {
assert_eq!(burst.flush_before_modified_input(), Some("a".to_string()));
assert!(!burst.is_active());
}
#[test]
fn next_flush_delay_counts_down_to_zero() {
let mut burst = PasteBurst::default();
let t0 = Instant::now();
let _ = burst.on_plain_char('a', t0);
let almost_due = t0 + Duration::from_millis(7);
let remaining = burst
.next_flush_delay(almost_due)
.expect("delay should exist");
assert!(remaining <= Duration::from_millis(1));
let due = t0 + Duration::from_millis(20);
assert_eq!(burst.next_flush_delay(due), Some(Duration::ZERO));
}
}
+96 -52
View File
@@ -27,7 +27,7 @@ use unicode_width::{UnicodeWidthChar, UnicodeWidthStr};
use crate::audit::log_sensitive_event;
use crate::commands;
use crate::compaction::estimate_tokens;
use crate::compaction::estimate_input_tokens_conservative;
use crate::config::Config;
use crate::core::engine::{EngineConfig, EngineHandle, spawn_engine};
use crate::core::events::Event as EngineEvent;
@@ -83,6 +83,11 @@ use super::widgets::{ChatWidget, ComposerWidget, HeaderData, HeaderWidget, Rende
const MAX_QUEUED_PREVIEW: usize = 3;
const CONTEXT_WARNING_THRESHOLD_PERCENT: f64 = 85.0;
const CONTEXT_CRITICAL_THRESHOLD_PERCENT: f64 = 85.0;
const UI_IDLE_POLL_MS: u64 = 33;
const UI_ACTIVE_POLL_MS: u64 = 16;
const UI_DEEPSEEK_SQUIGGLE_MS: u64 = 120;
const UI_TYPING_INDICATOR_MS: u64 = 120;
const UI_STATUS_ANIMATION_MS: u64 = UI_DEEPSEEK_SQUIGGLE_MS;
/// Run the interactive TUI event loop.
///
@@ -166,27 +171,6 @@ pub async fn run_tui(config: &Config, options: TuiOptions) -> Result<()> {
app.status_message = Some(format!("Failed to load session: {e}"));
}
}
} else if let Ok(manager) = SessionManager::default_location() {
// Crash recovery: restore in-flight checkpoint when no explicit session was requested.
match manager.load_checkpoint() {
Ok(Some(checkpoint)) => {
apply_loaded_session(&mut app, &checkpoint);
app.history.insert(
0,
HistoryCell::System {
content:
"Recovered from crash checkpoint; resume by sending your next message."
.to_string(),
},
);
app.mark_history_updated();
app.status_message = Some("Recovered checkpoint session".to_string());
}
Ok(None) => {}
Err(err) => {
app.status_message = Some(format!("Failed to restore checkpoint: {err}"));
}
}
}
if let Ok(manager) = SessionManager::default_location() {
@@ -298,6 +282,7 @@ fn build_engine_config(app: &App, config: &Config) -> EngineConfig {
max_subagents: app.max_subagents,
features: config.features(),
compaction: app.compaction_config(),
capacity: crate::core::capacity::CapacityControllerConfig::from_app_config(config),
todos: app.todos.clone(),
plan_state: app.plan_state.clone(),
}
@@ -318,19 +303,26 @@ async fn run_event_loop(
let mut last_task_refresh = Instant::now()
.checked_sub(Duration::from_secs(2))
.unwrap_or_else(Instant::now);
let mut last_status_frame = Instant::now()
.checked_sub(Duration::from_millis(UI_STATUS_ANIMATION_MS))
.unwrap_or_else(Instant::now);
loop {
if last_task_refresh.elapsed() >= Duration::from_millis(2500) {
let tasks = task_manager.list_tasks(Some(10)).await;
app.task_panel = tasks.into_iter().map(task_summary_to_panel_entry).collect();
last_task_refresh = Instant::now();
app.needs_redraw = true;
}
// First, poll for engine events (non-blocking)
let mut received_engine_event = false;
let mut transcript_batch_updated = false;
let mut queued_to_send: Option<QueuedMessage> = None;
{
let mut rx = engine_handle.rx_event.write().await;
while let Ok(event) = rx.try_recv() {
received_engine_event = true;
match event {
EngineEvent::MessageStarted { .. } => {
current_streaming_text.clear();
@@ -356,9 +348,9 @@ async fn run_event_loop(
if let Some(cell) = app.history.get_mut(index) {
if let HistoryCell::Assistant { content, .. } = cell {
content.clone_from(&current_streaming_text);
content.push_str(&sanitized);
}
app.mark_history_updated();
transcript_batch_updated = true;
}
}
EngineEvent::MessageComplete { .. } => {
@@ -367,7 +359,7 @@ async fn run_event_loop(
app.history.get_mut(index)
{
*streaming = false;
app.mark_history_updated();
transcript_batch_updated = true;
}
let mut blocks = Vec::new();
@@ -426,6 +418,7 @@ async fn run_event_loop(
app.history.get_mut(index)
{
c.push_str(&sanitized);
transcript_batch_updated = true;
}
}
}
@@ -435,6 +428,7 @@ async fn run_event_loop(
app.history.get_mut(index)
{
*streaming = false;
transcript_batch_updated = true;
}
}
@@ -478,6 +472,7 @@ async fn run_event_loop(
app.pending_tool_uses.clear();
app.plan_tool_used_in_turn = false;
persist_checkpoint(app);
last_status_frame = Instant::now();
}
EngineEvent::TurnComplete {
usage,
@@ -558,6 +553,31 @@ async fn run_event_loop(
EngineEvent::CompactionFailed { message, .. } => {
app.status_message = Some(message);
}
EngineEvent::CapacityDecision {
risk_band,
action,
reason,
..
} => {
app.status_message = Some(format!(
"Capacity decision: risk={risk_band} action={action} ({reason})"
));
}
EngineEvent::CapacityIntervention {
action,
before_prompt_tokens,
after_prompt_tokens,
..
} => {
app.status_message = Some(format!(
"Capacity intervention: {action} (~{before_prompt_tokens} -> ~{after_prompt_tokens} tokens)"
));
}
EngineEvent::CapacityMemoryPersistFailed { action, error, .. } => {
app.status_message = Some(format!(
"Capacity memory persist failed ({action}): {error}"
));
}
EngineEvent::PauseEvents => {
if !event_broker.is_paused() {
pause_terminal(terminal, app.use_alt_screen)?;
@@ -732,6 +752,12 @@ async fn run_event_loop(
}
}
}
if transcript_batch_updated {
app.mark_history_updated();
}
if received_engine_event {
app.needs_redraw = true;
}
if let Some(next) = queued_to_send {
if let Err(err) = dispatch_user_message(app, &engine_handle, next.clone()).await {
@@ -741,30 +767,56 @@ async fn run_event_loop(
app.queued_message_count()
));
}
app.needs_redraw = true;
}
let queue_state = (app.queued_messages.clone(), app.queued_draft.clone());
if queue_state != last_queue_state {
persist_offline_queue_state(app);
last_queue_state = queue_state;
app.needs_redraw = true;
}
if !app.view_stack.is_empty() {
let events = app.view_stack.tick();
if !events.is_empty() {
app.needs_redraw = true;
}
handle_view_events(app, &engine_handle, events).await;
}
if app.is_loading
&& last_status_frame.elapsed() >= Duration::from_millis(UI_STATUS_ANIMATION_MS)
{
app.needs_redraw = true;
last_status_frame = Instant::now();
}
if event_broker.is_paused() {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
continue;
}
app.flush_paste_burst_if_due(Instant::now());
let now = Instant::now();
app.flush_paste_burst_if_due(now);
terminal.draw(|f| render(f, app))?; // app is &mut
if app.needs_redraw {
terminal.draw(|f| render(f, app))?; // app is &mut
app.needs_redraw = false;
}
if event::poll(std::time::Duration::from_millis(120))? {
let mut poll_timeout = if app.is_loading {
Duration::from_millis(UI_ACTIVE_POLL_MS)
} else {
Duration::from_millis(UI_IDLE_POLL_MS)
};
if let Some(until_flush) = app.paste_burst.next_flush_delay(now) {
poll_timeout = poll_timeout.min(until_flush);
}
if event::poll(poll_timeout)? {
let evt = event::read()?;
app.needs_redraw = true;
// Handle bracketed paste events
if let Event::Paste(text) = &evt {
@@ -1075,6 +1127,8 @@ async fn run_event_loop(
model,
workspace,
} => {
let is_full_reset =
messages.is_empty() && system_prompt.is_none();
let _ = engine_handle
.send(Op::SyncSession {
messages,
@@ -1088,6 +1142,10 @@ async fn run_event_loop(
config: app.compaction_config(),
})
.await;
if is_full_reset {
persist_session_snapshot(app);
clear_checkpoint();
}
}
AppAction::SendMessage(content) => {
let queued = build_queued_message(app, content);
@@ -2446,7 +2504,7 @@ fn render_status_indicator(f: &mut Frame, area: Rect, app: &App, queued: &[Strin
None
};
let elapsed = app.turn_started_at.map(format_elapsed);
// Use typing indicator when streaming content, otherwise use whale spinner
// Use typing indicator when streaming content, otherwise use a subtle status glyph.
let has_streaming_content = app.streaming_message_index.is_some();
let spinner = if has_streaming_content {
typing_indicator(app.turn_started_at)
@@ -2743,23 +2801,11 @@ fn prompt_for_mode(mode: AppMode) -> &'static str {
}
fn estimated_context_tokens(app: &App) -> Option<i64> {
let mut total = estimate_tokens(&app.api_messages);
match &app.system_prompt {
Some(SystemPrompt::Text(text)) => total = total.saturating_add(estimate_text_tokens(text)),
Some(SystemPrompt::Blocks(blocks)) => {
for block in blocks {
total = total.saturating_add(estimate_text_tokens(&block.text));
}
}
None => {}
}
i64::try_from(total).ok()
}
fn estimate_text_tokens(text: &str) -> usize {
text.chars().count().div_ceil(4)
i64::try_from(estimate_input_tokens_conservative(
&app.api_messages,
app.system_prompt.as_ref(),
))
.ok()
}
fn context_usage_snapshot(app: &App) -> Option<(i64, u32, f64)> {
@@ -2826,21 +2872,19 @@ fn format_elapsed(start: Instant) -> String {
}
fn deepseek_squiggle(start: Option<Instant>) -> &'static str {
const FRAMES: [&str; 12] = [
"🐳", "🐳.", "🐳..", "🐳...", "🐳..", "🐳.", "🐋", "🐋.", "🐋..", "🐋...", "🐋..", "🐋.",
];
const FRAMES: [&str; 6] = ["", "", "", "", "", ""];
let elapsed_ms = start.map_or(0, |t| t.elapsed().as_millis());
let idx = ((elapsed_ms / 420) as usize) % FRAMES.len();
let idx = ((elapsed_ms / u128::from(UI_DEEPSEEK_SQUIGGLE_MS)) as usize) % FRAMES.len();
FRAMES[idx]
}
/// Braille pattern frames for typing/thinking indicator animation.
const TYPING_FRAMES: &[&str] = &["", "", "", "", "", "", "", "", "", ""];
const TYPING_FRAMES: &[&str] = &["", "", "", ""];
/// Returns the typing indicator frame based on elapsed time.
fn typing_indicator(start: Option<Instant>) -> &'static str {
let elapsed_ms = start.map_or(0, |t| t.elapsed().as_millis());
let idx = ((elapsed_ms / 220) as usize) % TYPING_FRAMES.len();
let idx = ((elapsed_ms / u128::from(UI_TYPING_INDICATOR_MS)) as usize) % TYPING_FRAMES.len();
TYPING_FRAMES[idx]
}