fix(streaming): preserve all tool_calls in OpenAI batch responses (#1686)
When an OpenAI-compatible backend (vLLM, Ollama, LM Studio, Together AI,
self-hosted vLLM/SGLang, etc.) streams an assistant message containing
multiple tool_calls in a single round, only the **last** tool's
`Event::ToolCallStarted` was firing. The preceding N-1 tool calls
executed and produced tool_result events, but never announced their
start to consumers (TUI / runtime API / embedder bridges), leaving them
with N orphan tool_result blocks and no matching tool_use blocks in the
assistant history.
## Reproduction
```text
backend dispatches: 7 × write_file + 1 × exec_shell
log shows: 7 × ApprovalRequired events ✓
listeners receive: 1 × chat:tool_start, 7 × chat:tool_end
session history: 1 tool_use + 7 tool_result (6 orphans)
```
Tested against vLLM 0.7 + Qwen3.6-35B-A3B with a "scaffold 7-file Tauri
template" prompt. Any model+backend combo that emits batch tool_calls
trips this — typical when a single LLM round asks for multiple parallel
file writes or edits.
## Root cause
`run_turn` tracked the currently-streaming tool block with a single
`current_tool_index: Option<usize>`. The Anthropic-style adapter
(non-streaming response → events at `chat.rs::L1807`) emits
Start/Stop pairs in lockstep so the slot never overlaps. But the
OpenAI streaming parser (`chat.rs::L1954-2064`) emits every
`ContentBlockStart::ToolUse` as soon as a tool_call delta lands, then
batches every `ContentBlockStop` at `finish_reason`:
```text
Start { index: 0 } // tool #1
Delta { index: 0, .. }
Start { index: 1 } // tool #2 — overwrites current_tool_index
Delta { index: 1, .. }
…
Start { index: 6 } // current_tool_index = Some(6)
Delta { index: 6, .. }
Stop { index: 0 } // take() returns Some(6) ← wrong tool!
Stop { index: 1 } // take() returns None
Stop { index: 2 } // take() returns None
…
```
The first `Stop` consumes the last index and emits `ToolCallStarted`
for the wrong `tool_uses` entry; every subsequent `Stop` finds the
slot already `None` and skips the entire `if let Some(index) = …`
branch, dropping the announcement.
## Fix
Replace the single slot with `HashMap<u32 block_index, usize
tool_uses_idx>`:
- `ContentBlockStart::ToolUse` and `::ServerToolUse` insert the
`(event.index → tool_uses.len())` mapping.
- `InputJsonDelta` looks up by the `ContentBlockDelta` outer index.
- `ContentBlockStop` removes by the stop's index, so each Stop routes
to its own `tool_uses` entry regardless of arrival order.
Routing no longer depends on `current_block_kind` (which has the same
single-slot overwrite problem); `current_tool_indices.remove(&index)`
returning `Some(_)` already proves the Stop belongs to a tool block.
## Tests
Added `batch_tool_calls_preserve_all_tool_use_indices` in
`core/engine/turn_loop.rs::tests` — feeds 7 Starts and 7 Stops through
the same `HashMap` API used by `run_turn`, asserts every index round-trips.
Manual end-to-end verification: vLLM + Qwen3.6-35B + 7-file Tauri
template prompt → frontend `messages` history now contains all 7
`write_file` tool_use blocks paired with their tool_result blocks.
Co-authored-by: hexin <he.xin@h3c.com>
This commit is contained in:
@@ -355,7 +355,17 @@ impl Engine {
|
||||
..Usage::default()
|
||||
};
|
||||
let mut current_block_kind: Option<ContentBlockKind> = None;
|
||||
let mut current_tool_index: Option<usize> = None;
|
||||
// Map block_index → tool_uses position. Required because the
|
||||
// OpenAI-compatible streaming parser emits multiple
|
||||
// ContentBlockStart::ToolUse events back-to-back (one per
|
||||
// tool_call in a batch) before any ContentBlockStop arrives —
|
||||
// all Stops are flushed together at `finish_reason`. A single
|
||||
// Option<usize> gets overwritten by each new Start; the first
|
||||
// Stop then takes the last index, and every subsequent Stop
|
||||
// takes `None`, dropping ToolCallStarted events for every
|
||||
// tool call except the last one in the batch.
|
||||
let mut current_tool_indices: std::collections::HashMap<u32, usize> =
|
||||
std::collections::HashMap::new();
|
||||
let mut in_tool_call_block = false;
|
||||
let mut fake_wrapper_notice_emitted = false;
|
||||
let mut pending_message_complete = false;
|
||||
@@ -566,7 +576,7 @@ impl Engine {
|
||||
name, input
|
||||
));
|
||||
current_block_kind = Some(ContentBlockKind::ToolUse);
|
||||
current_tool_index = Some(tool_uses.len());
|
||||
current_tool_indices.insert(index, tool_uses.len());
|
||||
// ToolCallStarted is deferred to ContentBlockStop —
|
||||
// see `final_tool_input`. Emitting here would ship
|
||||
// the placeholder `{}` and the cell would render
|
||||
@@ -585,7 +595,7 @@ impl Engine {
|
||||
name, input
|
||||
));
|
||||
current_block_kind = Some(ContentBlockKind::ToolUse);
|
||||
current_tool_index = Some(tool_uses.len());
|
||||
current_tool_indices.insert(index, tool_uses.len());
|
||||
tool_uses.push(ToolUseState {
|
||||
id,
|
||||
name,
|
||||
@@ -634,8 +644,8 @@ impl Engine {
|
||||
}
|
||||
}
|
||||
Delta::InputJsonDelta { partial_json } => {
|
||||
if let Some(index) = current_tool_index
|
||||
&& let Some(tool_state) = tool_uses.get_mut(index)
|
||||
if let Some(&tool_idx) = current_tool_indices.get(&index)
|
||||
&& let Some(tool_state) = tool_uses.get_mut(tool_idx)
|
||||
{
|
||||
tool_state.input_buffer.push_str(&partial_json);
|
||||
crate::logging::info(format!(
|
||||
@@ -669,9 +679,15 @@ impl Engine {
|
||||
}
|
||||
Some(ContentBlockKind::ToolUse) | None => {}
|
||||
}
|
||||
if matches!(stopped_kind, Some(ContentBlockKind::ToolUse))
|
||||
&& let Some(index) = current_tool_index.take()
|
||||
&& let Some(tool_state) = tool_uses.get_mut(index)
|
||||
// Route the Stop using event.index (via
|
||||
// `current_tool_indices`) rather than the single
|
||||
// `current_block_kind` slot. In an OpenAI batch
|
||||
// tool-call stream every Stop after the first sees
|
||||
// `stopped_kind = None` because `take()` cleared the
|
||||
// slot, so the original `matches!(stopped_kind, …)`
|
||||
// check would skip every tool except the last.
|
||||
if let Some(tool_idx) = current_tool_indices.remove(&index)
|
||||
&& let Some(tool_state) = tool_uses.get_mut(tool_idx)
|
||||
{
|
||||
crate::logging::info(format!(
|
||||
"Tool '{}' block stop. Buffer: '{}', Current input: {:?}",
|
||||
@@ -2024,6 +2040,65 @@ mod tests {
|
||||
assert!(!should_hold_turn_for_subagents(0, 0));
|
||||
}
|
||||
|
||||
/// Regression test for the OpenAI streaming batch tool_calls bug.
|
||||
///
|
||||
/// Background: when an OpenAI-compatible backend (vLLM, Ollama, LM Studio,
|
||||
/// etc.) streams a response containing multiple `tool_calls` in the same
|
||||
/// assistant message, the streaming parser emits the events in this order:
|
||||
///
|
||||
/// ```text
|
||||
/// ContentBlockStart::ToolUse { index: 0, .. } // tool #1
|
||||
/// ContentBlockDelta { index: 0, .. } // its arguments
|
||||
/// ContentBlockStart::ToolUse { index: 1, .. } // tool #2
|
||||
/// ContentBlockDelta { index: 1, .. }
|
||||
/// …
|
||||
/// ContentBlockStart::ToolUse { index: N-1, .. }
|
||||
/// ContentBlockDelta { index: N-1, .. }
|
||||
/// ContentBlockStop { index: 0 } // ── only flushed at
|
||||
/// ContentBlockStop { index: 1 } // finish_reason
|
||||
/// … // (see chat.rs
|
||||
/// ContentBlockStop { index: N-1 } // L2050-L2064)
|
||||
/// ```
|
||||
///
|
||||
/// All Starts arrive before any Stop. The fix replaces the single
|
||||
/// `current_tool_index: Option<usize>` slot (overwritten by each Start)
|
||||
/// with a `HashMap<u32 block_index, usize tool_uses_idx>` that survives
|
||||
/// every Start and routes each Stop to the right `tool_uses` entry.
|
||||
///
|
||||
/// This test confirms the invariant: feed 7 Starts then 7 Stops, expect
|
||||
/// all 7 indices to come back out in order.
|
||||
#[test]
|
||||
fn batch_tool_calls_preserve_all_tool_use_indices() {
|
||||
let mut current_tool_indices: std::collections::HashMap<u32, usize> =
|
||||
std::collections::HashMap::new();
|
||||
|
||||
// Simulate `ContentBlockStart::ToolUse { index: i }` for 7 tools.
|
||||
for block_index in 0..7u32 {
|
||||
current_tool_indices.insert(block_index, block_index as usize);
|
||||
}
|
||||
assert_eq!(current_tool_indices.len(), 7);
|
||||
|
||||
// Now drain via `ContentBlockStop { index: i }` in the same order.
|
||||
let mut recovered: Vec<(u32, usize)> = (0..7u32)
|
||||
.map(|block_index| {
|
||||
let tool_idx = current_tool_indices
|
||||
.remove(&block_index)
|
||||
.expect("each block_index must route to a tool_uses entry");
|
||||
(block_index, tool_idx)
|
||||
})
|
||||
.collect();
|
||||
recovered.sort_by_key(|(block_index, _)| *block_index);
|
||||
let expected: Vec<(u32, usize)> = (0..7u32).map(|i| (i, i as usize)).collect();
|
||||
assert_eq!(
|
||||
recovered, expected,
|
||||
"every Stop must recover the tool_uses index pushed by its matching Start"
|
||||
);
|
||||
assert!(
|
||||
current_tool_indices.is_empty(),
|
||||
"all entries must drain after their Stops"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn loop_guard_block_tool_result_counts_as_failure() {
|
||||
let result = loop_guard_block_tool_result("Blocked: repeated call".to_string());
|
||||
|
||||
Reference in New Issue
Block a user