From bbdfb26f3c4b7e1cf77ae3f2fccb26b3da36da41 Mon Sep 17 00:00:00 2001 From: Hunter Bown Date: Sun, 26 Apr 2026 17:19:42 -0500 Subject: [PATCH] fix(client): TCP/HTTP2 keepalives + stream-error diagnostics (#103 Phase 1+2) Two fixes for the persistent "Stream read error: error decoding response body" we saw mid-turn during long V4-pro thinking sessions. 1) HTTP transport tuning (`crates/tui/src/client.rs`): - Drop the blanket 300s request timeout. Long V4 thinking turns legitimately exceed the wall-clock window; per-chunk and per-stream guards in `engine.rs` already bound how long we wait without progress. - Add `tcp_keepalive(30s)` so dead-peer detection happens at the TCP layer instead of waiting for the application to notice. - Add `http2_keep_alive_interval(15s)` + `http2_keep_alive_timeout(20s)` so HTTP/2 connections to DeepSeek's edge don't go silent and get killed by an upstream proxy mid-thinking. 2) Stream-error diagnostics (`crates/tui/src/client/chat.rs`): - Walk reqwest's `std::error::Error::source()` chain when a chunk read errors, so the underlying hyper / h2 / io error is logged. Without this the outer "error decoding response body" message tells us nothing about WHY the stream died. - Track elapsed wall time, bytes received so far, and ms since the last successful event; log them alongside the error chain. Lets us tell HTTP/2 RST_STREAM mid-idle from chunk-decode-failure on a short stream from gzip-corruption mid-burst. Phase 3 (transparent retry with `prefix` continuation) is intentionally NOT in this PR. The retry-flag plumbing on MessageRequest + chat.rs prefix wire format + engine.rs retry loop is a meaningful surface that deserves its own review pass; this PR ships the diagnostic-and-resilience floor so we can land the harder retry work knowing the underlying network state is better. Refs #103. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/tui/src/client.rs | 8 +++++++- crates/tui/src/client/chat.rs | 29 +++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/crates/tui/src/client.rs b/crates/tui/src/client.rs index 3743e776..e17010bf 100644 --- a/crates/tui/src/client.rs +++ b/crates/tui/src/client.rs @@ -444,7 +444,13 @@ impl DeepSeekClient { reqwest::Client::builder() .default_headers(headers) .connect_timeout(Duration::from_secs(30)) - .timeout(Duration::from_secs(300)) + // The blanket 300s request timeout was incompatible with V4-pro + // thinking turns that legitimately exceed that wall-clock window + // (see #103). Drop it; per-chunk and per-stream guards in + // engine.rs already bound how long we'll wait without progress. + .tcp_keepalive(Some(Duration::from_secs(30))) + .http2_keep_alive_interval(Some(Duration::from_secs(15))) + .http2_keep_alive_timeout(Duration::from_secs(20)) .min_tls_version(reqwest::tls::Version::TLS_1_2) .build() .map_err(Into::into) diff --git a/crates/tui/src/client/chat.rs b/crates/tui/src/client/chat.rs index a2b5cb18..73543632 100644 --- a/crates/tui/src/client/chat.rs +++ b/crates/tui/src/client/chat.rs @@ -196,6 +196,15 @@ impl DeepSeekClient { let mut byte_stream = std::pin::pin!(byte_stream); let idle = stream_idle_timeout(); + // Telemetry for #103 stream-decode diagnostics: bytes received + // since the start of this stream and last successful event time. + // Surfaces in the error log when reqwest yields a chunk error so + // we can tell HTTP/2 RST_STREAM from chunk-decode-failure from + // gzip-corruption when investigating a flaky session. + let stream_start = std::time::Instant::now(); + let mut last_event_at = std::time::Instant::now(); + let mut bytes_received: usize = 0; + loop { let chunk_result = match tokio_timeout(idle, byte_stream.next()).await { Ok(Some(result)) => result, @@ -211,11 +220,31 @@ impl DeepSeekClient { let chunk = match chunk_result { Ok(bytes) => bytes, Err(e) => { + // Walk the error source chain so reqwest's underlying + // hyper / h2 / io error is visible — without this the + // outer "error decoding response body" message tells + // us nothing about WHY the stream died. + let mut error_chain = format!("{e}"); + let mut current: Option<&(dyn std::error::Error + 'static)> = + std::error::Error::source(&e); + while let Some(source) = current { + error_chain.push_str(&format!(" -> {source}")); + current = std::error::Error::source(source); + } + crate::logging::warn(format!( + "Stream read error: {error_chain} \ + (elapsed: {}ms, bytes_received: {}, ms_since_last_event: {})", + stream_start.elapsed().as_millis(), + bytes_received, + last_event_at.elapsed().as_millis(), + )); yield Err(anyhow::anyhow!("Stream read error: {e}")); break; } }; + bytes_received = bytes_received.saturating_add(chunk.len()); + last_event_at = std::time::Instant::now(); byte_buf.extend_from_slice(&chunk); // Guard against unbounded buffer growth (e.g., malformed stream without newlines)