Files
codewhale/crates/tui/src/client/chat.rs
T
Hunter Bown bbdfb26f3c fix(client): TCP/HTTP2 keepalives + stream-error diagnostics (#103 Phase 1+2)
Two fixes for the persistent "Stream read error: error decoding response
body" we saw mid-turn during long V4-pro thinking sessions.

1) HTTP transport tuning (`crates/tui/src/client.rs`):
   - Drop the blanket 300s request timeout. Long V4 thinking turns
     legitimately exceed the wall-clock window; per-chunk and per-stream
     guards in `engine.rs` already bound how long we wait without progress.
   - Add `tcp_keepalive(30s)` so dead-peer detection happens at the TCP
     layer instead of waiting for the application to notice.
   - Add `http2_keep_alive_interval(15s)` + `http2_keep_alive_timeout(20s)`
     so HTTP/2 connections to DeepSeek's edge don't go silent and get
     killed by an upstream proxy mid-thinking.

2) Stream-error diagnostics (`crates/tui/src/client/chat.rs`):
   - Walk reqwest's `std::error::Error::source()` chain when a chunk read
     errors, so the underlying hyper / h2 / io error is logged. Without
     this the outer "error decoding response body" message tells us
     nothing about WHY the stream died.
   - Track elapsed wall time, bytes received so far, and ms since the
     last successful event; log them alongside the error chain. Lets us
     tell HTTP/2 RST_STREAM mid-idle from chunk-decode-failure on a
     short stream from gzip-corruption mid-burst.

Phase 3 (transparent retry with `prefix` continuation) is intentionally
NOT in this PR. The retry-flag plumbing on MessageRequest + chat.rs prefix
wire format + engine.rs retry loop is a meaningful surface that deserves
its own review pass; this PR ships the diagnostic-and-resilience floor so
we can land the harder retry work knowing the underlying network state is
better.

Refs #103.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 17:19:42 -05:00

1249 lines
48 KiB
Rust

