Merge pull request #684 from Hmbown/feat/v0.8.11-compaction-v4-overhaul

feat(v0.8.11): cache-maxing overhaul — per-turn rebuild gate + working_set extraction + tool anchor
This commit is contained in:
Hunter Bown
2026-05-04 22:51:29 -05:00
committed by GitHub
17 changed files with 1330 additions and 344 deletions
+36
View File
@@ -5,6 +5,42 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.8.11] - Unreleased
### Changed
- **Cache-maxing prompt path for DeepSeek V4** — the engine now skips
system-prompt reassignment when the assembled stable prompt is unchanged,
keeps the volatile repo working-set summary out of the system prompt, and
injects it as per-turn metadata on the latest user message instead.
- **Tool catalog cache anchor** — the model-visible tool array now marks
the final native tool with `cache_control: ephemeral` so DeepSeek can
anchor the stable tool prefix explicitly.
- **V4-scale automatic compaction defaults** — automatic compaction keeps a
500K-token hard floor and the fallback compaction threshold now reflects
the V4-scale late-trigger policy instead of the old 50K-era default.
- **Token-only compaction trigger** — the message-count compaction trigger
was a 128K-era heuristic that fired on long sessions of small messages
— exactly the case where rewriting V4's prefix cache is most wasteful.
Removed `CompactionConfig::message_threshold` and the message-count
branch in `should_compact`; token budget is now the sole automatic
trigger (gated by the 500K floor). Manual `/compact` is unchanged.
### Fixed
- **Legacy 128K context naming** — the 128K fallback is now named and
documented as legacy DeepSeek-only behavior, reducing ambiguity with the
1M-token DeepSeek V4 defaults.
- **`npm install` resilience for slow / firewalled networks** — the
postinstall binary fetch from GitHub Releases now retries on transient
errors (5 attempts, 1-16 s exponential backoff with jitter), enforces a
per-attempt timeout (default 5 min, configurable via
`DEEPSEEK_TUI_DOWNLOAD_TIMEOUT_MS`) plus a 30 s stall detector, honors
`HTTPS_PROXY` / `HTTP_PROXY` / `NO_PROXY` env vars (pure-Node CONNECT
tunneling, no new dependencies), and prints a download-progress line
to stderr so users know it isn't hung. Suppressible with
`DEEPSEEK_TUI_QUIET_INSTALL=1`. Reported by a community user from China
whose install through a CN npm mirror took 18 minutes — the bottleneck
was the GitHub fetch, which CN npm mirrors do not proxy.
## [0.8.10] - 2026-05-04
A patch release: hotfixes, small UX polish, and four whalescale-unblocking
+147 -84
View File
@@ -18,32 +18,68 @@ use crate::models::{
};
/// Configuration for conversation compaction behavior.
///
/// v0.8.11 simplified this from the prior token-OR-message-count trigger
/// to a token-only trigger gated by an absolute floor. The
/// `message_threshold` field was removed: its only purpose was to fire
/// compaction on long sessions of small messages, which is exactly the
/// case where rewriting the V4 prefix cache is least valuable. Token
/// budget is the right signal; message count was a 128K-era heuristic.
#[derive(Debug, Clone, PartialEq)]
pub struct CompactionConfig {
pub enabled: bool,
pub token_threshold: usize,
pub message_threshold: usize,
pub model: String,
pub cache_summary: bool,
/// Hard floor — `should_compact` returns `false` when total session
/// tokens fall below this number, regardless of `enabled` or
/// `token_threshold`. Defaults to [`MINIMUM_AUTO_COMPACTION_TOKENS`]
/// (500K) for v0.8.11+. Tests that want to exercise the threshold
/// logic at small fixture sizes can set this to `0` to disable the
/// floor.
pub auto_floor_tokens: usize,
}
impl Default for CompactionConfig {
fn default() -> Self {
Self {
// ON BY DEFAULT since v0.8.6 (#402 P0 survivability).
// Long-running sessions need automatic compaction to stay
// within the model's context budget. Users who prefer the
// previous behaviour can opt out via `auto_compact = false`
// in settings or `compaction.enabled = false` in config.
// ON BY DEFAULT since v0.8.6 (#402 P0 survivability) — but the
// engine-level `auto_compact` setting was flipped OFF in v0.8.11
// (#665) so this default is mostly a fallback for code paths
// that build a `CompactionConfig` without going through
// `compaction_threshold_for_model_and_effort`. Real per-model
// values are still derived through that helper.
enabled: true,
token_threshold: 50000,
message_threshold: 50,
// v0.8.11: 50K was a 128K-era leftover that biased every
// unconfigured caller toward "compact almost immediately on V4."
// Bumped to 800K (80% of V4's 1M window) so the dead-code
// default no longer lies. Real call sites override this via
// `compaction_threshold_for_model_and_effort`.
token_threshold: 800_000,
model: DEFAULT_TEXT_MODEL.to_string(),
cache_summary: true,
auto_floor_tokens: MINIMUM_AUTO_COMPACTION_TOKENS,
}
}
}
/// Hard floor for automatic compaction in v0.8.11+.
///
/// Below this token count, `should_compact` returns `false` regardless of
/// `enabled` or `token_threshold`. The point of the floor is V4 prefix-cache
/// economics: compaction rewrites the stable prefix, which destroys the KV
/// cache. At low token counts the prefix cache is healthy and compaction's
/// cost (full re-prefill at miss prices) dwarfs its benefit (a tiny budget
/// reclaim). Above the floor compaction can still be net-positive — cache
/// is already pressured, the prefix has drifted, and freeing budget matters.
///
/// Manual `/compact` slash command bypasses this floor with explicit user
/// agency.
///
/// Constant rather than configurable for v0.8.11. If anyone needs to dial
/// it (smaller models, opinionated workflows), we can add a setting later.
pub const MINIMUM_AUTO_COMPACTION_TOKENS: usize = 500_000;
pub const KEEP_RECENT_MESSAGES: usize = 4;
const RECENT_WORKING_SET_WINDOW: usize = 12;
const MAX_WORKING_SET_PATHS: usize = 24;
@@ -585,6 +621,21 @@ pub fn should_compact(
return false;
}
// v0.8.11: hard floor enforcement. Below the floor (default 500K tokens
// — see `MINIMUM_AUTO_COMPACTION_TOKENS`), automatic compaction is
// refused because rewriting the prefix kills V4's prefix cache for
// little budget recovery. Manual `/compact` and the `compact_now` tool
// bypass this floor by going through different code paths.
if config.auto_floor_tokens > 0 {
let total_session_tokens: usize = messages
.iter()
.map(|m| estimate_tokens_for_message(m, false))
.sum();
if total_session_tokens < config.auto_floor_tokens {
return false;
}
}
let plan = plan_compaction(
messages,
workspace,
@@ -597,7 +648,6 @@ pub fn should_compact(
.iter()
.map(|&idx| estimate_tokens_for_message(&messages[idx], false))
.sum();
let pinned_count = plan.pinned_indices.len();
let token_estimate: usize = plan
.summarize_indices
@@ -608,21 +658,19 @@ pub fn should_compact(
// Pinned messages consume part of the budget, so compact earlier when needed.
let effective_token_threshold = config.token_threshold.saturating_sub(pinned_tokens);
let effective_message_threshold = config.message_threshold.saturating_sub(pinned_count);
// Always compact if we exceed the token threshold, even with few unpinned messages.
if token_estimate > effective_token_threshold && effective_token_threshold > 0 {
return true;
// Token-only trigger (v0.8.11): the prior message-count branch was a
// 128K-era heuristic that fired compaction on long chats of small
// messages — exactly the case where rewriting the V4 prefix cache is
// most wasteful. Token budget is the only signal that maps to actual
// model context pressure.
if effective_token_threshold == 0 {
return message_count >= MIN_SUMMARIZE_MESSAGES;
}
let enough_unpinned = message_count >= MIN_SUMMARIZE_MESSAGES
|| effective_token_threshold == 0
|| effective_message_threshold == 0;
if !enough_unpinned {
if message_count < MIN_SUMMARIZE_MESSAGES {
return false;
}
token_estimate > effective_token_threshold || message_count > effective_message_threshold
token_estimate > effective_token_threshold
}
fn truncate_chars(text: &str, max_chars: usize) -> &str {
@@ -1439,17 +1487,22 @@ mod tests {
assert!(!should_compact(&messages, &config, None, None, None));
}
/// v0.8.11: message-count is no longer a compaction trigger. Long
/// chats of small messages stay uncompacted because rewriting the V4
/// prefix cache for a tiny budget reclaim is net-negative. Only token
/// pressure (and the explicit `/compact` slash command) trigger
/// compaction.
#[test]
fn should_compact_respects_message_threshold() {
fn message_count_no_longer_triggers_compaction() {
let config = CompactionConfig {
enabled: true,
token_threshold: 1_000_000, // Very high
message_threshold: 5,
token_threshold: 1_000_000,
auto_floor_tokens: 0,
..Default::default()
};
// Under threshold
let few_messages: Vec<Message> = (0..4)
// 200 tiny messages, well above the prior message threshold.
let many_messages: Vec<Message> = (0..200)
.map(|_| Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
@@ -1458,19 +1511,9 @@ mod tests {
}],
})
.collect();
assert!(!should_compact(&few_messages, &config, None, None, None));
// Over threshold
let many_messages: Vec<Message> = (0..10)
.map(|_| Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text: "x".to_string(),
cache_control: None,
}],
})
.collect();
assert!(should_compact(&many_messages, &config, None, None, None));
// Token total stays minuscule so the token threshold is not hit;
// without the prior message-count trigger, no compaction.
assert!(!should_compact(&many_messages, &config, None, None, None));
}
#[test]
@@ -1568,7 +1611,6 @@ mod tests {
let config = CompactionConfig {
enabled: true,
token_threshold: 10,
message_threshold: 2,
..Default::default()
};
@@ -1579,44 +1621,12 @@ mod tests {
assert!(!should_compact(&messages, &config, None, None, None));
}
#[test]
fn should_compact_counts_only_unpinned_messages() {
let config = CompactionConfig {
enabled: true,
token_threshold: 1_000_000,
message_threshold: 5,
..Default::default()
};
let mut messages: Vec<Message> = (0..7)
.map(|i| msg("user", &format!("noise message {i}")))
.collect();
messages.push(msg("user", "Focus on src/core/engine.rs"));
messages.extend((0..4).map(|i| msg("assistant", &format!("recent {i}"))));
assert!(should_compact(&messages, &config, None, None, None));
}
#[test]
fn should_compact_when_pins_consume_budget() {
let config = CompactionConfig {
enabled: true,
token_threshold: 50,
message_threshold: 50,
..Default::default()
};
let mut messages = vec![msg("user", "noise 0"), msg("assistant", "noise 1")];
messages.extend((0..4).map(|_| {
msg(
"assistant",
&format!("{} src/core/engine.rs", "x".repeat(400)),
)
}));
// Pinned recent messages exceed the token budget, so unpinned noise should trigger compaction.
assert!(should_compact(&messages, &config, None, None, None));
}
// v0.8.11: removed `should_compact_counts_only_unpinned_messages` and
// `should_compact_when_pins_consume_budget` — both tested the
// message-count compaction trigger that v0.8.11 deleted. The
// pinned-tokens accounting they exercised is still tested by
// `should_compact_ignores_fully_pinned_context` below; the rest of
// their setup has no contemporary contract to pin.
#[test]
fn enforce_tool_call_pairs_removes_orphaned_tool_call() {
@@ -1872,8 +1882,8 @@ mod tests {
fn test_should_compact_token_threshold_triggers() {
let config = CompactionConfig {
enabled: true,
token_threshold: 100, // Low threshold for testing
message_threshold: 1000, // High message threshold
token_threshold: 100, // Low threshold for testing
auto_floor_tokens: 0,
..Default::default()
};
@@ -1891,7 +1901,6 @@ mod tests {
let config = CompactionConfig {
enabled: true,
token_threshold: 1000,
message_threshold: 1000,
..Default::default()
};
@@ -1901,6 +1910,63 @@ mod tests {
assert!(!should_compact(&messages, &config, None, None, None));
}
/// v0.8.11: the 500K hard floor blocks auto-compaction even when the
/// token-percentage threshold would otherwise fire. This is the V4
/// prefix-cache protection — below 500K total tokens, rewriting the
/// prefix loses cache for tiny budget gains.
#[test]
fn auto_compaction_floor_blocks_below_500k_even_when_threshold_says_yes() {
let config = CompactionConfig {
enabled: true,
token_threshold: 100, // would normally fire instantly
// Use the production default explicitly so this test pins the
// floor's contract rather than relying on `Default`.
auto_floor_tokens: MINIMUM_AUTO_COMPACTION_TOKENS,
..Default::default()
};
let messages: Vec<Message> = (0..10).map(|_| msg("user", &"x".repeat(50))).collect();
// Total tokens way under 500K, so floor blocks compaction.
assert!(!should_compact(&messages, &config, None, None, None));
}
/// v0.8.11: when total tokens cross the 500K floor, the existing
/// threshold/message-count logic takes over again.
#[test]
fn auto_compaction_floor_yields_to_threshold_logic_above_500k() {
let config = CompactionConfig {
enabled: true,
token_threshold: 2_000_000,
auto_floor_tokens: MINIMUM_AUTO_COMPACTION_TOKENS,
..Default::default()
};
// Each message ~500 tokens; 1100 messages → ~550K total tokens.
// That's above the floor (500K) AND below the deliberately high
// token_threshold, so auto-compaction stays off — by threshold,
// not floor.
let messages: Vec<Message> = (0..1100).map(|_| msg("user", &"x".repeat(2000))).collect();
assert!(!should_compact(&messages, &config, None, None, None));
// Crank threshold below total → compaction fires now that we're
// past the floor.
let config_lower = CompactionConfig {
token_threshold: 100_000,
..config
};
assert!(should_compact(&messages, &config_lower, None, None, None));
}
/// `CompactionConfig::default()` ships with the 500K floor on by
/// default — production callers via `..Default::default()` get the
/// safety guarantee automatically.
#[test]
fn compaction_config_default_carries_500k_floor() {
let config = CompactionConfig::default();
assert_eq!(config.auto_floor_tokens, MINIMUM_AUTO_COMPACTION_TOKENS);
assert_eq!(config.auto_floor_tokens, 500_000);
}
#[test]
fn test_plan_compaction_pins_error_messages() {
let messages = vec![
@@ -2194,7 +2260,6 @@ mod tests {
let _config = CompactionConfig {
enabled: true,
token_threshold: 1000,
message_threshold: 5,
..Default::default()
};
@@ -2210,9 +2275,7 @@ mod tests {
msg("assistant", "recent 2"),
];
// Should compact because:
// - More than message_threshold (5) unpinned messages
// - src/main.rs mention pins message 0
// src/main.rs mention should pin message 0 in the plan.
let plan = plan_compaction(
&messages,
Some(&workspace),
+2 -2
View File
@@ -225,7 +225,7 @@ pub fn provider_capability(provider: ApiProvider, resolved_model: &str) -> Provi
crate::models::DEEPSEEK_V4_CONTEXT_WINDOW_TOKENS
} else {
crate::models::context_window_for_model(resolved_model)
.unwrap_or(crate::models::DEFAULT_CONTEXT_WINDOW_TOKENS)
.unwrap_or(crate::models::LEGACY_DEEPSEEK_CONTEXT_WINDOW_TOKENS)
};
// Max output tokens: DeepSeek V4 models allow 262K; others get 4096.
@@ -4070,7 +4070,7 @@ model = "deepseek-v4-pro"
let cap = provider_capability(ApiProvider::Deepseek, "deepseek-coder");
assert_eq!(
cap.context_window,
crate::models::DEFAULT_CONTEXT_WINDOW_TOKENS
crate::models::LEGACY_DEEPSEEK_CONTEXT_WINDOW_TOKENS
);
assert_eq!(cap.max_output, 4096);
assert!(!cap.thinking_supported);
+51 -28
View File
@@ -8,6 +8,8 @@
//! - Tool execution orchestration
use std::collections::HashMap;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::path::PathBuf;
use std::sync::{Arc, Mutex as StdMutex};
use std::time::{Duration, Instant};
@@ -35,8 +37,8 @@ use crate::mcp::McpPool;
#[cfg(test)]
use crate::models::ToolCaller;
use crate::models::{
ContentBlock, ContentBlockStart, DEFAULT_CONTEXT_WINDOW_TOKENS, Delta, Message, MessageRequest,
StreamEvent, SystemPrompt, Tool, Usage,
ContentBlock, ContentBlockStart, Delta, LEGACY_DEEPSEEK_CONTEXT_WINDOW_TOKENS, Message,
MessageRequest, StreamEvent, SystemPrompt, Tool, Usage,
};
use crate::prompts;
use crate::seam_manager::{SeamConfig, SeamManager};
@@ -353,8 +355,9 @@ impl Engine {
config.mcp_config_path.clone(),
);
// Set up system prompt with project context (default to agent mode)
let working_set_summary = session.working_set.summary_block(&config.workspace);
// Set up stable system prompt with project context (default to agent mode).
// Per-turn working-set metadata is injected into the latest user
// message at request time so file churn does not rewrite this prefix.
let user_memory_block =
crate::memory::compose_block(config.memory_enabled, &config.memory_path);
let system_prompt = prompts::system_prompt_for_mode_with_context_skills_and_session(
@@ -368,8 +371,9 @@ impl Engine {
goal_objective: config.goal_objective.as_deref(),
},
);
session.system_prompt =
append_working_set_summary(Some(system_prompt), working_set_summary.as_deref());
let stable_prompt = Some(system_prompt);
session.last_system_prompt_hash = Some(system_prompt_hash(stable_prompt.as_ref()));
session.system_prompt = stable_prompt;
let subagent_manager =
new_shared_subagent_manager(config.workspace.clone(), config.max_subagents);
@@ -883,9 +887,9 @@ impl Engine {
} else {
Vec::new()
};
let tools = tool_registry
.as_ref()
.map(|registry| build_model_tool_catalog(registry.to_api_tools(), mcp_tools, mode));
let tools = tool_registry.as_ref().map(|registry| {
build_model_tool_catalog(registry.to_api_tools_with_cache(true), mcp_tools, mode)
});
// Main turn loop
let (status, error) = self
@@ -1179,7 +1183,10 @@ impl Engine {
.token_threshold
.min(target_budget.saturating_sub(1))
.max(1);
forced_config.message_threshold = forced_config.message_threshold.max(1);
// v0.8.11: forced compaction (capacity guardrail) bypasses the floor
// because we're at a hard ceiling and have to free budget regardless
// of cache cost.
forced_config.auto_floor_tokens = 0;
match compact_messages_safe(
client,
@@ -1645,10 +1652,6 @@ impl Engine {
/// Refresh the system prompt based on current mode and context.
fn refresh_system_prompt(&mut self, mode: AppMode) {
let working_set_summary = self
.session
.working_set
.summary_block(&self.config.workspace);
let user_memory_block =
crate::memory::compose_block(self.config.memory_enabled, &self.config.memory_path);
let base = prompts::system_prompt_for_mode_with_context_skills_and_session(
@@ -1664,8 +1667,11 @@ impl Engine {
);
let stable_prompt =
merge_system_prompts(Some(&base), self.session.compaction_summary_prompt.clone());
self.session.system_prompt =
append_working_set_summary(stable_prompt, working_set_summary.as_deref());
let stable_hash = system_prompt_hash(stable_prompt.as_ref());
if self.session.last_system_prompt_hash != Some(stable_hash) {
self.session.system_prompt = stable_prompt;
self.session.last_system_prompt_hash = Some(stable_hash);
}
}
fn merge_compaction_summary(&mut self, summary_prompt: Option<SystemPrompt>) {
@@ -1676,18 +1682,36 @@ impl Engine {
self.session.compaction_summary_prompt.as_ref(),
summary_prompt.clone(),
);
let current_without_working_set =
remove_working_set_summary(self.session.system_prompt.as_ref());
let merged = merge_system_prompts(current_without_working_set.as_ref(), summary_prompt);
let working_set_summary = self
.session
.working_set
.summary_block(&self.config.workspace);
self.session.system_prompt =
append_working_set_summary(merged, working_set_summary.as_deref());
let merged = merge_system_prompts(self.session.system_prompt.as_ref(), summary_prompt);
self.session.last_system_prompt_hash = Some(system_prompt_hash(merged.as_ref()));
self.session.system_prompt = merged;
}
}
fn system_prompt_hash(prompt: Option<&SystemPrompt>) -> u64 {
let mut hasher = DefaultHasher::new();
match prompt {
Some(SystemPrompt::Text(text)) => {
0u8.hash(&mut hasher);
text.hash(&mut hasher);
}
Some(SystemPrompt::Blocks(blocks)) => {
1u8.hash(&mut hasher);
for block in blocks {
block.block_type.hash(&mut hasher);
block.text.hash(&mut hasher);
if let Some(cache_control) = &block.cache_control {
cache_control.cache_type.hash(&mut hasher);
}
}
}
None => {
2u8.hash(&mut hasher);
}
}
hasher.finish()
}
/// Spawn the engine in a background task
pub fn spawn_engine(config: EngineConfig, api_config: &Config) -> EngineHandle {
let (engine, handle) = Engine::new(config, api_config);
@@ -1775,9 +1799,8 @@ mod context;
pub(crate) use context::compact_tool_result_for_context;
use context::{
COMPACTION_SUMMARY_MARKER, MAX_CONTEXT_RECOVERY_ATTEMPTS, MIN_RECENT_MESSAGES_TO_KEEP,
TURN_MAX_OUTPUT_TOKENS, append_working_set_summary, context_input_budget,
estimate_input_tokens_conservative, extract_compaction_summary_prompt,
is_context_length_error_message, remove_working_set_summary, summarize_text,
TURN_MAX_OUTPUT_TOKENS, context_input_budget, estimate_input_tokens_conservative,
extract_compaction_summary_prompt, is_context_length_error_message, summarize_text,
turn_response_headroom_tokens,
};
mod dispatch;
+3 -2
View File
@@ -160,9 +160,10 @@ impl Engine {
let unique_reference_ids_recent_window =
self.recent_unique_reference_count(message_window, turn);
let context_window = usize::try_from(
context_window_for_model(&self.session.model).unwrap_or(DEFAULT_CONTEXT_WINDOW_TOKENS),
context_window_for_model(&self.session.model)
.unwrap_or(LEGACY_DEEPSEEK_CONTEXT_WINDOW_TOKENS),
)
.unwrap_or(usize::try_from(DEFAULT_CONTEXT_WINDOW_TOKENS).unwrap_or(128_000))
.unwrap_or(usize::try_from(LEGACY_DEEPSEEK_CONTEXT_WINDOW_TOKENS).unwrap_or(128_000))
.max(1);
let context_used_ratio = (self.estimated_input_tokens() as f64) / (context_window as f64);
+1 -52
View File
@@ -6,7 +6,7 @@
use crate::compaction::estimate_tokens;
use crate::error_taxonomy::ErrorCategory;
use crate::models::{Message, SystemBlock, SystemPrompt, context_window_for_model};
use crate::models::{Message, SystemPrompt, context_window_for_model};
use crate::tools::spec::ToolResult;
/// Max output tokens requested for normal agent turns. Generous on purpose:
@@ -40,7 +40,6 @@ const LARGE_CONTEXT_WINDOW_TOKENS: u32 = 500_000;
const TOOL_RESULT_METADATA_SUMMARY_CHARS: usize = 320;
pub(super) const COMPACTION_SUMMARY_MARKER: &str = "Conversation Summary (Auto-Generated)";
pub(super) const WORKING_SET_SUMMARY_MARKER: &str = "## Repo Working Set";
#[derive(Debug, Clone, Copy)]
struct ToolResultContextLimits {
@@ -288,56 +287,6 @@ pub(super) fn extract_compaction_summary_prompt(
}
}
pub(super) fn remove_working_set_summary(prompt: Option<&SystemPrompt>) -> Option<SystemPrompt> {
match prompt {
Some(SystemPrompt::Blocks(blocks)) => {
let filtered: Vec<SystemBlock> = blocks
.iter()
.filter(|block| !block.text.contains(WORKING_SET_SUMMARY_MARKER))
.cloned()
.collect();
if filtered.is_empty() {
None
} else {
Some(SystemPrompt::Blocks(filtered))
}
}
Some(SystemPrompt::Text(text)) => Some(SystemPrompt::Text(text.clone())),
None => None,
}
}
pub(super) fn append_working_set_summary(
prompt: Option<SystemPrompt>,
working_set_summary: Option<&str>,
) -> Option<SystemPrompt> {
let Some(summary) = working_set_summary.map(str::trim).filter(|s| !s.is_empty()) else {
return prompt;
};
let working_set_block = SystemBlock {
block_type: "text".to_string(),
text: summary.to_string(),
cache_control: None,
};
match prompt {
Some(SystemPrompt::Text(text)) => Some(SystemPrompt::Blocks(vec![
SystemBlock {
block_type: "text".to_string(),
text,
cache_control: None,
},
working_set_block,
])),
Some(SystemPrompt::Blocks(mut blocks)) => {
blocks.retain(|block| !block.text.contains(WORKING_SET_SUMMARY_MARKER));
blocks.push(working_set_block);
Some(SystemPrompt::Blocks(blocks))
}
None => Some(SystemPrompt::Blocks(vec![working_set_block])),
}
}
fn estimate_text_tokens_conservative(text: &str) -> usize {
text.chars().count().div_ceil(3)
}
+201 -25
View File
@@ -1,6 +1,5 @@
use super::*;
use super::context::WORKING_SET_SUMMARY_MARKER;
use crate::models::SystemBlock;
use serde_json::json;
use std::collections::HashSet;
@@ -9,6 +8,8 @@ use std::path::PathBuf;
use std::time::Instant;
use tempfile::tempdir;
const WORKING_SET_SUMMARY_MARKER: &str = "## Repo Working Set";
fn build_engine_with_capacity(capacity: CapacityControllerConfig) -> Engine {
let engine_config = EngineConfig {
capacity,
@@ -501,7 +502,7 @@ fn subagent_results_are_summarized_before_parent_context_insertion() {
}
#[test]
fn refresh_system_prompt_places_working_set_after_stable_prefix() {
fn refresh_system_prompt_leaves_working_set_out_of_system_prompt() {
let tmp = tempdir().expect("tempdir");
fs::create_dir_all(tmp.path().join("src")).expect("mkdir");
fs::write(tmp.path().join("src/lib.rs"), "pub fn sample() {}").expect("write");
@@ -518,20 +519,197 @@ fn refresh_system_prompt_places_working_set_after_stable_prefix() {
engine.refresh_system_prompt(AppMode::Agent);
let Some(SystemPrompt::Blocks(blocks)) = &engine.session.system_prompt else {
panic!("expected structured prompt blocks");
};
let last = blocks.last().expect("working-set block");
assert!(last.text.contains(WORKING_SET_SUMMARY_MARKER));
assert!(
blocks[..blocks.len() - 1]
let prompt = match &engine.session.system_prompt {
Some(SystemPrompt::Text(text)) => text.clone(),
Some(SystemPrompt::Blocks(blocks)) => blocks
.iter()
.all(|block| !block.text.contains(WORKING_SET_SUMMARY_MARKER))
);
.map(|block| block.text.as_str())
.collect::<Vec<_>>()
.join("\n"),
None => panic!("expected system prompt"),
};
assert!(!prompt.contains(WORKING_SET_SUMMARY_MARKER));
}
#[test]
fn compaction_summary_stays_before_volatile_working_set() {
fn working_set_reaches_model_as_turn_metadata() {
let tmp = tempdir().expect("tempdir");
fs::create_dir_all(tmp.path().join("src")).expect("mkdir");
fs::write(tmp.path().join("src/lib.rs"), "pub fn sample() {}").expect("write");
let config = EngineConfig {
workspace: tmp.path().to_path_buf(),
..Default::default()
};
let (mut engine, _handle) = Engine::new(config, &Config::default());
engine
.session
.working_set
.observe_user_message("please inspect src/lib.rs", tmp.path());
engine.session.add_message(Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text: "please inspect src/lib.rs".to_string(),
cache_control: None,
}],
});
let messages = engine.messages_with_turn_metadata();
let first_block = messages
.last()
.and_then(|message| message.content.first())
.expect("turn metadata block");
let ContentBlock::Text { text, .. } = first_block else {
panic!("expected text metadata block");
};
assert!(text.starts_with("<turn_meta>\n"));
assert!(text.contains(WORKING_SET_SUMMARY_MARKER));
assert!(text.contains("src/lib.rs"));
}
/// v0.8.11 regression: tool-result messages serialize to role="tool" on
/// the wire but are stored as role="user" internally. Prepending
/// `<turn_meta>` text onto a tool-result message broke the
/// assistant→tool_result invariant and caused HTTP 400 from DeepSeek's
/// API ("insufficient tool messages following tool_calls"). The fix:
/// inject only into messages that have a Text content block and no
/// ToolResult blocks; mid-turn (tool-result is the trailing user
/// message) the injection skips.
#[test]
fn turn_metadata_skips_tool_result_messages() {
let tmp = tempdir().expect("tempdir");
fs::create_dir_all(tmp.path().join("src")).expect("mkdir");
fs::write(tmp.path().join("src/lib.rs"), "pub fn sample() {}").expect("write");
let config = EngineConfig {
workspace: tmp.path().to_path_buf(),
..Default::default()
};
let (mut engine, _handle) = Engine::new(config, &Config::default());
engine
.session
.working_set
.observe_user_message("inspect src/lib.rs", tmp.path());
// Real user message — should be eligible for injection.
engine.session.add_message(Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text: "inspect src/lib.rs".to_string(),
cache_control: None,
}],
});
// Assistant tool-call.
engine.session.add_message(Message {
role: "assistant".to_string(),
content: vec![ContentBlock::ToolUse {
id: "call_42".to_string(),
name: "read_file".to_string(),
input: serde_json::json!({"path": "src/lib.rs"}),
caller: None,
}],
});
// Tool result, stored as role="user" internally.
engine.session.add_message(Message {
role: "user".to_string(),
content: vec![ContentBlock::ToolResult {
tool_use_id: "call_42".to_string(),
content: "pub fn sample() {}".to_string(),
is_error: None,
content_blocks: None,
}],
});
let messages = engine.messages_with_turn_metadata();
// The trailing message is the tool result and MUST be untouched —
// no Text block sneaking in front of the ToolResult block.
let trailing = messages.last().expect("trailing message");
assert_eq!(trailing.role, "user");
assert_eq!(trailing.content.len(), 1);
assert!(matches!(
trailing.content.first(),
Some(ContentBlock::ToolResult { .. })
));
// The earlier real user message receives the turn_meta prefix.
let real_user = messages.first().expect("first user message");
assert_eq!(real_user.role, "user");
let ContentBlock::Text { text, .. } = real_user.content.first().expect("user text content")
else {
panic!("expected Text block on real user message");
};
assert!(text.starts_with("<turn_meta>\n"));
assert!(text.contains("src/lib.rs"));
}
/// When the turn is mid-execution and the trailing user message is a
/// tool result, no turn_meta is injected at all (rather than landing on
/// some earlier user message and confusing the API's tool-call
/// continuity check). The working_set surfaces again on the next
/// genuine user prompt.
#[test]
fn turn_metadata_skips_when_only_tool_results_trail() {
let tmp = tempdir().expect("tempdir");
fs::create_dir_all(tmp.path().join("src")).expect("mkdir");
fs::write(tmp.path().join("src/lib.rs"), "pub fn sample() {}").expect("write");
let config = EngineConfig {
workspace: tmp.path().to_path_buf(),
..Default::default()
};
let (mut engine, _handle) = Engine::new(config, &Config::default());
engine
.session
.working_set
.observe_user_message("inspect src/lib.rs", tmp.path());
// Only a tool-result message in history — simulates the corner case
// where the prior real user message has already been compacted away
// but a tool-result is still pending. We must not retroactively
// inject.
engine.session.add_message(Message {
role: "user".to_string(),
content: vec![ContentBlock::ToolResult {
tool_use_id: "call_42".to_string(),
content: "pub fn sample() {}".to_string(),
is_error: None,
content_blocks: None,
}],
});
let messages = engine.messages_with_turn_metadata();
// Returned unchanged: the single tool-result message, no Text
// prefix, content length == 1.
let only = messages.last().expect("trailing message");
assert_eq!(only.content.len(), 1);
assert!(matches!(
only.content.first(),
Some(ContentBlock::ToolResult { .. })
));
}
#[test]
fn refresh_system_prompt_is_noop_when_unchanged() {
let tmp = tempdir().expect("tempdir");
let config = EngineConfig {
workspace: tmp.path().to_path_buf(),
..Default::default()
};
let (mut engine, _handle) = Engine::new(config, &Config::default());
engine.refresh_system_prompt(AppMode::Agent);
let first_hash = engine.session.last_system_prompt_hash;
let first_prompt = engine.session.system_prompt.clone();
engine.refresh_system_prompt(AppMode::Agent);
assert_eq!(engine.session.last_system_prompt_hash, first_hash);
assert_eq!(engine.session.system_prompt, first_prompt);
}
#[test]
fn compaction_summary_stays_in_stable_system_prompt() {
let tmp = tempdir().expect("tempdir");
fs::create_dir_all(tmp.path().join("src")).expect("mkdir");
fs::write(tmp.path().join("src/main.rs"), "fn main() {}").expect("write");
@@ -552,20 +730,18 @@ fn compaction_summary_stays_before_volatile_working_set() {
cache_control: None,
}])));
let Some(SystemPrompt::Blocks(blocks)) = &engine.session.system_prompt else {
panic!("expected structured prompt blocks");
let prompt = match &engine.session.system_prompt {
Some(SystemPrompt::Text(text)) => text.clone(),
Some(SystemPrompt::Blocks(blocks)) => blocks
.iter()
.map(|block| block.text.as_str())
.collect::<Vec<_>>()
.join("\n"),
None => panic!("expected system prompt"),
};
let summary_index = blocks
.iter()
.position(|block| block.text.contains(COMPACTION_SUMMARY_MARKER))
.expect("summary block");
let working_set_index = blocks
.iter()
.position(|block| block.text.contains(WORKING_SET_SUMMARY_MARKER))
.expect("working-set block");
assert!(summary_index < working_set_index);
assert_eq!(working_set_index, blocks.len() - 1);
assert!(prompt.contains(COMPACTION_SUMMARY_MARKER));
assert!(!prompt.contains(WORKING_SET_SUMMARY_MARKER));
}
#[tokio::test]
@@ -635,7 +811,7 @@ async fn pre_request_refresh_invoked_when_medium_risk() {
engine.config.model = "deepseek-v3.2-128k".to_string();
let long = "x".repeat(5_000);
for _ in 0..200 {
for _ in 0..900 {
engine.session.messages.push(Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
+48 -1
View File
@@ -230,7 +230,7 @@ impl Engine {
};
let request = MessageRequest {
model: self.session.model.clone(),
messages: self.session.messages.clone(),
messages: self.messages_with_turn_metadata(),
max_tokens: TURN_MAX_OUTPUT_TOKENS,
system: self.session.system_prompt.clone(),
tools: active_tools.clone(),
@@ -1594,4 +1594,51 @@ impl Engine {
}
(TurnOutcomeStatus::Completed, None)
}
pub(super) fn messages_with_turn_metadata(&self) -> Vec<Message> {
let Some(summary) = self
.session
.working_set
.summary_block(&self.config.workspace)
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
else {
return self.session.messages.clone();
};
let mut messages = self.session.messages.clone();
// v0.8.11 hotfix: tool-result messages are stored as role="user" in
// our internal representation but serialize to role="tool" on the
// wire. Prepending a Text block onto a tool-result message breaks
// the assistant→tool_result invariant — the API rejects the request
// with `"insufficient tool messages following tool_calls"`. Inject
// only into actual user-typed messages, recognizable by having at
// least one Text content block (and no ToolResult blocks).
let Some(last_user) = messages.iter_mut().rev().find(|message| {
message.role == "user"
&& message
.content
.iter()
.all(|block| !matches!(block, ContentBlock::ToolResult { .. }))
&& message
.content
.iter()
.any(|block| matches!(block, ContentBlock::Text { .. }))
}) else {
// No real user message in the trailing slice (e.g. mid-turn
// after a tool call). Skip injection — the working_set will
// surface again on the next genuine user prompt.
return messages;
};
let turn_meta = format!("<turn_meta>\n{summary}\n</turn_meta>");
last_user.content.insert(
0,
ContentBlock::Text {
text: turn_meta,
cache_control: None,
},
);
messages
}
}
+4
View File
@@ -25,6 +25,9 @@ pub struct Session {
/// System prompt (optional)
pub system_prompt: Option<SystemPrompt>,
/// Hash of the last assembled stable system prompt. Used to avoid
/// replacing `system_prompt` when unchanged.
pub last_system_prompt_hash: Option<u64>,
/// Persisted summary blocks generated by context compaction.
pub compaction_summary_prompt: Option<SystemPrompt>,
@@ -131,6 +134,7 @@ impl Session {
} else {
None
},
last_system_prompt_hash: None,
working_set: WorkingSet::default(),
cycle_count: 0,
current_cycle_started: Utc::now(),
+1 -2
View File
@@ -3711,7 +3711,7 @@ async fn run_exec_agent(
use crate::core::engine::{EngineConfig, spawn_engine};
use crate::core::events::Event;
use crate::core::ops::Op;
use crate::models::{compaction_message_threshold_for_model, compaction_threshold_for_model};
use crate::models::compaction_threshold_for_model;
use crate::tools::plan::new_shared_plan_state;
use crate::tools::todo::new_shared_todo_list;
use crate::tui::app::AppMode;
@@ -3725,7 +3725,6 @@ async fn run_exec_agent(
enabled: false,
model: model.to_string(),
token_threshold: compaction_threshold_for_model(model),
message_threshold: compaction_message_threshold_for_model(model),
..Default::default()
};
+24 -54
View File
@@ -2,20 +2,19 @@
use serde::{Deserialize, Serialize};
pub const DEFAULT_CONTEXT_WINDOW_TOKENS: u32 = 128_000;
/// Context window used only for legacy DeepSeek model IDs that do not name a
/// newer V4 alias and do not carry an explicit `*k` suffix.
pub const LEGACY_DEEPSEEK_CONTEXT_WINDOW_TOKENS: u32 = 128_000;
pub const DEEPSEEK_V4_CONTEXT_WINDOW_TOKENS: u32 = 1_000_000;
/// Last-resort compaction trigger when [`context_window_for_model`] returns
/// `None` (an unrecognised model id). v0.8.11 raised this from `50_000` to
/// `102_400` (80% of [`DEFAULT_CONTEXT_WINDOW_TOKENS`]) so unknown models
/// inherit the same late-trigger discipline as V4 instead of paying the
/// prefix-cache hit at 5% of the V4 window. Known DeepSeek / Claude models
/// resolve to their own scaled value via [`compaction_threshold_for_model`]
/// (#664).
/// `102_400` (80% of [`LEGACY_DEEPSEEK_CONTEXT_WINDOW_TOKENS`]) so unknown
/// models inherit the same late-trigger discipline as V4 instead of paying
/// the prefix-cache hit at 5% of the V4 window. Known DeepSeek / Claude
/// models resolve to their own scaled value via
/// [`compaction_threshold_for_model`] (#664).
pub const DEFAULT_COMPACTION_TOKEN_THRESHOLD: usize = 102_400;
pub const DEFAULT_COMPACTION_MESSAGE_THRESHOLD: usize = 50;
const COMPACTION_THRESHOLD_PERCENT: u32 = 80;
const COMPACTION_MESSAGE_DIVISOR: u32 = 500;
const MAX_COMPACTION_MESSAGE_THRESHOLD: usize = 2_000;
// === Core Message Types ===
@@ -212,8 +211,9 @@ pub struct Usage {
#[must_use]
pub fn context_window_for_model(model: &str) -> Option<u32> {
let lower = model.to_lowercase();
// Unknown DeepSeek model IDs default to 128k unless an explicit *k suffix is present.
// DeepSeek-V4 family and current legacy aliases ship with a 1M context window.
// Unknown legacy DeepSeek model IDs default to 128K unless an explicit
// *k suffix is present. DeepSeek-V4 family and current compatibility
// aliases ship with a 1M context window.
if lower.contains("deepseek") {
if let Some(explicit_window) = deepseek_context_window_hint(&lower) {
return Some(explicit_window);
@@ -221,7 +221,7 @@ pub fn context_window_for_model(model: &str) -> Option<u32> {
if lower.contains("v4") || is_current_deepseek_v4_alias(&lower) {
return Some(DEEPSEEK_V4_CONTEXT_WINDOW_TOKENS);
}
return Some(DEFAULT_CONTEXT_WINDOW_TOKENS);
return Some(LEGACY_DEEPSEEK_CONTEXT_WINDOW_TOKENS);
}
if lower.contains("claude") {
return Some(200_000);
@@ -295,21 +295,6 @@ pub fn compaction_threshold_for_model_and_effort(
compaction_threshold_for_model(model)
}
/// Derive a compaction message-count threshold from model context window.
#[must_use]
pub fn compaction_message_threshold_for_model(model: &str) -> usize {
let Some(window) = context_window_for_model(model) else {
return DEFAULT_COMPACTION_MESSAGE_THRESHOLD;
};
let scaled = usize::try_from(window / COMPACTION_MESSAGE_DIVISOR)
.unwrap_or(DEFAULT_COMPACTION_MESSAGE_THRESHOLD);
scaled.clamp(
DEFAULT_COMPACTION_MESSAGE_THRESHOLD,
MAX_COMPACTION_MESSAGE_THRESHOLD,
)
}
// === Streaming Structures ===
#[allow(dead_code)]
@@ -411,14 +396,14 @@ mod tests {
}
#[test]
fn unknown_deepseek_models_map_to_128k_context_window() {
fn unknown_legacy_deepseek_models_map_to_128k_context_window() {
assert_eq!(
context_window_for_model("deepseek-coder"),
Some(DEFAULT_CONTEXT_WINDOW_TOKENS)
Some(LEGACY_DEEPSEEK_CONTEXT_WINDOW_TOKENS)
);
assert_eq!(
context_window_for_model("deepseek-v3.2-0324"),
Some(DEFAULT_CONTEXT_WINDOW_TOKENS)
Some(LEGACY_DEEPSEEK_CONTEXT_WINDOW_TOKENS)
);
}
@@ -447,7 +432,7 @@ mod tests {
);
assert_eq!(
context_window_for_model("deepseek-v3.2-2k-preview"),
Some(DEFAULT_CONTEXT_WINDOW_TOKENS)
Some(LEGACY_DEEPSEEK_CONTEXT_WINDOW_TOKENS)
);
}
@@ -458,32 +443,17 @@ mod tests {
102_400
);
// v0.8.11 (#664): unknown-model fallback also resolves to 80% of
// `DEFAULT_CONTEXT_WINDOW_TOKENS` (128k) — same late-trigger
// discipline as the V4 path. Was `50_000` pre-v0.8.11; that
// hardcoded value compacted at ~5% of a 1M window when the model
// detection silently fell through, which is exactly the
// prefix-cache-burning behaviour we're getting away from.
// `LEGACY_DEEPSEEK_CONTEXT_WINDOW_TOKENS` (128K legacy DeepSeek
// fallback) — same late-trigger discipline as the V4 path. Was
// `50_000` pre-v0.8.11; that hardcoded value compacted at ~5% of a
// 1M window when model detection silently fell through, which is
// exactly the prefix-cache-burning behaviour we're getting away from.
assert_eq!(compaction_threshold_for_model("unknown-model"), 102_400);
}
#[test]
fn compaction_message_threshold_scales_with_context_window() {
assert_eq!(
compaction_message_threshold_for_model("deepseek-v3.2-128k"),
256
);
assert_eq!(compaction_message_threshold_for_model("unknown-model"), 50);
// 200k / 500 = 400, within the 2k cap.
assert_eq!(compaction_message_threshold_for_model("claude-3"), 400);
}
#[test]
fn compaction_scales_for_deepseek_v4_1m_context() {
assert_eq!(compaction_threshold_for_model("deepseek-v4-pro"), 800_000);
assert_eq!(
compaction_message_threshold_for_model("deepseek-v4-pro"),
2_000
);
}
#[test]
@@ -509,9 +479,9 @@ mod tests {
102_400
);
// v0.8.11 (#664): unknown-model fallback also lands on the
// 80%-of-128K floor instead of the legacy hardcoded 50K, so
// model-detection-fall-through doesn't quietly burn V4 prefix
// cache at 5%-of-window.
// 80%-of-128K legacy DeepSeek fallback instead of the legacy
// hardcoded 50K, so model-detection-fall-through doesn't quietly
// burn V4 prefix cache at 5%-of-window.
assert_eq!(
compaction_threshold_for_model_and_effort("unknown-model", Some("max")),
102_400
+28 -41
View File
@@ -254,11 +254,11 @@ pub fn system_prompt_for_mode_with_context(
/// 4. `## Context Management` (compile-time constant, Agent/Yolo only)
/// 5. compaction handoff template (compile-time constant)
/// 6. handoff block — file-backed; rewritten by `/compact` and on exit
/// 7. working-set summary — drifts when a new path is observed
///
/// Anything appended after a volatile block forfeits the cache for the rest
/// of the request. New blocks belong above the handoff/working-set boundary
/// unless they themselves are turn-volatile.
/// of the request. New blocks belong above the handoff boundary unless they
/// themselves are turn-volatile. Working-set metadata is now injected into the
/// latest user message as per-turn metadata instead of this system prompt.
pub fn system_prompt_for_mode_with_context_and_skills(
mode: AppMode,
workspace: &Path,
@@ -283,7 +283,7 @@ pub fn system_prompt_for_mode_with_context_and_skills(
pub fn system_prompt_for_mode_with_context_skills_and_session(
mode: AppMode,
workspace: &Path,
working_set_summary: Option<&str>,
_working_set_summary: Option<&str>,
skills_dir: Option<&Path>,
instructions: Option<&[PathBuf]>,
session_context: PromptSessionContext<'_>,
@@ -360,6 +360,7 @@ pub fn system_prompt_for_mode_with_context_skills_and_session(
If you notice context is getting long (>80%), proactively suggest using `/compact` to the user.\n\n\
### Prompt-cache awareness\n\n\
DeepSeek caches the longest *byte-stable prefix* of every request and charges roughly 100× less for cache-hit tokens than miss tokens. The system prompt above is layered most-static-first specifically so the prefix stays stable turn-over-turn. To keep cache hits high:\n\
- **Working set location:** the current repo working set is injected into the latest user message inside a `<turn_meta>` block. Treat it as high-priority turn metadata, not as a stable system-prompt section.\n\
- **Append, don't reorder.** New context goes at the end (latest user / tool messages). Reshuffling earlier messages or rewriting their content invalidates the cache for everything after the change.\n\
- **Don't paraphrase quoted content.** If you've already read a file, refer to it by path or line range instead of re-quoting it with different formatting.\n\
- **Use `/compact` as a hard reset, not a tweak.** Compaction is meant for when the cache is already losing it intentionally rewrites the prefix to a shorter summary. Don't trigger it for small wins.\n\
@@ -382,13 +383,6 @@ pub fn system_prompt_for_mode_with_context_skills_and_session(
full_prompt = format!("{full_prompt}\n\n{handoff_block}");
}
// 7. Working-set summary (drifts when a new path is observed).
if let Some(summary) = working_set_summary
&& !summary.trim().is_empty()
{
full_prompt = format!("{full_prompt}\n\n{summary}");
}
SystemPrompt::Text(full_prompt)
}
@@ -547,7 +541,7 @@ mod tests {
}
#[test]
fn session_goal_is_injected_above_volatile_prompt_tail() {
fn session_goal_is_injected_above_handoff_tail() {
let tmp = tempdir().expect("tempdir");
let prompt = match system_prompt_for_mode_with_context_skills_and_session(
AppMode::Agent,
@@ -566,11 +560,10 @@ mod tests {
let goal_pos = prompt.find("<session_goal>").expect("goal block");
let compact_pos = prompt.find("## Compaction Handoff").expect("compact block");
let working_set_pos = prompt.find("## Repo Working Set").expect("working set");
assert!(prompt.contains("Fix transcript corruption"));
assert!(goal_pos < compact_pos);
assert!(goal_pos < working_set_pos);
assert!(!prompt.contains("src/lib.rs"));
}
#[test]
@@ -729,12 +722,10 @@ mod tests {
}
#[test]
fn system_prompt_with_working_set_summary_is_byte_stable_for_constant_summary() {
// The `working_set_summary` argument is the volatile surface (suspect
// #1 in #263). Independently verifying THIS surface needs a separate
// test in working_set.rs; here we just pin that the surrounding
// prompt construction faithfully embeds whatever summary it's given
// without injecting any non-determinism on its own.
fn system_prompt_ignores_working_set_summary_argument() {
// Working-set metadata is now injected into the latest user message
// per turn. The legacy argument remains for call-site compatibility
// but must not reintroduce volatile bytes into the system prompt.
let tmp = tempdir().expect("tempdir");
let workspace = tmp.path();
let summary = "## Repo Working Set\nWorkspace: /tmp/x\n";
@@ -754,16 +745,18 @@ mod tests {
&a,
&b,
);
assert!(a.contains(summary), "summary must be embedded as-is");
assert!(
!a.contains(summary),
"summary must not be embedded in system prompt"
);
}
#[test]
fn system_prompt_with_handoff_file_is_byte_stable_when_file_is_unchanged() {
// Companion to the working-set stability test: if `.deepseek/handoff.md`
// hasn't moved between two builds, the rendered prompt must produce
// identical bytes. The handoff block is the second volatile surface
// (the first is the working-set summary) — both land below the static
// boundary in `system_prompt_for_mode_with_context_and_skills`.
// If `.deepseek/handoff.md` hasn't moved between two builds, the
// rendered prompt must produce identical bytes. The handoff block
// lands below the static boundary in
// `system_prompt_for_mode_with_context_and_skills`.
let tmp = tempdir().expect("tempdir");
let workspace = tmp.path();
let handoff_dir = workspace.join(".deepseek");
@@ -792,14 +785,11 @@ mod tests {
}
#[test]
fn handoff_and_working_set_appear_after_static_blocks() {
// Cache-prefix invariant: the volatile blocks (handoff, working_set)
// must come *after* the static `## Context Management` and the
// compaction handoff template (`## Compaction Handoff`) so a churn
// in either volatile section doesn't drag the static blocks out of
// the cached prefix. Pre-fix ordering placed handoff between the
// skills block and `## Context Management`, which busted the cache
// every time `/compact` rewrote the file.
fn handoff_appears_after_static_blocks_without_working_set() {
// Cache-prefix invariant: the handoff block must come after static
// `## Context Management` and the compaction handoff template
// (`## Compaction Handoff`). Working-set metadata is per-turn user
// metadata now, not a system-prompt tail block.
let tmp = tempdir().expect("tempdir");
let workspace = tmp.path();
let handoff_dir = workspace.join(".deepseek");
@@ -822,9 +812,10 @@ mod tests {
let handoff_pos = prompt
.find(HANDOFF_BLOCK_MARKER)
.expect("handoff block present when fixture file exists");
let working_set_pos = prompt
.find("## Repo Working Set")
.expect("working-set summary present when supplied");
assert!(
!prompt.contains("## Repo Working Set"),
"working-set summary must stay out of the system prompt"
);
assert!(
context_pos < handoff_pos,
@@ -834,10 +825,6 @@ mod tests {
compact_pos < handoff_pos,
"## Compaction Handoff must precede the handoff block"
);
assert!(
handoff_pos < working_set_pos,
"handoff block must precede the working-set summary (most-volatile last)"
);
}
#[test]
+1 -5
View File
@@ -23,10 +23,7 @@ use crate::core::coherence::CoherenceState;
use crate::core::engine::{EngineConfig, EngineHandle, spawn_engine};
use crate::core::events::{Event as EngineEvent, TurnOutcomeStatus};
use crate::core::ops::Op;
use crate::models::{
ContentBlock, Message, SystemPrompt, Usage, compaction_message_threshold_for_model,
compaction_threshold_for_model,
};
use crate::models::{ContentBlock, Message, SystemPrompt, Usage, compaction_threshold_for_model};
use crate::tools::plan::new_shared_plan_state;
use crate::tools::subagent::SubAgentStatus;
use crate::tools::todo::new_shared_todo_list;
@@ -1765,7 +1762,6 @@ impl RuntimeThreadManager {
enabled: false,
model: thread.model.clone(),
token_threshold: compaction_threshold_for_model(&thread.model),
message_threshold: compaction_message_threshold_for_model(&thread.model),
..Default::default()
};
let network_policy = self.config.network.clone().map(|toml_cfg| {
+21 -1
View File
@@ -186,7 +186,6 @@ impl ToolRegistry {
/// Convert tools to API Tool format with optional cache control on the last tool.
#[must_use]
#[allow(dead_code)]
pub fn to_api_tools_with_cache(&self, enable_cache: bool) -> Vec<Tool> {
let mut tools = self.to_api_tools();
if enable_cache && let Some(last) = tools.last_mut() {
@@ -871,6 +870,27 @@ mod tests {
assert_eq!(api_tools[0].description, "A test tool");
}
#[test]
fn api_tools_with_cache_marks_last_tool_ephemeral() {
let tmp = tempdir().expect("tempdir");
let ctx = ToolContext::new(tmp.path().to_path_buf());
let mut registry = ToolRegistry::new(ctx);
registry.register(make_test_tool("tool_a"));
registry.register(make_test_tool("tool_b"));
let api_tools = registry.to_api_tools_with_cache(true);
assert_eq!(api_tools.len(), 2);
assert!(api_tools[0].cache_control.is_none());
assert_eq!(
api_tools[1]
.cache_control
.as_ref()
.map(|c| c.cache_type.as_str()),
Some("ephemeral")
);
}
/// Tool whose `description()` advances through a script of pre-built
/// strings, one per call. Used to demonstrate that the api-tools cache
/// pins the description bytes on first read instead of re-sampling them
+1 -5
View File
@@ -15,10 +15,7 @@ use crate::core::coherence::CoherenceState;
use crate::cycle_manager::{CycleBriefing, CycleConfig};
use crate::hooks::{HookContext, HookEvent, HookExecutor, HookResult};
use crate::localization::{Locale, MessageId, resolve_locale, tr};
use crate::models::{
Message, SystemPrompt, compaction_message_threshold_for_model,
compaction_threshold_for_model_and_effort,
};
use crate::models::{Message, SystemPrompt, compaction_threshold_for_model_and_effort};
use crate::palette::{self, UiTheme};
use crate::session_manager::SessionContextReference;
use crate::settings::Settings;
@@ -3169,7 +3166,6 @@ impl App {
CompactionConfig {
enabled: self.auto_compact,
token_threshold: self.compact_threshold,
message_threshold: compaction_message_threshold_for_model(&self.model),
model: self.model.clone(),
..Default::default()
}
+7 -6
View File
@@ -4,16 +4,17 @@ use std::collections::HashSet;
use std::fmt::Write;
use crate::compaction::estimate_input_tokens_conservative;
use crate::models::{DEFAULT_CONTEXT_WINDOW_TOKENS, SystemPrompt, context_window_for_model};
use crate::models::{
LEGACY_DEEPSEEK_CONTEXT_WINDOW_TOKENS, SystemPrompt, context_window_for_model,
};
use crate::session_manager::SessionContextReference;
use crate::tui::app::{App, ToolDetailRecord};
use crate::tui::file_mention::ContextReferenceSource;
use crate::utils::estimate_message_chars;
/// Marker used by the engine's `append_working_set_summary` to tag the
/// volatile tail block in the system prompt. Replicated here so the
/// context inspector can distinguish stable prefix blocks from the
/// ephemeral working-set block without importing engine internals.
/// Marker used by per-turn working-set metadata. Replicated here so the
/// context inspector can distinguish stable prompt blocks from volatile
/// working-set context without importing engine internals.
const WORKING_SET_MARKER: &str = "## Repo Working Set";
const CONTEXT_WARNING_THRESHOLD_PERCENT: f64 = 85.0;
@@ -68,7 +69,7 @@ pub fn build_context_inspector_text(app: &App) -> String {
}
fn context_usage(app: &App) -> (usize, u32, f64) {
let max = context_window_for_model(&app.model).unwrap_or(DEFAULT_CONTEXT_WINDOW_TOKENS);
let max = context_window_for_model(&app.model).unwrap_or(LEGACY_DEEPSEEK_CONTEXT_WINDOW_TOKENS);
let estimated =
estimate_input_tokens_conservative(&app.api_messages, app.system_prompt.as_ref());
let total_chars = estimate_message_chars(&app.api_messages);
+754 -36
View File
@@ -1,10 +1,12 @@
const fs = require("fs");
const https = require("https");
const http = require("http");
const net = require("net");
const tls = require("tls");
const crypto = require("crypto");
const { URL } = require("url");
const { mkdir, chmod, stat, rename, readFile, unlink, writeFile } = fs.promises;
const { createWriteStream } = fs;
const { pipeline } = require("stream/promises");
const path = require("path");
const {
@@ -16,6 +18,46 @@ const {
const { preflightGlibc } = require("./preflight-glibc");
const pkg = require("../package.json");
const DEFAULT_TIMEOUT_MS = 300_000; // 5 minutes per attempt
const DEFAULT_STALL_MS = 30_000; // abort if no bytes for 30s
const MAX_ATTEMPTS = 5;
const BASE_BACKOFF_MS = 1_000;
const RETRYABLE_NET_CODES = new Set([
"ECONNRESET",
"ECONNREFUSED",
"ETIMEDOUT",
"EAI_AGAIN",
"ENETUNREACH",
"EHOSTUNREACH",
"EPIPE",
"ECONNABORTED",
]);
class NonRetryableError extends Error {
constructor(message) {
super(message);
this.name = "NonRetryableError";
this.nonRetryable = true;
}
}
class HttpStatusError extends Error {
constructor(status, url) {
super(`Request failed with status ${status}: ${url}`);
this.name = "HttpStatusError";
this.status = status;
}
}
class DownloadTimeoutError extends Error {
constructor(message) {
super(message);
this.name = "DownloadTimeoutError";
this.code = "EDOWNLOADTIMEOUT";
}
}
function resolvePackageVersion() {
const configuredVersion =
process.env.DEEPSEEK_TUI_VERSION ||
@@ -44,45 +86,719 @@ function binaryPaths() {
};
}
async function httpGet(url) {
const client = url.startsWith("https:") ? https : http;
const response = await new Promise((resolve, reject) => {
client.get(url, (res) => {
const status = res.statusCode || 0;
if (status >= 300 && status < 400 && res.headers.location) {
resolve({ redirect: res.headers.location, response: null });
return;
}
if (status !== 200) {
reject(new Error(`Request failed with status ${status}: ${url}`));
return;
}
resolve({ redirect: null, response: res });
}).on("error", reject);
});
return response;
// ────────────────────────────────────────────────────────────────────────────
// Logging / progress
// ────────────────────────────────────────────────────────────────────────────
function isQuietInstall() {
if (process.env.DEEPSEEK_TUI_QUIET_INSTALL === "1") {
return true;
}
const level = (process.env.npm_config_loglevel || "").toLowerCase();
return level === "silent" || level === "error";
}
async function download(url, destination) {
const resolved = await httpGet(url);
if (resolved.redirect) {
return download(resolved.redirect, destination);
function logInfo(message) {
if (isQuietInstall()) {
return;
}
process.stderr.write(`deepseek-tui: ${message}\n`);
}
function envInt(name, fallback) {
const raw = process.env[name];
if (!raw) {
return fallback;
}
const parsed = Number.parseInt(String(raw).trim(), 10);
if (!Number.isFinite(parsed) || parsed <= 0) {
return fallback;
}
return parsed;
}
function downloadTimeoutMs() {
return envInt(
"DEEPSEEK_TUI_DOWNLOAD_TIMEOUT_MS",
envInt("DEEPSEEK_DOWNLOAD_TIMEOUT_MS", DEFAULT_TIMEOUT_MS),
);
}
function downloadStallMs() {
return envInt(
"DEEPSEEK_TUI_DOWNLOAD_STALL_MS",
envInt("DEEPSEEK_DOWNLOAD_STALL_MS", DEFAULT_STALL_MS),
);
}
function formatMb(bytes) {
return (bytes / (1024 * 1024)).toFixed(0);
}
function createProgressReporter(assetName, totalBytes) {
if (isQuietInstall()) {
return { onChunk: () => {}, finish: () => {} };
}
const isTty = !!process.stderr.isTTY;
const interactive = isTty;
const tickBytes = interactive ? 1 * 1024 * 1024 : 5 * 1024 * 1024;
const tickMs = 2_000;
let received = 0;
let lastBytesPrinted = 0;
let lastTimePrinted = 0;
let everPrinted = false;
const render = (final) => {
if (totalBytes && totalBytes > 0) {
const pct = Math.min(100, Math.round((received / totalBytes) * 100));
const line = `deepseek-tui: downloading ${assetName}: ${formatMb(received)} / ${formatMb(totalBytes)} MB (${pct}%)`;
if (interactive) {
process.stderr.write(`${line}\r`);
} else {
process.stderr.write(`${line}\n`);
}
} else {
const line = `deepseek-tui: downloading ${assetName}: ${formatMb(received)} MB downloaded`;
if (interactive) {
process.stderr.write(`${line}\r`);
} else {
process.stderr.write(`${line}\n`);
}
}
everPrinted = true;
lastBytesPrinted = received;
lastTimePrinted = Date.now();
};
return {
onChunk(chunkLen) {
received += chunkLen;
const now = Date.now();
if (
received - lastBytesPrinted >= tickBytes ||
(interactive && now - lastTimePrinted >= tickMs)
) {
render(false);
}
},
finish() {
// Final line — always render once.
render(true);
if (interactive && everPrinted) {
// Move past the carriage-return line and emit a "done" footer.
process.stderr.write("\n");
}
process.stderr.write(`deepseek-tui: ${assetName} ... done.\n`);
},
};
}
// ────────────────────────────────────────────────────────────────────────────
// Proxy support (HTTPS_PROXY / HTTP_PROXY / NO_PROXY) — pure Node, CONNECT
// tunnel + TLS upgrade for HTTPS targets.
// ────────────────────────────────────────────────────────────────────────────
function getProxyUrl(targetUrl) {
const isHttps = targetUrl.protocol === "https:";
const candidates = isHttps
? ["HTTPS_PROXY", "https_proxy", "HTTP_PROXY", "http_proxy"]
: ["HTTP_PROXY", "http_proxy"];
for (const name of candidates) {
const raw = process.env[name];
if (raw && String(raw).trim() !== "") {
return String(raw).trim();
}
}
return null;
}
function shouldBypassProxy(host) {
const raw = process.env.NO_PROXY || process.env.no_proxy;
if (!raw) {
return false;
}
const lower = String(host).toLowerCase();
for (const part of String(raw).split(",")) {
const entry = part.trim().toLowerCase();
if (!entry) {
continue;
}
if (entry === "*") {
return true;
}
// Strip leading dot and any explicit port.
const stripped = entry.replace(/^\./, "").replace(/:.*$/, "");
if (!stripped) {
continue;
}
if (lower === stripped || lower.endsWith(`.${stripped}`)) {
return true;
}
}
return false;
}
function parseProxy(proxyStr) {
// Accept "http://user:pass@host:port" and bare "host:port".
const normalized = /^[a-z][a-z0-9+\-.]*:\/\//i.test(proxyStr)
? proxyStr
: `http://${proxyStr}`;
const u = new URL(normalized);
const port = u.port
? Number.parseInt(u.port, 10)
: u.protocol === "https:"
? 443
: 80;
let auth = null;
if (u.username) {
const user = decodeURIComponent(u.username);
const pass = u.password ? decodeURIComponent(u.password) : "";
auth = Buffer.from(`${user}:${pass}`).toString("base64");
}
return {
protocol: u.protocol,
host: u.hostname,
port,
auth,
raw: proxyStr,
};
}
function connectThroughProxy(proxy, targetHost, targetPort, timeoutMs) {
return new Promise((resolve, reject) => {
const socket = net.connect({ host: proxy.host, port: proxy.port });
let settled = false;
const fail = (err) => {
if (settled) return;
settled = true;
try {
socket.destroy();
} catch {
// ignore
}
reject(err);
};
const timer = timeoutMs > 0
? setTimeout(() => fail(new DownloadTimeoutError(
`proxy CONNECT to ${proxy.host}:${proxy.port} timed out after ${timeoutMs} ms`,
)), timeoutMs)
: null;
socket.once("error", (err) => {
if (timer) clearTimeout(timer);
// Surface proxy host so the user can fix it.
const wrapped = new Error(
`proxy connection failed (${proxy.host}:${proxy.port}): ${err.message}`,
);
wrapped.code = err.code;
fail(wrapped);
});
socket.once("connect", () => {
const lines = [
`CONNECT ${targetHost}:${targetPort} HTTP/1.1`,
`Host: ${targetHost}:${targetPort}`,
"User-Agent: deepseek-tui-installer",
"Proxy-Connection: keep-alive",
];
if (proxy.auth) {
lines.push(`Proxy-Authorization: Basic ${proxy.auth}`);
}
const req = `${lines.join("\r\n")}\r\n\r\n`;
let buf = Buffer.alloc(0);
const onData = (chunk) => {
buf = Buffer.concat([buf, chunk]);
const idx = buf.indexOf("\r\n\r\n");
if (idx === -1) {
if (buf.length > 16 * 1024) {
socket.removeListener("data", onData);
fail(new Error(
`proxy ${proxy.host}:${proxy.port} returned an oversized response header`,
));
}
return;
}
socket.removeListener("data", onData);
const head = buf.slice(0, idx).toString("utf8");
const firstLine = head.split(/\r?\n/, 1)[0] || "";
const m = firstLine.match(/^HTTP\/\d\.\d\s+(\d{3})/);
if (!m) {
fail(new Error(`proxy ${proxy.host}:${proxy.port} returned invalid CONNECT reply: ${firstLine}`));
return;
}
const code = Number.parseInt(m[1], 10);
if (code !== 200) {
fail(new Error(
`proxy ${proxy.host}:${proxy.port} refused CONNECT to ${targetHost}:${targetPort}: HTTP ${code}`,
));
return;
}
if (timer) clearTimeout(timer);
if (settled) return;
settled = true;
// Any bytes past the header belong to the tunneled stream — but in
// practice CONNECT 200 has no body; if it did, we'd lose those bytes
// here. Keep it simple: trust well-behaved proxies.
resolve(socket);
};
socket.on("data", onData);
socket.write(req, "utf8");
});
});
}
// ────────────────────────────────────────────────────────────────────────────
// HTTP request with timeout, stall detection, and proxy support.
// ────────────────────────────────────────────────────────────────────────────
function httpRequest(rawUrl, opts = {}) {
const totalTimeoutMs = opts.totalTimeoutMs ?? downloadTimeoutMs();
const stallMs = opts.stallMs ?? downloadStallMs();
return new Promise((resolve, reject) => {
let url;
try {
url = new URL(rawUrl);
} catch (err) {
reject(new NonRetryableError(`Invalid URL: ${rawUrl} (${err.message})`));
return;
}
if (url.protocol !== "https:" && url.protocol !== "http:") {
reject(new NonRetryableError(`Unsupported protocol: ${url.protocol}`));
return;
}
const proxyStr = !shouldBypassProxy(url.hostname) ? getProxyUrl(url) : null;
const isHttps = url.protocol === "https:";
const port = url.port
? Number.parseInt(url.port, 10)
: isHttps
? 443
: 80;
let totalTimer = null;
let stallTimer = null;
let settled = false;
let req = null;
let res = null;
const cleanup = () => {
if (totalTimer) {
clearTimeout(totalTimer);
totalTimer = null;
}
if (stallTimer) {
clearTimeout(stallTimer);
stallTimer = null;
}
};
const fail = (err) => {
if (settled) return;
settled = true;
cleanup();
try {
if (req && !req.destroyed) req.destroy();
} catch {
// ignore
}
try {
if (res && !res.destroyed) res.destroy();
} catch {
// ignore
}
reject(err);
};
if (totalTimeoutMs > 0) {
totalTimer = setTimeout(() => {
fail(new DownloadTimeoutError(
`download exceeded total timeout of ${totalTimeoutMs} ms ` +
`(set DEEPSEEK_TUI_DOWNLOAD_TIMEOUT_MS to raise it; current stall budget is ${stallMs} ms)`,
));
}, totalTimeoutMs);
}
const armStallTimer = () => {
if (stallMs <= 0) return;
if (stallTimer) clearTimeout(stallTimer);
stallTimer = setTimeout(() => {
fail(new DownloadTimeoutError(
`download stalled — no bytes received for ${stallMs} ms ` +
`(set DEEPSEEK_TUI_DOWNLOAD_STALL_MS to raise it; total budget is ${totalTimeoutMs} ms)`,
));
}, stallMs);
};
const launch = (socket) => {
const reqOptions = {
method: "GET",
host: url.hostname,
port,
path: `${url.pathname}${url.search || ""}`,
headers: {
Host: url.host,
"User-Agent": "deepseek-tui-installer",
Accept: "*/*",
Connection: "close",
},
};
if (socket) {
reqOptions.createConnection = () => socket;
if (isHttps) {
// Wrap raw TCP socket from CONNECT in TLS.
const tlsSocket = tls.connect({
socket,
servername: url.hostname,
ALPNProtocols: ["http/1.1"],
});
tlsSocket.once("error", (err) => fail(err));
reqOptions.createConnection = () => tlsSocket;
}
}
const client = isHttps ? https : http;
try {
req = client.request(reqOptions, (response) => {
res = response;
armStallTimer();
response.on("data", () => {
armStallTimer();
});
response.on("end", () => {
cleanup();
});
response.on("error", (err) => fail(err));
const status = response.statusCode || 0;
if (status >= 300 && status < 400 && response.headers.location) {
cleanup();
settled = true;
response.resume();
resolve({ redirect: response.headers.location, response: null });
return;
}
if (status < 200 || status >= 300) {
const err = new HttpStatusError(status, rawUrl);
// 4xx: non-retryable; 5xx: retryable.
if (status >= 400 && status < 500) {
err.nonRetryable = true;
}
fail(err);
return;
}
if (settled) return;
settled = true;
// Hand the live response stream to the caller.
resolve({ redirect: null, response });
});
req.once("error", (err) => fail(err));
req.once("socket", (s) => {
// Belt-and-suspenders: surface socket-level errors quickly.
s.once("error", (err) => fail(err));
});
req.end();
} catch (err) {
fail(err);
}
};
if (proxyStr) {
let proxy;
try {
proxy = parseProxy(proxyStr);
} catch (err) {
fail(new NonRetryableError(
`Invalid proxy URL "${proxyStr}": ${err.message}`,
));
return;
}
if (!isHttps) {
// Plain HTTP through proxy — send absolute URI, no CONNECT.
const client = http;
try {
req = client.request(
{
host: proxy.host,
port: proxy.port,
method: "GET",
path: rawUrl,
headers: {
Host: url.host,
"User-Agent": "deepseek-tui-installer",
Accept: "*/*",
Connection: "close",
...(proxy.auth ? { "Proxy-Authorization": `Basic ${proxy.auth}` } : {}),
},
},
(response) => {
res = response;
armStallTimer();
response.on("data", () => armStallTimer());
response.on("end", () => cleanup());
response.on("error", (err) => fail(err));
const status = response.statusCode || 0;
if (status >= 300 && status < 400 && response.headers.location) {
cleanup();
settled = true;
response.resume();
resolve({ redirect: response.headers.location, response: null });
return;
}
if (status < 200 || status >= 300) {
const err = new HttpStatusError(status, rawUrl);
if (status >= 400 && status < 500) err.nonRetryable = true;
fail(err);
return;
}
if (settled) return;
settled = true;
resolve({ redirect: null, response });
},
);
req.once("error", (err) => fail(err));
req.end();
} catch (err) {
fail(err);
}
return;
}
// HTTPS through proxy: CONNECT tunnel + TLS upgrade.
connectThroughProxy(proxy, url.hostname, port, Math.max(stallMs, 5_000))
.then((tcpSocket) => {
if (settled) {
try { tcpSocket.destroy(); } catch { /* ignore */ }
return;
}
const tlsSocket = tls.connect({
socket: tcpSocket,
servername: url.hostname,
ALPNProtocols: ["http/1.1"],
});
tlsSocket.once("error", (err) => fail(err));
tlsSocket.once("secureConnect", () => {
if (settled) {
try { tlsSocket.destroy(); } catch { /* ignore */ }
return;
}
const reqOptions = {
method: "GET",
createConnection: () => tlsSocket,
path: `${url.pathname}${url.search || ""}`,
headers: {
Host: url.host,
"User-Agent": "deepseek-tui-installer",
Accept: "*/*",
Connection: "close",
},
};
try {
req = https.request(reqOptions, (response) => {
res = response;
armStallTimer();
response.on("data", () => armStallTimer());
response.on("end", () => cleanup());
response.on("error", (err) => fail(err));
const status = response.statusCode || 0;
if (status >= 300 && status < 400 && response.headers.location) {
cleanup();
settled = true;
response.resume();
resolve({ redirect: response.headers.location, response: null });
return;
}
if (status < 200 || status >= 300) {
const err = new HttpStatusError(status, rawUrl);
if (status >= 400 && status < 500) err.nonRetryable = true;
fail(err);
return;
}
if (settled) return;
settled = true;
resolve({ redirect: null, response });
});
req.once("error", (err) => fail(err));
req.end();
} catch (err) {
fail(err);
}
});
})
.catch((err) => fail(err));
return;
}
// No proxy — direct connection.
launch(null);
});
}
// ────────────────────────────────────────────────────────────────────────────
// Retry wrapper
// ────────────────────────────────────────────────────────────────────────────
function isRetryable(err) {
if (!err) return false;
if (err.nonRetryable) return false;
if (err instanceof NonRetryableError) return false;
if (err instanceof DownloadTimeoutError) return true;
if (err instanceof HttpStatusError) {
return err.status >= 500;
}
if (err.code && RETRYABLE_NET_CODES.has(err.code)) return true;
// Network-flavored messages we may see without a code.
const msg = String(err.message || "").toLowerCase();
if (msg.includes("network") && msg.includes("unreachable")) return true;
if (msg.includes("socket hang up")) return true;
if (msg.includes("aborted")) return true;
return false;
}
function backoffDelay(attempt) {
// attempt is 1-indexed; first retry waits ~1s.
const base = BASE_BACKOFF_MS * 2 ** (attempt - 1);
const jitter = (Math.random() * 0.4 - 0.2) * base; // ±20%
return Math.max(0, Math.round(base + jitter));
}
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function withRetry(label, fn) {
let lastErr;
for (let attempt = 1; attempt <= MAX_ATTEMPTS; attempt++) {
try {
return await fn(attempt);
} catch (err) {
lastErr = err;
if (!isRetryable(err) || attempt === MAX_ATTEMPTS) {
break;
}
const wait = backoffDelay(attempt);
logInfo(
`${label} failed (attempt ${attempt}/${MAX_ATTEMPTS}): ${err.message}; retrying in ${wait} ms`,
);
await sleep(wait);
}
}
const msg = lastErr && lastErr.message ? lastErr.message : String(lastErr);
const wrapped = new Error(
`${label} failed after ${MAX_ATTEMPTS} attempt(s): ${msg}`,
);
if (lastErr && lastErr.stack) {
wrapped.cause = lastErr;
}
throw wrapped;
}
// ────────────────────────────────────────────────────────────────────────────
// Public download primitives (now retry + progress aware)
// ────────────────────────────────────────────────────────────────────────────
async function followRedirects(url, opts) {
const maxRedirects = 10;
let current = url;
for (let hop = 0; hop < maxRedirects; hop++) {
const result = await httpRequest(current, opts);
if (result.redirect) {
try {
current = new URL(result.redirect, current).toString();
} catch {
current = result.redirect;
}
continue;
}
return result;
}
throw new NonRetryableError(`too many redirects starting at ${url}`);
}
function streamToFile(response, destination, progress) {
return new Promise((resolve, reject) => {
const sink = createWriteStream(destination);
let done = false;
const finish = (err) => {
if (done) return;
done = true;
if (err) {
sink.destroy();
reject(err);
} else {
resolve();
}
};
response.on("data", (chunk) => {
if (progress) progress.onChunk(chunk.length);
});
response.on("error", (err) => finish(err));
sink.on("error", (err) => finish(err));
sink.on("finish", () => finish(null));
response.pipe(sink);
});
}
async function download(url, destination, options = {}) {
await mkdir(path.dirname(destination), { recursive: true });
await pipeline(resolved.response, createWriteStream(destination));
const assetName = options.assetName || path.basename(destination);
await withRetry(`download ${assetName}`, async (attempt) => {
const result = await followRedirects(url, {
totalTimeoutMs: downloadTimeoutMs(),
stallMs: downloadStallMs(),
});
const response = result.response;
const lenHeader = response.headers["content-length"];
const total = lenHeader ? Number.parseInt(lenHeader, 10) : 0;
const progress = createProgressReporter(assetName, Number.isFinite(total) ? total : 0);
if (attempt > 1) {
logInfo(`retry attempt ${attempt}/${MAX_ATTEMPTS} for ${assetName}`);
}
try {
await streamToFile(response, destination, progress);
} catch (err) {
// Ensure we don't leave a partial file confusing future attempts.
try {
await unlink(destination);
} catch {
// ignore
}
throw err;
}
progress.finish();
});
}
async function downloadText(url) {
const resolved = await httpGet(url);
if (resolved.redirect) {
return downloadText(resolved.redirect);
}
const chunks = [];
resolved.response.setEncoding("utf8");
for await (const chunk of resolved.response) {
chunks.push(chunk);
}
return chunks.join("");
return withRetry(`fetch ${url}`, async () => {
const result = await followRedirects(url, {
totalTimeoutMs: downloadTimeoutMs(),
stallMs: downloadStallMs(),
});
const response = result.response;
response.setEncoding("utf8");
// NOTE: do NOT use `for await (const chunk of response)` here.
// `httpRequest` attaches a `data` listener on the response to re-arm
// the stall timer, which puts the stream in flowing mode. The async
// iterator expects paused mode and will silently miss every chunk —
// this manifested as an empty checksum manifest in the npm wrapper
// smoke test ("Checksum manifest is missing <asset>"). Subscribing
// to `data` events directly stacks alongside the stall listener and
// both fire per chunk, so we collect the body correctly without
// disturbing the stall detection.
return new Promise((resolve, reject) => {
const chunks = [];
response.on("data", (chunk) => {
chunks.push(chunk);
});
response.on("end", () => {
resolve(chunks.join(""));
});
response.on("error", reject);
});
});
}
async function readLocalVersion(file) {
@@ -122,11 +838,13 @@ async function sha256File(filePath) {
async function verifyChecksum(filePath, assetName, checksums) {
const expected = checksums.get(assetName);
if (!expected) {
throw new Error(`Checksum manifest is missing ${assetName}`);
throw new NonRetryableError(`Checksum manifest is missing ${assetName}`);
}
const actual = await sha256File(filePath);
if (actual !== expected) {
throw new Error(
// Bytes are corrupted; another fetch is unlikely to help without a fix
// upstream. Mark non-retryable.
throw new NonRetryableError(
`Checksum mismatch for ${assetName}: expected ${expected}, got ${actual}`,
);
}
@@ -152,7 +870,7 @@ async function ensureBinary(targetPath, assetName, version, repo, getChecksums)
const checksums = await getChecksums();
const url = releaseAssetUrl(assetName, version, repo);
const destination = `${targetPath}.${process.pid}.${Date.now()}.download`;
await download(url, destination);
await download(url, destination, { assetName });
try {
await verifyChecksum(destination, assetName, checksums);
preflightGlibc(destination);