Merge branch 'fix/v067-stream-retry' (#103 stream-error retry + diagnostics)
This commit is contained in:
@@ -401,6 +401,17 @@ pub(super) fn api_url(base_url: &str, path: &str) -> String {
|
||||
|
||||
// === DeepSeekClient ===
|
||||
|
||||
/// Returns true when DEEPSEEK_FORCE_HTTP1 is set to a truthy value
|
||||
/// (`1`, `true`, `yes`, `on`, case-insensitive). Used by `build_http_client`
|
||||
/// to opt out of HTTP/2 entirely when DeepSeek's edge mishandles long-lived H2
|
||||
/// streams (#103). Anything else (unset, `0`, `false`, ...) leaves HTTP/2 on.
|
||||
fn force_http1_from_env() -> bool {
|
||||
std::env::var("DEEPSEEK_FORCE_HTTP1")
|
||||
.ok()
|
||||
.map(|v| v.trim().to_ascii_lowercase())
|
||||
.is_some_and(|v| matches!(v.as_str(), "1" | "true" | "yes" | "on"))
|
||||
}
|
||||
|
||||
impl DeepSeekClient {
|
||||
/// Create a DeepSeek client from CLI configuration.
|
||||
pub fn new(config: &Config) -> Result<Self> {
|
||||
@@ -441,7 +452,7 @@ impl DeepSeekClient {
|
||||
AUTHORIZATION,
|
||||
HeaderValue::from_str(&format!("Bearer {api_key}"))?,
|
||||
);
|
||||
reqwest::Client::builder()
|
||||
let mut builder = reqwest::Client::builder()
|
||||
.default_headers(headers)
|
||||
.connect_timeout(Duration::from_secs(30))
|
||||
// The blanket 300s request timeout was incompatible with V4-pro
|
||||
@@ -451,9 +462,16 @@ impl DeepSeekClient {
|
||||
.tcp_keepalive(Some(Duration::from_secs(30)))
|
||||
.http2_keep_alive_interval(Some(Duration::from_secs(15)))
|
||||
.http2_keep_alive_timeout(Duration::from_secs(20))
|
||||
.min_tls_version(reqwest::tls::Version::TLS_1_2)
|
||||
.build()
|
||||
.map_err(Into::into)
|
||||
.min_tls_version(reqwest::tls::Version::TLS_1_2);
|
||||
// Escape hatch (#103): some DeepSeek edge nodes mishandle long-lived
|
||||
// HTTP/2 streams. Setting DEEPSEEK_FORCE_HTTP1=1 pins the client to
|
||||
// HTTP/1.1 so users can experiment without us committing to that
|
||||
// path as the default.
|
||||
if force_http1_from_env() {
|
||||
logging::info("DEEPSEEK_FORCE_HTTP1=1 — pinning HTTP client to HTTP/1.1");
|
||||
builder = builder.http1_only();
|
||||
}
|
||||
builder.build().map_err(Into::into)
|
||||
}
|
||||
|
||||
/// List available models from the provider.
|
||||
@@ -2127,4 +2145,65 @@ mod tests {
|
||||
now + RECOVERY_PROBE_COOLDOWN + Duration::from_millis(1)
|
||||
));
|
||||
}
|
||||
|
||||
// === #103 Phase 2: HTTP/1 escape hatch ===================================
|
||||
|
||||
/// Serialize tests that mutate `DEEPSEEK_FORCE_HTTP1` so they don't race
|
||||
/// against each other — env vars are process-global.
|
||||
static FORCE_HTTP1_ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
|
||||
|
||||
struct ForceHttp1EnvGuard {
|
||||
prior: Option<std::ffi::OsString>,
|
||||
}
|
||||
impl ForceHttp1EnvGuard {
|
||||
fn capture() -> Self {
|
||||
Self {
|
||||
prior: std::env::var_os("DEEPSEEK_FORCE_HTTP1"),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl Drop for ForceHttp1EnvGuard {
|
||||
fn drop(&mut self) {
|
||||
// Safety: scoped to test process; reverts to the captured value.
|
||||
match &self.prior {
|
||||
Some(v) => unsafe { std::env::set_var("DEEPSEEK_FORCE_HTTP1", v) },
|
||||
None => unsafe { std::env::remove_var("DEEPSEEK_FORCE_HTTP1") },
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn force_http1_unset_is_false() {
|
||||
let _lock = FORCE_HTTP1_ENV_LOCK.lock().unwrap();
|
||||
let _guard = ForceHttp1EnvGuard::capture();
|
||||
unsafe { std::env::remove_var("DEEPSEEK_FORCE_HTTP1") };
|
||||
assert!(!force_http1_from_env());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn force_http1_truthy_values() {
|
||||
let _lock = FORCE_HTTP1_ENV_LOCK.lock().unwrap();
|
||||
let _guard = ForceHttp1EnvGuard::capture();
|
||||
for value in ["1", "true", "True", "YES", "on", " 1 "] {
|
||||
// Safety: serialized by FORCE_HTTP1_ENV_LOCK; reverted by guard.
|
||||
unsafe { std::env::set_var("DEEPSEEK_FORCE_HTTP1", value) };
|
||||
assert!(
|
||||
force_http1_from_env(),
|
||||
"{value:?} should be parsed as truthy",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn force_http1_falsy_values() {
|
||||
let _lock = FORCE_HTTP1_ENV_LOCK.lock().unwrap();
|
||||
let _guard = ForceHttp1EnvGuard::capture();
|
||||
for value in ["0", "false", "no", "off", "", "garbage", "2"] {
|
||||
unsafe { std::env::set_var("DEEPSEEK_FORCE_HTTP1", value) };
|
||||
assert!(
|
||||
!force_http1_from_env(),
|
||||
"{value:?} should NOT be parsed as truthy",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -161,6 +161,12 @@ impl DeepSeekClient {
|
||||
}
|
||||
|
||||
let model = request.model.clone();
|
||||
|
||||
// Capture transport-shape headers before we consume `response` into
|
||||
// `bytes_stream()`. They are surfaced in the decode-error log path so
|
||||
// we can tell HTTP/2 RST_STREAM from chunked-encoding corruption from
|
||||
// gzip-compressor failure when investigating #103.
|
||||
let response_headers = format_stream_headers(response.headers());
|
||||
let byte_stream = response.bytes_stream();
|
||||
|
||||
let stream = async_stream::stream! {
|
||||
@@ -233,10 +239,11 @@ impl DeepSeekClient {
|
||||
}
|
||||
crate::logging::warn(format!(
|
||||
"Stream read error: {error_chain} \
|
||||
(elapsed: {}ms, bytes_received: {}, ms_since_last_event: {})",
|
||||
(elapsed: {}ms, bytes_received: {}, ms_since_last_event: {}, headers: {})",
|
||||
stream_start.elapsed().as_millis(),
|
||||
bytes_received,
|
||||
last_event_at.elapsed().as_millis(),
|
||||
response_headers,
|
||||
));
|
||||
yield Err(anyhow::anyhow!("Stream read error: {e}"));
|
||||
break;
|
||||
@@ -756,6 +763,27 @@ pub(super) fn count_reasoning_replay_chars(body: &Value) -> u64 {
|
||||
.sum()
|
||||
}
|
||||
|
||||
/// Render the transport-shape headers we care about for #103 diagnostics.
|
||||
/// Always returns SOMETHING printable so the decode-error log line is parseable
|
||||
/// even when the server stripped a header we expected.
|
||||
fn format_stream_headers(headers: &reqwest::header::HeaderMap) -> String {
|
||||
const FIELDS: &[&str] = &[
|
||||
"content-encoding",
|
||||
"transfer-encoding",
|
||||
"connection",
|
||||
"server",
|
||||
];
|
||||
let mut parts: Vec<String> = Vec::with_capacity(FIELDS.len());
|
||||
for field in FIELDS {
|
||||
let rendered = headers
|
||||
.get(*field)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.unwrap_or("(absent)");
|
||||
parts.push(format!("{field}={rendered}"));
|
||||
}
|
||||
parts.join(", ")
|
||||
}
|
||||
|
||||
/// Diagnostic logger fired when DeepSeek rejects the request despite the
|
||||
/// sanitizer. Walks the body and logs which assistant messages have tool_calls
|
||||
/// but no `reasoning_content` — useful to track down a code path that bypasses
|
||||
@@ -1246,3 +1274,201 @@ pub(super) fn parse_sse_chunk(
|
||||
|
||||
events
|
||||
}
|
||||
|
||||
// === #103 Phase 1: stream-decode diagnostics ===================================
|
||||
|
||||
#[cfg(test)]
|
||||
mod stream_diagnostics_tests {
|
||||
use super::*;
|
||||
use reqwest::header::{HeaderMap, HeaderValue};
|
||||
|
||||
#[test]
|
||||
fn format_stream_headers_renders_all_fields_when_present() {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("content-encoding", HeaderValue::from_static("gzip"));
|
||||
headers.insert("transfer-encoding", HeaderValue::from_static("chunked"));
|
||||
headers.insert("connection", HeaderValue::from_static("keep-alive"));
|
||||
headers.insert("server", HeaderValue::from_static("openresty/1.25.3.1"));
|
||||
|
||||
let rendered = format_stream_headers(&headers);
|
||||
// Order is fixed by FIELDS in the helper; assert each field appears.
|
||||
assert!(
|
||||
rendered.contains("content-encoding=gzip"),
|
||||
"got: {rendered}"
|
||||
);
|
||||
assert!(
|
||||
rendered.contains("transfer-encoding=chunked"),
|
||||
"got: {rendered}"
|
||||
);
|
||||
assert!(
|
||||
rendered.contains("connection=keep-alive"),
|
||||
"got: {rendered}"
|
||||
);
|
||||
assert!(
|
||||
rendered.contains("server=openresty/1.25.3.1"),
|
||||
"got: {rendered}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn format_stream_headers_marks_missing_fields_as_absent() {
|
||||
// DeepSeek frequently omits content-encoding when not compressing.
|
||||
// The diagnostic must still produce a parseable line so log scrapers
|
||||
// don't lose the slot.
|
||||
let headers = HeaderMap::new();
|
||||
let rendered = format_stream_headers(&headers);
|
||||
assert!(
|
||||
rendered.contains("content-encoding=(absent)"),
|
||||
"missing field must be explicitly marked; got: {rendered}"
|
||||
);
|
||||
assert!(
|
||||
rendered.contains("transfer-encoding=(absent)"),
|
||||
"missing field must be explicitly marked; got: {rendered}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn format_stream_headers_handles_non_ascii_value_gracefully() {
|
||||
// If a header value isn't UTF-8, `.to_str()` fails — we must not panic
|
||||
// and should still produce a parseable line.
|
||||
let mut headers = HeaderMap::new();
|
||||
// 0xFF is a valid byte but invalid UTF-8 start byte.
|
||||
headers.insert(
|
||||
"server",
|
||||
HeaderValue::from_bytes(b"\xff\xfemystery").expect("header value"),
|
||||
);
|
||||
let rendered = format_stream_headers(&headers);
|
||||
assert!(
|
||||
rendered.contains("server=(absent)"),
|
||||
"non-UTF8 header values fall back to (absent); got: {rendered}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// === #103 Phase 4: SSE decoder behavior on canned chunk sequences ============
|
||||
|
||||
#[cfg(test)]
|
||||
mod stream_decoder_tests {
|
||||
//! Drive `parse_sse_chunk` (the in-place SSE event extractor) over canned
|
||||
//! chunk sequences. The full `handle_chat_completion_stream` path needs a
|
||||
//! live `reqwest::Response` so it isn't unit-testable without a mock HTTP
|
||||
//! harness (issue #69 tracks that). For #103 we exercise the chunk decoder
|
||||
//! directly to verify each "class of stream failure" the engine relies on.
|
||||
use super::*;
|
||||
use crate::models::{ContentBlockStart, Delta, StreamEvent};
|
||||
|
||||
/// Decode a raw SSE-data JSON chunk into our internal events, mirroring
|
||||
/// the per-event call shape used by `handle_chat_completion_stream`.
|
||||
fn decode_chunk(json_text: &str) -> Vec<StreamEvent> {
|
||||
let chunk: Value = serde_json::from_str(json_text).expect("valid SSE JSON");
|
||||
let mut content_index = 0u32;
|
||||
let mut text_started = false;
|
||||
let mut thinking_started = false;
|
||||
let mut tool_indices = std::collections::HashMap::new();
|
||||
parse_sse_chunk(
|
||||
&chunk,
|
||||
&mut content_index,
|
||||
&mut text_started,
|
||||
&mut thinking_started,
|
||||
&mut tool_indices,
|
||||
true,
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decoder_emits_text_delta_for_content_chunk() {
|
||||
// The "happy" first chunk: a normal content delta. The engine treats
|
||||
// this as `any_content_received = true` and would NOT transparently
|
||||
// retry on a subsequent error.
|
||||
let events = decode_chunk(r#"{"choices":[{"delta":{"content":"hello"}}]}"#);
|
||||
assert!(
|
||||
matches!(
|
||||
events.first(),
|
||||
Some(StreamEvent::ContentBlockStart {
|
||||
content_block: ContentBlockStart::Text { .. },
|
||||
..
|
||||
})
|
||||
),
|
||||
"first event should open a text block; got {events:?}"
|
||||
);
|
||||
assert!(
|
||||
events
|
||||
.iter()
|
||||
.any(|e| matches!(e, StreamEvent::ContentBlockDelta {
|
||||
delta: Delta::TextDelta { text },
|
||||
..
|
||||
} if text == "hello")),
|
||||
"should yield a TextDelta carrying 'hello'; got {events:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decoder_emits_thinking_delta_for_reasoning_chunk() {
|
||||
// V4 thinking models surface reasoning_content first — the engine
|
||||
// also counts these as content received (so a subsequent stream error
|
||||
// surfaces rather than retrying transparently).
|
||||
let events = decode_chunk(r#"{"choices":[{"delta":{"reasoning_content":"plan..."}}]}"#);
|
||||
assert!(
|
||||
matches!(
|
||||
events.first(),
|
||||
Some(StreamEvent::ContentBlockStart {
|
||||
content_block: ContentBlockStart::Thinking { .. },
|
||||
..
|
||||
})
|
||||
),
|
||||
"first event should open a thinking block; got {events:?}"
|
||||
);
|
||||
assert!(
|
||||
events
|
||||
.iter()
|
||||
.any(|e| matches!(e, StreamEvent::ContentBlockDelta {
|
||||
delta: Delta::ThinkingDelta { thinking },
|
||||
..
|
||||
} if thinking == "plan...")),
|
||||
"should yield a ThinkingDelta carrying 'plan...'; got {events:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decoder_yields_no_events_for_keepalive_chunk() {
|
||||
// DeepSeek often sends `{"choices":[]}` keepalive chunks before
|
||||
// emitting real content. The engine MUST treat a stream error after
|
||||
// these as "no content received" and be eligible for transparent
|
||||
// retry — assert here that the decoder yields no payload events.
|
||||
let events = decode_chunk(r#"{"choices":[]}"#);
|
||||
assert!(
|
||||
events.is_empty(),
|
||||
"empty-choices chunk must produce no events; got {events:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decoder_emits_tool_use_block_for_tool_call_delta() {
|
||||
// Tool-call deltas are content too — once one arrives, transparent
|
||||
// retry must be off (the model has committed to a tool invocation
|
||||
// path that DeepSeek has billed for).
|
||||
let events = decode_chunk(
|
||||
r#"{"choices":[{"delta":{"tool_calls":[{"index":0,"id":"call_1","function":{"name":"grep_files","arguments":"{\"pattern\":\"foo\"}"}}]}}]}"#,
|
||||
);
|
||||
assert!(
|
||||
events.iter().any(|e| matches!(
|
||||
e,
|
||||
StreamEvent::ContentBlockStart {
|
||||
content_block: ContentBlockStart::ToolUse { name, .. },
|
||||
..
|
||||
} if name == "grep_files"
|
||||
)),
|
||||
"should open a ToolUse block for grep_files; got {events:?}"
|
||||
);
|
||||
assert!(
|
||||
events.iter().any(|e| matches!(
|
||||
e,
|
||||
StreamEvent::ContentBlockDelta {
|
||||
delta: Delta::InputJsonDelta { partial_json },
|
||||
..
|
||||
} if partial_json.contains("\"pattern\"")
|
||||
)),
|
||||
"should yield InputJsonDelta carrying the tool args; got {events:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@
|
||||
//! - Tool execution orchestration
|
||||
|
||||
use std::path::PathBuf;
|
||||
use std::pin::pin;
|
||||
use std::sync::{Arc, Mutex as StdMutex};
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{fs::OpenOptions, io::Write};
|
||||
@@ -294,6 +293,35 @@ const STREAM_MAX_CONTENT_BYTES: usize = 10 * 1024 * 1024; // 10 MB
|
||||
/// per-chunk idle of 300s with no wall-clock cap; we keep both layers but
|
||||
/// give the wall-clock a generous window so it never fires in practice.
|
||||
const STREAM_MAX_DURATION_SECS: u64 = 1800; // 30 minutes (was 300s; #103/#1)
|
||||
/// Hard cap on consecutive recoverable stream errors before we surface a turn
|
||||
/// failure. Bumped 3 → 5 in v0.6.7 along with the HTTP/2 keepalive defaults
|
||||
/// (#103) — keepalive should make spurious decode errors rarer, so we can
|
||||
/// tolerate a longer streak before giving up on the turn.
|
||||
const MAX_STREAM_ERRORS_BEFORE_FAIL: u32 = 5;
|
||||
/// Cap on transparent stream-level retries — these only happen when the wire
|
||||
/// dies before any content was streamed, so DeepSeek hasn't billed us and
|
||||
/// the user hasn't seen anything. Two attempts is enough to ride out a
|
||||
/// flaky edge node without amplifying real outages (#103).
|
||||
const MAX_TRANSPARENT_STREAM_RETRIES: u32 = 2;
|
||||
|
||||
/// Decide whether a stream error is eligible for a transparent retry.
|
||||
///
|
||||
/// True only when ALL three conditions hold:
|
||||
/// 1. No content has been received on the current attempt — otherwise DeepSeek
|
||||
/// has already billed us for output tokens and the user has seen partial
|
||||
/// deltas; resending would double-bill and desync the UI.
|
||||
/// 2. We still have transparent-retry budget remaining.
|
||||
/// 3. The turn has not been cancelled.
|
||||
///
|
||||
/// Extracted as a pure function so the four #103 retry cases can be exercised
|
||||
/// in unit tests without booting the full engine state machine.
|
||||
fn should_transparently_retry_stream(
|
||||
any_content_received: bool,
|
||||
transparent_attempts: u32,
|
||||
cancelled: bool,
|
||||
) -> bool {
|
||||
!any_content_received && transparent_attempts < MAX_TRANSPARENT_STREAM_RETRIES && !cancelled
|
||||
}
|
||||
/// Max output tokens requested for normal agent turns. Generous on purpose:
|
||||
/// V4 thinking models can produce tens of thousands of reasoning tokens on
|
||||
/// hard prompts before the visible reply, and DeepSeek V4 ships with a 1M
|
||||
@@ -2405,8 +2433,11 @@ impl Engine {
|
||||
top_p: None,
|
||||
};
|
||||
|
||||
// Stream the response
|
||||
let stream_result = client.create_message_stream(request).await;
|
||||
// Stream the response. Keep the request around (cloned into the
|
||||
// first call) so we can resend it on a transparent retry below
|
||||
// when the wire dies before any content was streamed (#103).
|
||||
let stream_request = request;
|
||||
let stream_result = client.create_message_stream(stream_request.clone()).await;
|
||||
let stream = match stream_result {
|
||||
Ok(s) => {
|
||||
context_recovery_attempts = 0;
|
||||
@@ -2432,7 +2463,10 @@ impl Engine {
|
||||
return (TurnOutcomeStatus::Failed, turn_error);
|
||||
}
|
||||
};
|
||||
let mut stream = pin!(stream);
|
||||
// The stream value is itself `Pin<Box<dyn Stream + Send>>`, which
|
||||
// is `Unpin`, so we can rebind it on a transparent retry without
|
||||
// breaking the existing pin invariants.
|
||||
let mut stream = stream;
|
||||
|
||||
// Track content blocks
|
||||
let mut content_blocks: Vec<ContentBlock> = Vec::new();
|
||||
@@ -2452,8 +2486,18 @@ impl Engine {
|
||||
let mut pending_message_complete = false;
|
||||
let mut last_text_index: Option<usize> = None;
|
||||
let mut stream_errors = 0u32;
|
||||
// #103 transparent retry bookkeeping. `any_content_received` flips
|
||||
// on the first non-MessageStart event so we know whether DeepSeek
|
||||
// billed us / the user has seen any output for this turn yet.
|
||||
// This is distinct from the outer `stream_retry_attempts` (which
|
||||
// restarts the whole turn-step when a stream died with no
|
||||
// content-block delta delivered to the consumer).
|
||||
let mut any_content_received = false;
|
||||
let mut transparent_stream_retries = 0u32;
|
||||
let mut pending_steers: Vec<String> = Vec::new();
|
||||
let stream_start = Instant::now();
|
||||
// `stream_start` is reset on a transparent retry so the wall-clock
|
||||
// budget restarts with the fresh stream.
|
||||
let mut stream_start = Instant::now();
|
||||
let mut stream_content_bytes: usize = 0;
|
||||
let chunk_timeout = Duration::from_secs(STREAM_CHUNK_TIMEOUT_SECS);
|
||||
let max_duration = Duration::from_secs(STREAM_MAX_DURATION_SECS);
|
||||
@@ -2525,13 +2569,58 @@ impl Engine {
|
||||
}
|
||||
|
||||
let event = match event_result {
|
||||
Ok(e) => e,
|
||||
Ok(e) => {
|
||||
// Flip on the first non-MessageStart event — that's
|
||||
// the moment we cross from "stream not yet productive"
|
||||
// (eligible for transparent retry) into "DeepSeek has
|
||||
// billed us / user has seen output" (must surface).
|
||||
if !any_content_received && !matches!(e, StreamEvent::MessageStart { .. }) {
|
||||
any_content_received = true;
|
||||
}
|
||||
e
|
||||
}
|
||||
Err(e) => {
|
||||
stream_errors = stream_errors.saturating_add(1);
|
||||
let message = e.to_string();
|
||||
// #103: when the stream errors before any content was
|
||||
// streamed AND we still have retry budget, transparently
|
||||
// resend the request. DeepSeek has not billed for any
|
||||
// output and the user has seen nothing — re-trying is
|
||||
// the right user-visible behavior.
|
||||
if should_transparently_retry_stream(
|
||||
any_content_received,
|
||||
transparent_stream_retries,
|
||||
self.cancel_token.is_cancelled(),
|
||||
) {
|
||||
transparent_stream_retries =
|
||||
transparent_stream_retries.saturating_add(1);
|
||||
crate::logging::info(format!(
|
||||
"Transparent stream retry {}/{} (no content received yet): {}",
|
||||
transparent_stream_retries, MAX_TRANSPARENT_STREAM_RETRIES, message,
|
||||
));
|
||||
// Drop the failed stream before issuing the new
|
||||
// request to release the underlying connection.
|
||||
drop(stream);
|
||||
match client.create_message_stream(stream_request.clone()).await {
|
||||
Ok(fresh) => {
|
||||
stream = fresh;
|
||||
stream_start = Instant::now();
|
||||
// Roll back the error counter — this one
|
||||
// didn't surface to the user.
|
||||
stream_errors = stream_errors.saturating_sub(1);
|
||||
continue;
|
||||
}
|
||||
Err(retry_err) => {
|
||||
let retry_msg = format!("Stream retry failed: {retry_err}");
|
||||
turn_error.get_or_insert(retry_msg.clone());
|
||||
let _ = self.tx_event.send(Event::error(retry_msg, true)).await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
turn_error.get_or_insert(message.clone());
|
||||
let _ = self.tx_event.send(Event::error(message, true)).await;
|
||||
if stream_errors >= 3 {
|
||||
if stream_errors >= MAX_STREAM_ERRORS_BEFORE_FAIL {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
|
||||
@@ -939,3 +939,103 @@ fn final_tool_input_falls_back_to_initial_when_buffer_unparseable() {
|
||||
let state = tool_state(json!({"command": "echo hi"}), "{not json");
|
||||
assert_eq!(final_tool_input(&state), json!({"command": "echo hi"}));
|
||||
}
|
||||
|
||||
// === #103 transparent stream-retry policy =====================================
|
||||
|
||||
#[test]
|
||||
fn stream_retry_zero_content_then_error_is_transparently_retried() {
|
||||
// Case 2 from issue #103: stream yielded ZERO content then errored.
|
||||
// The decoder hit Err on the very first poll → engine should retry
|
||||
// because DeepSeek hasn't billed and the user has seen nothing.
|
||||
assert!(
|
||||
super::should_transparently_retry_stream(false, 0, false),
|
||||
"first attempt with no content must be eligible for transparent retry"
|
||||
);
|
||||
assert!(
|
||||
super::should_transparently_retry_stream(false, 1, false),
|
||||
"second attempt (one prior retry) with no content must still be eligible"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stream_retry_after_content_received_surfaces_error() {
|
||||
// Case 3 from issue #103: stream yielded content then errored. We must
|
||||
// NOT transparently retry — the model has emitted billed output tokens
|
||||
// and the UI has streamed deltas; resending would double-bill and the
|
||||
// user would see the same prefix twice.
|
||||
assert!(
|
||||
!super::should_transparently_retry_stream(true, 0, false),
|
||||
"any content received → no transparent retry, even with full budget"
|
||||
);
|
||||
assert!(
|
||||
!super::should_transparently_retry_stream(true, 1, false),
|
||||
"any content received → no transparent retry on subsequent attempts"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stream_retry_budget_caps_transparent_retries_at_two() {
|
||||
// Case 4 from issue #103: after MAX_TRANSPARENT_STREAM_RETRIES attempts
|
||||
// we stop trying transparently and let the outer error path surface.
|
||||
// (The outer per-turn `stream_retry_attempts` retry is a separate layer
|
||||
// and is still in effect at the whole-turn level.)
|
||||
assert!(
|
||||
super::should_transparently_retry_stream(
|
||||
false,
|
||||
super::MAX_TRANSPARENT_STREAM_RETRIES - 1,
|
||||
false,
|
||||
),
|
||||
"one short of the cap should still retry"
|
||||
);
|
||||
assert!(
|
||||
!super::should_transparently_retry_stream(
|
||||
false,
|
||||
super::MAX_TRANSPARENT_STREAM_RETRIES,
|
||||
false,
|
||||
),
|
||||
"at the cap, no further transparent retries"
|
||||
);
|
||||
assert!(
|
||||
!super::should_transparently_retry_stream(
|
||||
false,
|
||||
super::MAX_TRANSPARENT_STREAM_RETRIES + 5,
|
||||
false,
|
||||
),
|
||||
"well past the cap, definitely no transparent retries"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stream_retry_respects_cancellation() {
|
||||
// Cancellation overrides every other condition. If the user pressed
|
||||
// Esc / Ctrl-C, do not silently re-issue the request behind their back.
|
||||
assert!(
|
||||
!super::should_transparently_retry_stream(false, 0, true),
|
||||
"cancelled turn must not be transparently retried"
|
||||
);
|
||||
assert!(
|
||||
!super::should_transparently_retry_stream(false, 1, true),
|
||||
"cancelled turn must not be transparently retried even with budget"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stream_retry_threshold_relaxed_to_five() {
|
||||
// Case 1+4 from issue #103: the consecutive-error threshold for marking
|
||||
// the turn failed was relaxed from 3 → 5 in v0.6.7 because the new
|
||||
// HTTP/2 keepalive defaults make spurious decode errors rarer.
|
||||
// This test pins the constant so a future regression to 3 fails loudly.
|
||||
assert_eq!(
|
||||
super::MAX_STREAM_ERRORS_BEFORE_FAIL,
|
||||
5,
|
||||
"the consecutive-stream-error threshold should be 5; \
|
||||
lowering it back to 3 will fail mid-turn under transient flakiness"
|
||||
);
|
||||
// And a regression guard on the transparent-retry cap.
|
||||
assert_eq!(
|
||||
super::MAX_TRANSPARENT_STREAM_RETRIES,
|
||||
2,
|
||||
"transparent-retry cap should be 2; raising it risks hammering the \
|
||||
provider on real outages"
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user