fix(client): #103 stream-error diagnostics + transparent retry on early decode failure

Phase 1: log full reqwest error chain + headers + bytes-received at decode site
Phase 2: HTTP/2 keepalive settings + tcp keepalive on the reqwest builder
Phase 3: engine transparently retries when stream errors before any content;
         surface error on mid-stream failure (no double-bill); stream_errors
         threshold relaxed 3 -> 5 with the new keepalive
Phase 4: unit tests for the four classes of stream failure

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hunter Bown
2026-04-27 21:57:13 -05:00
parent 4ac7219d77
commit 36320c5bea
4 changed files with 506 additions and 12 deletions
+83 -4
View File
@@ -401,6 +401,17 @@ pub(super) fn api_url(base_url: &str, path: &str) -> String {
// === DeepSeekClient ===
/// Returns true when DEEPSEEK_FORCE_HTTP1 is set to a truthy value
/// (`1`, `true`, `yes`, `on`, case-insensitive). Used by `build_http_client`
/// to opt out of HTTP/2 entirely when DeepSeek's edge mishandles long-lived H2
/// streams (#103). Anything else (unset, `0`, `false`, ...) leaves HTTP/2 on.
fn force_http1_from_env() -> bool {
std::env::var("DEEPSEEK_FORCE_HTTP1")
.ok()
.map(|v| v.trim().to_ascii_lowercase())
.is_some_and(|v| matches!(v.as_str(), "1" | "true" | "yes" | "on"))
}
impl DeepSeekClient {
/// Create a DeepSeek client from CLI configuration.
pub fn new(config: &Config) -> Result<Self> {
@@ -441,7 +452,7 @@ impl DeepSeekClient {
AUTHORIZATION,
HeaderValue::from_str(&format!("Bearer {api_key}"))?,
);
reqwest::Client::builder()
let mut builder = reqwest::Client::builder()
.default_headers(headers)
.connect_timeout(Duration::from_secs(30))
// The blanket 300s request timeout was incompatible with V4-pro
@@ -451,9 +462,16 @@ impl DeepSeekClient {
.tcp_keepalive(Some(Duration::from_secs(30)))
.http2_keep_alive_interval(Some(Duration::from_secs(15)))
.http2_keep_alive_timeout(Duration::from_secs(20))
.min_tls_version(reqwest::tls::Version::TLS_1_2)
.build()
.map_err(Into::into)
.min_tls_version(reqwest::tls::Version::TLS_1_2);
// Escape hatch (#103): some DeepSeek edge nodes mishandle long-lived
// HTTP/2 streams. Setting DEEPSEEK_FORCE_HTTP1=1 pins the client to
// HTTP/1.1 so users can experiment without us committing to that
// path as the default.
if force_http1_from_env() {
logging::info("DEEPSEEK_FORCE_HTTP1=1 — pinning HTTP client to HTTP/1.1");
builder = builder.http1_only();
}
builder.build().map_err(Into::into)
}
/// List available models from the provider.
@@ -2127,4 +2145,65 @@ mod tests {
now + RECOVERY_PROBE_COOLDOWN + Duration::from_millis(1)
));
}
// === #103 Phase 2: HTTP/1 escape hatch ===================================
/// Serialize tests that mutate `DEEPSEEK_FORCE_HTTP1` so they don't race
/// against each other — env vars are process-global.
static FORCE_HTTP1_ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
struct ForceHttp1EnvGuard {
prior: Option<std::ffi::OsString>,
}
impl ForceHttp1EnvGuard {
fn capture() -> Self {
Self {
prior: std::env::var_os("DEEPSEEK_FORCE_HTTP1"),
}
}
}
impl Drop for ForceHttp1EnvGuard {
fn drop(&mut self) {
// Safety: scoped to test process; reverts to the captured value.
match &self.prior {
Some(v) => unsafe { std::env::set_var("DEEPSEEK_FORCE_HTTP1", v) },
None => unsafe { std::env::remove_var("DEEPSEEK_FORCE_HTTP1") },
}
}
}
#[test]
fn force_http1_unset_is_false() {
let _lock = FORCE_HTTP1_ENV_LOCK.lock().unwrap();
let _guard = ForceHttp1EnvGuard::capture();
unsafe { std::env::remove_var("DEEPSEEK_FORCE_HTTP1") };
assert!(!force_http1_from_env());
}
#[test]
fn force_http1_truthy_values() {
let _lock = FORCE_HTTP1_ENV_LOCK.lock().unwrap();
let _guard = ForceHttp1EnvGuard::capture();
for value in ["1", "true", "True", "YES", "on", " 1 "] {
// Safety: serialized by FORCE_HTTP1_ENV_LOCK; reverted by guard.
unsafe { std::env::set_var("DEEPSEEK_FORCE_HTTP1", value) };
assert!(
force_http1_from_env(),
"{value:?} should be parsed as truthy",
);
}
}
#[test]
fn force_http1_falsy_values() {
let _lock = FORCE_HTTP1_ENV_LOCK.lock().unwrap();
let _guard = ForceHttp1EnvGuard::capture();
for value in ["0", "false", "no", "off", "", "garbage", "2"] {
unsafe { std::env::set_var("DEEPSEEK_FORCE_HTTP1", value) };
assert!(
!force_http1_from_env(),
"{value:?} should NOT be parsed as truthy",
);
}
}
}
+227 -1
View File
@@ -161,6 +161,12 @@ impl DeepSeekClient {
}
let model = request.model.clone();
// Capture transport-shape headers before we consume `response` into
// `bytes_stream()`. They are surfaced in the decode-error log path so
// we can tell HTTP/2 RST_STREAM from chunked-encoding corruption from
// gzip-compressor failure when investigating #103.
let response_headers = format_stream_headers(response.headers());
let byte_stream = response.bytes_stream();
let stream = async_stream::stream! {
@@ -233,10 +239,11 @@ impl DeepSeekClient {
}
crate::logging::warn(format!(
"Stream read error: {error_chain} \
(elapsed: {}ms, bytes_received: {}, ms_since_last_event: {})",
(elapsed: {}ms, bytes_received: {}, ms_since_last_event: {}, headers: {})",
stream_start.elapsed().as_millis(),
bytes_received,
last_event_at.elapsed().as_millis(),
response_headers,
));
yield Err(anyhow::anyhow!("Stream read error: {e}"));
break;
@@ -756,6 +763,27 @@ pub(super) fn count_reasoning_replay_chars(body: &Value) -> u64 {
.sum()
}
/// Render the transport-shape headers we care about for #103 diagnostics.
/// Always returns SOMETHING printable so the decode-error log line is parseable
/// even when the server stripped a header we expected.
fn format_stream_headers(headers: &reqwest::header::HeaderMap) -> String {
const FIELDS: &[&str] = &[
"content-encoding",
"transfer-encoding",
"connection",
"server",
];
let mut parts: Vec<String> = Vec::with_capacity(FIELDS.len());
for field in FIELDS {
let rendered = headers
.get(*field)
.and_then(|v| v.to_str().ok())
.unwrap_or("(absent)");
parts.push(format!("{field}={rendered}"));
}
parts.join(", ")
}
/// Diagnostic logger fired when DeepSeek rejects the request despite the
/// sanitizer. Walks the body and logs which assistant messages have tool_calls
/// but no `reasoning_content` — useful to track down a code path that bypasses
@@ -1246,3 +1274,201 @@ pub(super) fn parse_sse_chunk(
events
}
// === #103 Phase 1: stream-decode diagnostics ===================================
#[cfg(test)]
mod stream_diagnostics_tests {
use super::*;
use reqwest::header::{HeaderMap, HeaderValue};
#[test]
fn format_stream_headers_renders_all_fields_when_present() {
let mut headers = HeaderMap::new();
headers.insert("content-encoding", HeaderValue::from_static("gzip"));
headers.insert("transfer-encoding", HeaderValue::from_static("chunked"));
headers.insert("connection", HeaderValue::from_static("keep-alive"));
headers.insert("server", HeaderValue::from_static("openresty/1.25.3.1"));
let rendered = format_stream_headers(&headers);
// Order is fixed by FIELDS in the helper; assert each field appears.
assert!(
rendered.contains("content-encoding=gzip"),
"got: {rendered}"
);
assert!(
rendered.contains("transfer-encoding=chunked"),
"got: {rendered}"
);
assert!(
rendered.contains("connection=keep-alive"),
"got: {rendered}"
);
assert!(
rendered.contains("server=openresty/1.25.3.1"),
"got: {rendered}"
);
}
#[test]
fn format_stream_headers_marks_missing_fields_as_absent() {
// DeepSeek frequently omits content-encoding when not compressing.
// The diagnostic must still produce a parseable line so log scrapers
// don't lose the slot.
let headers = HeaderMap::new();
let rendered = format_stream_headers(&headers);
assert!(
rendered.contains("content-encoding=(absent)"),
"missing field must be explicitly marked; got: {rendered}"
);
assert!(
rendered.contains("transfer-encoding=(absent)"),
"missing field must be explicitly marked; got: {rendered}"
);
}
#[test]
fn format_stream_headers_handles_non_ascii_value_gracefully() {
// If a header value isn't UTF-8, `.to_str()` fails — we must not panic
// and should still produce a parseable line.
let mut headers = HeaderMap::new();
// 0xFF is a valid byte but invalid UTF-8 start byte.
headers.insert(
"server",
HeaderValue::from_bytes(b"\xff\xfemystery").expect("header value"),
);
let rendered = format_stream_headers(&headers);
assert!(
rendered.contains("server=(absent)"),
"non-UTF8 header values fall back to (absent); got: {rendered}"
);
}
}
// === #103 Phase 4: SSE decoder behavior on canned chunk sequences ============
#[cfg(test)]
mod stream_decoder_tests {
//! Drive `parse_sse_chunk` (the in-place SSE event extractor) over canned
//! chunk sequences. The full `handle_chat_completion_stream` path needs a
//! live `reqwest::Response` so it isn't unit-testable without a mock HTTP
//! harness (issue #69 tracks that). For #103 we exercise the chunk decoder
//! directly to verify each "class of stream failure" the engine relies on.
use super::*;
use crate::models::{ContentBlockStart, Delta, StreamEvent};
/// Decode a raw SSE-data JSON chunk into our internal events, mirroring
/// the per-event call shape used by `handle_chat_completion_stream`.
fn decode_chunk(json_text: &str) -> Vec<StreamEvent> {
let chunk: Value = serde_json::from_str(json_text).expect("valid SSE JSON");
let mut content_index = 0u32;
let mut text_started = false;
let mut thinking_started = false;
let mut tool_indices = std::collections::HashMap::new();
parse_sse_chunk(
&chunk,
&mut content_index,
&mut text_started,
&mut thinking_started,
&mut tool_indices,
true,
)
}
#[test]
fn decoder_emits_text_delta_for_content_chunk() {
// The "happy" first chunk: a normal content delta. The engine treats
// this as `any_content_received = true` and would NOT transparently
// retry on a subsequent error.
let events = decode_chunk(r#"{"choices":[{"delta":{"content":"hello"}}]}"#);
assert!(
matches!(
events.first(),
Some(StreamEvent::ContentBlockStart {
content_block: ContentBlockStart::Text { .. },
..
})
),
"first event should open a text block; got {events:?}"
);
assert!(
events
.iter()
.any(|e| matches!(e, StreamEvent::ContentBlockDelta {
delta: Delta::TextDelta { text },
..
} if text == "hello")),
"should yield a TextDelta carrying 'hello'; got {events:?}"
);
}
#[test]
fn decoder_emits_thinking_delta_for_reasoning_chunk() {
// V4 thinking models surface reasoning_content first — the engine
// also counts these as content received (so a subsequent stream error
// surfaces rather than retrying transparently).
let events = decode_chunk(r#"{"choices":[{"delta":{"reasoning_content":"plan..."}}]}"#);
assert!(
matches!(
events.first(),
Some(StreamEvent::ContentBlockStart {
content_block: ContentBlockStart::Thinking { .. },
..
})
),
"first event should open a thinking block; got {events:?}"
);
assert!(
events
.iter()
.any(|e| matches!(e, StreamEvent::ContentBlockDelta {
delta: Delta::ThinkingDelta { thinking },
..
} if thinking == "plan...")),
"should yield a ThinkingDelta carrying 'plan...'; got {events:?}"
);
}
#[test]
fn decoder_yields_no_events_for_keepalive_chunk() {
// DeepSeek often sends `{"choices":[]}` keepalive chunks before
// emitting real content. The engine MUST treat a stream error after
// these as "no content received" and be eligible for transparent
// retry — assert here that the decoder yields no payload events.
let events = decode_chunk(r#"{"choices":[]}"#);
assert!(
events.is_empty(),
"empty-choices chunk must produce no events; got {events:?}"
);
}
#[test]
fn decoder_emits_tool_use_block_for_tool_call_delta() {
// Tool-call deltas are content too — once one arrives, transparent
// retry must be off (the model has committed to a tool invocation
// path that DeepSeek has billed for).
let events = decode_chunk(
r#"{"choices":[{"delta":{"tool_calls":[{"index":0,"id":"call_1","function":{"name":"grep_files","arguments":"{\"pattern\":\"foo\"}"}}]}}]}"#,
);
assert!(
events.iter().any(|e| matches!(
e,
StreamEvent::ContentBlockStart {
content_block: ContentBlockStart::ToolUse { name, .. },
..
} if name == "grep_files"
)),
"should open a ToolUse block for grep_files; got {events:?}"
);
assert!(
events.iter().any(|e| matches!(
e,
StreamEvent::ContentBlockDelta {
delta: Delta::InputJsonDelta { partial_json },
..
} if partial_json.contains("\"pattern\"")
)),
"should yield InputJsonDelta carrying the tool args; got {events:?}"
);
}
}
+96 -7
View File
@@ -8,7 +8,6 @@
//! - Tool execution orchestration
use std::path::PathBuf;
use std::pin::pin;
use std::sync::{Arc, Mutex as StdMutex};
use std::time::{Duration, Instant};
use std::{fs::OpenOptions, io::Write};
@@ -294,6 +293,35 @@ const STREAM_MAX_CONTENT_BYTES: usize = 10 * 1024 * 1024; // 10 MB
/// per-chunk idle of 300s with no wall-clock cap; we keep both layers but
/// give the wall-clock a generous window so it never fires in practice.
const STREAM_MAX_DURATION_SECS: u64 = 1800; // 30 minutes (was 300s; #103/#1)
/// Hard cap on consecutive recoverable stream errors before we surface a turn
/// failure. Bumped 3 → 5 in v0.6.7 along with the HTTP/2 keepalive defaults
/// (#103) — keepalive should make spurious decode errors rarer, so we can
/// tolerate a longer streak before giving up on the turn.
const MAX_STREAM_ERRORS_BEFORE_FAIL: u32 = 5;
/// Cap on transparent stream-level retries — these only happen when the wire
/// dies before any content was streamed, so DeepSeek hasn't billed us and
/// the user hasn't seen anything. Two attempts is enough to ride out a
/// flaky edge node without amplifying real outages (#103).
const MAX_TRANSPARENT_STREAM_RETRIES: u32 = 2;
/// Decide whether a stream error is eligible for a transparent retry.
///
/// True only when ALL three conditions hold:
/// 1. No content has been received on the current attempt — otherwise DeepSeek
/// has already billed us for output tokens and the user has seen partial
/// deltas; resending would double-bill and desync the UI.
/// 2. We still have transparent-retry budget remaining.
/// 3. The turn has not been cancelled.
///
/// Extracted as a pure function so the four #103 retry cases can be exercised
/// in unit tests without booting the full engine state machine.
fn should_transparently_retry_stream(
any_content_received: bool,
transparent_attempts: u32,
cancelled: bool,
) -> bool {
!any_content_received && transparent_attempts < MAX_TRANSPARENT_STREAM_RETRIES && !cancelled
}
/// Max output tokens requested for normal agent turns. Generous on purpose:
/// V4 thinking models can produce tens of thousands of reasoning tokens on
/// hard prompts before the visible reply, and DeepSeek V4 ships with a 1M
@@ -2373,8 +2401,11 @@ impl Engine {
top_p: None,
};
// Stream the response
let stream_result = client.create_message_stream(request).await;
// Stream the response. Keep the request around (cloned into the
// first call) so we can resend it on a transparent retry below
// when the wire dies before any content was streamed (#103).
let stream_request = request;
let stream_result = client.create_message_stream(stream_request.clone()).await;
let stream = match stream_result {
Ok(s) => {
context_recovery_attempts = 0;
@@ -2400,7 +2431,10 @@ impl Engine {
return (TurnOutcomeStatus::Failed, turn_error);
}
};
let mut stream = pin!(stream);
// The stream value is itself `Pin<Box<dyn Stream + Send>>`, which
// is `Unpin`, so we can rebind it on a transparent retry without
// breaking the existing pin invariants.
let mut stream = stream;
// Track content blocks
let mut content_blocks: Vec<ContentBlock> = Vec::new();
@@ -2420,8 +2454,18 @@ impl Engine {
let mut pending_message_complete = false;
let mut last_text_index: Option<usize> = None;
let mut stream_errors = 0u32;
// #103 transparent retry bookkeeping. `any_content_received` flips
// on the first non-MessageStart event so we know whether DeepSeek
// billed us / the user has seen any output for this turn yet.
// This is distinct from the outer `stream_retry_attempts` (which
// restarts the whole turn-step when a stream died with no
// content-block delta delivered to the consumer).
let mut any_content_received = false;
let mut transparent_stream_retries = 0u32;
let mut pending_steers: Vec<String> = Vec::new();
let stream_start = Instant::now();
// `stream_start` is reset on a transparent retry so the wall-clock
// budget restarts with the fresh stream.
let mut stream_start = Instant::now();
let mut stream_content_bytes: usize = 0;
let chunk_timeout = Duration::from_secs(STREAM_CHUNK_TIMEOUT_SECS);
let max_duration = Duration::from_secs(STREAM_MAX_DURATION_SECS);
@@ -2493,13 +2537,58 @@ impl Engine {
}
let event = match event_result {
Ok(e) => e,
Ok(e) => {
// Flip on the first non-MessageStart event — that's
// the moment we cross from "stream not yet productive"
// (eligible for transparent retry) into "DeepSeek has
// billed us / user has seen output" (must surface).
if !any_content_received && !matches!(e, StreamEvent::MessageStart { .. }) {
any_content_received = true;
}
e
}
Err(e) => {
stream_errors = stream_errors.saturating_add(1);
let message = e.to_string();
// #103: when the stream errors before any content was
// streamed AND we still have retry budget, transparently
// resend the request. DeepSeek has not billed for any
// output and the user has seen nothing — re-trying is
// the right user-visible behavior.
if should_transparently_retry_stream(
any_content_received,
transparent_stream_retries,
self.cancel_token.is_cancelled(),
) {
transparent_stream_retries =
transparent_stream_retries.saturating_add(1);
crate::logging::info(format!(
"Transparent stream retry {}/{} (no content received yet): {}",
transparent_stream_retries, MAX_TRANSPARENT_STREAM_RETRIES, message,
));
// Drop the failed stream before issuing the new
// request to release the underlying connection.
drop(stream);
match client.create_message_stream(stream_request.clone()).await {
Ok(fresh) => {
stream = fresh;
stream_start = Instant::now();
// Roll back the error counter — this one
// didn't surface to the user.
stream_errors = stream_errors.saturating_sub(1);
continue;
}
Err(retry_err) => {
let retry_msg = format!("Stream retry failed: {retry_err}");
turn_error.get_or_insert(retry_msg.clone());
let _ = self.tx_event.send(Event::error(retry_msg, true)).await;
break;
}
}
}
turn_error.get_or_insert(message.clone());
let _ = self.tx_event.send(Event::error(message, true)).await;
if stream_errors >= 3 {
if stream_errors >= MAX_STREAM_ERRORS_BEFORE_FAIL {
break;
}
continue;
+100
View File
@@ -939,3 +939,103 @@ fn final_tool_input_falls_back_to_initial_when_buffer_unparseable() {
let state = tool_state(json!({"command": "echo hi"}), "{not json");
assert_eq!(final_tool_input(&state), json!({"command": "echo hi"}));
}
// === #103 transparent stream-retry policy =====================================
#[test]
fn stream_retry_zero_content_then_error_is_transparently_retried() {
// Case 2 from issue #103: stream yielded ZERO content then errored.
// The decoder hit Err on the very first poll → engine should retry
// because DeepSeek hasn't billed and the user has seen nothing.
assert!(
super::should_transparently_retry_stream(false, 0, false),
"first attempt with no content must be eligible for transparent retry"
);
assert!(
super::should_transparently_retry_stream(false, 1, false),
"second attempt (one prior retry) with no content must still be eligible"
);
}
#[test]
fn stream_retry_after_content_received_surfaces_error() {
// Case 3 from issue #103: stream yielded content then errored. We must
// NOT transparently retry — the model has emitted billed output tokens
// and the UI has streamed deltas; resending would double-bill and the
// user would see the same prefix twice.
assert!(
!super::should_transparently_retry_stream(true, 0, false),
"any content received → no transparent retry, even with full budget"
);
assert!(
!super::should_transparently_retry_stream(true, 1, false),
"any content received → no transparent retry on subsequent attempts"
);
}
#[test]
fn stream_retry_budget_caps_transparent_retries_at_two() {
// Case 4 from issue #103: after MAX_TRANSPARENT_STREAM_RETRIES attempts
// we stop trying transparently and let the outer error path surface.
// (The outer per-turn `stream_retry_attempts` retry is a separate layer
// and is still in effect at the whole-turn level.)
assert!(
super::should_transparently_retry_stream(
false,
super::MAX_TRANSPARENT_STREAM_RETRIES - 1,
false,
),
"one short of the cap should still retry"
);
assert!(
!super::should_transparently_retry_stream(
false,
super::MAX_TRANSPARENT_STREAM_RETRIES,
false,
),
"at the cap, no further transparent retries"
);
assert!(
!super::should_transparently_retry_stream(
false,
super::MAX_TRANSPARENT_STREAM_RETRIES + 5,
false,
),
"well past the cap, definitely no transparent retries"
);
}
#[test]
fn stream_retry_respects_cancellation() {
// Cancellation overrides every other condition. If the user pressed
// Esc / Ctrl-C, do not silently re-issue the request behind their back.
assert!(
!super::should_transparently_retry_stream(false, 0, true),
"cancelled turn must not be transparently retried"
);
assert!(
!super::should_transparently_retry_stream(false, 1, true),
"cancelled turn must not be transparently retried even with budget"
);
}
#[test]
fn stream_retry_threshold_relaxed_to_five() {
// Case 1+4 from issue #103: the consecutive-error threshold for marking
// the turn failed was relaxed from 3 → 5 in v0.6.7 because the new
// HTTP/2 keepalive defaults make spurious decode errors rarer.
// This test pins the constant so a future regression to 3 fails loudly.
assert_eq!(
super::MAX_STREAM_ERRORS_BEFORE_FAIL,
5,
"the consecutive-stream-error threshold should be 5; \
lowering it back to 3 will fail mid-turn under transient flakiness"
);
// And a regression guard on the transparent-retry cap.
assert_eq!(
super::MAX_TRANSPARENT_STREAM_RETRIES,
2,
"transparent-retry cap should be 2; raising it risks hammering the \
provider on real outages"
);
}