//! Chat Completions API helpers for DeepSeek's OpenAI-compatible endpoint.
//!
//! This is the production code path. Streaming (`create_message_stream`),
//! request building (`build_chat_messages*`), and SSE parsing (`parse_sse_chunk`)
//! all live here.
use std::collections::HashSet;
use std::pin::Pin;
use std::time::Duration;
use anyhow::{Context, Result};
use serde_json::{Value, json};
use tokio::time::timeout as tokio_timeout;
/// Default idle timeout for SSE stream reads (300 seconds = 5 minutes).
/// After this period with no data, the stream is considered stalled and
/// yields a recoverable error so the caller can retry.
const DEFAULT_STREAM_IDLE_TIMEOUT: Duration = Duration::from_secs(300);
/// Reads the `DEEPSEEK_STREAM_IDLE_TIMEOUT_SECS` env var, falling back to
/// the default 300s. The parsed value is clamped to [1, 3600] seconds.
fn stream_idle_timeout() -> Duration {
let secs = std::env::var("DEEPSEEK_STREAM_IDLE_TIMEOUT_SECS")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(DEFAULT_STREAM_IDLE_TIMEOUT.as_secs())
.clamp(1, 3600);
Duration::from_secs(secs)
}
use crate::llm_client::StreamEventBox;
use crate::logging;
use crate::models::{
ContentBlock, ContentBlockStart, Delta, Message, MessageDelta, MessageRequest, MessageResponse,
StreamEvent, SystemPrompt, Tool, ToolCaller, Usage,
};
use super::{
DeepSeekClient, ERROR_BODY_MAX_BYTES, SSE_BACKPRESSURE_HIGH_WATERMARK,
SSE_BACKPRESSURE_SLEEP_MS, SSE_MAX_LINES_PER_CHUNK, acquire_stream_buffer, api_url,
apply_reasoning_effort, bounded_error_text, from_api_tool_name, parse_usage,
release_stream_buffer, system_to_instructions, to_api_tool_name,
};
impl DeepSeekClient {
pub(super) async fn create_message_chat(
&self,
request: &MessageRequest,
) -> Result<MessageResponse> {
let messages = build_chat_messages_for_request(request);
let mut body = json!({
"model": request.model,
"messages": messages,
"max_tokens": request.max_tokens,
});
if let Some(temperature) = request.temperature {
body["temperature"] = json!(temperature);
}
if let Some(top_p) = request.top_p {
body["top_p"] = json!(top_p);
}
if let Some(tools) = request.tools.as_ref() {
body["tools"] = json!(tools.iter().map(tool_to_chat).collect::<Vec<_>>());
}
if let Some(choice) = request.tool_choice.as_ref()
&& let Some(mapped) = map_tool_choice_for_chat(choice)
{
body["tool_choice"] = mapped;
}
apply_reasoning_effort(
&mut body,
request.reasoning_effort.as_deref(),
self.api_provider,
);
let url = api_url(&self.base_url, "chat/completions");
let response = self
.send_with_retry(|| self.http_client.post(&url).json(&body))
.await?;
let status = response.status();
if !status.is_success() {
let error_text = bounded_error_text(response, ERROR_BODY_MAX_BYTES).await;
anyhow::bail!("Failed to call DeepSeek Chat API: HTTP {status}: {error_text}");
}
let response_text = response.text().await.unwrap_or_default();
let value: Value =
serde_json::from_str(&response_text).context("Failed to parse Chat API JSON")?;
parse_chat_message(&value)
}
}
impl DeepSeekClient {
pub(super) async fn handle_chat_completion_stream(
&self,
request: MessageRequest,
) -> Result<StreamEventBox> {
// Try true SSE streaming via chat completions (widely supported)
let messages = build_chat_messages_for_request(&request);
let mut body = json!({
"model": request.model,
"messages": messages,
"max_tokens": request.max_tokens,
"stream": true,
"stream_options": {
"include_usage": true
},
});
if let Some(temperature) = request.temperature {
body["temperature"] = json!(temperature);
}
if let Some(top_p) = request.top_p {
body["top_p"] = json!(top_p);
}
if let Some(tools) = request.tools.as_ref() {
body["tools"] = json!(tools.iter().map(tool_to_chat).collect::<Vec<_>>());
}
if let Some(choice) = request.tool_choice.as_ref()
&& let Some(mapped) = map_tool_choice_for_chat(choice)
{
body["tool_choice"] = mapped;
}
apply_reasoning_effort(
&mut body,
request.reasoning_effort.as_deref(),
self.api_provider,
);
// Bulletproof final sanitizer: walk the wire payload and force
// `reasoning_content` onto any assistant message that has tool_calls
// but no reasoning_content. DeepSeek's thinking-mode API rejects
// such messages with a 400. This is the last line of defense after
// engine-side and build-side substitution; if either upstream path
// misses a case (e.g. a session restored from disk, a sub-agent
// adding messages directly, or a cached prefix mismatch), this pass
// still produces a valid request.
let replay_input_tokens = sanitize_thinking_mode_messages(
&mut body,
&request.model,
request.reasoning_effort.as_deref(),
);
let url = api_url(&self.base_url, "chat/completions");
let response = self
.send_with_retry(|| self.http_client.post(&url).json(&body))
.await?;
let status = response.status();
if !status.is_success() {
let error_text = bounded_error_text(response, ERROR_BODY_MAX_BYTES).await;
// If DeepSeek rejected for missing reasoning_content despite the
// sanitizer, dump the offending indices so we can diagnose where
// they came from on the next failure.
if error_text.contains("reasoning_content") {
log_thinking_mode_violations(&body);
}
anyhow::bail!("SSE stream request failed: HTTP {status}: {error_text}");
}
let model = request.model.clone();
let byte_stream = response.bytes_stream();
let stream = async_stream::stream! {
use futures_util::StreamExt;
// Emit a synthetic MessageStart
yield Ok(StreamEvent::MessageStart {
message: MessageResponse {
id: String::new(),
r#type: "message".to_string(),
role: "assistant".to_string(),
content: Vec::new(),
model: model.clone(),
stop_reason: None,
stop_sequence: None,
container: None,
usage: Usage {
input_tokens: 0,
output_tokens: 0,
..Usage::default()
},
},
});
let mut line_buf = String::new();
let mut byte_buf = acquire_stream_buffer();
let mut content_index: u32 = 0;
let mut text_started = false;
let mut thinking_started = false;
let mut tool_indices: std::collections::HashMap<u32, u32> = std::collections::HashMap::new();
let is_reasoning_model = requires_reasoning_content(&model);
let mut byte_stream = std::pin::pin!(byte_stream);
let idle = stream_idle_timeout();
// Telemetry for #103 stream-decode diagnostics: bytes received
// since the start of this stream and last successful event time.
// Surfaces in the error log when reqwest yields a chunk error so
// we can tell HTTP/2 RST_STREAM from chunk-decode-failure from
// gzip-corruption when investigating a flaky session.
let stream_start = std::time::Instant::now();
let mut last_event_at = std::time::Instant::now();
let mut bytes_received: usize = 0;
loop {
let chunk_result = match tokio_timeout(idle, byte_stream.next()).await {
Ok(Some(result)) => result,
Ok(None) => break, // Stream ended normally
Err(_elapsed) => {
yield Err(anyhow::anyhow!(
"SSE stream idle timeout after {}s — no data received",
idle.as_secs(),
));
break;
}
};
let chunk = match chunk_result {
Ok(bytes) => bytes,
Err(e) => {
// Walk the error source chain so reqwest's underlying
// hyper / h2 / io error is visible — without this the
// outer "error decoding response body" message tells
// us nothing about WHY the stream died.
let mut error_chain = format!("{e}");
let mut current: Option<&(dyn std::error::Error + 'static)> =
std::error::Error::source(&e);
while let Some(source) = current {
error_chain.push_str(&format!(" -> {source}"));
current = std::error::Error::source(source);
}
crate::logging::warn(format!(
"Stream read error: {error_chain} \
(elapsed: {}ms, bytes_received: {}, ms_since_last_event: {})",
stream_start.elapsed().as_millis(),
bytes_received,
last_event_at.elapsed().as_millis(),
));
yield Err(anyhow::anyhow!("Stream read error: {e}"));
break;
}
};
bytes_received = bytes_received.saturating_add(chunk.len());
last_event_at = std::time::Instant::now();
byte_buf.extend_from_slice(&chunk);
// Guard against unbounded buffer growth (e.g., malformed stream without newlines)
const MAX_SSE_BUF: usize = 10 * 1024 * 1024; // 10 MB
if byte_buf.len() > MAX_SSE_BUF {
yield Err(anyhow::anyhow!("SSE buffer exceeded {MAX_SSE_BUF} bytes — aborting stream"));
break;
}
if byte_buf.len() > SSE_BACKPRESSURE_HIGH_WATERMARK {
tokio::time::sleep(Duration::from_millis(SSE_BACKPRESSURE_SLEEP_MS)).await;
}
// Process complete SSE lines from the buffer
let mut lines_processed = 0usize;
while let Some(newline_pos) = byte_buf.iter().position(|&b| b == b'\n') {
let mut end = newline_pos;
if end > 0 && byte_buf[end - 1] == b'\r' {
end -= 1;
}
let line = String::from_utf8_lossy(&byte_buf[..end]).into_owned();
byte_buf.drain(..newline_pos + 1);
if line.is_empty() {
// Empty line = event boundary, process accumulated data
if !line_buf.is_empty() {
let data = std::mem::take(&mut line_buf);
if data.trim() == "[DONE]" {
// Stream complete
} else if let Ok(chunk_json) = serde_json::from_str::<Value>(&data) {
// Parse the SSE chunk into stream events
for mut event in parse_sse_chunk(
&chunk_json,
&mut content_index,
&mut text_started,
&mut thinking_started,
&mut tool_indices,
is_reasoning_model,
) {
// Stamp the client-side replay-token estimate
// onto the final usage so the UI can surface
// it (#30). We compute it pre-request and
// overlay it on the server-reported usage at
// stream completion.
if let Some(tokens) = replay_input_tokens
&& let StreamEvent::MessageDelta {
usage: Some(usage),
..
} = &mut event
{
usage.reasoning_replay_tokens = Some(tokens);
}
yield Ok(event);
}
}
}
continue;
}
if let Some(data) = line.strip_prefix("data: ") {
line_buf.push_str(data);
}
// Ignore other SSE fields (event:, id:, retry:)
lines_processed = lines_processed.saturating_add(1);
if lines_processed >= SSE_MAX_LINES_PER_CHUNK {
// Yield backpressure relief to avoid starving downstream consumers.
break;
}
}
}
// Close any open blocks
if thinking_started {
yield Ok(StreamEvent::ContentBlockStop { index: content_index.saturating_sub(1) });
}
if text_started {
yield Ok(StreamEvent::ContentBlockStop { index: content_index.saturating_sub(1) });
}
release_stream_buffer(byte_buf);
yield Ok(StreamEvent::MessageStop);
};
Ok(Pin::from(Box::new(stream)
as Box<
dyn futures_util::Stream<Item = Result<StreamEvent>> + Send,
>))
}
}
// === Chat Completions Helpers ===
#[cfg(test)]
pub(super) fn build_chat_messages(
system: Option<&SystemPrompt>,
messages: &[Message],
model: &str,
) -> Vec<Value> {
build_chat_messages_with_reasoning(
system,
messages,
model,
should_replay_reasoning_content(model, None),
)
}
pub(super) fn build_chat_messages_for_request(request: &MessageRequest) -> Vec<Value> {
build_chat_messages_with_reasoning(
request.system.as_ref(),
&request.messages,
&request.model,
should_replay_reasoning_content(&request.model, request.reasoning_effort.as_deref()),
)
}
fn build_chat_messages_with_reasoning(
system: Option<&SystemPrompt>,
messages: &[Message],
_model: &str,
include_reasoning: bool,
) -> Vec<Value> {
let mut out = Vec::new();
let mut pending_tool_calls: HashSet<String> = HashSet::new();
if let Some(instructions) = system_to_instructions(system.cloned())
&& !instructions.trim().is_empty()
{
out.push(json!({
"role": "system",
"content": instructions,
}));
}
for message in messages.iter() {
let role = message.role.as_str();
let mut text_parts = Vec::new();
let mut thinking_parts = Vec::new();
let mut tool_calls = Vec::new();
let mut tool_call_ids = Vec::new();
let mut tool_results: Vec<(String, Value)> = Vec::new();
for block in &message.content {
match block {
ContentBlock::Text { text, .. } => text_parts.push(text.clone()),
ContentBlock::Thinking { thinking } => thinking_parts.push(thinking.clone()),
ContentBlock::ToolUse {
id,
name,
input,
caller,
..
} => {
let args = serde_json::to_string(input).unwrap_or_else(|_| input.to_string());
let mut call = json!({
"id": id,
"type": "function",
"function": {
"name": to_api_tool_name(name),
"arguments": args,
}
});
if let Some(caller) = caller {
call["caller"] = json!({
"type": caller.caller_type,
"tool_id": caller.tool_id,
});
}
tool_calls.push(call);
tool_call_ids.push(id.clone());
}
ContentBlock::ToolResult {
tool_use_id,
content,
..
} => {
tool_results.push((
tool_use_id.clone(),
json!({
"role": "tool",
"tool_call_id": tool_use_id,
"content": content,
}),
));
}
ContentBlock::ServerToolUse { .. }
| ContentBlock::ToolSearchToolResult { .. }
| ContentBlock::CodeExecutionToolResult { .. } => {}
}
}
if role == "assistant" {
let content = text_parts.join("\n");
let mut reasoning_content = thinking_parts.join("\n");
let has_text = !content.trim().is_empty();
let has_tool_calls = !tool_calls.is_empty();
// DeepSeek thinking-mode rule: every assistant message in the
// conversation must carry its `reasoning_content` when thinking
// is enabled. The docs say non-tool-call messages' reasoning is
// "ignored", but the API still validates presence and rejects
// with a 400 if any assistant message is missing it. If reasoning
// was lost (e.g. a session checkpoint from before this rule was
// enforced, or a sub-turn with no streamed reasoning text),
// substitute a non-empty placeholder so the API accepts the
// request.
let include_reasoning_for_turn = include_reasoning;
let mut has_reasoning =
include_reasoning_for_turn && !reasoning_content.trim().is_empty();
if include_reasoning_for_turn && !has_reasoning {
logging::warn(
"Substituting placeholder reasoning_content for DeepSeek tool-call assistant message",
);
reasoning_content = String::from("(reasoning omitted)");
has_reasoning = true;
}
// DeepSeek rejects assistant messages where both `content` and
// `tool_calls` are missing/null. Skip such entries even if they
// carry reasoning-only metadata unless we can send a non-null
// placeholder content field.
if !has_text && !has_tool_calls && !has_reasoning {
pending_tool_calls.clear();
continue;
}
let mut msg = json!({
"role": "assistant",
"content": if has_text {
json!(content)
} else if has_reasoning {
json!("")
} else {
Value::Null
},
});
if has_reasoning {
msg["reasoning_content"] = json!(reasoning_content);
}
if has_tool_calls {
msg["tool_calls"] = json!(tool_calls);
pending_tool_calls = tool_call_ids.into_iter().collect();
} else {
pending_tool_calls.clear();
}
out.push(msg);
} else if role == "user" {
let content = text_parts.join("\n");
if !content.trim().is_empty() {
out.push(json!({
"role": "user",
"content": content,
}));
}
}
if !tool_results.is_empty() {
if pending_tool_calls.is_empty() {
logging::warn("Dropping tool results without matching tool_calls");
} else {
for (tool_id, tool_msg) in tool_results {
if pending_tool_calls.remove(&tool_id) {
out.push(tool_msg);
} else {
logging::warn(format!(
"Dropping tool result for unknown tool_call_id: {tool_id}"
));
}
}
}
} else if role != "assistant" {
pending_tool_calls.clear();
}
}
// Safety net: after compaction, an assistant message may have tool_calls
// whose results were summarized away. The API rejects these, so strip
// the tool_calls (downgrading to a plain assistant message) and remove
// the now-orphaned tool result messages.
let mut i = 0;
while i < out.len() {
let is_assistant_with_tools = out[i].get("role").and_then(Value::as_str)
== Some("assistant")
&& out[i].get("tool_calls").is_some();
if is_assistant_with_tools {
let expected_ids: HashSet<String> = out[i]
.get("tool_calls")
.and_then(Value::as_array)
.map(|calls| {
calls
.iter()
.filter_map(|c| c.get("id").and_then(Value::as_str).map(String::from))
.collect()
})
.unwrap_or_default();
// Collect tool result IDs immediately following this assistant message.
let mut found_ids: HashSet<String> = HashSet::new();
let mut tool_result_end = i + 1;
while tool_result_end < out.len() {
if out[tool_result_end].get("role").and_then(Value::as_str) == Some("tool") {
if let Some(id) = out[tool_result_end]
.get("tool_call_id")
.and_then(Value::as_str)
{
found_ids.insert(id.to_string());
}
tool_result_end += 1;
} else {
break;
}
}
// Also scan non-contiguous tool results up to the next assistant message
// in case compaction left gaps.
let mut scan = tool_result_end;
while scan < out.len() {
if out[scan].get("role").and_then(Value::as_str) == Some("assistant") {
break;
}
if out[scan].get("role").and_then(Value::as_str) == Some("tool")
&& let Some(id) = out[scan].get("tool_call_id").and_then(Value::as_str)
{
found_ids.insert(id.to_string());
}
scan += 1;
}
if !expected_ids.is_subset(&found_ids) {
let missing: Vec<_> = expected_ids.difference(&found_ids).collect();
logging::warn(format!(
"Stripping orphaned tool_calls from assistant message \
(expected {} tool results, found {}, missing: {:?})",
expected_ids.len(),
found_ids.len(),
missing
));
if let Some(obj) = out[i].as_object_mut() {
obj.remove("tool_calls");
}
// If tool_calls were the only assistant content, remove the now-invalid
// assistant message entirely (DeepSeek requires content or tool_calls).
let assistant_content_empty = out[i]
.get("content")
.is_none_or(|v| v.is_null() || v.as_str().is_some_and(str::is_empty));
if assistant_content_empty {
// Remove orphaned tool results tied to this stripped assistant call set.
let mut j = out.len();
while j > i + 1 {
j -= 1;
if out[j].get("role").and_then(Value::as_str) == Some("tool")
&& let Some(id) = out[j].get("tool_call_id").and_then(Value::as_str)
&& expected_ids.contains(id)
{
out.remove(j);
}
}
out.remove(i);
i = i.saturating_sub(1);
continue;
}
// Remove contiguous tool results first
if tool_result_end > i + 1 {
out.drain((i + 1)..tool_result_end);
}
// Remove any remaining non-contiguous tool results referencing expected_ids
// (scan backward to avoid index shifting issues)
let mut j = out.len();
while j > i + 1 {
j -= 1;
if out[j].get("role").and_then(Value::as_str) == Some("tool")
&& let Some(id) = out[j].get("tool_call_id").and_then(Value::as_str)
&& expected_ids.contains(id)
{
out.remove(j);
}
}
}
}
i += 1;
}
out
}
pub(super) fn tool_to_chat(tool: &Tool) -> Value {
let mut value = json!({
"type": "function",
"function": {
"name": to_api_tool_name(&tool.name),
"description": tool.description,
"parameters": tool.input_schema,
}
});
if let Some(allowed_callers) = &tool.allowed_callers {
value["allowed_callers"] = json!(allowed_callers);
}
if let Some(defer_loading) = tool.defer_loading {
value["defer_loading"] = json!(defer_loading);
}
if let Some(input_examples) = &tool.input_examples {
value["input_examples"] = json!(input_examples);
}
if let Some(strict) = tool.strict
&& let Some(function) = value.get_mut("function")
{
function["strict"] = json!(strict);
}
value
}
fn map_tool_choice_for_chat(choice: &Value) -> Option<Value> {
if let Some(choice_str) = choice.as_str() {
return Some(json!(choice_str));
}
let Some(choice_type) = choice.get("type").and_then(Value::as_str) else {
return Some(choice.clone());
};
match choice_type {
"auto" | "none" => Some(json!(choice_type)),
"any" => Some(json!("auto")),
"tool" => choice.get("name").and_then(Value::as_str).map(|name| {
json!({
"type": "function",
"function": { "name": to_api_tool_name(name) }
})
}),
_ => Some(choice.clone()),
}
}
/// Final-pass sanitizer over the outgoing chat-completions JSON payload.
/// Forces a non-empty `reasoning_content` onto every `assistant` message that
/// carries `tool_calls`, when the model + effort combination requires it.
/// DeepSeek's thinking-mode API rejects such messages with a 400 error;
/// substituting a placeholder keeps the conversation chain intact.
///
/// Also tallies the size of all replayed `reasoning_content` and logs it, so
/// users on `RUST_LOG=deepseek_tui=debug` can see how much of their input
/// budget is being spent re-sending prior thinking traces (V4 §5.1.1
/// "Interleaved Thinking" requires the full trace to be replayed across user
/// message boundaries in tool-calling sessions).
pub(super) fn sanitize_thinking_mode_messages(
body: &mut Value,
model: &str,
effort: Option<&str>,
) -> Option<u32> {
if !should_replay_reasoning_content(model, effort) {
return None;
}
let messages = body.get_mut("messages").and_then(Value::as_array_mut)?;
let mut substitutions: u32 = 0;
let mut replay_chars: u64 = 0;
let mut replay_messages: u32 = 0;
for (idx, msg) in messages.iter_mut().enumerate() {
if msg.get("role").and_then(Value::as_str) != Some("assistant") {
continue;
}
let needs_placeholder = msg
.get("reasoning_content")
.and_then(Value::as_str)
.is_none_or(|s| s.trim().is_empty());
if needs_placeholder {
msg["reasoning_content"] = json!("(reasoning omitted)");
substitutions = substitutions.saturating_add(1);
logging::warn(format!(
"Final sanitizer: forced reasoning_content placeholder on assistant[{idx}]",
));
}
if let Some(reasoning) = msg.get("reasoning_content").and_then(Value::as_str) {
let len = reasoning.len() as u64;
if len > 0 {
replay_chars = replay_chars.saturating_add(len);
replay_messages = replay_messages.saturating_add(1);
}
}
}
if substitutions > 0 {
logging::warn(format!(
"Final sanitizer: {substitutions} assistant message(s) needed reasoning_content placeholder",
));
}
if replay_messages == 0 {
return None;
}
// ~4 chars/token is the standard rough estimate; DeepSeek tokens skew
// a touch shorter on Chinese/code but this is order-of-magnitude info.
let approx_tokens = (replay_chars / 4).min(u64::from(u32::MAX)) as u32;
logging::info(format!(
"Reasoning-content replay: {replay_messages} assistant message(s), ~{approx_tokens} input tokens ({replay_chars} chars) being re-sent in this request",
));
Some(approx_tokens)
}
/// Sums the byte length of `reasoning_content` across all assistant messages in
/// an outgoing chat-completions body. Used by tests; the production sanitizer
/// computes the same number inline and logs it.
#[cfg(test)]
pub(super) fn count_reasoning_replay_chars(body: &Value) -> u64 {
let Some(messages) = body.get("messages").and_then(Value::as_array) else {
return 0;
};
messages
.iter()
.filter(|m| m.get("role").and_then(Value::as_str) == Some("assistant"))
.filter_map(|m| m.get("reasoning_content").and_then(Value::as_str))
.map(|s| s.len() as u64)
.sum()
}
/// Diagnostic logger fired when DeepSeek rejects the request despite the
/// sanitizer. Walks the body and logs which assistant messages have tool_calls
/// but no `reasoning_content` — useful to track down a code path that bypasses
/// the sanitizer entirely.
fn log_thinking_mode_violations(body: &Value) {
let Some(messages) = body.get("messages").and_then(Value::as_array) else {
logging::warn("400-after-sanitizer: body has no `messages` array");
return;
};
let mut violations: Vec<String> = Vec::new();
for (idx, msg) in messages.iter().enumerate() {
if msg.get("role").and_then(Value::as_str) != Some("assistant") {
continue;
}
let reasoning = msg
.get("reasoning_content")
.and_then(Value::as_str)
.unwrap_or("");
let has_tc = msg.get("tool_calls").is_some();
if reasoning.trim().is_empty() {
violations.push(format!(
"assistant[{idx}] (reasoning_content missing, tool_calls={})",
has_tc
));
}
}
if violations.is_empty() {
logging::warn(
"400-after-sanitizer: all assistant messages have reasoning_content — DeepSeek rejected for a different reason",
);
} else {
logging::warn(format!(
"400-after-sanitizer: {} assistant message(s) lack reasoning_content despite sanitizer: {}",
violations.len(),
violations.join(", ")
));
}
}
fn requires_reasoning_content(model: &str) -> bool {
let lower = model.to_lowercase();
lower.contains("deepseek-v3.2")
|| lower.contains("deepseek-v4")
|| lower.contains("reasoner")
|| lower.contains("-reasoning")
|| lower.contains("-thinking")
|| has_deepseek_r_series_marker(&lower)
}
fn should_replay_reasoning_content(model: &str, effort: Option<&str>) -> bool {
if effort
.map(|value| {
matches!(
value.trim().to_ascii_lowercase().as_str(),
"off" | "disabled" | "none" | "false"
)
})
.unwrap_or(false)
{
return false;
}
requires_reasoning_content(model)
}
fn has_deepseek_r_series_marker(model_lower: &str) -> bool {
const PREFIX: &str = "deepseek-r";
model_lower.match_indices(PREFIX).any(|(idx, _)| {
model_lower[idx + PREFIX.len()..]
.chars()
.next()
.is_some_and(|ch| ch.is_ascii_digit())
})
}
fn reasoning_field(value: &Value) -> Option<&str> {
value
.get("reasoning_content")
.or_else(|| value.get("reasoning"))
.and_then(Value::as_str)
}
pub(super) fn parse_chat_message(payload: &Value) -> Result<MessageResponse> {
let id = payload
.get("id")
.and_then(Value::as_str)
.unwrap_or("chatcmpl")
.to_string();
let model = payload
.get("model")
.and_then(Value::as_str)
.unwrap_or("unknown")
.to_string();
let choices = payload
.get("choices")
.and_then(Value::as_array)
.context("Chat API response missing choices")?;
let choice = choices
.first()
.context("Chat API response missing first choice")?;
let message = choice
.get("message")
.context("Chat API response missing message")?;
let mut content_blocks = Vec::new();
if let Some(reasoning) =
reasoning_field(message).filter(|reasoning| !reasoning.trim().is_empty())
{
content_blocks.push(ContentBlock::Thinking {
thinking: reasoning.to_string(),
});
}
if let Some(text) = message.get("content").and_then(Value::as_str)
&& !text.trim().is_empty()
{
content_blocks.push(ContentBlock::Text {
text: text.to_string(),
cache_control: None,
});
}
if let Some(tool_calls) = message.get("tool_calls").and_then(Value::as_array) {
for call in tool_calls {
let id = call
.get("id")
.and_then(Value::as_str)
.unwrap_or("tool_call")
.to_string();
let function = call.get("function");
let name = function
.and_then(|f| f.get("name"))
.and_then(Value::as_str)
.unwrap_or("tool")
.to_string();
let arguments = function
.and_then(|f| f.get("arguments"))
.and_then(Value::as_str)
.map(|raw| serde_json::from_str(raw).unwrap_or(Value::String(raw.to_string())))
.unwrap_or(Value::Null);
let caller = call.get("caller").and_then(|v| {
v.get("type")
.and_then(Value::as_str)
.map(|caller_type| ToolCaller {
caller_type: caller_type.to_string(),
tool_id: v
.get("tool_id")
.and_then(Value::as_str)
.map(std::string::ToString::to_string),
})
});
content_blocks.push(ContentBlock::ToolUse {
id,
name: from_api_tool_name(&name),
input: arguments,
caller,
});
}
}
let usage = parse_usage(payload.get("usage"));
Ok(MessageResponse {
id,
r#type: "message".to_string(),
role: "assistant".to_string(),
content: content_blocks,
model,
stop_reason: choice
.get("finish_reason")
.and_then(Value::as_str)
.map(str::to_string),
stop_sequence: None,
container: None,
usage,
})
}
// === Streaming Helpers ===
/// Build synthetic stream events from a non-streaming response (used as fallback).
#[allow(dead_code)]
fn build_stream_events(response: &MessageResponse) -> Vec<StreamEvent> {
let mut events = Vec::new();
let mut index = 0u32;
events.push(StreamEvent::MessageStart {
message: response.clone(),
});
for block in &response.content {
match block {
ContentBlock::Text { text, .. } => {
events.push(StreamEvent::ContentBlockStart {
index,
content_block: ContentBlockStart::Text {
text: String::new(),
},
});
if !text.is_empty() {
events.push(StreamEvent::ContentBlockDelta {
index,
delta: Delta::TextDelta { text: text.clone() },
});
}
events.push(StreamEvent::ContentBlockStop { index });
}
ContentBlock::Thinking { thinking } => {
events.push(StreamEvent::ContentBlockStart {
index,
content_block: ContentBlockStart::Thinking {
thinking: String::new(),
},
});
if !thinking.is_empty() {
events.push(StreamEvent::ContentBlockDelta {
index,
delta: Delta::ThinkingDelta {
thinking: thinking.clone(),
},
});
}
events.push(StreamEvent::ContentBlockStop { index });
}
ContentBlock::ToolUse {
id, name, input, ..
} => {
events.push(StreamEvent::ContentBlockStart {
index,
content_block: ContentBlockStart::ToolUse {
id: id.clone(),
name: name.clone(),
input: input.clone(),
caller: None,
},
});
events.push(StreamEvent::ContentBlockStop { index });
}
ContentBlock::ToolResult { .. } => {}
ContentBlock::ServerToolUse { id, name, input } => {
events.push(StreamEvent::ContentBlockStart {
index,
content_block: ContentBlockStart::ServerToolUse {
id: id.clone(),
name: name.clone(),
input: input.clone(),
},
});
events.push(StreamEvent::ContentBlockStop { index });
}
ContentBlock::ToolSearchToolResult { .. }
| ContentBlock::CodeExecutionToolResult { .. } => {}
}
index = index.saturating_add(1);
}
events.push(StreamEvent::MessageDelta {
delta: MessageDelta {
stop_reason: response.stop_reason.clone(),
stop_sequence: response.stop_sequence.clone(),
},
usage: Some(response.usage.clone()),
});
events.push(StreamEvent::MessageStop);
events
}
// === SSE Chunk Parser ===
/// Parse a single SSE chunk from the Chat Completions streaming API into
/// our internal `StreamEvent` representation.
pub(super) fn parse_sse_chunk(
chunk: &Value,
content_index: &mut u32,
text_started: &mut bool,
thinking_started: &mut bool,
tool_indices: &mut std::collections::HashMap<u32, u32>,
is_reasoning_model: bool,
) -> Vec<StreamEvent> {
let mut events = Vec::new();
let Some(choices) = chunk.get("choices").and_then(Value::as_array) else {
// Usage-only chunk (sent at end with stream_options)
if let Some(usage_val) = chunk.get("usage") {
let usage = parse_usage(Some(usage_val));
events.push(StreamEvent::MessageDelta {
delta: MessageDelta {
stop_reason: None,
stop_sequence: None,
},
usage: Some(usage),
});
}
return events;
};
if choices.is_empty() {
if let Some(usage_val) = chunk.get("usage") {
let usage = parse_usage(Some(usage_val));
events.push(StreamEvent::MessageDelta {
delta: MessageDelta {
stop_reason: None,
stop_sequence: None,
},
usage: Some(usage),
});
}
return events;
}
for choice in choices {
let delta = choice.get("delta");
let finish_reason = choice
.get("finish_reason")
.and_then(Value::as_str)
.map(str::to_string);
if let Some(delta) = delta {
// Handle reasoning_content / reasoning thinking deltas.
if is_reasoning_model
&& let Some(reasoning) = reasoning_field(delta)
&& !reasoning.is_empty()
{
if !*thinking_started {
events.push(StreamEvent::ContentBlockStart {
index: *content_index,
content_block: ContentBlockStart::Thinking {
thinking: String::new(),
},
});
*thinking_started = true;
}
events.push(StreamEvent::ContentBlockDelta {
index: *content_index,
delta: Delta::ThinkingDelta {
thinking: reasoning.to_string(),
},
});
}
// Handle regular content
if let Some(content) = delta.get("content").and_then(Value::as_str)
&& !content.is_empty()
{
// Close thinking block if transitioning to text
if *thinking_started {
events.push(StreamEvent::ContentBlockStop {
index: *content_index,
});
*content_index += 1;
*thinking_started = false;
}
if !*text_started {
events.push(StreamEvent::ContentBlockStart {
index: *content_index,
content_block: ContentBlockStart::Text {
text: String::new(),
},
});
*text_started = true;
}
events.push(StreamEvent::ContentBlockDelta {
index: *content_index,
delta: Delta::TextDelta {
text: content.to_string(),
},
});
}
// Handle tool calls
if let Some(tool_calls) = delta.get("tool_calls").and_then(Value::as_array) {
for tc in tool_calls {
let tc_index = tc.get("index").and_then(Value::as_u64).unwrap_or(0) as u32;
let tool_block_index = match tool_indices.entry(tc_index) {
std::collections::hash_map::Entry::Occupied(entry) => *entry.get(),
std::collections::hash_map::Entry::Vacant(entry) => {
// Close text block if transitioning to tool use
if *text_started {
events.push(StreamEvent::ContentBlockStop {
index: *content_index,
});
*content_index += 1;
*text_started = false;
}
if *thinking_started {
events.push(StreamEvent::ContentBlockStop {
index: *content_index,
});
*content_index += 1;
*thinking_started = false;
}
let id = tc
.get("id")
.and_then(Value::as_str)
.unwrap_or("tool_call")
.to_string();
let name = tc
.get("function")
.and_then(|f| f.get("name"))
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
let caller = tc.get("caller").and_then(|v| {
v.get("type").and_then(Value::as_str).map(|caller_type| {
ToolCaller {
caller_type: caller_type.to_string(),
tool_id: v
.get("tool_id")
.and_then(Value::as_str)
.map(std::string::ToString::to_string),
}
})
});
let block_index = *content_index;
events.push(StreamEvent::ContentBlockStart {
index: block_index,
content_block: ContentBlockStart::ToolUse {
id,
name: from_api_tool_name(&name),
input: json!({}),
caller,
},
});
*content_index = (*content_index).saturating_add(1);
entry.insert(block_index);
block_index
}
};
// Stream tool call arguments
if let Some(args) = tc
.get("function")
.and_then(|f| f.get("arguments"))
.and_then(Value::as_str)
&& !args.is_empty()
{
events.push(StreamEvent::ContentBlockDelta {
index: tool_block_index,
delta: Delta::InputJsonDelta {
partial_json: args.to_string(),
},
});
}
}
}
}
// Handle finish reason
if let Some(reason) = finish_reason {
// Close any open blocks
if *text_started {
events.push(StreamEvent::ContentBlockStop {
index: *content_index,
});
*text_started = false;
}
if *thinking_started {
events.push(StreamEvent::ContentBlockStop {
index: *content_index,
});
*thinking_started = false;
}
// Close tool blocks
let mut open_tool_indices: Vec<u32> =
tool_indices.drain().map(|(_, idx)| idx).collect();
open_tool_indices.sort_unstable();
for tool_block_index in open_tool_indices {
events.push(StreamEvent::ContentBlockStop {
index: tool_block_index,
});
}
// Emit usage from the chunk if available
let chunk_usage = chunk.get("usage").map(|u| parse_usage(Some(u)));
events.push(StreamEvent::MessageDelta {
delta: MessageDelta {
stop_reason: Some(reason),
stop_sequence: None,
},
usage: chunk_usage,
});
}
}
events
}