diff --git a/README.md b/README.md index adcdc4e6..f49adeb8 100644 --- a/README.md +++ b/README.md @@ -103,7 +103,7 @@ Config lives at `~/.deepseek/config.toml`: ```toml api_key = "sk-..." default_text_model = "deepseek-reasoner" # optional (or "deepseek-chat") -allow_shell = false # optional +allow_shell = true # optional (sandboxed by default) max_subagents = 3 # optional (1-20) ``` diff --git a/config.example.toml b/config.example.toml index a629d96e..ee29e195 100644 --- a/config.example.toml +++ b/config.example.toml @@ -37,7 +37,7 @@ memory_path = "~/.deepseek/memory.md" # ───────────────────────────────────────────────────────────────────────────────── # Security # ───────────────────────────────────────────────────────────────────────────────── -allow_shell = false +allow_shell = true approval_policy = "on-request" # on-request | untrusted | never sandbox_mode = "workspace-write" # read-only | workspace-write | danger-full-access | external-sandbox max_subagents = 5 # optional (1-20) @@ -83,6 +83,24 @@ exponential_base = 2.0 # model = "deepseek-chat" # Model to use for summarization # cache_summary = true # Cache the summary block +# ───────────────────────────────────────────────────────────────────────────────── +# Capacity Controller (runtime pressure guardrails) +# ───────────────────────────────────────────────────────────────────────────────── +[capacity] +enabled = true +low_risk_max = 0.34 +medium_risk_max = 0.62 +severe_min_slack = -0.25 +severe_violation_ratio = 0.40 +refresh_cooldown_turns = 2 +replan_cooldown_turns = 5 +max_replay_per_turn = 1 +min_turns_before_guardrail = 2 +profile_window = 8 +deepseek_v3_2_chat_prior = 3.9 +deepseek_v3_2_reasoner_prior = 4.1 +fallback_default_prior = 3.8 + # ───────────────────────────────────────────────────────────────────────────────── # Profile Example (for multiple environments) # ───────────────────────────────────────────────────────────────────────────────── diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 7ef687f3..36dd33f5 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -154,7 +154,7 @@ Responses API (with automatic fallback if needed). ### Crash Recovery + Offline Queue 1. Before sending user input, the TUI writes a checkpoint snapshot to `~/.deepseek/sessions/checkpoints/latest.json` -2. If the process crashes mid-turn, startup restores that checkpoint automatically (unless explicit `--resume` is used) +2. Startup remains fresh by default; prior sessions are resumed explicitly via `--resume`/`--continue` (or `Ctrl+R` in TUI) 3. While degraded/offline, new prompts are queued in-memory and mirrored to `~/.deepseek/sessions/checkpoints/offline_queue.json` 4. Queue edits (`/queue ...`) are persisted continuously so drafts and queued prompts survive restarts 5. Successful turn completion clears the active checkpoint and writes a durable session snapshot diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 1bd5e781..512031fa 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -56,6 +56,19 @@ These override config values: - `DEEPSEEK_MAX_SUBAGENTS` (clamped to `1..=20`) - `DEEPSEEK_TASKS_DIR` (runtime task queue/artifact storage, default `~/.deepseek/tasks`) - `DEEPSEEK_ALLOW_INSECURE_HTTP` (`1`/`true` allows non-local `http://` base URLs; default is reject) +- `DEEPSEEK_CAPACITY_ENABLED` +- `DEEPSEEK_CAPACITY_LOW_RISK_MAX` +- `DEEPSEEK_CAPACITY_MEDIUM_RISK_MAX` +- `DEEPSEEK_CAPACITY_SEVERE_MIN_SLACK` +- `DEEPSEEK_CAPACITY_SEVERE_VIOLATION_RATIO` +- `DEEPSEEK_CAPACITY_REFRESH_COOLDOWN_TURNS` +- `DEEPSEEK_CAPACITY_REPLAN_COOLDOWN_TURNS` +- `DEEPSEEK_CAPACITY_MAX_REPLAY_PER_TURN` +- `DEEPSEEK_CAPACITY_MIN_TURNS_BEFORE_GUARDRAIL` +- `DEEPSEEK_CAPACITY_PROFILE_WINDOW` +- `DEEPSEEK_CAPACITY_PRIOR_CHAT` +- `DEEPSEEK_CAPACITY_PRIOR_REASONER` +- `DEEPSEEK_CAPACITY_PRIOR_FALLBACK` ## Settings File (Persistent UI Preferences) @@ -84,7 +97,7 @@ Common settings keys: - `api_key` (string, required): must be non-empty (or set `DEEPSEEK_API_KEY`). - `base_url` (string, optional): defaults to `https://api.deepseek.com` (OpenAI-compatible Responses API). - `default_text_model` (string, optional): defaults to `deepseek-reasoner`. Supported IDs are `deepseek-reasoner` and `deepseek-chat`. -- `allow_shell` (bool, optional): defaults to `false`. +- `allow_shell` (bool, optional): defaults to `true` (sandboxed). - `approval_policy` (string, optional): `on-request`, `untrusted`, or `never`. Runtime `/set approval_mode` also accepts `on-request` and `untrusted` aliases. - `sandbox_mode` (string, optional): `read-only`, `workspace-write`, `danger-full-access`, `external-sandbox`. - `managed_config_path` (string, optional): managed config file loaded after user/env config. @@ -100,6 +113,20 @@ Common settings keys: - `[retry].initial_delay` (float seconds, default `1.0`) - `[retry].max_delay` (float seconds, default `60.0`) - `[retry].exponential_base` (float, default `2.0`) +- `capacity.*` (optional): runtime context-capacity controller: + - `[capacity].enabled` (bool, default `true`) + - `[capacity].low_risk_max` (float, default `0.34`) + - `[capacity].medium_risk_max` (float, default `0.62`) + - `[capacity].severe_min_slack` (float, default `-0.25`) + - `[capacity].severe_violation_ratio` (float, default `0.40`) + - `[capacity].refresh_cooldown_turns` (int, default `2`) + - `[capacity].replan_cooldown_turns` (int, default `5`) + - `[capacity].max_replay_per_turn` (int, default `1`) + - `[capacity].min_turns_before_guardrail` (int, default `2`) + - `[capacity].profile_window` (int, default `8`) + - `[capacity].deepseek_v3_2_chat_prior` (float, default `3.9`) + - `[capacity].deepseek_v3_2_reasoner_prior` (float, default `4.1`) + - `[capacity].fallback_default_prior` (float, default `3.8`) - `tui.alternate_screen` (string, optional): `auto`, `always`, or `never`. `auto` disables the alternate screen in Zellij; `--no-alt-screen` forces inline mode. - `hooks` (optional): lifecycle hooks configuration (see `config.example.toml`). - `features.*` (optional): feature flag overrides (see below). @@ -154,6 +181,8 @@ allowed_sandbox_modes = ["read-only", "workspace-write"] If configured values violate requirements, startup fails with a descriptive error. +See `docs/capacity_controller.md` for formulas, intervention behavior, and telemetry. + ## Notes On `deepseek doctor` `deepseek doctor` now follows the same config resolution rules as the rest of the CLI. diff --git a/docs/OPERATIONS_RUNBOOK.md b/docs/OPERATIONS_RUNBOOK.md index 0fd5822c..8cdf8850 100644 --- a/docs/OPERATIONS_RUNBOOK.md +++ b/docs/OPERATIONS_RUNBOOK.md @@ -51,11 +51,11 @@ Actions: Expected behavior: - Checkpoint stored at `~/.deepseek/sessions/checkpoints/latest.json` -- Startup auto-restores checkpoint when no explicit `--resume` target is supplied +- Startup begins a fresh session unless `--resume`/`--continue` is supplied Actions: -1. Start TUI normally and verify "Recovered checkpoint session" status -2. If automatic recovery fails, inspect checkpoint JSON for schema mismatch +1. Resume prior work explicitly via `deepseek --resume ` or `Ctrl+R` in TUI +2. If checkpoint inspection is needed, inspect `latest.json` for schema mismatch/details 3. If schema is newer than binary supports, upgrade binary or remove stale checkpoint ## Incident: Persistent State Schema Errors diff --git a/docs/capacity_controller.md b/docs/capacity_controller.md new file mode 100644 index 00000000..5476e969 --- /dev/null +++ b/docs/capacity_controller.md @@ -0,0 +1,136 @@ +# Capacity Controller + +`deepseek-tui` includes a capacity-aware context controller that keeps active prompt context near coherent operating range while preserving full history on disk. + +## Policy Overview + +Each checkpoint computes: + +- `H_hat` (runtime pressure proxy) +- `C_hat` (model capacity prior) +- `slack = C_hat - H_hat` +- dynamic slack profile over last `N=8` observations + +### Runtime Pressure Proxy (`H_hat`) + +- `action_complexity_bits = log2(1 + action_count_this_turn)` +- `tool_complexity_bits = log2(1 + tool_calls_recent_window)` +- `ref_complexity_bits = log2(1 + unique_reference_ids_recent_window)` +- `context_pressure_bits = 6.0 * context_used_ratio` + +Formula: + +`H_hat = 0.35*action_complexity_bits + 0.30*tool_complexity_bits + 0.20*ref_complexity_bits + 0.15*context_pressure_bits` + +### Capacity Prior (`C_hat`) + +Per-model priors: + +- `deepseek_v3_2_chat = 3.9` +- `deepseek_v3_2_reasoner = 4.1` +- fallback `3.8` + +### Failure Probability + +Using rolling profile fields: + +- `final_slack` +- `min_slack` +- `violation_ratio` +- `slack_volatility` +- `slack_drop` + +Formula: + +`z = -1.65*final_slack -0.85*min_slack +1.35*violation_ratio +0.70*slack_volatility +0.28*slack_drop -0.12` + +`p_fail = sigmoid(z)` clamped to `[0,1]`. + +Risk bands: + +- low: `p_fail <= low_risk_max` +- medium: `p_fail <= medium_risk_max` +- high: otherwise + +Action mapping: + +- low -> `NoIntervention` +- medium -> `TargetedContextRefresh` +- high + severe dynamics (`min_slack <= severe_min_slack` or `violation_ratio >= severe_violation_ratio`) -> `VerifyAndReplan` +- otherwise high -> `VerifyWithToolReplay` + +## Checkpoints + +The engine evaluates controller policy at: + +1. Pre-request checkpoint (before `MessageRequest` assembly). +2. Post-tool checkpoint (after tool result append). +3. Error-escalation checkpoint (tool error streak path). + +## Interventions + +### `TargetedContextRefresh` + +- Runs compaction (`compact_messages_safe`) when possible. +- Falls back to local trim if compaction path fails. +- Persists canonical state. +- Replaces long-tail active context with compact canonical prompt + memory pointer. + +### `VerifyWithToolReplay` + +- Replays one read-only critical tool call from recent turn context. +- Appends verification note with pass/fail + diff summary. +- On replay conflict/error, marks escalation candidate and disables replay for current turn. + +### `VerifyAndReplan` + +- Persists canonical snapshot. +- Clears volatile prompt tail while preserving latest user ask and latest verification note. +- Injects canonical replan instruction into system prompt. +- Continues turn loop from compact canonical state. + +## Safety Controls + +- Max one intervention per turn. +- Cooldowns for refresh and replan. +- Replay budget per turn (`max_replay_per_turn`). +- Fail-open behavior when controller inputs are unavailable. +- Compaction/replay failures are logged; turn continues. + +## Memory Store + +Path: + +- `DEEPSEEK_CAPACITY_MEMORY_DIR` (if set) +- otherwise `~/.deepseek/memory/.jsonl` +- fallback: `/.deepseek/memory/.jsonl` when home path is unavailable/unwritable + +Record fields: + +- `id`, `ts`, `turn_index`, `action_trigger` +- `h_hat`, `c_hat`, `slack`, `risk_band` +- `canonical_state` +- `source_message_ids` +- optional `replay_info` + +Loader utility supports fetching last `K` snapshots for rehydration. + +## Configuration + +`[capacity]` keys: + +- `enabled` +- `low_risk_max` +- `medium_risk_max` +- `severe_min_slack` +- `severe_violation_ratio` +- `refresh_cooldown_turns` +- `replan_cooldown_turns` +- `max_replay_per_turn` +- `min_turns_before_guardrail` +- `profile_window` +- `deepseek_v3_2_chat_prior` +- `deepseek_v3_2_reasoner_prior` +- `fallback_default_prior` + +Equivalent environment overrides are available with `DEEPSEEK_CAPACITY_*`. diff --git a/src/commands/core.rs b/src/commands/core.rs index d1f83c60..2bc6d0ae 100644 --- a/src/commands/core.rs +++ b/src/commands/core.rs @@ -37,7 +37,10 @@ pub fn clear(app: &mut App) -> CommandResult { app.history.clear(); app.mark_history_updated(); app.api_messages.clear(); + app.system_prompt = None; app.transcript_selection.clear(); + app.queued_messages.clear(); + app.queued_draft = None; app.total_conversation_tokens = 0; let todos_cleared = app.clear_todos(); app.tool_log.clear(); @@ -49,11 +52,21 @@ pub fn clear(app: &mut App) -> CommandResult { app.last_exec_wait_command = None; app.last_prompt_tokens = None; app.last_completion_tokens = None; - if todos_cleared { - CommandResult::message("Conversation cleared") + app.current_session_id = None; + let message = if todos_cleared { + "Conversation cleared".to_string() } else { - CommandResult::message("Conversation cleared (plan state busy; run /clear again if needed)") - } + "Conversation cleared (plan state busy; run /clear again if needed)".to_string() + }; + CommandResult::with_message_and_action( + message, + AppAction::SyncSession { + messages: Vec::new(), + system_prompt: None, + model: app.model.clone(), + workspace: app.workspace.clone(), + }, + ) } /// Exit the application @@ -265,6 +278,7 @@ mod tests { }); app.total_conversation_tokens = 100; app.tool_log.push("test".to_string()); + app.current_session_id = Some("existing-session".to_string()); let result = clear(&mut app); assert!(result.message.is_some()); @@ -274,6 +288,8 @@ mod tests { assert!(app.tool_log.is_empty()); assert!(app.tool_cells.is_empty()); assert!(app.tool_details_by_cell.is_empty()); + assert!(app.current_session_id.is_none()); + assert!(matches!(result.action, Some(AppAction::SyncSession { .. }))); } #[test] diff --git a/src/commands/debug.rs b/src/commands/debug.rs index e8b84e9b..d12f8ae6 100644 --- a/src/commands/debug.rs +++ b/src/commands/debug.rs @@ -3,7 +3,7 @@ //! Debug commands: tokens, cost, system, context, undo, retry use super::CommandResult; -use crate::compaction::estimate_tokens; +use crate::compaction::estimate_input_tokens_conservative; use crate::models::{DEFAULT_CONTEXT_WINDOW_TOKENS, SystemPrompt, context_window_for_model}; use crate::tui::app::{App, AppAction}; use crate::tui::history::HistoryCell; @@ -78,16 +78,15 @@ pub fn system_prompt(app: &mut App) -> CommandResult { /// Show context window usage pub fn context(app: &mut App) -> CommandResult { let mut total_chars = estimate_message_chars(&app.api_messages); - let mut estimated_tokens = estimate_tokens(&app.api_messages); + let estimated_tokens = + estimate_input_tokens_conservative(&app.api_messages, app.system_prompt.as_ref()); // System prompt if let Some(SystemPrompt::Text(text)) = &app.system_prompt { total_chars += text.len(); - estimated_tokens = estimated_tokens.saturating_add(estimate_text_tokens(text)); } else if let Some(SystemPrompt::Blocks(blocks)) = &app.system_prompt { for block in blocks { total_chars += block.text.len(); - estimated_tokens = estimated_tokens.saturating_add(estimate_text_tokens(&block.text)); } } @@ -114,10 +113,6 @@ pub fn context(app: &mut App) -> CommandResult { )) } -fn estimate_text_tokens(text: &str) -> usize { - text.chars().count().div_ceil(4) -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/compaction.rs b/src/compaction.rs index 4e8cc1e5..01497f75 100644 --- a/src/compaction.rs +++ b/src/compaction.rs @@ -44,6 +44,11 @@ const KEEP_RECENT_MESSAGES: usize = 4; const RECENT_WORKING_SET_WINDOW: usize = 12; const MAX_WORKING_SET_PATHS: usize = 24; const MIN_SUMMARIZE_MESSAGES: usize = 6; +const SUMMARY_TEXT_SNIPPET_CHARS: usize = 800; +const SUMMARY_TOOL_RESULT_SNIPPET_CHARS: usize = 240; +const SUMMARY_INPUT_MAX_CHARS: usize = 24_000; +const SUMMARY_INPUT_HEAD_CHARS: usize = 14_000; +const SUMMARY_INPUT_TAIL_CHARS: usize = 6_000; #[derive(Debug, Clone, Default)] struct CompactionPlan { @@ -466,6 +471,35 @@ pub fn estimate_tokens(messages: &[Message]) -> usize { messages.iter().map(estimate_tokens_for_message).sum() } +fn estimate_text_tokens_conservative(text: &str) -> usize { + text.chars().count().div_ceil(3) +} + +fn estimate_system_tokens_conservative(system: Option<&SystemPrompt>) -> usize { + match system { + Some(SystemPrompt::Text(text)) => estimate_text_tokens_conservative(text), + Some(SystemPrompt::Blocks(blocks)) => blocks + .iter() + .map(|block| estimate_text_tokens_conservative(&block.text)) + .sum(), + None => 0, + } +} + +/// Conservative estimate for full request input tokens (messages + system + framing). +#[must_use] +pub fn estimate_input_tokens_conservative( + messages: &[Message], + system: Option<&SystemPrompt>, +) -> usize { + let message_tokens = estimate_tokens(messages).saturating_mul(3).div_ceil(2); + let system_tokens = estimate_system_tokens_conservative(system); + let framing_overhead = messages.len().saturating_mul(12).saturating_add(48); + message_tokens + .saturating_add(system_tokens) + .saturating_add(framing_overhead) +} + pub fn should_compact( messages: &[Message], config: &CompactionConfig, @@ -527,6 +561,22 @@ fn truncate_chars(text: &str, max_chars: usize) -> &str { } } +fn tail_chars(text: &str, max_chars: usize) -> String { + if max_chars == 0 { + return String::new(); + } + let total_chars = text.chars().count(); + if total_chars <= max_chars { + return text.to_string(); + } + let start_char = total_chars.saturating_sub(max_chars); + let start_idx = text + .char_indices() + .nth(start_char) + .map_or(0, |(idx, _)| idx); + text[start_idx..].to_string() +} + /// Result of a compaction operation with metadata. #[derive(Debug)] pub struct CompactionResult { @@ -707,13 +757,14 @@ async fn create_summary( for block in &msg.content { match block { ContentBlock::Text { text, .. } => { - let _ = write!(conversation_text, "{role}: {text}\n\n"); + let snippet = truncate_chars(text, SUMMARY_TEXT_SNIPPET_CHARS); + let _ = write!(conversation_text, "{role}: {snippet}\n\n"); } ContentBlock::ToolUse { name, .. } => { let _ = write!(conversation_text, "{role}: [Used tool: {name}]\n\n"); } ContentBlock::ToolResult { content, .. } => { - let snippet = truncate_chars(content, 500); + let snippet = truncate_chars(content, SUMMARY_TOOL_RESULT_SNIPPET_CHARS); let _ = write!(conversation_text, "Tool result: {}\n\n", snippet); } ContentBlock::Thinking { .. } => { @@ -723,6 +774,17 @@ async fn create_summary( } } + let conversation_chars = conversation_text.chars().count(); + if conversation_chars > SUMMARY_INPUT_MAX_CHARS { + let head = truncate_chars(&conversation_text, SUMMARY_INPUT_HEAD_CHARS).to_string(); + let tail = tail_chars(&conversation_text, SUMMARY_INPUT_TAIL_CHARS); + let omitted = conversation_chars + .saturating_sub(head.chars().count()) + .saturating_sub(tail.chars().count()); + conversation_text = + format!("{head}\n\n[... {omitted} characters omitted before summary ...]\n\n{tail}"); + } + let request = MessageRequest { model: model.to_string(), messages: vec![Message { @@ -731,6 +793,7 @@ async fn create_summary( text: format!( "Summarize the following conversation in a concise but comprehensive way. \ Preserve key information, decisions made, and any important context. \ + Tool outputs may be abbreviated. \ Keep it under 500 words.\n\n---\n\n{conversation_text}" ), cache_control: None, diff --git a/src/config.rs b/src/config.rs index c96aa0dc..326fb9f9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -57,6 +57,24 @@ pub struct RetryPolicy { pub exponential_base: f64, } +/// Capacity-controller config loaded from config files/environment. +#[derive(Debug, Clone, Deserialize)] +pub struct CapacityConfig { + pub enabled: Option, + pub low_risk_max: Option, + pub medium_risk_max: Option, + pub severe_min_slack: Option, + pub severe_violation_ratio: Option, + pub refresh_cooldown_turns: Option, + pub replan_cooldown_turns: Option, + pub max_replay_per_turn: Option, + pub min_turns_before_guardrail: Option, + pub profile_window: Option, + pub deepseek_v3_2_chat_prior: Option, + pub deepseek_v3_2_reasoner_prior: Option, + pub fallback_default_prior: Option, +} + impl RetryPolicy { /// Compute the backoff delay for a retry attempt. #[must_use] @@ -89,6 +107,7 @@ pub struct Config { pub requirements_path: Option, pub max_subagents: Option, pub retry: Option, + pub capacity: Option, pub features: Option, /// TUI configuration (alternate screen, etc.) @@ -203,6 +222,36 @@ impl Config { ); } } + if let Some(capacity) = &self.capacity { + if let Some(v) = capacity.low_risk_max + && !(0.0..=1.0).contains(&v) + { + anyhow::bail!( + "Invalid capacity.low_risk_max '{v}': expected a value in [0.0, 1.0]." + ); + } + if let Some(v) = capacity.medium_risk_max + && !(0.0..=1.0).contains(&v) + { + anyhow::bail!( + "Invalid capacity.medium_risk_max '{v}': expected a value in [0.0, 1.0]." + ); + } + if let (Some(low), Some(medium)) = (capacity.low_risk_max, capacity.medium_risk_max) + && low > medium + { + anyhow::bail!( + "Invalid capacity thresholds: low_risk_max ({low}) must be <= medium_risk_max ({medium})." + ); + } + if let Some(v) = capacity.severe_violation_ratio + && !(0.0..=1.0).contains(&v) + { + anyhow::bail!( + "Invalid capacity.severe_violation_ratio '{v}': expected a value in [0.0, 1.0]." + ); + } + } Ok(()) } @@ -285,7 +334,7 @@ impl Config { /// Return whether shell execution is allowed. #[must_use] pub fn allow_shell(&self) -> bool { - self.allow_shell.unwrap_or(false) + self.allow_shell.unwrap_or(true) } /// Return the maximum number of concurrent sub-agents. @@ -479,6 +528,105 @@ fn apply_env_overrides(config: &mut Config) { { config.max_subagents = Some(parsed.clamp(1, MAX_SUBAGENTS)); } + + let capacity = config.capacity.get_or_insert(CapacityConfig { + enabled: None, + low_risk_max: None, + medium_risk_max: None, + severe_min_slack: None, + severe_violation_ratio: None, + refresh_cooldown_turns: None, + replan_cooldown_turns: None, + max_replay_per_turn: None, + min_turns_before_guardrail: None, + profile_window: None, + deepseek_v3_2_chat_prior: None, + deepseek_v3_2_reasoner_prior: None, + fallback_default_prior: None, + }); + + if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_ENABLED") { + let val = value.trim().to_ascii_lowercase(); + capacity.enabled = Some(matches!(val.as_str(), "1" | "true" | "yes" | "on")); + } + if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_LOW_RISK_MAX") + && let Ok(parsed) = value.parse::() + { + capacity.low_risk_max = Some(parsed); + } + if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_MEDIUM_RISK_MAX") + && let Ok(parsed) = value.parse::() + { + capacity.medium_risk_max = Some(parsed); + } + if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_SEVERE_MIN_SLACK") + && let Ok(parsed) = value.parse::() + { + capacity.severe_min_slack = Some(parsed); + } + if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_SEVERE_VIOLATION_RATIO") + && let Ok(parsed) = value.parse::() + { + capacity.severe_violation_ratio = Some(parsed); + } + if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_REFRESH_COOLDOWN_TURNS") + && let Ok(parsed) = value.parse::() + { + capacity.refresh_cooldown_turns = Some(parsed); + } + if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_REPLAN_COOLDOWN_TURNS") + && let Ok(parsed) = value.parse::() + { + capacity.replan_cooldown_turns = Some(parsed); + } + if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_MAX_REPLAY_PER_TURN") + && let Ok(parsed) = value.parse::() + { + capacity.max_replay_per_turn = Some(parsed); + } + if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_MIN_TURNS_BEFORE_GUARDRAIL") + && let Ok(parsed) = value.parse::() + { + capacity.min_turns_before_guardrail = Some(parsed); + } + if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_PROFILE_WINDOW") + && let Ok(parsed) = value.parse::() + { + capacity.profile_window = Some(parsed); + } + if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_PRIOR_CHAT") + && let Ok(parsed) = value.parse::() + { + capacity.deepseek_v3_2_chat_prior = Some(parsed); + } + if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_PRIOR_REASONER") + && let Ok(parsed) = value.parse::() + { + capacity.deepseek_v3_2_reasoner_prior = Some(parsed); + } + if let Ok(value) = std::env::var("DEEPSEEK_CAPACITY_PRIOR_FALLBACK") + && let Ok(parsed) = value.parse::() + { + capacity.fallback_default_prior = Some(parsed); + } + + if config.capacity.as_ref().is_some_and(|c| { + c.enabled.is_none() + && c.low_risk_max.is_none() + && c.medium_risk_max.is_none() + && c.severe_min_slack.is_none() + && c.severe_violation_ratio.is_none() + && c.refresh_cooldown_turns.is_none() + && c.replan_cooldown_turns.is_none() + && c.max_replay_per_turn.is_none() + && c.min_turns_before_guardrail.is_none() + && c.profile_window.is_none() + && c.deepseek_v3_2_chat_prior.is_none() + && c.deepseek_v3_2_reasoner_prior.is_none() + && c.fallback_default_prior.is_none() + }) { + config.capacity = None; + } } fn normalize_model_config(config: &mut Config) { @@ -549,6 +697,7 @@ fn merge_config(base: Config, override_cfg: Config) -> Config { requirements_path: override_cfg.requirements_path.or(base.requirements_path), max_subagents: override_cfg.max_subagents.or(base.max_subagents), retry: override_cfg.retry.or(base.retry), + capacity: override_cfg.capacity.or(base.capacity), tui: override_cfg.tui.or(base.tui), hooks: override_cfg.hooks.or(base.hooks), features: merge_features(base.features, override_cfg.features), diff --git a/src/core/capacity.rs b/src/core/capacity.rs new file mode 100644 index 00000000..e087a599 --- /dev/null +++ b/src/core/capacity.rs @@ -0,0 +1,610 @@ +//! Capacity-aware guardrail controller for context pressure management. + +use std::collections::{HashMap, VecDeque}; + +/// Controller settings. +#[derive(Debug, Clone, PartialEq)] +pub struct CapacityControllerConfig { + pub enabled: bool, + pub low_risk_max: f64, + pub medium_risk_max: f64, + pub severe_min_slack: f64, + pub severe_violation_ratio: f64, + pub refresh_cooldown_turns: u64, + pub replan_cooldown_turns: u64, + pub max_replay_per_turn: usize, + pub min_turns_before_guardrail: u64, + pub profile_window: usize, + pub model_priors: HashMap, + pub fallback_default: f64, +} + +impl Default for CapacityControllerConfig { + fn default() -> Self { + let mut model_priors = HashMap::new(); + model_priors.insert("deepseek_v3_2_chat".to_string(), 3.9); + model_priors.insert("deepseek_v3_2_reasoner".to_string(), 4.1); + + Self { + enabled: true, + low_risk_max: 0.34, + medium_risk_max: 0.62, + severe_min_slack: -0.25, + severe_violation_ratio: 0.40, + refresh_cooldown_turns: 2, + replan_cooldown_turns: 5, + max_replay_per_turn: 1, + min_turns_before_guardrail: 2, + profile_window: 8, + model_priors, + fallback_default: 3.8, + } + } +} + +impl CapacityControllerConfig { + /// Build effective capacity config from app config. + #[must_use] + pub fn from_app_config(config: &crate::config::Config) -> Self { + let mut out = Self::default(); + let Some(capacity) = config.capacity.as_ref() else { + return out; + }; + + if let Some(v) = capacity.enabled { + out.enabled = v; + } + if let Some(v) = capacity.low_risk_max { + out.low_risk_max = v; + } + if let Some(v) = capacity.medium_risk_max { + out.medium_risk_max = v; + } + if let Some(v) = capacity.severe_min_slack { + out.severe_min_slack = v; + } + if let Some(v) = capacity.severe_violation_ratio { + out.severe_violation_ratio = v; + } + if let Some(v) = capacity.refresh_cooldown_turns { + out.refresh_cooldown_turns = v; + } + if let Some(v) = capacity.replan_cooldown_turns { + out.replan_cooldown_turns = v; + } + if let Some(v) = capacity.max_replay_per_turn { + out.max_replay_per_turn = v; + } + if let Some(v) = capacity.min_turns_before_guardrail { + out.min_turns_before_guardrail = v; + } + if let Some(v) = capacity.profile_window { + out.profile_window = v.max(2); + } + + if let Some(v) = capacity.deepseek_v3_2_chat_prior { + out.model_priors.insert("deepseek_v3_2_chat".to_string(), v); + } + if let Some(v) = capacity.deepseek_v3_2_reasoner_prior { + out.model_priors + .insert("deepseek_v3_2_reasoner".to_string(), v); + } + if let Some(v) = capacity.fallback_default_prior { + out.fallback_default = v; + } + + out + } +} + +/// Guardrail decision output. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum GuardrailAction { + NoIntervention, + TargetedContextRefresh, + VerifyWithToolReplay, + VerifyAndReplan, +} + +impl GuardrailAction { + #[must_use] + pub fn as_str(self) -> &'static str { + match self { + GuardrailAction::NoIntervention => "no_intervention", + GuardrailAction::TargetedContextRefresh => "targeted_context_refresh", + GuardrailAction::VerifyWithToolReplay => "verify_with_tool_replay", + GuardrailAction::VerifyAndReplan => "verify_and_replan", + } + } +} + +/// Coarse failure risk band. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RiskBand { + Low, + Medium, + High, +} + +impl RiskBand { + #[must_use] + pub fn as_str(self) -> &'static str { + match self { + RiskBand::Low => "low", + RiskBand::Medium => "medium", + RiskBand::High => "high", + } + } +} + +/// Input used to observe current turn pressure. +#[derive(Debug, Clone)] +pub struct CapacityObservationInput { + pub turn_index: u64, + pub model: String, + pub action_count_this_turn: usize, + pub tool_calls_recent_window: usize, + pub unique_reference_ids_recent_window: usize, + pub context_used_ratio: f64, +} + +/// Rolling slack profile. +#[derive(Debug, Clone, Copy, Default)] +pub struct DynamicSlackProfile { + pub final_slack: f64, + pub min_slack: f64, + pub violation_ratio: f64, + pub slack_volatility: f64, + pub slack_drop: f64, +} + +/// Per-checkpoint capacity snapshot. +#[derive(Debug, Clone)] +pub struct CapacitySnapshot { + pub turn_index: u64, + pub h_hat: f64, + pub c_hat: f64, + pub slack: f64, + pub profile: DynamicSlackProfile, + pub p_fail: f64, + pub risk_band: RiskBand, + pub severe: bool, +} + +/// Full controller decision including reason and block flags. +#[derive(Debug, Clone)] +pub struct CapacityDecision { + pub action: GuardrailAction, + pub reason: String, + pub cooldown_blocked: bool, +} + +#[derive(Debug, Clone, Default)] +struct GuardrailRuntimeState { + last_refresh_turn: Option, + last_replan_turn: Option, + replay_count_this_turn: usize, + replay_disabled_turn: Option, + intervention_applied_turn: Option, +} + +/// Capacity controller. +#[derive(Debug, Clone)] +pub struct CapacityController { + config: CapacityControllerConfig, + slack_window: VecDeque, + recent_tool_counts: VecDeque, + recent_ref_counts: VecDeque, + state: GuardrailRuntimeState, + last_snapshot: Option, +} + +impl CapacityController { + #[must_use] + pub fn new(config: CapacityControllerConfig) -> Self { + Self { + config, + slack_window: VecDeque::new(), + recent_tool_counts: VecDeque::new(), + recent_ref_counts: VecDeque::new(), + state: GuardrailRuntimeState::default(), + last_snapshot: None, + } + } + + #[must_use] + pub fn config(&self) -> &CapacityControllerConfig { + &self.config + } + + pub fn observe_pre_turn( + &mut self, + input: CapacityObservationInput, + ) -> Option { + self.observe(input) + } + + pub fn observe_post_tool( + &mut self, + input: CapacityObservationInput, + ) -> Option { + self.observe(input) + } + + /// Decide intervention from the latest snapshot, with cooldown and safety gates. + #[must_use] + pub fn decide( + &mut self, + turn_index: u64, + snapshot: Option<&CapacitySnapshot>, + ) -> CapacityDecision { + if !self.config.enabled { + return CapacityDecision { + action: GuardrailAction::NoIntervention, + reason: "capacity_controller_disabled".to_string(), + cooldown_blocked: false, + }; + } + + let Some(snapshot) = snapshot else { + return CapacityDecision { + action: GuardrailAction::NoIntervention, + reason: "missing_capacity_data_fail_open".to_string(), + cooldown_blocked: false, + }; + }; + + if turn_index < self.config.min_turns_before_guardrail { + return CapacityDecision { + action: GuardrailAction::NoIntervention, + reason: "min_turns_before_guardrail_not_reached".to_string(), + cooldown_blocked: false, + }; + } + + let proposed = decide_policy(&self.config, snapshot); + if proposed == GuardrailAction::NoIntervention { + return CapacityDecision { + action: proposed, + reason: "low_risk_no_intervention".to_string(), + cooldown_blocked: false, + }; + } + + if self + .state + .intervention_applied_turn + .is_some_and(|t| t == turn_index) + { + return CapacityDecision { + action: GuardrailAction::NoIntervention, + reason: "intervention_already_applied_this_turn".to_string(), + cooldown_blocked: true, + }; + } + + match proposed { + GuardrailAction::TargetedContextRefresh => { + if self + .state + .last_refresh_turn + .is_some_and(|last| turn_index <= last + self.config.refresh_cooldown_turns) + { + return CapacityDecision { + action: GuardrailAction::NoIntervention, + reason: "refresh_cooldown_active".to_string(), + cooldown_blocked: true, + }; + } + } + GuardrailAction::VerifyWithToolReplay => { + if self + .state + .replay_disabled_turn + .is_some_and(|t| t == turn_index) + { + return CapacityDecision { + action: GuardrailAction::NoIntervention, + reason: "replay_disabled_for_turn".to_string(), + cooldown_blocked: true, + }; + } + if self.state.replay_count_this_turn >= self.config.max_replay_per_turn { + return CapacityDecision { + action: GuardrailAction::NoIntervention, + reason: "max_replay_per_turn_reached".to_string(), + cooldown_blocked: true, + }; + } + } + GuardrailAction::VerifyAndReplan => { + if self + .state + .last_replan_turn + .is_some_and(|last| turn_index <= last + self.config.replan_cooldown_turns) + { + return CapacityDecision { + action: GuardrailAction::NoIntervention, + reason: "replan_cooldown_active".to_string(), + cooldown_blocked: true, + }; + } + } + GuardrailAction::NoIntervention => {} + } + + CapacityDecision { + action: proposed, + reason: "policy_selected_action".to_string(), + cooldown_blocked: false, + } + } + + pub fn mark_turn_start(&mut self, turn_index: u64) { + let new_turn = match self.last_snapshot.as_ref() { + None => true, + Some(snapshot) => snapshot.turn_index != turn_index, + }; + if new_turn { + self.state.replay_count_this_turn = 0; + self.state.replay_disabled_turn = None; + self.state.intervention_applied_turn = None; + } + } + + pub fn mark_intervention_applied(&mut self, turn_index: u64, action: GuardrailAction) { + self.state.intervention_applied_turn = Some(turn_index); + match action { + GuardrailAction::TargetedContextRefresh => { + self.state.last_refresh_turn = Some(turn_index); + } + GuardrailAction::VerifyWithToolReplay => { + self.state.replay_count_this_turn = + self.state.replay_count_this_turn.saturating_add(1); + } + GuardrailAction::VerifyAndReplan => { + self.state.last_replan_turn = Some(turn_index); + } + GuardrailAction::NoIntervention => {} + } + } + + pub fn mark_replay_failed(&mut self, turn_index: u64) { + self.state.replay_disabled_turn = Some(turn_index); + } + + #[must_use] + pub fn last_snapshot(&self) -> Option<&CapacitySnapshot> { + self.last_snapshot.as_ref() + } + + fn observe(&mut self, input: CapacityObservationInput) -> Option { + if !self.config.enabled { + return None; + } + + let context_used_ratio = input.context_used_ratio.clamp(0.0, 2.0); + let action_complexity_bits = log2_1p(input.action_count_this_turn); + let tool_complexity_bits = log2_1p(input.tool_calls_recent_window); + let ref_complexity_bits = log2_1p(input.unique_reference_ids_recent_window); + let context_pressure_bits = 6.0 * context_used_ratio; + + let h_hat = (0.35 * action_complexity_bits) + + (0.30 * tool_complexity_bits) + + (0.20 * ref_complexity_bits) + + (0.15 * context_pressure_bits); + let c_hat = self.model_prior(&input.model); + let slack = c_hat - h_hat; + + push_window(&mut self.slack_window, slack, self.config.profile_window); + push_window( + &mut self.recent_tool_counts, + input.tool_calls_recent_window, + self.config.profile_window, + ); + push_window( + &mut self.recent_ref_counts, + input.unique_reference_ids_recent_window, + self.config.profile_window, + ); + + let profile = compute_profile(&self.slack_window); + let z = (-1.65 * profile.final_slack) + + (-0.85 * profile.min_slack) + + (1.35 * profile.violation_ratio) + + (0.70 * profile.slack_volatility) + + (0.28 * profile.slack_drop) + - 0.12; + let p_fail = sigmoid(z).clamp(0.0, 1.0); + let risk_band = if p_fail <= self.config.low_risk_max { + RiskBand::Low + } else if p_fail <= self.config.medium_risk_max { + RiskBand::Medium + } else { + RiskBand::High + }; + let severe = profile.min_slack <= self.config.severe_min_slack + || profile.violation_ratio >= self.config.severe_violation_ratio; + + let snapshot = CapacitySnapshot { + turn_index: input.turn_index, + h_hat, + c_hat, + slack, + profile, + p_fail, + risk_band, + severe, + }; + self.last_snapshot = Some(snapshot.clone()); + Some(snapshot) + } + + fn model_prior(&self, model: &str) -> f64 { + let normalized = normalize_model_prior_key(model); + self.config + .model_priors + .get(normalized) + .copied() + .unwrap_or(self.config.fallback_default) + } +} + +/// Pure policy mapping for snapshot -> action. +#[must_use] +pub fn decide_policy( + _config: &CapacityControllerConfig, + snapshot: &CapacitySnapshot, +) -> GuardrailAction { + match snapshot.risk_band { + RiskBand::Low => GuardrailAction::NoIntervention, + RiskBand::Medium => GuardrailAction::TargetedContextRefresh, + RiskBand::High if snapshot.severe => GuardrailAction::VerifyAndReplan, + RiskBand::High => GuardrailAction::VerifyWithToolReplay, + } +} + +fn normalize_model_prior_key(model: &str) -> &str { + let lower = model.to_ascii_lowercase(); + if lower.contains("reasoner") || lower.contains("r1") { + "deepseek_v3_2_reasoner" + } else if lower.contains("chat") || lower.contains("v3") { + "deepseek_v3_2_chat" + } else { + "fallback_default" + } +} + +fn log2_1p(v: usize) -> f64 { + (1.0 + (v as f64)).log2() +} + +fn push_window(window: &mut VecDeque, value: T, max_len: usize) { + window.push_back(value); + while window.len() > max_len { + window.pop_front(); + } +} + +fn compute_profile(window: &VecDeque) -> DynamicSlackProfile { + if window.is_empty() { + return DynamicSlackProfile::default(); + } + + let values: Vec = window.iter().copied().collect(); + let final_slack = *values.last().unwrap_or(&0.0); + let min_slack = values.iter().copied().fold(f64::INFINITY, f64::min); + let violations = values.iter().filter(|v| **v <= 0.0).count() as f64; + let violation_ratio = violations / (values.len() as f64); + + let deltas: Vec = values.windows(2).map(|w| w[1] - w[0]).collect(); + let slack_drop = if values.len() >= 2 { + (values[values.len() - 2] - values[values.len() - 1]).max(0.0) + } else { + 0.0 + }; + + let slack_volatility = if deltas.is_empty() { + 0.0 + } else { + let mean = deltas.iter().sum::() / (deltas.len() as f64); + let var = deltas + .iter() + .map(|delta| { + let centered = *delta - mean; + centered * centered + }) + .sum::() + / (deltas.len() as f64); + var.sqrt() + }; + + DynamicSlackProfile { + final_slack, + min_slack, + violation_ratio, + slack_volatility, + slack_drop, + } +} + +fn sigmoid(z: f64) -> f64 { + if z >= 0.0 { + let ez = (-z).exp(); + 1.0 / (1.0 + ez) + } else { + let ez = z.exp(); + ez / (1.0 + ez) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_snapshot(p_fail: f64, severe: bool, risk_band: RiskBand) -> CapacitySnapshot { + CapacitySnapshot { + turn_index: 3, + h_hat: 1.0, + c_hat: 3.8, + slack: 2.8, + profile: DynamicSlackProfile { + final_slack: 2.8, + min_slack: if severe { -0.5 } else { 0.2 }, + violation_ratio: if severe { 0.6 } else { 0.1 }, + slack_volatility: 0.2, + slack_drop: 0.1, + }, + p_fail, + risk_band, + severe, + } + } + + #[test] + fn low_risk_maps_to_no_intervention() { + let cfg = CapacityControllerConfig::default(); + let snap = make_snapshot(0.2, false, RiskBand::Low); + assert_eq!(decide_policy(&cfg, &snap), GuardrailAction::NoIntervention); + } + + #[test] + fn medium_risk_maps_to_refresh() { + let cfg = CapacityControllerConfig::default(); + let snap = make_snapshot(0.5, false, RiskBand::Medium); + assert_eq!( + decide_policy(&cfg, &snap), + GuardrailAction::TargetedContextRefresh + ); + } + + #[test] + fn high_non_severe_maps_to_replay() { + let cfg = CapacityControllerConfig::default(); + let snap = make_snapshot(0.8, false, RiskBand::High); + assert_eq!( + decide_policy(&cfg, &snap), + GuardrailAction::VerifyWithToolReplay + ); + } + + #[test] + fn high_severe_maps_to_replan() { + let cfg = CapacityControllerConfig::default(); + let snap = make_snapshot(0.9, true, RiskBand::High); + assert_eq!(decide_policy(&cfg, &snap), GuardrailAction::VerifyAndReplan); + } + + #[test] + fn cooldown_blocks_repeated_action() { + let mut controller = CapacityController::new(CapacityControllerConfig::default()); + let turn_index = 5; + controller.mark_turn_start(turn_index); + controller.mark_intervention_applied(turn_index, GuardrailAction::TargetedContextRefresh); + + let snapshot = make_snapshot(0.5, false, RiskBand::Medium); + let decision = controller.decide(turn_index + 1, Some(&snapshot)); + assert_eq!(decision.action, GuardrailAction::NoIntervention); + assert!(decision.cooldown_blocked); + } +} diff --git a/src/core/capacity_memory.rs b/src/core/capacity_memory.rs new file mode 100644 index 00000000..a9900a3e --- /dev/null +++ b/src/core/capacity_memory.rs @@ -0,0 +1,337 @@ +//! Persistent memory snapshots for capacity controller interventions. + +use std::fs::{self, OpenOptions}; +use std::io::{BufRead, BufReader, Write}; +use std::path::{Path, PathBuf}; +use std::time::SystemTime; + +use anyhow::{Context, Result, anyhow}; +use chrono::Utc; +use serde::{Deserialize, Serialize}; + +/// Canonical compact state persisted by interventions. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct CanonicalState { + pub goal: String, + pub constraints: Vec, + pub confirmed_facts: Vec, + pub open_loops: Vec, + pub pending_actions: Vec, + pub critical_refs: Vec, +} + +/// Replay verification metadata. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ReplayInfo { + pub tool_id: String, + pub tool_name: String, + pub pass: bool, + pub diff_summary: String, +} + +/// JSONL record written for each intervention. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CapacityMemoryRecord { + pub id: String, + pub ts: String, + pub turn_index: u64, + pub action_trigger: String, + pub h_hat: f64, + pub c_hat: f64, + pub slack: f64, + pub risk_band: String, + pub canonical_state: CanonicalState, + pub source_message_ids: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub replay_info: Option, +} + +/// Resolve `~/.deepseek/memory`. +#[must_use] +pub fn default_capacity_memory_dir() -> PathBuf { + capacity_memory_dirs() + .into_iter() + .next() + .unwrap_or_else(|| PathBuf::from(".").join(".deepseek").join("memory")) +} + +fn capacity_memory_dirs() -> Vec { + if let Ok(raw) = std::env::var("DEEPSEEK_CAPACITY_MEMORY_DIR") { + let trimmed = raw.trim(); + if !trimmed.is_empty() { + return vec![PathBuf::from(shellexpand::tilde(trimmed).as_ref())]; + } + } + + let mut dirs = Vec::new(); + if let Some(home) = dirs::home_dir() { + dirs.push(home.join(".deepseek").join("memory")); + } + + let cwd = std::env::current_dir() + .unwrap_or_else(|_| PathBuf::from(".")) + .join(".deepseek") + .join("memory"); + dirs.push(cwd); + + dirs.dedup(); + dirs +} + +#[must_use] +pub fn session_memory_path(session_id: &str) -> PathBuf { + default_capacity_memory_dir().join(format!("{session_id}.jsonl")) +} + +pub fn append_capacity_record(session_id: &str, record: &CapacityMemoryRecord) -> Result { + let candidates = candidate_session_memory_paths(session_id); + append_capacity_record_to_candidates(&candidates, record) +} + +pub fn append_capacity_record_to_path(path: &Path, record: &CapacityMemoryRecord) -> Result<()> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent) + .with_context(|| format!("Failed to create memory directory {}", parent.display()))?; + } + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(path) + .with_context(|| format!("Failed to open memory log {}", path.display()))?; + let line = + serde_json::to_string(record).context("Failed to serialize capacity memory record")?; + writeln!(file, "{line}") + .with_context(|| format!("Failed to write memory record {}", path.display()))?; + Ok(()) +} + +pub fn load_last_k_capacity_records( + session_id: &str, + k: usize, +) -> Result> { + let candidates = candidate_session_memory_paths(session_id); + load_last_k_capacity_records_from_candidates(&candidates, k) +} + +pub fn load_last_k_capacity_records_from_path( + path: &Path, + k: usize, +) -> Result> { + if k == 0 || !path.exists() { + return Ok(Vec::new()); + } + + let file = OpenOptions::new() + .read(true) + .open(path) + .with_context(|| format!("Failed to open memory log {}", path.display()))?; + let reader = BufReader::new(file); + let mut records = Vec::new(); + + for line in reader.lines() { + let line = line.with_context(|| format!("Failed reading {}", path.display()))?; + if line.trim().is_empty() { + continue; + } + if let Ok(record) = serde_json::from_str::(&line) { + records.push(record); + } + } + + if records.len() > k { + Ok(records.split_off(records.len() - k)) + } else { + Ok(records) + } +} + +fn candidate_session_memory_paths(session_id: &str) -> Vec { + capacity_memory_dirs() + .into_iter() + .map(|dir| dir.join(format!("{session_id}.jsonl"))) + .collect() +} + +fn append_capacity_record_to_candidates( + paths: &[PathBuf], + record: &CapacityMemoryRecord, +) -> Result { + let mut last_err: Option = None; + for path in paths { + match append_capacity_record_to_path(path, record) { + Ok(()) => return Ok(path.clone()), + Err(err) => last_err = Some(err), + } + } + + Err(last_err.unwrap_or_else(|| anyhow!("No capacity memory path candidates available"))) +} + +fn load_last_k_capacity_records_from_candidates( + paths: &[PathBuf], + k: usize, +) -> Result> { + if k == 0 { + return Ok(Vec::new()); + } + + let mut newest: Option<(SystemTime, Vec)> = None; + let mut last_err: Option = None; + + for path in paths { + if !path.exists() { + continue; + } + + match load_last_k_capacity_records_from_path(path, k) { + Ok(records) => { + if records.is_empty() { + continue; + } + let modified = fs::metadata(path) + .and_then(|meta| meta.modified()) + .unwrap_or(SystemTime::UNIX_EPOCH); + let should_replace = newest + .as_ref() + .map(|(current, _)| modified >= *current) + .unwrap_or(true); + if should_replace { + newest = Some((modified, records)); + } + } + Err(err) => last_err = Some(err), + } + } + + if let Some((_, records)) = newest { + return Ok(records); + } + if let Some(err) = last_err { + return Err(err); + } + Ok(Vec::new()) +} + +#[must_use] +pub fn new_record_id() -> String { + format!("cap_{}", &uuid::Uuid::new_v4().to_string()[..8]) +} + +#[must_use] +pub fn now_rfc3339() -> String { + Utc::now().to_rfc3339() +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + #[test] + fn memory_jsonl_round_trip() { + let tmp = tempdir().expect("tempdir"); + let path = tmp.path().join("session.jsonl"); + + let record = CapacityMemoryRecord { + id: "cap_1".to_string(), + ts: now_rfc3339(), + turn_index: 2, + action_trigger: "targeted_context_refresh".to_string(), + h_hat: 1.2, + c_hat: 3.8, + slack: 2.6, + risk_band: "medium".to_string(), + canonical_state: CanonicalState { + goal: "Ship feature".to_string(), + ..CanonicalState::default() + }, + source_message_ids: vec!["m1".to_string()], + replay_info: None, + }; + + append_capacity_record_to_path(&path, &record).expect("append"); + let records = load_last_k_capacity_records_from_path(&path, 1).expect("load"); + assert_eq!(records.len(), 1); + assert_eq!(records[0].canonical_state.goal, "Ship feature"); + } + + #[test] + fn append_falls_back_to_next_candidate_path() { + let tmp = tempdir().expect("tempdir"); + let blocked_root = tmp.path().join("blocked"); + fs::write(&blocked_root, "file").expect("create blocking file"); + let blocked_path = blocked_root.join("session.jsonl"); + let fallback_path = tmp.path().join("fallback").join("session.jsonl"); + + let record = CapacityMemoryRecord { + id: "cap_fallback".to_string(), + ts: now_rfc3339(), + turn_index: 1, + action_trigger: "targeted_context_refresh".to_string(), + h_hat: 1.0, + c_hat: 3.8, + slack: 2.8, + risk_band: "medium".to_string(), + canonical_state: CanonicalState::default(), + source_message_ids: vec!["m1".to_string()], + replay_info: None, + }; + + let chosen = append_capacity_record_to_candidates( + &[blocked_path.clone(), fallback_path.clone()], + &record, + ) + .expect("append with fallback"); + assert_eq!(chosen, fallback_path); + assert!(chosen.exists()); + } + + #[test] + fn load_prefers_newest_candidate_records() { + let tmp = tempdir().expect("tempdir"); + let older = tmp.path().join("older.jsonl"); + let newer = tmp.path().join("newer.jsonl"); + + let old_record = CapacityMemoryRecord { + id: "cap_old".to_string(), + ts: now_rfc3339(), + turn_index: 1, + action_trigger: "targeted_context_refresh".to_string(), + h_hat: 1.0, + c_hat: 3.8, + slack: 2.8, + risk_band: "medium".to_string(), + canonical_state: CanonicalState { + goal: "old".to_string(), + ..CanonicalState::default() + }, + source_message_ids: vec!["m1".to_string()], + replay_info: None, + }; + let new_record = CapacityMemoryRecord { + id: "cap_new".to_string(), + ts: now_rfc3339(), + turn_index: 2, + action_trigger: "verify_and_replan".to_string(), + h_hat: 1.4, + c_hat: 3.8, + slack: 2.4, + risk_band: "high".to_string(), + canonical_state: CanonicalState { + goal: "new".to_string(), + ..CanonicalState::default() + }, + source_message_ids: vec!["m2".to_string()], + replay_info: None, + }; + + append_capacity_record_to_path(&older, &old_record).expect("write older"); + std::thread::sleep(std::time::Duration::from_millis(10)); + append_capacity_record_to_path(&newer, &new_record).expect("write newer"); + + let records = load_last_k_capacity_records_from_candidates(&[older, newer], 1) + .expect("load newest records"); + assert_eq!(records.len(), 1); + assert_eq!(records[0].canonical_state.goal, "new"); + } +} diff --git a/src/core/engine.rs b/src/core/engine.rs index 88133cb4..f41e4afd 100644 --- a/src/core/engine.rs +++ b/src/core/engine.rs @@ -9,7 +9,7 @@ use std::path::PathBuf; use std::pin::pin; -use std::sync::Arc; +use std::sync::{Arc, Mutex as StdMutex}; use std::time::{Duration, Instant}; use std::{fs::OpenOptions, io::Write}; @@ -29,8 +29,8 @@ use crate::features::{Feature, Features}; use crate::llm_client::LlmClient; use crate::mcp::McpPool; use crate::models::{ - ContentBlock, ContentBlockStart, Delta, Message, MessageRequest, StreamEvent, SystemPrompt, - Tool, Usage, context_window_for_model, + ContentBlock, ContentBlockStart, DEFAULT_CONTEXT_WINDOW_TOKENS, Delta, Message, MessageRequest, + StreamEvent, SystemPrompt, Tool, Usage, context_window_for_model, }; use crate::prompts; use crate::tools::plan::{SharedPlanState, new_shared_plan_state}; @@ -44,6 +44,14 @@ use crate::tools::user_input::{UserInputRequest, UserInputResponse}; use crate::tools::{ToolContext, ToolRegistryBuilder}; use crate::tui::app::AppMode; +use super::capacity::{ + CapacityController, CapacityControllerConfig, CapacityDecision, CapacityObservationInput, + CapacitySnapshot, GuardrailAction, RiskBand, +}; +use super::capacity_memory::{ + CanonicalState, CapacityMemoryRecord, ReplayInfo, append_capacity_record, + load_last_k_capacity_records, new_record_id, now_rfc3339, +}; use super::events::{Event, TurnOutcomeStatus}; use super::ops::Op; use super::session::Session; @@ -75,6 +83,8 @@ pub struct EngineConfig { pub features: Features, /// Auto-compaction settings for long conversations. pub compaction: CompactionConfig, + /// Capacity-controller settings. + pub capacity: CapacityControllerConfig, /// Shared Todo list state. pub todos: SharedTodoList, /// Shared Plan state. @@ -86,7 +96,7 @@ impl Default for EngineConfig { Self { model: DEFAULT_TEXT_MODEL.to_string(), workspace: PathBuf::from("."), - allow_shell: false, + allow_shell: true, trust_mode: false, notes_path: PathBuf::from("notes.txt"), mcp_config_path: PathBuf::from("mcp.json"), @@ -94,6 +104,7 @@ impl Default for EngineConfig { max_subagents: DEFAULT_MAX_SUBAGENTS, features: Features::with_defaults(), compaction: CompactionConfig::default(), + capacity: CapacityControllerConfig::default(), todos: new_shared_todo_list(), plan_state: new_shared_plan_state(), } @@ -107,8 +118,8 @@ pub struct EngineHandle { pub tx_op: mpsc::Sender, /// Receive events from the engine pub rx_event: Arc>>, - /// Cancellation token for the current request - cancel_token: CancellationToken, + /// Shared pointer to the cancellation token for the current request. + cancel_token: Arc>, /// Send approval decisions to the engine tx_approval: mpsc::Sender, /// Send user input responses to the engine @@ -126,13 +137,19 @@ impl EngineHandle { /// Cancel the current request pub fn cancel(&self) { - self.cancel_token.cancel(); + match self.cancel_token.lock() { + Ok(token) => token.cancel(), + Err(poisoned) => poisoned.into_inner().cancel(), + } } /// Check if a request is currently cancelled #[must_use] pub fn is_cancelled(&self) -> bool { - self.cancel_token.is_cancelled() + match self.cancel_token.lock() { + Ok(token) => token.is_cancelled(), + Err(poisoned) => poisoned.into_inner().is_cancelled(), + } } /// Approve a pending tool call @@ -213,7 +230,10 @@ pub struct Engine { rx_steer: mpsc::Receiver, tx_event: mpsc::Sender, cancel_token: CancellationToken, + shared_cancel_token: Arc>, tool_exec_lock: Arc>, + capacity_controller: CapacityController, + turn_counter: u64, } #[derive(Debug, Clone)] @@ -326,6 +346,15 @@ const MIN_RECENT_MESSAGES_TO_KEEP: usize = 4; const MAX_CONTEXT_RECOVERY_ATTEMPTS: u8 = 2; /// Reserve additional headroom to avoid hitting provider hard limits. const CONTEXT_HEADROOM_TOKENS: usize = 1024; +/// Hard cap for any tool output inserted into model context. +const TOOL_RESULT_CONTEXT_HARD_LIMIT_CHARS: usize = 12_000; +/// Soft cap for known noisy tools inserted into model context. +const TOOL_RESULT_CONTEXT_SOFT_LIMIT_CHARS: usize = 2_000; +/// Snippet length kept when compacting tool output for model context. +const TOOL_RESULT_CONTEXT_SNIPPET_CHARS: usize = 900; +/// Max chars to keep from metadata-provided output summaries. +const TOOL_RESULT_METADATA_SUMMARY_CHARS: usize = 320; +const COMPACTION_SUMMARY_MARKER: &str = "Conversation Summary (Auto-Generated)"; const TOOL_CALL_START_MARKERS: [&str; 5] = [ "[TOOL_CALL]", @@ -563,6 +592,86 @@ fn summarize_text(text: &str, limit: usize) -> String { out } +fn tool_result_is_noisy(tool_name: &str) -> bool { + matches!( + tool_name, + "exec_shell" + | "exec_shell_wait" + | "exec_shell_interact" + | "multi_tool_use.parallel" + | "web_search" + | "weather" + | "finance" + | "sports" + | "time" + ) +} + +fn tool_result_metadata_summary(metadata: Option<&serde_json::Value>) -> Option { + let obj = metadata?.as_object()?; + for key in ["summary", "stdout_summary", "stderr_summary", "message"] { + if let Some(text) = obj.get(key).and_then(serde_json::Value::as_str) { + let trimmed = text.trim(); + if !trimmed.is_empty() { + return Some(summarize_text(trimmed, TOOL_RESULT_METADATA_SUMMARY_CHARS)); + } + } + } + None +} + +fn compact_tool_result_for_context(tool_name: &str, output: &ToolResult) -> String { + let raw = output.content.trim(); + if raw.is_empty() { + return String::new(); + } + + let raw_chars = raw.chars().count(); + let should_compact = raw_chars > TOOL_RESULT_CONTEXT_HARD_LIMIT_CHARS + || (tool_result_is_noisy(tool_name) && raw_chars > TOOL_RESULT_CONTEXT_SOFT_LIMIT_CHARS); + if !should_compact { + return raw.to_string(); + } + + let snippet = summarize_text(raw, TOOL_RESULT_CONTEXT_SNIPPET_CHARS); + let omitted = raw_chars.saturating_sub(snippet.chars().count()); + let summary = tool_result_metadata_summary(output.metadata.as_ref()); + + if let Some(summary) = summary { + format!( + "[{tool_name} output compacted to protect context]\nSummary: {summary}\nSnippet: {snippet}\n(Original: {raw_chars} chars, omitted: {omitted} chars.)" + ) + } else { + format!( + "[{tool_name} output compacted to protect context]\nSnippet: {snippet}\n(Original: {raw_chars} chars, omitted: {omitted} chars.)" + ) + } +} + +fn extract_compaction_summary_prompt(prompt: Option) -> Option { + match prompt { + Some(SystemPrompt::Blocks(blocks)) => { + let summary_blocks: Vec<_> = blocks + .into_iter() + .filter(|block| block.text.contains(COMPACTION_SUMMARY_MARKER)) + .collect(); + if summary_blocks.is_empty() { + None + } else { + Some(SystemPrompt::Blocks(summary_blocks)) + } + } + Some(SystemPrompt::Text(text)) => { + if text.contains(COMPACTION_SUMMARY_MARKER) { + Some(SystemPrompt::Text(text)) + } else { + None + } + } + None => None, + } +} + fn estimate_text_tokens_conservative(text: &str) -> usize { text.chars().count().div_ceil(3) } @@ -626,6 +735,19 @@ fn emit_tool_audit(event: serde_json::Value) { } impl Engine { + fn reset_cancel_token(&mut self) { + let token = CancellationToken::new(); + self.cancel_token = token.clone(); + match self.shared_cancel_token.lock() { + Ok(mut shared) => { + *shared = token; + } + Err(poisoned) => { + *poisoned.into_inner() = token; + } + } + } + /// Create a new engine with the given configuration pub fn new(config: EngineConfig, api_config: &Config) -> (Self, EngineHandle) { let (tx_op, rx_op) = mpsc::channel(32); @@ -634,6 +756,7 @@ impl Engine { let (tx_user_input, rx_user_input) = mpsc::channel(32); let (tx_steer, rx_steer) = mpsc::channel(64); let cancel_token = CancellationToken::new(); + let shared_cancel_token = Arc::new(StdMutex::new(cancel_token.clone())); let tool_exec_lock = Arc::new(RwLock::new(())); // Create clients for both providers @@ -663,8 +786,9 @@ impl Engine { let subagent_manager = new_shared_subagent_manager(config.workspace.clone(), config.max_subagents); let shell_manager = new_shared_shell_manager(config.workspace.clone()); + let capacity_controller = CapacityController::new(config.capacity.clone()); - let engine = Engine { + let mut engine = Engine { config, deepseek_client, deepseek_client_error, @@ -678,13 +802,17 @@ impl Engine { rx_steer, tx_event, cancel_token: cancel_token.clone(), + shared_cancel_token: shared_cancel_token.clone(), tool_exec_lock, + capacity_controller, + turn_counter: 0, }; + engine.rehydrate_latest_canonical_state(); let handle = EngineHandle { tx_op, rx_event: Arc::new(RwLock::new(rx_event)), - cancel_token, + cancel_token: shared_cancel_token, tx_approval, tx_user_input, tx_steer, @@ -710,8 +838,7 @@ impl Engine { } Op::CancelRequest => { self.cancel_token.cancel(); - // Create a new token for the next request - self.cancel_token = CancellationToken::new(); + self.reset_cancel_token(); } Op::ApproveToolCall { id } => { // Tool approval handling will be implemented in tools module @@ -822,6 +949,8 @@ impl Engine { workspace, } => { self.session.messages = messages; + self.session.compaction_summary_prompt = + extract_compaction_summary_prompt(system_prompt.clone()); self.session.system_prompt = system_prompt; self.session.model = model; self.session.workspace = workspace.clone(); @@ -834,6 +963,7 @@ impl Engine { None }; self.session.rebuild_working_set(); + self.rehydrate_latest_canonical_state(); let _ = self .tx_event .send(Event::status("Session context synced".to_string())) @@ -859,13 +989,15 @@ impl Engine { trust_mode: bool, ) { // Reset cancel token for fresh turn (in case previous was cancelled) - self.cancel_token = CancellationToken::new(); + self.reset_cancel_token(); // Drain stale steer messages from previous turns. while self.rx_steer.try_recv().is_ok() {} // Create turn context first so start event includes a stable turn id. let mut turn = TurnContext::new(self.config.max_steps); + self.turn_counter = self.turn_counter.saturating_add(1); + self.capacity_controller.mark_turn_start(self.turn_counter); // Emit turn started event let _ = self @@ -918,16 +1050,8 @@ impl Engine { self.session.trust_mode = trust_mode; self.config.trust_mode = trust_mode; - // Update system prompt to match the current mode - let working_set_summary = self - .session - .working_set - .summary_block(&self.config.workspace); - self.session.system_prompt = Some(prompts::system_prompt_for_mode_with_context( - mode, - &self.config.workspace, - working_set_summary.as_deref(), - )); + // Update system prompt to match current mode and include persisted compaction context. + self.refresh_system_prompt(mode); // Build tool registry and tool list for the current mode let todo_list = self.config.todos.clone(); @@ -1072,10 +1196,7 @@ impl Engine { Ok(result) => { if !result.messages.is_empty() || self.session.messages.is_empty() { self.session.messages = result.messages; - self.session.system_prompt = merge_system_prompts( - self.session.system_prompt.as_ref(), - result.summary_prompt, - ); + self.merge_compaction_summary(result.summary_prompt); let message = if result.retries_used > 0 { format!( "Manual context compaction completed (after {} retries)", @@ -1203,8 +1324,7 @@ impl Engine { if !compacted_messages.is_empty() || self.session.messages.is_empty() { self.session.messages = compacted_messages; } - self.session.system_prompt = - merge_system_prompts(self.session.system_prompt.as_ref(), summary_prompt); + self.merge_compaction_summary(summary_prompt); let trimmed = self.trim_oldest_messages_to_budget(target_budget); let after_tokens = self.estimated_input_tokens(); @@ -1649,10 +1769,7 @@ impl Engine { // Only update if we got valid messages (never corrupt state) if !result.messages.is_empty() || self.session.messages.is_empty() { self.session.messages = result.messages; - self.session.system_prompt = merge_system_prompts( - self.session.system_prompt.as_ref(), - result.summary_prompt, - ); + self.merge_compaction_summary(result.summary_prompt); let status = if result.retries_used > 0 { format!( "Auto-compaction complete (after {} retries)", @@ -1699,6 +1816,13 @@ impl Engine { } } + if self + .run_capacity_pre_request_checkpoint(turn, Some(&client), _mode) + .await + { + continue; + } + if let Some(input_budget) = context_input_budget(&self.session.model, TURN_MAX_OUTPUT_TOKENS) { @@ -2459,20 +2583,22 @@ impl Engine { "tool_name": outcome.name.clone(), "success": output.success, })); + let output_for_context = + compact_tool_result_for_context(&outcome.name, &output); let output_content = output.content; tool_call.set_result(output_content.clone(), duration); self.session.working_set.observe_tool_call( &tool_name_for_ws, &tool_input, - Some(&output_content), + Some(&output_for_context), &self.session.workspace, ); self.session.add_message(Message { role: "user".to_string(), content: vec![ContentBlock::ToolResult { tool_use_id: outcome.id, - content: output_content, + content: output_for_context, }], }); } @@ -2506,6 +2632,22 @@ impl Engine { turn.record_tool_call(tool_call); } + if self + .run_capacity_post_tool_checkpoint( + turn, + _mode, + tool_registry, + tool_exec_lock.clone(), + mcp_pool.clone(), + step_error_count, + consecutive_tool_error_steps, + ) + .await + { + turn.next_step(); + continue; + } + if !pending_steers.is_empty() { for steer in pending_steers.drain(..) { self.session @@ -2527,6 +2669,19 @@ impl Engine { consecutive_tool_error_steps = 0; } + if self + .run_capacity_error_escalation_checkpoint( + turn, + _mode, + step_error_count, + consecutive_tool_error_steps, + ) + .await + { + turn.next_step(); + continue; + } + if consecutive_tool_error_steps >= 3 { let _ = self .tx_event @@ -2549,6 +2704,828 @@ impl Engine { (TurnOutcomeStatus::Completed, None) } + async fn run_capacity_pre_request_checkpoint( + &mut self, + turn: &TurnContext, + client: Option<&DeepSeekClient>, + mode: AppMode, + ) -> bool { + let snapshot = self + .capacity_controller + .observe_pre_turn(self.capacity_observation(turn)); + let decision = self + .capacity_controller + .decide(self.turn_counter, snapshot.as_ref()); + self.emit_capacity_decision(turn, snapshot.as_ref(), &decision) + .await; + + if decision.action != GuardrailAction::TargetedContextRefresh { + return false; + } + + self.apply_targeted_context_refresh(turn, client, mode, snapshot.as_ref()) + .await + } + + #[allow(clippy::too_many_arguments)] + async fn run_capacity_post_tool_checkpoint( + &mut self, + turn: &TurnContext, + mode: AppMode, + tool_registry: Option<&crate::tools::ToolRegistry>, + tool_exec_lock: Arc>, + mcp_pool: Option>>, + _step_error_count: usize, + _consecutive_tool_error_steps: u32, + ) -> bool { + let snapshot = self + .capacity_controller + .observe_post_tool(self.capacity_observation(turn)); + let decision = self + .capacity_controller + .decide(self.turn_counter, snapshot.as_ref()); + self.emit_capacity_decision(turn, snapshot.as_ref(), &decision) + .await; + + match decision.action { + GuardrailAction::VerifyWithToolReplay => { + let _ = self + .apply_verify_with_tool_replay( + turn, + mode, + snapshot.as_ref(), + tool_registry, + tool_exec_lock, + mcp_pool, + ) + .await; + false + } + GuardrailAction::VerifyAndReplan => { + self.apply_verify_and_replan(turn, mode, snapshot.as_ref(), "high_risk_post_tool") + .await + } + GuardrailAction::NoIntervention | GuardrailAction::TargetedContextRefresh => false, + } + } + + async fn run_capacity_error_escalation_checkpoint( + &mut self, + turn: &TurnContext, + mode: AppMode, + step_error_count: usize, + consecutive_tool_error_steps: u32, + ) -> bool { + if step_error_count == 0 && consecutive_tool_error_steps < 2 { + return false; + } + + let snapshot = self + .capacity_controller + .last_snapshot() + .cloned() + .or_else(|| { + self.capacity_controller + .observe_pre_turn(self.capacity_observation(turn)) + }); + let Some(snapshot) = snapshot else { + return false; + }; + + let repeated_failures = step_error_count >= 2 || consecutive_tool_error_steps >= 2; + let mut forced = snapshot.clone(); + if repeated_failures && !(snapshot.risk_band == RiskBand::High && snapshot.severe) { + forced.risk_band = RiskBand::High; + forced.severe = true; + } + + let decision = self + .capacity_controller + .decide(self.turn_counter, Some(&forced)); + self.emit_capacity_decision(turn, Some(&forced), &decision) + .await; + + if decision.action != GuardrailAction::VerifyAndReplan { + return false; + } + + self.apply_verify_and_replan( + turn, + mode, + Some(&forced), + &format!( + "error_escalation: step_errors={step_error_count}, consecutive_steps={consecutive_tool_error_steps}" + ), + ) + .await + } + + fn capacity_observation(&self, turn: &TurnContext) -> CapacityObservationInput { + let message_window = self.config.capacity.profile_window.max(8) * 3; + let action_count_this_turn = usize::try_from(turn.step) + .unwrap_or(usize::MAX) + .saturating_add(turn.tool_calls.len()) + .saturating_add(1); + let tool_calls_recent_window = self.recent_tool_call_count(message_window); + let unique_reference_ids_recent_window = + self.recent_unique_reference_count(message_window, turn); + let context_window = usize::try_from( + context_window_for_model(&self.session.model).unwrap_or(DEFAULT_CONTEXT_WINDOW_TOKENS), + ) + .unwrap_or(usize::try_from(DEFAULT_CONTEXT_WINDOW_TOKENS).unwrap_or(128_000)) + .max(1); + let context_used_ratio = (self.estimated_input_tokens() as f64) / (context_window as f64); + + CapacityObservationInput { + turn_index: self.turn_counter, + model: self.session.model.clone(), + action_count_this_turn, + tool_calls_recent_window, + unique_reference_ids_recent_window, + context_used_ratio, + } + } + + fn recent_tool_call_count(&self, message_window: usize) -> usize { + self.session + .messages + .iter() + .rev() + .take(message_window) + .map(|msg| { + msg.content + .iter() + .filter(|block| { + matches!( + block, + ContentBlock::ToolUse { .. } | ContentBlock::ToolResult { .. } + ) + }) + .count() + }) + .sum() + } + + fn recent_unique_reference_count(&self, message_window: usize, turn: &TurnContext) -> usize { + let mut refs = std::collections::HashSet::new(); + for msg in self.session.messages.iter().rev().take(message_window) { + for block in &msg.content { + match block { + ContentBlock::ToolUse { id, .. } => { + refs.insert(id.clone()); + } + ContentBlock::ToolResult { tool_use_id, .. } => { + refs.insert(tool_use_id.clone()); + } + ContentBlock::Text { text, .. } => { + for token in text.split_whitespace() { + if token.contains('/') || token.contains('.') { + refs.insert( + token + .trim_matches(|c: char| ",.;:()[]{}".contains(c)) + .to_string(), + ); + } + } + } + ContentBlock::Thinking { .. } => {} + } + } + } + for tool_call in turn.tool_calls.iter().rev().take(8) { + refs.insert(tool_call.id.clone()); + } + for path in self.session.working_set.top_paths(8) { + refs.insert(path); + } + refs.retain(|item| !item.is_empty()); + refs.len() + } + + async fn emit_capacity_decision( + &self, + turn: &TurnContext, + snapshot: Option<&CapacitySnapshot>, + decision: &CapacityDecision, + ) { + let Some(snapshot) = snapshot else { + return; + }; + let _ = self + .tx_event + .send(Event::CapacityDecision { + session_id: self.session.id.clone(), + turn_id: turn.id.clone(), + h_hat: snapshot.h_hat, + c_hat: snapshot.c_hat, + slack: snapshot.slack, + min_slack: snapshot.profile.min_slack, + violation_ratio: snapshot.profile.violation_ratio, + p_fail: snapshot.p_fail, + risk_band: snapshot.risk_band.as_str().to_string(), + action: decision.action.as_str().to_string(), + cooldown_blocked: decision.cooldown_blocked, + reason: decision.reason.clone(), + }) + .await; + } + + async fn emit_capacity_intervention( + &self, + turn: &TurnContext, + action: GuardrailAction, + before_prompt_tokens: usize, + after_prompt_tokens: usize, + replay_outcome: Option, + replan_performed: bool, + ) { + let _ = self + .tx_event + .send(Event::CapacityIntervention { + session_id: self.session.id.clone(), + turn_id: turn.id.clone(), + action: action.as_str().to_string(), + before_prompt_tokens, + after_prompt_tokens, + compaction_size_reduction: before_prompt_tokens.saturating_sub(after_prompt_tokens), + replay_outcome, + replan_performed, + }) + .await; + } + + async fn apply_targeted_context_refresh( + &mut self, + turn: &TurnContext, + client: Option<&DeepSeekClient>, + mode: AppMode, + snapshot: Option<&CapacitySnapshot>, + ) -> bool { + let before_tokens = self.estimated_input_tokens(); + let compaction_pins = self + .session + .working_set + .pinned_message_indices(&self.session.messages, &self.session.workspace); + let compaction_paths = self.session.working_set.top_paths(24); + + let mut refreshed = false; + if let Some(client) = client { + match compact_messages_safe( + client, + &self.session.messages, + &self.config.compaction, + Some(&self.session.workspace), + Some(&compaction_pins), + Some(&compaction_paths), + ) + .await + { + Ok(result) => { + if !result.messages.is_empty() || self.session.messages.is_empty() { + self.session.messages = result.messages; + self.merge_compaction_summary(result.summary_prompt); + refreshed = true; + } + } + Err(err) => { + let _ = self + .tx_event + .send(Event::status(format!( + "Capacity refresh compaction failed: {err}. Falling back to local trim." + ))) + .await; + } + } + } + + if !refreshed { + let target_budget = context_input_budget(&self.session.model, TURN_MAX_OUTPUT_TOKENS) + .unwrap_or(self.config.compaction.token_threshold.max(1)); + let trimmed = self.trim_oldest_messages_to_budget(target_budget); + refreshed = trimmed > 0; + } + + if !refreshed { + return false; + } + + let canonical = self.build_canonical_state(turn, None); + let source_message_ids = self.capacity_source_message_ids(turn); + let record = self.build_capacity_record( + turn, + GuardrailAction::TargetedContextRefresh, + snapshot, + canonical.clone(), + source_message_ids, + None, + ); + let pointer = self + .persist_capacity_record(turn, GuardrailAction::TargetedContextRefresh, &record) + .await; + self.merge_compaction_summary(Some(self.canonical_prompt( + &canonical, + &pointer, + GuardrailAction::TargetedContextRefresh, + None, + ))); + self.refresh_system_prompt(mode); + + let after_tokens = self.estimated_input_tokens(); + self.emit_capacity_intervention( + turn, + GuardrailAction::TargetedContextRefresh, + before_tokens, + after_tokens, + None, + false, + ) + .await; + self.capacity_controller + .mark_intervention_applied(self.turn_counter, GuardrailAction::TargetedContextRefresh); + true + } + + #[allow(clippy::too_many_arguments)] + async fn apply_verify_with_tool_replay( + &mut self, + turn: &TurnContext, + mode: AppMode, + snapshot: Option<&CapacitySnapshot>, + tool_registry: Option<&crate::tools::ToolRegistry>, + tool_exec_lock: Arc>, + mut mcp_pool: Option>>, + ) -> bool { + let before_tokens = self.estimated_input_tokens(); + let Some(candidate) = self.select_replay_candidate(turn, tool_registry) else { + return false; + }; + + if McpPool::is_mcp_tool(&candidate.name) && mcp_pool.is_none() { + mcp_pool = self.ensure_mcp_pool().await.ok(); + } + + let supports_parallel = if McpPool::is_mcp_tool(&candidate.name) { + mcp_tool_is_parallel_safe(&candidate.name) + } else { + tool_registry + .and_then(|registry| registry.get(&candidate.name)) + .is_some_and(|spec| spec.supports_parallel()) + }; + let interactive = (candidate.name == "exec_shell" + && candidate + .input + .get("interactive") + .and_then(serde_json::Value::as_bool) + == Some(true)) + || candidate.name == REQUEST_USER_INPUT_NAME; + + let replay_result = Self::execute_tool_with_lock( + tool_exec_lock, + supports_parallel, + interactive, + self.tx_event.clone(), + candidate.name.clone(), + candidate.input.clone(), + tool_registry, + mcp_pool.clone(), + None, + ) + .await; + + let (pass, replay_outcome, diff_summary) = match replay_result { + Ok(output) => { + let original = candidate.result.as_deref().unwrap_or_default(); + let replay = output.content.as_str(); + let equal = original.trim() == replay.trim(); + let diff = if equal { + "output_match".to_string() + } else { + format!( + "output_mismatch: original='{}' replay='{}'", + summarize_text(original, 140), + summarize_text(replay, 140) + ) + }; + ( + equal, + if equal { + "pass".to_string() + } else { + "conflict".to_string() + }, + diff, + ) + } + Err(err) => { + self.capacity_controller + .mark_replay_failed(self.turn_counter); + ( + false, + "error".to_string(), + format!("replay_error: {}", summarize_text(&err.to_string(), 180)), + ) + } + }; + + let verification_note = format!( + "[verification replay] tool={} pass={} details={}", + candidate.name, pass, diff_summary + ); + self.session.add_message(Message { + role: "user".to_string(), + content: vec![ContentBlock::ToolResult { + tool_use_id: candidate.id.clone(), + content: verification_note.clone(), + }], + }); + + if !pass { + self.capacity_controller + .mark_replay_failed(self.turn_counter); + } + + let canonical = self.build_canonical_state( + turn, + Some(if pass { + "replay verification passed" + } else { + "replay verification failed or conflicted" + }), + ); + let replay_info = Some(ReplayInfo { + tool_id: candidate.id.clone(), + tool_name: candidate.name.clone(), + pass, + diff_summary: diff_summary.clone(), + }); + let source_message_ids = self.capacity_source_message_ids(turn); + let record = self.build_capacity_record( + turn, + GuardrailAction::VerifyWithToolReplay, + snapshot, + canonical.clone(), + source_message_ids, + replay_info, + ); + let pointer = self + .persist_capacity_record(turn, GuardrailAction::VerifyWithToolReplay, &record) + .await; + self.merge_compaction_summary(Some(self.canonical_prompt( + &canonical, + &pointer, + GuardrailAction::VerifyWithToolReplay, + Some(&verification_note), + ))); + self.refresh_system_prompt(mode); + + let after_tokens = self.estimated_input_tokens(); + self.emit_capacity_intervention( + turn, + GuardrailAction::VerifyWithToolReplay, + before_tokens, + after_tokens, + Some(replay_outcome), + false, + ) + .await; + self.capacity_controller + .mark_intervention_applied(self.turn_counter, GuardrailAction::VerifyWithToolReplay); + true + } + + async fn apply_verify_and_replan( + &mut self, + turn: &TurnContext, + mode: AppMode, + snapshot: Option<&CapacitySnapshot>, + reason: &str, + ) -> bool { + let before_tokens = self.estimated_input_tokens(); + let canonical = self.build_canonical_state(turn, Some(reason)); + let source_message_ids = self.capacity_source_message_ids(turn); + let record = self.build_capacity_record( + turn, + GuardrailAction::VerifyAndReplan, + snapshot, + canonical.clone(), + source_message_ids, + None, + ); + let pointer = self + .persist_capacity_record(turn, GuardrailAction::VerifyAndReplan, &record) + .await; + + let latest_user = self + .session + .messages + .iter() + .rev() + .find(|msg| { + msg.role == "user" + && msg + .content + .iter() + .any(|block| matches!(block, ContentBlock::Text { .. })) + }) + .cloned(); + let latest_verified = self + .session + .messages + .iter() + .rev() + .find(|msg| { + msg.role == "user" + && msg.content.iter().any(|block| match block { + ContentBlock::ToolResult { content, .. } => { + content.contains("[verification replay]") + } + _ => false, + }) + }) + .cloned(); + + self.session.messages.clear(); + if let Some(msg) = latest_user { + self.session.messages.push(msg); + } + if let Some(msg) = latest_verified { + self.session.messages.push(msg); + } + + self.merge_compaction_summary(Some(self.canonical_prompt( + &canonical, + &pointer, + GuardrailAction::VerifyAndReplan, + Some("Replan now from canonical state. Keep steps minimal and verifiable."), + ))); + self.refresh_system_prompt(mode); + + let _ = self + .tx_event + .send(Event::status( + "Capacity guardrail: context reset to canonical state; replanning step." + .to_string(), + )) + .await; + + let after_tokens = self.estimated_input_tokens(); + self.emit_capacity_intervention( + turn, + GuardrailAction::VerifyAndReplan, + before_tokens, + after_tokens, + None, + true, + ) + .await; + self.capacity_controller + .mark_intervention_applied(self.turn_counter, GuardrailAction::VerifyAndReplan); + true + } + + fn select_replay_candidate( + &self, + turn: &TurnContext, + tool_registry: Option<&crate::tools::ToolRegistry>, + ) -> Option { + turn.tool_calls + .iter() + .rev() + .find(|call| { + call.error.is_none() + && call.result.is_some() + && self.tool_is_replayable_read_only(&call.name, tool_registry) + }) + .cloned() + } + + fn tool_is_replayable_read_only( + &self, + tool_name: &str, + tool_registry: Option<&crate::tools::ToolRegistry>, + ) -> bool { + if tool_name == MULTI_TOOL_PARALLEL_NAME || tool_name == REQUEST_USER_INPUT_NAME { + return false; + } + if McpPool::is_mcp_tool(tool_name) { + return mcp_tool_is_read_only(tool_name); + } + tool_registry + .and_then(|registry| registry.get(tool_name)) + .is_some_and(|spec| spec.is_read_only()) + } + + fn build_canonical_state(&self, turn: &TurnContext, note: Option<&str>) -> CanonicalState { + let goal = self + .session + .messages + .iter() + .rev() + .find_map(|msg| { + if msg.role != "user" { + return None; + } + msg.content.iter().find_map(|block| match block { + ContentBlock::Text { text, .. } => Some(summarize_text(text, 220)), + _ => None, + }) + }) + .unwrap_or_else(|| "Continue current task from compact state".to_string()); + + let mut constraints = vec![ + format!("model={}", self.session.model), + format!("workspace={}", self.session.workspace.display()), + ]; + if let Some(note) = note { + constraints.push(summarize_text(note, 180)); + } + + let mut confirmed_facts = Vec::new(); + for msg in self.session.messages.iter().rev() { + for block in &msg.content { + if let ContentBlock::ToolResult { content, .. } = block { + if content.starts_with("Error:") { + continue; + } + confirmed_facts.push(summarize_text(content, 180)); + if confirmed_facts.len() >= 4 { + break; + } + } + } + if confirmed_facts.len() >= 4 { + break; + } + } + + let open_loops: Vec = turn + .tool_calls + .iter() + .rev() + .filter_map(|call| { + call.error + .as_ref() + .map(|error| format!("{}: {}", call.name, summarize_text(error, 180))) + }) + .take(4) + .collect(); + + let pending_actions: Vec = if open_loops.is_empty() { + vec!["Continue with next smallest verifiable step".to_string()] + } else { + vec![ + "Re-evaluate failed tool steps with narrower scope".to_string(), + "Re-derive plan from canonical facts before further edits".to_string(), + ] + }; + + let mut critical_refs = self.session.working_set.top_paths(8); + for tool_call in turn.tool_calls.iter().rev().take(4) { + critical_refs.push(format!("tool:{}", tool_call.id)); + } + critical_refs.dedup(); + + CanonicalState { + goal, + constraints, + confirmed_facts, + open_loops, + pending_actions, + critical_refs, + } + } + + fn canonical_prompt( + &self, + canonical: &CanonicalState, + pointer: &str, + action: GuardrailAction, + extra: Option<&str>, + ) -> SystemPrompt { + let mut lines = vec![ + COMPACTION_SUMMARY_MARKER.to_string(), + format!("Capacity Canonical State [{}]", action.as_str()), + format!("Goal: {}", canonical.goal), + "Constraints:".to_string(), + ]; + for item in &canonical.constraints { + lines.push(format!("- {}", summarize_text(item, 200))); + } + lines.push("Confirmed Facts:".to_string()); + for item in &canonical.confirmed_facts { + lines.push(format!("- {}", summarize_text(item, 200))); + } + lines.push("Open Loops:".to_string()); + if canonical.open_loops.is_empty() { + lines.push("- none".to_string()); + } else { + for item in &canonical.open_loops { + lines.push(format!("- {}", summarize_text(item, 200))); + } + } + lines.push("Pending Actions:".to_string()); + for item in &canonical.pending_actions { + lines.push(format!("- {}", summarize_text(item, 200))); + } + lines.push("Critical Refs:".to_string()); + for item in &canonical.critical_refs { + lines.push(format!("- {}", summarize_text(item, 200))); + } + if let Some(extra) = extra { + lines.push(format!("Instruction: {}", summarize_text(extra, 240))); + } + lines.push(format!("Memory Pointer: {pointer}")); + + SystemPrompt::Blocks(vec![crate::models::SystemBlock { + block_type: "text".to_string(), + text: lines.join("\n"), + cache_control: None, + }]) + } + + fn capacity_source_message_ids(&self, turn: &TurnContext) -> Vec { + let mut ids: Vec = turn + .tool_calls + .iter() + .rev() + .take(8) + .map(|call| call.id.clone()) + .collect(); + ids.reverse(); + ids + } + + fn build_capacity_record( + &self, + turn: &TurnContext, + action: GuardrailAction, + snapshot: Option<&CapacitySnapshot>, + canonical: CanonicalState, + source_message_ids: Vec, + replay_info: Option, + ) -> CapacityMemoryRecord { + let (h_hat, c_hat, slack, risk_band) = snapshot + .map(|s| (s.h_hat, s.c_hat, s.slack, s.risk_band.as_str().to_string())) + .unwrap_or_else(|| (0.0, 0.0, 0.0, "unknown".to_string())); + + CapacityMemoryRecord { + id: new_record_id(), + ts: now_rfc3339(), + turn_index: self.turn_counter, + action_trigger: action.as_str().to_string(), + h_hat, + c_hat, + slack, + risk_band, + canonical_state: canonical, + source_message_ids: if source_message_ids.is_empty() { + vec![turn.id.clone()] + } else { + source_message_ids + }, + replay_info, + } + } + + async fn persist_capacity_record( + &mut self, + turn: &TurnContext, + action: GuardrailAction, + record: &CapacityMemoryRecord, + ) -> String { + let pointer = format!("memory://{}/{}", self.session.id, record.id); + if let Err(err) = append_capacity_record(&self.session.id, record) { + let _ = self + .tx_event + .send(Event::CapacityMemoryPersistFailed { + session_id: self.session.id.clone(), + turn_id: turn.id.clone(), + action: action.as_str().to_string(), + error: summarize_text(&err.to_string(), 280), + }) + .await; + return format!("{pointer}?persist=failed"); + } + pointer + } + + fn rehydrate_latest_canonical_state(&mut self) { + let Ok(records) = load_last_k_capacity_records(&self.session.id, 1) else { + return; + }; + let Some(last) = records.last() else { + return; + }; + let pointer = format!("memory://{}/{}", self.session.id, last.id); + let prompt = self.canonical_prompt( + &last.canonical_state, + &pointer, + GuardrailAction::NoIntervention, + Some("Rehydrated canonical state from memory."), + ); + self.merge_compaction_summary(Some(prompt)); + } + /// Get a reference to the session pub fn session(&self) -> &Session { &self.session @@ -2565,12 +3542,25 @@ impl Engine { .session .working_set .summary_block(&self.config.workspace); - - self.session.system_prompt = Some(prompts::system_prompt_for_mode_with_context( + let base = prompts::system_prompt_for_mode_with_context( mode, &self.config.workspace, working_set_summary.as_deref(), - )); + ); + self.session.system_prompt = + merge_system_prompts(Some(&base), self.session.compaction_summary_prompt.clone()); + } + + fn merge_compaction_summary(&mut self, summary_prompt: Option) { + if summary_prompt.is_none() { + return; + } + self.session.compaction_summary_prompt = merge_system_prompts( + self.session.compaction_summary_prompt.as_ref(), + summary_prompt.clone(), + ); + self.session.system_prompt = + merge_system_prompts(self.session.system_prompt.as_ref(), summary_prompt); } } @@ -2602,10 +3592,11 @@ pub(crate) fn mock_engine_handle() -> MockEngineHandle { let (tx_user_input, _rx_user_input) = mpsc::channel(32); let (tx_steer, rx_steer) = mpsc::channel(64); let cancel_token = CancellationToken::new(); + let shared_cancel_token = Arc::new(StdMutex::new(cancel_token.clone())); let handle = EngineHandle { tx_op, rx_event: Arc::new(RwLock::new(rx_event)), - cancel_token: cancel_token.clone(), + cancel_token: shared_cancel_token, tx_approval, tx_user_input, tx_steer, diff --git a/src/core/engine/tests.rs b/src/core/engine/tests.rs index ad210ad1..668138c8 100644 --- a/src/core/engine/tests.rs +++ b/src/core/engine/tests.rs @@ -1,8 +1,17 @@ use super::*; use serde_json::json; +use std::fs; use std::path::PathBuf; use std::time::Instant; +use tempfile::tempdir; + +fn build_engine_with_capacity(capacity: CapacityControllerConfig) -> Engine { + let mut engine_config = EngineConfig::default(); + engine_config.capacity = capacity; + let (engine, _handle) = Engine::new(engine_config, &Config::default()); + engine +} fn make_plan( read_only: bool, @@ -23,6 +32,19 @@ fn make_plan( } } +#[test] +fn engine_handle_cancel_tracks_latest_turn_token() { + let (mut engine, handle) = Engine::new(EngineConfig::default(), &Config::default()); + let stale_token = engine.cancel_token.clone(); + + engine.reset_cancel_token(); + handle.cancel(); + + assert!(engine.cancel_token.is_cancelled()); + assert!(handle.is_cancelled()); + assert!(!stale_token.is_cancelled()); +} + #[test] fn parallel_batch_requires_read_only_parallel_tools() { let plans = vec![make_plan(true, true, false, false)]; @@ -92,3 +114,200 @@ fn context_budget_reserves_output_and_headroom() { let expected = 128_000usize - 4_096usize - 1_024usize; assert_eq!(budget, expected); } + +#[tokio::test] +async fn pre_request_refresh_invoked_when_medium_risk() { + let mut capacity = CapacityControllerConfig::default(); + capacity.enabled = true; + capacity.low_risk_max = 0.0; + capacity.medium_risk_max = 1.0; + capacity.min_turns_before_guardrail = 0; + + let mut engine = build_engine_with_capacity(capacity.clone()); + engine.config.capacity = capacity.clone(); + engine.capacity_controller = CapacityController::new(capacity); + engine.turn_counter = 5; + engine + .capacity_controller + .mark_turn_start(engine.turn_counter); + + let long = "x".repeat(5_000); + for _ in 0..200 { + engine.session.messages.push(Message { + role: "user".to_string(), + content: vec![ContentBlock::Text { + text: long.clone(), + cache_control: None, + }], + }); + } + + let before = engine.estimated_input_tokens(); + let turn = TurnContext::new(10); + let applied = engine + .run_capacity_pre_request_checkpoint(&turn, None, AppMode::Agent) + .await; + let after = engine.estimated_input_tokens(); + + assert!(applied); + assert!(after < before); +} + +#[tokio::test] +async fn post_tool_replay_invoked_when_high_non_severe_risk() { + let tmp = tempdir().expect("tempdir"); + fs::write(tmp.path().join("sample.txt"), "hello replay").expect("write"); + + let mut capacity = CapacityControllerConfig::default(); + capacity.enabled = true; + capacity.low_risk_max = 0.0; + capacity.medium_risk_max = 0.0; + capacity.severe_min_slack = -10.0; + capacity.severe_violation_ratio = 2.0; + capacity.min_turns_before_guardrail = 0; + + let mut engine = build_engine_with_capacity(capacity.clone()); + engine.session.workspace = tmp.path().to_path_buf(); + engine.config.workspace = tmp.path().to_path_buf(); + engine.config.capacity = capacity.clone(); + engine.capacity_controller = CapacityController::new(capacity); + engine.turn_counter = 4; + engine + .capacity_controller + .mark_turn_start(engine.turn_counter); + + let mut turn = TurnContext::new(10); + let mut tool_call = TurnToolCall::new( + "tool_read_1".to_string(), + "read_file".to_string(), + json!({ "path": "sample.txt" }), + ); + tool_call.set_result( + "hello replay".to_string(), + std::time::Duration::from_millis(1), + ); + turn.record_tool_call(tool_call); + + let registry = ToolRegistryBuilder::new() + .with_read_only_file_tools() + .build(engine.build_tool_context(AppMode::Agent)); + + let restarted = engine + .run_capacity_post_tool_checkpoint( + &turn, + AppMode::Agent, + Some(®istry), + Arc::new(RwLock::new(())), + None, + 0, + 0, + ) + .await; + + assert!(!restarted); + let has_verification_note = engine.session.messages.iter().any(|msg| { + msg.content.iter().any(|block| match block { + ContentBlock::ToolResult { content, .. } => content.contains("[verification replay]"), + _ => false, + }) + }); + assert!(has_verification_note); +} + +#[tokio::test] +async fn error_escalation_triggers_replan_when_severe_or_repeated_failures() { + let tmp = tempdir().expect("tempdir"); + // Safety: scoped to test process; reset at end. + unsafe { + std::env::set_var( + "DEEPSEEK_CAPACITY_MEMORY_DIR", + tmp.path().to_string_lossy().to_string(), + ); + } + + let mut capacity = CapacityControllerConfig::default(); + capacity.enabled = true; + capacity.low_risk_max = 0.0; + capacity.medium_risk_max = 0.0; + capacity.min_turns_before_guardrail = 0; + + let mut engine = build_engine_with_capacity(capacity.clone()); + engine.config.capacity = capacity.clone(); + engine.capacity_controller = CapacityController::new(capacity); + engine.turn_counter = 6; + engine + .capacity_controller + .mark_turn_start(engine.turn_counter); + + for i in 0..10 { + engine.session.messages.push(Message { + role: if i % 2 == 0 { "user" } else { "assistant" }.to_string(), + content: vec![ContentBlock::Text { + text: format!("noise message {i}"), + cache_control: None, + }], + }); + } + engine.session.messages.push(Message { + role: "user".to_string(), + content: vec![ContentBlock::Text { + text: "Please finish task".to_string(), + cache_control: None, + }], + }); + + let before_len = engine.session.messages.len(); + let turn = TurnContext::new(10); + let restarted = engine + .run_capacity_error_escalation_checkpoint(&turn, AppMode::Agent, 2, 2) + .await; + + assert!(restarted); + assert!(engine.session.messages.len() < before_len); + assert!(engine.session.messages.len() <= 2); + + let records = load_last_k_capacity_records(&engine.session.id, 1).expect("load memory"); + assert!(!records.is_empty()); + assert!(!records[0].canonical_state.goal.is_empty()); + unsafe { + std::env::remove_var("DEEPSEEK_CAPACITY_MEMORY_DIR"); + } +} + +#[tokio::test] +async fn controller_disabled_keeps_behavior_unchanged() { + let mut capacity = CapacityControllerConfig::default(); + capacity.enabled = false; + + let mut engine = build_engine_with_capacity(capacity.clone()); + engine.config.capacity = capacity.clone(); + engine.capacity_controller = CapacityController::new(capacity); + engine.turn_counter = 3; + engine + .capacity_controller + .mark_turn_start(engine.turn_counter); + + let long = "y".repeat(5_000); + for _ in 0..120 { + engine.session.messages.push(Message { + role: "user".to_string(), + content: vec![ContentBlock::Text { + text: long.clone(), + cache_control: None, + }], + }); + } + + let before = engine.estimated_input_tokens(); + let before_len = engine.session.messages.len(); + let turn = TurnContext::new(10); + let applied = engine + .run_capacity_pre_request_checkpoint(&turn, None, AppMode::Agent) + .await; + let after = engine.estimated_input_tokens(); + let after_len = engine.session.messages.len(); + + assert!(!applied); + assert_eq!(before, after); + assert_eq!(before_len, after_len); +} diff --git a/src/core/events.rs b/src/core/events.rs index e3e6eb0f..b3edb9a9 100644 --- a/src/core/events.rs +++ b/src/core/events.rs @@ -90,6 +90,42 @@ pub enum Event { message: String, }, + /// Capacity decision telemetry. + CapacityDecision { + session_id: String, + turn_id: String, + h_hat: f64, + c_hat: f64, + slack: f64, + min_slack: f64, + violation_ratio: f64, + p_fail: f64, + risk_band: String, + action: String, + cooldown_blocked: bool, + reason: String, + }, + + /// Capacity intervention telemetry. + CapacityIntervention { + session_id: String, + turn_id: String, + action: String, + before_prompt_tokens: usize, + after_prompt_tokens: usize, + compaction_size_reduction: usize, + replay_outcome: Option, + replan_performed: bool, + }, + + /// Capacity memory persistence failure telemetry. + CapacityMemoryPersistFailed { + session_id: String, + turn_id: String, + action: String, + error: String, + }, + // === Sub-Agent Events === /// A sub-agent has been spawned AgentSpawned { id: String, prompt: String }, diff --git a/src/core/mod.rs b/src/core/mod.rs index 6ef5c3ee..5eecbf3a 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -11,6 +11,8 @@ #![allow(dead_code)] +pub mod capacity; +pub mod capacity_memory; pub mod engine; pub mod events; pub mod ops; diff --git a/src/core/session.rs b/src/core/session.rs index 622613b0..a3b5c62a 100644 --- a/src/core/session.rs +++ b/src/core/session.rs @@ -18,6 +18,8 @@ pub struct Session { /// System prompt (optional) pub system_prompt: Option, + /// Persisted summary blocks generated by context compaction. + pub compaction_summary_prompt: Option, /// Conversation history (API format) pub messages: Vec, @@ -88,6 +90,7 @@ impl Session { model, workspace, system_prompt: None, + compaction_summary_prompt: None, messages: Vec::new(), total_usage: SessionUsage::default(), allow_shell, @@ -125,6 +128,7 @@ impl Session { /// Clear the conversation history pub fn clear(&mut self) { self.messages.clear(); + self.compaction_summary_prompt = None; } /// Get the message count diff --git a/src/main.rs b/src/main.rs index 50ee224f..e44c4ba8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2108,10 +2108,17 @@ async fn run_exec_agent( use crate::core::engine::{EngineConfig, spawn_engine}; use crate::core::events::Event; use crate::core::ops::Op; + use crate::models::{compaction_message_threshold_for_model, compaction_threshold_for_model}; use crate::tools::plan::new_shared_plan_state; use crate::tools::todo::new_shared_todo_list; use crate::tui::app::AppMode; + let mut compaction = CompactionConfig::default(); + compaction.enabled = true; + compaction.model = model.to_string(); + compaction.token_threshold = compaction_threshold_for_model(model); + compaction.message_threshold = compaction_message_threshold_for_model(model); + let engine_config = EngineConfig { model: model.to_string(), workspace: workspace.clone(), @@ -2122,7 +2129,8 @@ async fn run_exec_agent( max_steps: 100, max_subagents, features: config.features(), - compaction: CompactionConfig::default(), + compaction, + capacity: crate::core::capacity::CapacityControllerConfig::from_app_config(config), todos: new_shared_todo_list(), plan_state: new_shared_plan_state(), }; diff --git a/src/runtime_api.rs b/src/runtime_api.rs index fe7f1117..e9aee111 100644 --- a/src/runtime_api.rs +++ b/src/runtime_api.rs @@ -802,11 +802,13 @@ mod tests { } } - async fn spawn_test_server() -> Result<( - SocketAddr, - SharedRuntimeThreadManager, - tokio::task::JoinHandle<()>, - )> { + async fn spawn_test_server() -> Result< + Option<( + SocketAddr, + SharedRuntimeThreadManager, + tokio::task::JoinHandle<()>, + )>, + > { let root = std::env::temp_dir().join(format!("deepseek-runtime-api-{}", Uuid::new_v4())); let sessions_dir = root.join("sessions"); fs::create_dir_all(&sessions_dir)?; @@ -824,8 +826,24 @@ mod tests { Arc::new(MockExecutor), ) .await?; + let mut config = Config::default(); + config.capacity = Some(crate::config::CapacityConfig { + enabled: Some(false), + low_risk_max: None, + medium_risk_max: None, + severe_min_slack: None, + severe_violation_ratio: None, + refresh_cooldown_turns: None, + replan_cooldown_turns: None, + max_replay_per_turn: None, + min_turns_before_guardrail: None, + profile_window: None, + deepseek_v3_2_chat_prior: None, + deepseek_v3_2_reasoner_prior: None, + fallback_default_prior: None, + }); let runtime_threads: SharedRuntimeThreadManager = Arc::new(RuntimeThreadManager::open( - Config::default(), + config, PathBuf::from("."), RuntimeThreadManagerConfig::from_task_data_dir(root.join("runtime")), )?); @@ -838,12 +856,16 @@ mod tests { sessions_dir, }; let app = build_router(state); - let listener = TcpListener::bind("127.0.0.1:0").await?; + let listener = match TcpListener::bind("127.0.0.1:0").await { + Ok(listener) => listener, + Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => return Ok(None), + Err(err) => return Err(err.into()), + }; let addr = listener.local_addr()?; let handle = tokio::spawn(async move { let _ = axum::serve(listener, app).await; }); - Ok((addr, runtime_threads, handle)) + Ok(Some((addr, runtime_threads, handle))) } async fn read_first_sse_frame(resp: reqwest::Response) -> Result { @@ -925,7 +947,9 @@ mod tests { #[tokio::test] async fn health_and_tasks_endpoints_work() -> Result<()> { - let (addr, _runtime_threads, handle) = spawn_test_server().await?; + let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else { + return Ok(()); + }; let client = reqwest::Client::new(); let health: serde_json::Value = client @@ -983,7 +1007,9 @@ mod tests { #[tokio::test] async fn stream_requires_prompt() -> Result<()> { - let (addr, _runtime_threads, handle) = spawn_test_server().await?; + let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else { + return Ok(()); + }; let client = reqwest::Client::new(); let resp = client @@ -998,7 +1024,9 @@ mod tests { #[tokio::test] async fn thread_endpoints_expose_lifecycle_contract() -> Result<()> { - let (addr, _runtime_threads, handle) = spawn_test_server().await?; + let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else { + return Ok(()); + }; let client = reqwest::Client::new(); let created: serde_json::Value = client @@ -1130,7 +1158,9 @@ mod tests { #[tokio::test] async fn events_endpoint_respects_since_seq_cursor() -> Result<()> { - let (addr, _runtime_threads, handle) = spawn_test_server().await?; + let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else { + return Ok(()); + }; let client = reqwest::Client::new(); let created: serde_json::Value = client @@ -1207,7 +1237,9 @@ mod tests { #[tokio::test] async fn steer_and_interrupt_endpoints_work_on_active_turn() -> Result<()> { - let (addr, runtime_threads, handle) = spawn_test_server().await?; + let Some((addr, runtime_threads, handle)) = spawn_test_server().await? else { + return Ok(()); + }; let client = reqwest::Client::new(); let created: serde_json::Value = client @@ -1425,7 +1457,9 @@ mod tests { #[tokio::test] async fn stream_endpoint_remains_backward_compatible() -> Result<()> { - let (addr, _runtime_threads, handle) = spawn_test_server().await?; + let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else { + return Ok(()); + }; let client = reqwest::Client::new(); let resp = client diff --git a/src/runtime_threads.rs b/src/runtime_threads.rs index 7d3c768a..e26bf3ed 100644 --- a/src/runtime_threads.rs +++ b/src/runtime_threads.rs @@ -22,7 +22,10 @@ use crate::config::{Config, DEFAULT_TEXT_MODEL, MAX_SUBAGENTS}; use crate::core::engine::{EngineConfig, EngineHandle, spawn_engine}; use crate::core::events::{Event as EngineEvent, TurnOutcomeStatus}; use crate::core::ops::Op; -use crate::models::{ContentBlock, Message, Usage}; +use crate::models::{ + ContentBlock, Message, Usage, compaction_message_threshold_for_model, + compaction_threshold_for_model, +}; use crate::tools::plan::new_shared_plan_state; use crate::tools::todo::new_shared_todo_list; use crate::tui::app::AppMode; @@ -1150,7 +1153,11 @@ impl RuntimeThreadManager { } } - let compaction = CompactionConfig::default(); + let mut compaction = CompactionConfig::default(); + compaction.enabled = true; + compaction.model = thread.model.clone(); + compaction.token_threshold = compaction_threshold_for_model(&thread.model); + compaction.message_threshold = compaction_message_threshold_for_model(&thread.model); let engine_cfg = EngineConfig { model: thread.model.clone(), workspace: thread.workspace.clone(), @@ -1162,6 +1169,9 @@ impl RuntimeThreadManager { max_subagents: self.config.max_subagents().clamp(1, MAX_SUBAGENTS), features: self.config.features(), compaction, + capacity: crate::core::capacity::CapacityControllerConfig::from_app_config( + &self.config, + ), todos: new_shared_todo_list(), plan_state: new_shared_plan_state(), }; @@ -1471,6 +1481,99 @@ impl RuntimeThreadManager { .await?; } } + EngineEvent::CapacityDecision { + risk_band, + action, + reason, + .. + } => { + let message = format!( + "Capacity decision: risk={risk_band} action={action} reason={reason}" + ); + let item = TurnItemRecord { + schema_version: CURRENT_RUNTIME_SCHEMA_VERSION, + id: format!("item_{}", &Uuid::new_v4().to_string()[..8]), + turn_id: turn_id.clone(), + kind: TurnItemKind::Status, + status: TurnItemLifecycleStatus::Completed, + summary: summarize_text(&message, SUMMARY_LIMIT), + detail: Some(message), + artifact_refs: Vec::new(), + started_at: Some(Utc::now()), + ended_at: Some(Utc::now()), + }; + self.store.save_item(&item)?; + self.attach_item_to_turn(&turn_id, &item.id)?; + self.emit_event( + &thread_id, + Some(&turn_id), + Some(&item.id), + "item.completed", + json!({ "item": item }), + ) + .await?; + } + EngineEvent::CapacityIntervention { + action, + before_prompt_tokens, + after_prompt_tokens, + replay_outcome, + replan_performed, + .. + } => { + let message = format!( + "Capacity intervention: {action} (~{before_prompt_tokens} -> ~{after_prompt_tokens}) replay={:?} replan={replan_performed}", + replay_outcome + ); + let item = TurnItemRecord { + schema_version: CURRENT_RUNTIME_SCHEMA_VERSION, + id: format!("item_{}", &Uuid::new_v4().to_string()[..8]), + turn_id: turn_id.clone(), + kind: TurnItemKind::Status, + status: TurnItemLifecycleStatus::Completed, + summary: summarize_text(&message, SUMMARY_LIMIT), + detail: Some(message), + artifact_refs: Vec::new(), + started_at: Some(Utc::now()), + ended_at: Some(Utc::now()), + }; + self.store.save_item(&item)?; + self.attach_item_to_turn(&turn_id, &item.id)?; + self.emit_event( + &thread_id, + Some(&turn_id), + Some(&item.id), + "item.completed", + json!({ "item": item }), + ) + .await?; + } + EngineEvent::CapacityMemoryPersistFailed { action, error, .. } => { + let message = + format!("Capacity memory persist failed: action={action} error={error}"); + let item = TurnItemRecord { + schema_version: CURRENT_RUNTIME_SCHEMA_VERSION, + id: format!("item_{}", &Uuid::new_v4().to_string()[..8]), + turn_id: turn_id.clone(), + kind: TurnItemKind::Status, + status: TurnItemLifecycleStatus::Failed, + summary: summarize_text(&message, SUMMARY_LIMIT), + detail: Some(message), + artifact_refs: Vec::new(), + started_at: Some(Utc::now()), + ended_at: Some(Utc::now()), + }; + self.store.save_item(&item)?; + self.attach_item_to_turn(&turn_id, &item.id)?; + self.emit_event( + &thread_id, + Some(&turn_id), + Some(&item.id), + "item.failed", + json!({ "item": item }), + ) + .await?; + } EngineEvent::ApprovalRequired { id, tool_name, diff --git a/src/tui/history.rs b/src/tui/history.rs index e2698c63..20ad76ae 100644 --- a/src/tui/history.rs +++ b/src/tui/history.rs @@ -19,6 +19,8 @@ use crate::tui::markdown_render; const TOOL_COMMAND_LINE_LIMIT: usize = 5; const TOOL_OUTPUT_LINE_LIMIT: usize = 12; const TOOL_TEXT_LIMIT: usize = 240; +const TOOL_RUNNING_SYMBOLS: [&str; 3] = ["◌", "◍", "◉"]; +const TOOL_STATUS_SYMBOL_MS: u64 = 900; // === History Cells === @@ -52,33 +54,14 @@ impl HistoryCell { pub fn lines(&self, width: u16) -> Vec> { match self { HistoryCell::User { content } => render_message("You", content, user_style(), width), - HistoryCell::Assistant { content, streaming } => { - let mut lines = render_message("Answer", content, assistant_style(), width); - if *streaming { - // Add blinking cursor to last line - if let Some(last) = lines.last_mut() { - last.spans.push(Span::styled( - "▋", - Style::default().fg(palette::DEEPSEEK_SKY), - )); - } - } - lines + HistoryCell::Assistant { content, .. } => { + render_message("Answer", content, assistant_style(), width) } HistoryCell::System { content } => { render_message("System", content, system_style(), width) } HistoryCell::Thinking { content, streaming } => { - let mut lines = render_thinking(content, width, *streaming); - if *streaming { - if let Some(last) = lines.last_mut() { - last.spans.push(Span::styled( - "▋", - Style::default().fg(palette::DEEPSEEK_SKY), - )); - } - } - lines + render_thinking(content, width, *streaming) } HistoryCell::Tool(cell) => cell.lines(width), } @@ -1277,11 +1260,13 @@ fn status_symbol(started_at: Option, status: ToolStatus) -> String { match status { ToolStatus::Running => { let elapsed_ms = started_at.map_or(0, |t| t.elapsed().as_millis()); - if (elapsed_ms / 900).is_multiple_of(2) { - "*".to_string() + let cycle = u128::from(TOOL_STATUS_SYMBOL_MS); + let idx = if cycle == 0 { + 0 } else { - ".".to_string() - } + (elapsed_ms / cycle) % (TOOL_RUNNING_SYMBOLS.len() as u128) + }; + TOOL_RUNNING_SYMBOLS[usize::try_from(idx).unwrap_or_default()].to_string() } ToolStatus::Success => "o".to_string(), ToolStatus::Failed => "x".to_string(), diff --git a/src/tui/paste_burst.rs b/src/tui/paste_burst.rs index c0ee03ca..35ee9ad3 100644 --- a/src/tui/paste_burst.rs +++ b/src/tui/paste_burst.rs @@ -130,6 +130,20 @@ impl PasteBurst { } } + /// Return the remaining delay before a pending char/paste buffer must flush. + /// + /// This lets the UI event loop avoid sleeping past the flush deadline. + #[must_use] + pub fn next_flush_delay(&self, now: Instant) -> Option { + let last = self.last_plain_char_time?; + let timeout = if self.is_active_internal() { + PASTE_BURST_ACTIVE_IDLE_TIMEOUT + } else { + PASTE_BURST_CHAR_INTERVAL + }; + Some(timeout.saturating_sub(now.duration_since(last))) + } + pub fn append_newline_if_active(&mut self, now: Instant) -> bool { if self.is_active() { self.buffer.push('\n'); @@ -295,4 +309,20 @@ mod tests { assert_eq!(burst.flush_before_modified_input(), Some("a".to_string())); assert!(!burst.is_active()); } + + #[test] + fn next_flush_delay_counts_down_to_zero() { + let mut burst = PasteBurst::default(); + let t0 = Instant::now(); + let _ = burst.on_plain_char('a', t0); + + let almost_due = t0 + Duration::from_millis(7); + let remaining = burst + .next_flush_delay(almost_due) + .expect("delay should exist"); + assert!(remaining <= Duration::from_millis(1)); + + let due = t0 + Duration::from_millis(20); + assert_eq!(burst.next_flush_delay(due), Some(Duration::ZERO)); + } } diff --git a/src/tui/ui.rs b/src/tui/ui.rs index 6b7d5341..d59f7100 100644 --- a/src/tui/ui.rs +++ b/src/tui/ui.rs @@ -27,7 +27,7 @@ use unicode_width::{UnicodeWidthChar, UnicodeWidthStr}; use crate::audit::log_sensitive_event; use crate::commands; -use crate::compaction::estimate_tokens; +use crate::compaction::estimate_input_tokens_conservative; use crate::config::Config; use crate::core::engine::{EngineConfig, EngineHandle, spawn_engine}; use crate::core::events::Event as EngineEvent; @@ -83,6 +83,11 @@ use super::widgets::{ChatWidget, ComposerWidget, HeaderData, HeaderWidget, Rende const MAX_QUEUED_PREVIEW: usize = 3; const CONTEXT_WARNING_THRESHOLD_PERCENT: f64 = 85.0; const CONTEXT_CRITICAL_THRESHOLD_PERCENT: f64 = 85.0; +const UI_IDLE_POLL_MS: u64 = 33; +const UI_ACTIVE_POLL_MS: u64 = 16; +const UI_DEEPSEEK_SQUIGGLE_MS: u64 = 120; +const UI_TYPING_INDICATOR_MS: u64 = 120; +const UI_STATUS_ANIMATION_MS: u64 = UI_DEEPSEEK_SQUIGGLE_MS; /// Run the interactive TUI event loop. /// @@ -166,27 +171,6 @@ pub async fn run_tui(config: &Config, options: TuiOptions) -> Result<()> { app.status_message = Some(format!("Failed to load session: {e}")); } } - } else if let Ok(manager) = SessionManager::default_location() { - // Crash recovery: restore in-flight checkpoint when no explicit session was requested. - match manager.load_checkpoint() { - Ok(Some(checkpoint)) => { - apply_loaded_session(&mut app, &checkpoint); - app.history.insert( - 0, - HistoryCell::System { - content: - "Recovered from crash checkpoint; resume by sending your next message." - .to_string(), - }, - ); - app.mark_history_updated(); - app.status_message = Some("Recovered checkpoint session".to_string()); - } - Ok(None) => {} - Err(err) => { - app.status_message = Some(format!("Failed to restore checkpoint: {err}")); - } - } } if let Ok(manager) = SessionManager::default_location() { @@ -298,6 +282,7 @@ fn build_engine_config(app: &App, config: &Config) -> EngineConfig { max_subagents: app.max_subagents, features: config.features(), compaction: app.compaction_config(), + capacity: crate::core::capacity::CapacityControllerConfig::from_app_config(config), todos: app.todos.clone(), plan_state: app.plan_state.clone(), } @@ -318,19 +303,26 @@ async fn run_event_loop( let mut last_task_refresh = Instant::now() .checked_sub(Duration::from_secs(2)) .unwrap_or_else(Instant::now); + let mut last_status_frame = Instant::now() + .checked_sub(Duration::from_millis(UI_STATUS_ANIMATION_MS)) + .unwrap_or_else(Instant::now); loop { if last_task_refresh.elapsed() >= Duration::from_millis(2500) { let tasks = task_manager.list_tasks(Some(10)).await; app.task_panel = tasks.into_iter().map(task_summary_to_panel_entry).collect(); last_task_refresh = Instant::now(); + app.needs_redraw = true; } // First, poll for engine events (non-blocking) + let mut received_engine_event = false; + let mut transcript_batch_updated = false; let mut queued_to_send: Option = None; { let mut rx = engine_handle.rx_event.write().await; while let Ok(event) = rx.try_recv() { + received_engine_event = true; match event { EngineEvent::MessageStarted { .. } => { current_streaming_text.clear(); @@ -356,9 +348,9 @@ async fn run_event_loop( if let Some(cell) = app.history.get_mut(index) { if let HistoryCell::Assistant { content, .. } = cell { - content.clone_from(¤t_streaming_text); + content.push_str(&sanitized); } - app.mark_history_updated(); + transcript_batch_updated = true; } } EngineEvent::MessageComplete { .. } => { @@ -367,7 +359,7 @@ async fn run_event_loop( app.history.get_mut(index) { *streaming = false; - app.mark_history_updated(); + transcript_batch_updated = true; } let mut blocks = Vec::new(); @@ -426,6 +418,7 @@ async fn run_event_loop( app.history.get_mut(index) { c.push_str(&sanitized); + transcript_batch_updated = true; } } } @@ -435,6 +428,7 @@ async fn run_event_loop( app.history.get_mut(index) { *streaming = false; + transcript_batch_updated = true; } } @@ -478,6 +472,7 @@ async fn run_event_loop( app.pending_tool_uses.clear(); app.plan_tool_used_in_turn = false; persist_checkpoint(app); + last_status_frame = Instant::now(); } EngineEvent::TurnComplete { usage, @@ -558,6 +553,31 @@ async fn run_event_loop( EngineEvent::CompactionFailed { message, .. } => { app.status_message = Some(message); } + EngineEvent::CapacityDecision { + risk_band, + action, + reason, + .. + } => { + app.status_message = Some(format!( + "Capacity decision: risk={risk_band} action={action} ({reason})" + )); + } + EngineEvent::CapacityIntervention { + action, + before_prompt_tokens, + after_prompt_tokens, + .. + } => { + app.status_message = Some(format!( + "Capacity intervention: {action} (~{before_prompt_tokens} -> ~{after_prompt_tokens} tokens)" + )); + } + EngineEvent::CapacityMemoryPersistFailed { action, error, .. } => { + app.status_message = Some(format!( + "Capacity memory persist failed ({action}): {error}" + )); + } EngineEvent::PauseEvents => { if !event_broker.is_paused() { pause_terminal(terminal, app.use_alt_screen)?; @@ -732,6 +752,12 @@ async fn run_event_loop( } } } + if transcript_batch_updated { + app.mark_history_updated(); + } + if received_engine_event { + app.needs_redraw = true; + } if let Some(next) = queued_to_send { if let Err(err) = dispatch_user_message(app, &engine_handle, next.clone()).await { @@ -741,30 +767,56 @@ async fn run_event_loop( app.queued_message_count() )); } + + app.needs_redraw = true; } let queue_state = (app.queued_messages.clone(), app.queued_draft.clone()); if queue_state != last_queue_state { persist_offline_queue_state(app); last_queue_state = queue_state; + app.needs_redraw = true; } if !app.view_stack.is_empty() { let events = app.view_stack.tick(); + if !events.is_empty() { + app.needs_redraw = true; + } handle_view_events(app, &engine_handle, events).await; } + if app.is_loading + && last_status_frame.elapsed() >= Duration::from_millis(UI_STATUS_ANIMATION_MS) + { + app.needs_redraw = true; + last_status_frame = Instant::now(); + } + if event_broker.is_paused() { tokio::time::sleep(std::time::Duration::from_millis(50)).await; continue; } - app.flush_paste_burst_if_due(Instant::now()); + let now = Instant::now(); + app.flush_paste_burst_if_due(now); - terminal.draw(|f| render(f, app))?; // app is &mut + if app.needs_redraw { + terminal.draw(|f| render(f, app))?; // app is &mut + app.needs_redraw = false; + } - if event::poll(std::time::Duration::from_millis(120))? { + let mut poll_timeout = if app.is_loading { + Duration::from_millis(UI_ACTIVE_POLL_MS) + } else { + Duration::from_millis(UI_IDLE_POLL_MS) + }; + if let Some(until_flush) = app.paste_burst.next_flush_delay(now) { + poll_timeout = poll_timeout.min(until_flush); + } + if event::poll(poll_timeout)? { let evt = event::read()?; + app.needs_redraw = true; // Handle bracketed paste events if let Event::Paste(text) = &evt { @@ -1075,6 +1127,8 @@ async fn run_event_loop( model, workspace, } => { + let is_full_reset = + messages.is_empty() && system_prompt.is_none(); let _ = engine_handle .send(Op::SyncSession { messages, @@ -1088,6 +1142,10 @@ async fn run_event_loop( config: app.compaction_config(), }) .await; + if is_full_reset { + persist_session_snapshot(app); + clear_checkpoint(); + } } AppAction::SendMessage(content) => { let queued = build_queued_message(app, content); @@ -2446,7 +2504,7 @@ fn render_status_indicator(f: &mut Frame, area: Rect, app: &App, queued: &[Strin None }; let elapsed = app.turn_started_at.map(format_elapsed); - // Use typing indicator when streaming content, otherwise use whale spinner + // Use typing indicator when streaming content, otherwise use a subtle status glyph. let has_streaming_content = app.streaming_message_index.is_some(); let spinner = if has_streaming_content { typing_indicator(app.turn_started_at) @@ -2743,23 +2801,11 @@ fn prompt_for_mode(mode: AppMode) -> &'static str { } fn estimated_context_tokens(app: &App) -> Option { - let mut total = estimate_tokens(&app.api_messages); - - match &app.system_prompt { - Some(SystemPrompt::Text(text)) => total = total.saturating_add(estimate_text_tokens(text)), - Some(SystemPrompt::Blocks(blocks)) => { - for block in blocks { - total = total.saturating_add(estimate_text_tokens(&block.text)); - } - } - None => {} - } - - i64::try_from(total).ok() -} - -fn estimate_text_tokens(text: &str) -> usize { - text.chars().count().div_ceil(4) + i64::try_from(estimate_input_tokens_conservative( + &app.api_messages, + app.system_prompt.as_ref(), + )) + .ok() } fn context_usage_snapshot(app: &App) -> Option<(i64, u32, f64)> { @@ -2826,21 +2872,19 @@ fn format_elapsed(start: Instant) -> String { } fn deepseek_squiggle(start: Option) -> &'static str { - const FRAMES: [&str; 12] = [ - "🐳", "🐳.", "🐳..", "🐳...", "🐳..", "🐳.", "🐋", "🐋.", "🐋..", "🐋...", "🐋..", "🐋.", - ]; + const FRAMES: [&str; 6] = ["◍", "◉", "◌", "◌", "◉", "◍"]; let elapsed_ms = start.map_or(0, |t| t.elapsed().as_millis()); - let idx = ((elapsed_ms / 420) as usize) % FRAMES.len(); + let idx = ((elapsed_ms / u128::from(UI_DEEPSEEK_SQUIGGLE_MS)) as usize) % FRAMES.len(); FRAMES[idx] } /// Braille pattern frames for typing/thinking indicator animation. -const TYPING_FRAMES: &[&str] = &["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]; +const TYPING_FRAMES: &[&str] = &["⠁", "⠂", "⠄", "⠂"]; /// Returns the typing indicator frame based on elapsed time. fn typing_indicator(start: Option) -> &'static str { let elapsed_ms = start.map_or(0, |t| t.elapsed().as_millis()); - let idx = ((elapsed_ms / 220) as usize) % TYPING_FRAMES.len(); + let idx = ((elapsed_ms / u128::from(UI_TYPING_INDICATOR_MS)) as usize) % TYPING_FRAMES.len(); TYPING_FRAMES[idx] }