diff --git a/crates/tui/src/client.rs b/crates/tui/src/client.rs index e17010bf..5b5ad5eb 100644 --- a/crates/tui/src/client.rs +++ b/crates/tui/src/client.rs @@ -401,6 +401,17 @@ pub(super) fn api_url(base_url: &str, path: &str) -> String { // === DeepSeekClient === +/// Returns true when DEEPSEEK_FORCE_HTTP1 is set to a truthy value +/// (`1`, `true`, `yes`, `on`, case-insensitive). Used by `build_http_client` +/// to opt out of HTTP/2 entirely when DeepSeek's edge mishandles long-lived H2 +/// streams (#103). Anything else (unset, `0`, `false`, ...) leaves HTTP/2 on. +fn force_http1_from_env() -> bool { + std::env::var("DEEPSEEK_FORCE_HTTP1") + .ok() + .map(|v| v.trim().to_ascii_lowercase()) + .is_some_and(|v| matches!(v.as_str(), "1" | "true" | "yes" | "on")) +} + impl DeepSeekClient { /// Create a DeepSeek client from CLI configuration. pub fn new(config: &Config) -> Result { @@ -441,7 +452,7 @@ impl DeepSeekClient { AUTHORIZATION, HeaderValue::from_str(&format!("Bearer {api_key}"))?, ); - reqwest::Client::builder() + let mut builder = reqwest::Client::builder() .default_headers(headers) .connect_timeout(Duration::from_secs(30)) // The blanket 300s request timeout was incompatible with V4-pro @@ -451,9 +462,16 @@ impl DeepSeekClient { .tcp_keepalive(Some(Duration::from_secs(30))) .http2_keep_alive_interval(Some(Duration::from_secs(15))) .http2_keep_alive_timeout(Duration::from_secs(20)) - .min_tls_version(reqwest::tls::Version::TLS_1_2) - .build() - .map_err(Into::into) + .min_tls_version(reqwest::tls::Version::TLS_1_2); + // Escape hatch (#103): some DeepSeek edge nodes mishandle long-lived + // HTTP/2 streams. Setting DEEPSEEK_FORCE_HTTP1=1 pins the client to + // HTTP/1.1 so users can experiment without us committing to that + // path as the default. + if force_http1_from_env() { + logging::info("DEEPSEEK_FORCE_HTTP1=1 — pinning HTTP client to HTTP/1.1"); + builder = builder.http1_only(); + } + builder.build().map_err(Into::into) } /// List available models from the provider. @@ -2127,4 +2145,65 @@ mod tests { now + RECOVERY_PROBE_COOLDOWN + Duration::from_millis(1) )); } + + // === #103 Phase 2: HTTP/1 escape hatch =================================== + + /// Serialize tests that mutate `DEEPSEEK_FORCE_HTTP1` so they don't race + /// against each other — env vars are process-global. + static FORCE_HTTP1_ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(()); + + struct ForceHttp1EnvGuard { + prior: Option, + } + impl ForceHttp1EnvGuard { + fn capture() -> Self { + Self { + prior: std::env::var_os("DEEPSEEK_FORCE_HTTP1"), + } + } + } + impl Drop for ForceHttp1EnvGuard { + fn drop(&mut self) { + // Safety: scoped to test process; reverts to the captured value. + match &self.prior { + Some(v) => unsafe { std::env::set_var("DEEPSEEK_FORCE_HTTP1", v) }, + None => unsafe { std::env::remove_var("DEEPSEEK_FORCE_HTTP1") }, + } + } + } + + #[test] + fn force_http1_unset_is_false() { + let _lock = FORCE_HTTP1_ENV_LOCK.lock().unwrap(); + let _guard = ForceHttp1EnvGuard::capture(); + unsafe { std::env::remove_var("DEEPSEEK_FORCE_HTTP1") }; + assert!(!force_http1_from_env()); + } + + #[test] + fn force_http1_truthy_values() { + let _lock = FORCE_HTTP1_ENV_LOCK.lock().unwrap(); + let _guard = ForceHttp1EnvGuard::capture(); + for value in ["1", "true", "True", "YES", "on", " 1 "] { + // Safety: serialized by FORCE_HTTP1_ENV_LOCK; reverted by guard. + unsafe { std::env::set_var("DEEPSEEK_FORCE_HTTP1", value) }; + assert!( + force_http1_from_env(), + "{value:?} should be parsed as truthy", + ); + } + } + + #[test] + fn force_http1_falsy_values() { + let _lock = FORCE_HTTP1_ENV_LOCK.lock().unwrap(); + let _guard = ForceHttp1EnvGuard::capture(); + for value in ["0", "false", "no", "off", "", "garbage", "2"] { + unsafe { std::env::set_var("DEEPSEEK_FORCE_HTTP1", value) }; + assert!( + !force_http1_from_env(), + "{value:?} should NOT be parsed as truthy", + ); + } + } } diff --git a/crates/tui/src/client/chat.rs b/crates/tui/src/client/chat.rs index 73543632..df0216a1 100644 --- a/crates/tui/src/client/chat.rs +++ b/crates/tui/src/client/chat.rs @@ -161,6 +161,12 @@ impl DeepSeekClient { } let model = request.model.clone(); + + // Capture transport-shape headers before we consume `response` into + // `bytes_stream()`. They are surfaced in the decode-error log path so + // we can tell HTTP/2 RST_STREAM from chunked-encoding corruption from + // gzip-compressor failure when investigating #103. + let response_headers = format_stream_headers(response.headers()); let byte_stream = response.bytes_stream(); let stream = async_stream::stream! { @@ -233,10 +239,11 @@ impl DeepSeekClient { } crate::logging::warn(format!( "Stream read error: {error_chain} \ - (elapsed: {}ms, bytes_received: {}, ms_since_last_event: {})", + (elapsed: {}ms, bytes_received: {}, ms_since_last_event: {}, headers: {})", stream_start.elapsed().as_millis(), bytes_received, last_event_at.elapsed().as_millis(), + response_headers, )); yield Err(anyhow::anyhow!("Stream read error: {e}")); break; @@ -756,6 +763,27 @@ pub(super) fn count_reasoning_replay_chars(body: &Value) -> u64 { .sum() } +/// Render the transport-shape headers we care about for #103 diagnostics. +/// Always returns SOMETHING printable so the decode-error log line is parseable +/// even when the server stripped a header we expected. +fn format_stream_headers(headers: &reqwest::header::HeaderMap) -> String { + const FIELDS: &[&str] = &[ + "content-encoding", + "transfer-encoding", + "connection", + "server", + ]; + let mut parts: Vec = Vec::with_capacity(FIELDS.len()); + for field in FIELDS { + let rendered = headers + .get(*field) + .and_then(|v| v.to_str().ok()) + .unwrap_or("(absent)"); + parts.push(format!("{field}={rendered}")); + } + parts.join(", ") +} + /// Diagnostic logger fired when DeepSeek rejects the request despite the /// sanitizer. Walks the body and logs which assistant messages have tool_calls /// but no `reasoning_content` — useful to track down a code path that bypasses @@ -1246,3 +1274,201 @@ pub(super) fn parse_sse_chunk( events } + +// === #103 Phase 1: stream-decode diagnostics =================================== + +#[cfg(test)] +mod stream_diagnostics_tests { + use super::*; + use reqwest::header::{HeaderMap, HeaderValue}; + + #[test] + fn format_stream_headers_renders_all_fields_when_present() { + let mut headers = HeaderMap::new(); + headers.insert("content-encoding", HeaderValue::from_static("gzip")); + headers.insert("transfer-encoding", HeaderValue::from_static("chunked")); + headers.insert("connection", HeaderValue::from_static("keep-alive")); + headers.insert("server", HeaderValue::from_static("openresty/1.25.3.1")); + + let rendered = format_stream_headers(&headers); + // Order is fixed by FIELDS in the helper; assert each field appears. + assert!( + rendered.contains("content-encoding=gzip"), + "got: {rendered}" + ); + assert!( + rendered.contains("transfer-encoding=chunked"), + "got: {rendered}" + ); + assert!( + rendered.contains("connection=keep-alive"), + "got: {rendered}" + ); + assert!( + rendered.contains("server=openresty/1.25.3.1"), + "got: {rendered}" + ); + } + + #[test] + fn format_stream_headers_marks_missing_fields_as_absent() { + // DeepSeek frequently omits content-encoding when not compressing. + // The diagnostic must still produce a parseable line so log scrapers + // don't lose the slot. + let headers = HeaderMap::new(); + let rendered = format_stream_headers(&headers); + assert!( + rendered.contains("content-encoding=(absent)"), + "missing field must be explicitly marked; got: {rendered}" + ); + assert!( + rendered.contains("transfer-encoding=(absent)"), + "missing field must be explicitly marked; got: {rendered}" + ); + } + + #[test] + fn format_stream_headers_handles_non_ascii_value_gracefully() { + // If a header value isn't UTF-8, `.to_str()` fails — we must not panic + // and should still produce a parseable line. + let mut headers = HeaderMap::new(); + // 0xFF is a valid byte but invalid UTF-8 start byte. + headers.insert( + "server", + HeaderValue::from_bytes(b"\xff\xfemystery").expect("header value"), + ); + let rendered = format_stream_headers(&headers); + assert!( + rendered.contains("server=(absent)"), + "non-UTF8 header values fall back to (absent); got: {rendered}" + ); + } +} + +// === #103 Phase 4: SSE decoder behavior on canned chunk sequences ============ + +#[cfg(test)] +mod stream_decoder_tests { + //! Drive `parse_sse_chunk` (the in-place SSE event extractor) over canned + //! chunk sequences. The full `handle_chat_completion_stream` path needs a + //! live `reqwest::Response` so it isn't unit-testable without a mock HTTP + //! harness (issue #69 tracks that). For #103 we exercise the chunk decoder + //! directly to verify each "class of stream failure" the engine relies on. + use super::*; + use crate::models::{ContentBlockStart, Delta, StreamEvent}; + + /// Decode a raw SSE-data JSON chunk into our internal events, mirroring + /// the per-event call shape used by `handle_chat_completion_stream`. + fn decode_chunk(json_text: &str) -> Vec { + let chunk: Value = serde_json::from_str(json_text).expect("valid SSE JSON"); + let mut content_index = 0u32; + let mut text_started = false; + let mut thinking_started = false; + let mut tool_indices = std::collections::HashMap::new(); + parse_sse_chunk( + &chunk, + &mut content_index, + &mut text_started, + &mut thinking_started, + &mut tool_indices, + true, + ) + } + + #[test] + fn decoder_emits_text_delta_for_content_chunk() { + // The "happy" first chunk: a normal content delta. The engine treats + // this as `any_content_received = true` and would NOT transparently + // retry on a subsequent error. + let events = decode_chunk(r#"{"choices":[{"delta":{"content":"hello"}}]}"#); + assert!( + matches!( + events.first(), + Some(StreamEvent::ContentBlockStart { + content_block: ContentBlockStart::Text { .. }, + .. + }) + ), + "first event should open a text block; got {events:?}" + ); + assert!( + events + .iter() + .any(|e| matches!(e, StreamEvent::ContentBlockDelta { + delta: Delta::TextDelta { text }, + .. + } if text == "hello")), + "should yield a TextDelta carrying 'hello'; got {events:?}" + ); + } + + #[test] + fn decoder_emits_thinking_delta_for_reasoning_chunk() { + // V4 thinking models surface reasoning_content first — the engine + // also counts these as content received (so a subsequent stream error + // surfaces rather than retrying transparently). + let events = decode_chunk(r#"{"choices":[{"delta":{"reasoning_content":"plan..."}}]}"#); + assert!( + matches!( + events.first(), + Some(StreamEvent::ContentBlockStart { + content_block: ContentBlockStart::Thinking { .. }, + .. + }) + ), + "first event should open a thinking block; got {events:?}" + ); + assert!( + events + .iter() + .any(|e| matches!(e, StreamEvent::ContentBlockDelta { + delta: Delta::ThinkingDelta { thinking }, + .. + } if thinking == "plan...")), + "should yield a ThinkingDelta carrying 'plan...'; got {events:?}" + ); + } + + #[test] + fn decoder_yields_no_events_for_keepalive_chunk() { + // DeepSeek often sends `{"choices":[]}` keepalive chunks before + // emitting real content. The engine MUST treat a stream error after + // these as "no content received" and be eligible for transparent + // retry — assert here that the decoder yields no payload events. + let events = decode_chunk(r#"{"choices":[]}"#); + assert!( + events.is_empty(), + "empty-choices chunk must produce no events; got {events:?}" + ); + } + + #[test] + fn decoder_emits_tool_use_block_for_tool_call_delta() { + // Tool-call deltas are content too — once one arrives, transparent + // retry must be off (the model has committed to a tool invocation + // path that DeepSeek has billed for). + let events = decode_chunk( + r#"{"choices":[{"delta":{"tool_calls":[{"index":0,"id":"call_1","function":{"name":"grep_files","arguments":"{\"pattern\":\"foo\"}"}}]}}]}"#, + ); + assert!( + events.iter().any(|e| matches!( + e, + StreamEvent::ContentBlockStart { + content_block: ContentBlockStart::ToolUse { name, .. }, + .. + } if name == "grep_files" + )), + "should open a ToolUse block for grep_files; got {events:?}" + ); + assert!( + events.iter().any(|e| matches!( + e, + StreamEvent::ContentBlockDelta { + delta: Delta::InputJsonDelta { partial_json }, + .. + } if partial_json.contains("\"pattern\"") + )), + "should yield InputJsonDelta carrying the tool args; got {events:?}" + ); + } +} diff --git a/crates/tui/src/core/engine.rs b/crates/tui/src/core/engine.rs index 49d84a4a..b117f802 100644 --- a/crates/tui/src/core/engine.rs +++ b/crates/tui/src/core/engine.rs @@ -8,7 +8,6 @@ //! - Tool execution orchestration use std::path::PathBuf; -use std::pin::pin; use std::sync::{Arc, Mutex as StdMutex}; use std::time::{Duration, Instant}; use std::{fs::OpenOptions, io::Write}; @@ -294,6 +293,35 @@ const STREAM_MAX_CONTENT_BYTES: usize = 10 * 1024 * 1024; // 10 MB /// per-chunk idle of 300s with no wall-clock cap; we keep both layers but /// give the wall-clock a generous window so it never fires in practice. const STREAM_MAX_DURATION_SECS: u64 = 1800; // 30 minutes (was 300s; #103/#1) +/// Hard cap on consecutive recoverable stream errors before we surface a turn +/// failure. Bumped 3 → 5 in v0.6.7 along with the HTTP/2 keepalive defaults +/// (#103) — keepalive should make spurious decode errors rarer, so we can +/// tolerate a longer streak before giving up on the turn. +const MAX_STREAM_ERRORS_BEFORE_FAIL: u32 = 5; +/// Cap on transparent stream-level retries — these only happen when the wire +/// dies before any content was streamed, so DeepSeek hasn't billed us and +/// the user hasn't seen anything. Two attempts is enough to ride out a +/// flaky edge node without amplifying real outages (#103). +const MAX_TRANSPARENT_STREAM_RETRIES: u32 = 2; + +/// Decide whether a stream error is eligible for a transparent retry. +/// +/// True only when ALL three conditions hold: +/// 1. No content has been received on the current attempt — otherwise DeepSeek +/// has already billed us for output tokens and the user has seen partial +/// deltas; resending would double-bill and desync the UI. +/// 2. We still have transparent-retry budget remaining. +/// 3. The turn has not been cancelled. +/// +/// Extracted as a pure function so the four #103 retry cases can be exercised +/// in unit tests without booting the full engine state machine. +fn should_transparently_retry_stream( + any_content_received: bool, + transparent_attempts: u32, + cancelled: bool, +) -> bool { + !any_content_received && transparent_attempts < MAX_TRANSPARENT_STREAM_RETRIES && !cancelled +} /// Max output tokens requested for normal agent turns. Generous on purpose: /// V4 thinking models can produce tens of thousands of reasoning tokens on /// hard prompts before the visible reply, and DeepSeek V4 ships with a 1M @@ -2373,8 +2401,11 @@ impl Engine { top_p: None, }; - // Stream the response - let stream_result = client.create_message_stream(request).await; + // Stream the response. Keep the request around (cloned into the + // first call) so we can resend it on a transparent retry below + // when the wire dies before any content was streamed (#103). + let stream_request = request; + let stream_result = client.create_message_stream(stream_request.clone()).await; let stream = match stream_result { Ok(s) => { context_recovery_attempts = 0; @@ -2400,7 +2431,10 @@ impl Engine { return (TurnOutcomeStatus::Failed, turn_error); } }; - let mut stream = pin!(stream); + // The stream value is itself `Pin>`, which + // is `Unpin`, so we can rebind it on a transparent retry without + // breaking the existing pin invariants. + let mut stream = stream; // Track content blocks let mut content_blocks: Vec = Vec::new(); @@ -2420,8 +2454,18 @@ impl Engine { let mut pending_message_complete = false; let mut last_text_index: Option = None; let mut stream_errors = 0u32; + // #103 transparent retry bookkeeping. `any_content_received` flips + // on the first non-MessageStart event so we know whether DeepSeek + // billed us / the user has seen any output for this turn yet. + // This is distinct from the outer `stream_retry_attempts` (which + // restarts the whole turn-step when a stream died with no + // content-block delta delivered to the consumer). + let mut any_content_received = false; + let mut transparent_stream_retries = 0u32; let mut pending_steers: Vec = Vec::new(); - let stream_start = Instant::now(); + // `stream_start` is reset on a transparent retry so the wall-clock + // budget restarts with the fresh stream. + let mut stream_start = Instant::now(); let mut stream_content_bytes: usize = 0; let chunk_timeout = Duration::from_secs(STREAM_CHUNK_TIMEOUT_SECS); let max_duration = Duration::from_secs(STREAM_MAX_DURATION_SECS); @@ -2493,13 +2537,58 @@ impl Engine { } let event = match event_result { - Ok(e) => e, + Ok(e) => { + // Flip on the first non-MessageStart event — that's + // the moment we cross from "stream not yet productive" + // (eligible for transparent retry) into "DeepSeek has + // billed us / user has seen output" (must surface). + if !any_content_received && !matches!(e, StreamEvent::MessageStart { .. }) { + any_content_received = true; + } + e + } Err(e) => { stream_errors = stream_errors.saturating_add(1); let message = e.to_string(); + // #103: when the stream errors before any content was + // streamed AND we still have retry budget, transparently + // resend the request. DeepSeek has not billed for any + // output and the user has seen nothing — re-trying is + // the right user-visible behavior. + if should_transparently_retry_stream( + any_content_received, + transparent_stream_retries, + self.cancel_token.is_cancelled(), + ) { + transparent_stream_retries = + transparent_stream_retries.saturating_add(1); + crate::logging::info(format!( + "Transparent stream retry {}/{} (no content received yet): {}", + transparent_stream_retries, MAX_TRANSPARENT_STREAM_RETRIES, message, + )); + // Drop the failed stream before issuing the new + // request to release the underlying connection. + drop(stream); + match client.create_message_stream(stream_request.clone()).await { + Ok(fresh) => { + stream = fresh; + stream_start = Instant::now(); + // Roll back the error counter — this one + // didn't surface to the user. + stream_errors = stream_errors.saturating_sub(1); + continue; + } + Err(retry_err) => { + let retry_msg = format!("Stream retry failed: {retry_err}"); + turn_error.get_or_insert(retry_msg.clone()); + let _ = self.tx_event.send(Event::error(retry_msg, true)).await; + break; + } + } + } turn_error.get_or_insert(message.clone()); let _ = self.tx_event.send(Event::error(message, true)).await; - if stream_errors >= 3 { + if stream_errors >= MAX_STREAM_ERRORS_BEFORE_FAIL { break; } continue; diff --git a/crates/tui/src/core/engine/tests.rs b/crates/tui/src/core/engine/tests.rs index 3f030d97..6ebde7e1 100644 --- a/crates/tui/src/core/engine/tests.rs +++ b/crates/tui/src/core/engine/tests.rs @@ -939,3 +939,103 @@ fn final_tool_input_falls_back_to_initial_when_buffer_unparseable() { let state = tool_state(json!({"command": "echo hi"}), "{not json"); assert_eq!(final_tool_input(&state), json!({"command": "echo hi"})); } + +// === #103 transparent stream-retry policy ===================================== + +#[test] +fn stream_retry_zero_content_then_error_is_transparently_retried() { + // Case 2 from issue #103: stream yielded ZERO content then errored. + // The decoder hit Err on the very first poll → engine should retry + // because DeepSeek hasn't billed and the user has seen nothing. + assert!( + super::should_transparently_retry_stream(false, 0, false), + "first attempt with no content must be eligible for transparent retry" + ); + assert!( + super::should_transparently_retry_stream(false, 1, false), + "second attempt (one prior retry) with no content must still be eligible" + ); +} + +#[test] +fn stream_retry_after_content_received_surfaces_error() { + // Case 3 from issue #103: stream yielded content then errored. We must + // NOT transparently retry — the model has emitted billed output tokens + // and the UI has streamed deltas; resending would double-bill and the + // user would see the same prefix twice. + assert!( + !super::should_transparently_retry_stream(true, 0, false), + "any content received → no transparent retry, even with full budget" + ); + assert!( + !super::should_transparently_retry_stream(true, 1, false), + "any content received → no transparent retry on subsequent attempts" + ); +} + +#[test] +fn stream_retry_budget_caps_transparent_retries_at_two() { + // Case 4 from issue #103: after MAX_TRANSPARENT_STREAM_RETRIES attempts + // we stop trying transparently and let the outer error path surface. + // (The outer per-turn `stream_retry_attempts` retry is a separate layer + // and is still in effect at the whole-turn level.) + assert!( + super::should_transparently_retry_stream( + false, + super::MAX_TRANSPARENT_STREAM_RETRIES - 1, + false, + ), + "one short of the cap should still retry" + ); + assert!( + !super::should_transparently_retry_stream( + false, + super::MAX_TRANSPARENT_STREAM_RETRIES, + false, + ), + "at the cap, no further transparent retries" + ); + assert!( + !super::should_transparently_retry_stream( + false, + super::MAX_TRANSPARENT_STREAM_RETRIES + 5, + false, + ), + "well past the cap, definitely no transparent retries" + ); +} + +#[test] +fn stream_retry_respects_cancellation() { + // Cancellation overrides every other condition. If the user pressed + // Esc / Ctrl-C, do not silently re-issue the request behind their back. + assert!( + !super::should_transparently_retry_stream(false, 0, true), + "cancelled turn must not be transparently retried" + ); + assert!( + !super::should_transparently_retry_stream(false, 1, true), + "cancelled turn must not be transparently retried even with budget" + ); +} + +#[test] +fn stream_retry_threshold_relaxed_to_five() { + // Case 1+4 from issue #103: the consecutive-error threshold for marking + // the turn failed was relaxed from 3 → 5 in v0.6.7 because the new + // HTTP/2 keepalive defaults make spurious decode errors rarer. + // This test pins the constant so a future regression to 3 fails loudly. + assert_eq!( + super::MAX_STREAM_ERRORS_BEFORE_FAIL, + 5, + "the consecutive-stream-error threshold should be 5; \ + lowering it back to 3 will fail mid-turn under transient flakiness" + ); + // And a regression guard on the transparent-retry cap. + assert_eq!( + super::MAX_TRANSPARENT_STREAM_RETRIES, + 2, + "transparent-retry cap should be 2; raising it risks hammering the \ + provider on real outages" + ); +}