fix(runtime): batch thread detail item reads

Avoid an N+1 item-directory scan when loading runtime thread details by grouping persisted items for all turns in one pass.

Harvested from PR #3141.

Co-authored-by: Hmbown <101357273+Hmbown@users.noreply.github.com>

Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com>
This commit is contained in:
Hunter B
2026-06-12 02:35:39 -07:00
parent 2f717d3345
commit 119285f056
3 changed files with 142 additions and 6 deletions
+4
View File
@@ -53,6 +53,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
Anthropic stream readers now accept both `data: {...}` and `data:{...}` SSE Anthropic stream readers now accept both `data: {...}` and `data:{...}` SSE
frames, matching the spec and preventing providers that omit the optional frames, matching the spec and preventing providers that omit the optional
space from streaming empty output. Thanks @wgeeker for the PR. space from streaming empty output. Thanks @wgeeker for the PR.
- **Runtime thread detail N+1 reads (#3141).** `get_thread_detail` now scans
persisted turn items once and groups them by turn instead of reading the
items directory once per turn, preserving item order while keeping large
thread detail loads responsive.
- **SiliconFlow China provider config (#2893/#2895).** `siliconflow-CN` - **SiliconFlow China provider config (#2893/#2895).** `siliconflow-CN`
now reads its own `[providers.siliconflow_cn]` / `[providers.siliconflow-CN]` now reads its own `[providers.siliconflow_cn]` / `[providers.siliconflow-CN]`
table and falls back to `[providers.siliconflow]` only for unset table and falls back to `[providers.siliconflow]` only for unset
+4
View File
@@ -53,6 +53,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
Anthropic stream readers now accept both `data: {...}` and `data:{...}` SSE Anthropic stream readers now accept both `data: {...}` and `data:{...}` SSE
frames, matching the spec and preventing providers that omit the optional frames, matching the spec and preventing providers that omit the optional
space from streaming empty output. Thanks @wgeeker for the PR. space from streaming empty output. Thanks @wgeeker for the PR.
- **Runtime thread detail N+1 reads (#3141).** `get_thread_detail` now scans
persisted turn items once and groups them by turn instead of reading the
items directory once per turn, preserving item order while keeping large
thread detail loads responsive.
- **SiliconFlow China provider config (#2893/#2895).** `siliconflow-CN` - **SiliconFlow China provider config (#2893/#2895).** `siliconflow-CN`
now reads its own `[providers.siliconflow_cn]` / `[providers.siliconflow-CN]` now reads its own `[providers.siliconflow_cn]` / `[providers.siliconflow-CN]`
table and falls back to `[providers.siliconflow]` only for unset table and falls back to `[providers.siliconflow]` only for unset
+134 -6
View File
@@ -61,6 +61,15 @@ fn validated_record_id<'a>(id: &'a str, label: &str) -> Result<&'a str> {
Ok(trimmed) Ok(trimmed)
} }
fn sort_turn_items_by_start(items: &mut [TurnItemRecord]) {
let fallback = Utc::now();
items.sort_by(|a, b| {
let left = a.started_at.unwrap_or(fallback);
let right = b.started_at.unwrap_or(fallback);
left.cmp(&right)
});
}
/// Bumped to 2 for v0.6.6 after live engine semantics changed. The persisted /// Bumped to 2 for v0.6.6 after live engine semantics changed. The persisted
/// thread/turn/item records did not change shape, but a v1 reader on a v2 /// thread/turn/item records did not change shape, but a v1 reader on a v2
/// session should still fail closed rather than silently mis-replay. /// session should still fail closed rather than silently mis-replay.
@@ -432,11 +441,51 @@ impl RuntimeThreadStore {
out.push(item); out.push(item);
} }
} }
out.sort_by(|a, b| { sort_turn_items_by_start(&mut out);
let left = a.started_at.unwrap_or_else(Utc::now); Ok(out)
let right = b.started_at.unwrap_or_else(Utc::now); }
left.cmp(&right)
}); pub fn list_items_for_turns_map(
&self,
turn_ids: &[String],
) -> Result<HashMap<String, Vec<TurnItemRecord>>> {
if turn_ids.is_empty() {
return Ok(HashMap::new());
}
for turn_id in turn_ids {
validated_record_id(turn_id, "turn id")?;
}
let wanted: HashSet<&str> = turn_ids.iter().map(String::as_str).collect();
let mut out: HashMap<String, Vec<TurnItemRecord>> = HashMap::new();
for entry in fs::read_dir(&self.items_dir)
.with_context(|| format!("Failed to read {}", self.items_dir.display()))?
{
let entry = entry?;
let path = entry.path();
if path.extension().is_none_or(|ext| ext != "json") {
continue;
}
let raw = fs::read_to_string(&path)
.with_context(|| format!("Failed to read {}", path.display()))?;
let item: TurnItemRecord = serde_json::from_str(&raw)
.with_context(|| format!("Failed to parse {}", path.display()))?;
if item.schema_version > CURRENT_RUNTIME_SCHEMA_VERSION {
bail!(
"Item schema v{} is newer than supported v{}",
item.schema_version,
CURRENT_RUNTIME_SCHEMA_VERSION
);
}
if wanted.contains(item.turn_id.as_str()) {
out.entry(item.turn_id.clone()).or_default().push(item);
}
}
for items in out.values_mut() {
sort_turn_items_by_start(items);
}
Ok(out) Ok(out)
} }
@@ -1237,9 +1286,13 @@ impl RuntimeThreadManager {
pub async fn get_thread_detail(&self, id: &str) -> Result<ThreadDetail> { pub async fn get_thread_detail(&self, id: &str) -> Result<ThreadDetail> {
let thread = self.get_thread(id).await?; let thread = self.get_thread(id).await?;
let turns = self.store.list_turns_for_thread(id)?; let turns = self.store.list_turns_for_thread(id)?;
let turn_ids: Vec<String> = turns.iter().map(|turn| turn.id.clone()).collect();
let mut items_by_turn = self.store.list_items_for_turns_map(&turn_ids)?;
let mut items = Vec::new(); let mut items = Vec::new();
for turn in &turns { for turn in &turns {
items.extend(self.store.list_items_for_turn(&turn.id)?); if let Some(mut turn_items) = items_by_turn.remove(&turn.id) {
items.append(&mut turn_items);
}
} }
let latest_seq = self.store.current_seq().await; let latest_seq = self.store.current_seq().await;
Ok(ThreadDetail { Ok(ThreadDetail {
@@ -4291,6 +4344,81 @@ mod tests {
Ok(()) Ok(())
} }
#[tokio::test]
async fn get_thread_detail_batches_items_by_turn_without_losing_order() -> Result<()> {
let manager = test_manager(test_runtime_dir())?;
let thread = manager
.create_thread(CreateThreadRequest {
model: None,
workspace: None,
mode: None,
allow_shell: None,
trust_mode: None,
auto_approve: None,
archived: false,
system_prompt: None,
task_id: None,
})
.await?;
let base = Utc::now();
let mut first_turn = sample_turn(
&thread.id,
"turn_detail_batch_first",
RuntimeTurnStatus::Completed,
);
first_turn.created_at = base;
let mut second_turn = sample_turn(
&thread.id,
"turn_detail_batch_second",
RuntimeTurnStatus::Completed,
);
second_turn.created_at = base + chrono::Duration::seconds(1);
manager.store.save_turn(&first_turn)?;
manager.store.save_turn(&second_turn)?;
let mut first_late = sample_item(
&first_turn.id,
"item_detail_first_late",
TurnItemLifecycleStatus::Completed,
);
first_late.started_at = Some(base + chrono::Duration::seconds(5));
let mut first_early = sample_item(
&first_turn.id,
"item_detail_first_early",
TurnItemLifecycleStatus::Completed,
);
first_early.started_at = Some(base + chrono::Duration::seconds(1));
let mut second_item = sample_item(
&second_turn.id,
"item_detail_second",
TurnItemLifecycleStatus::Completed,
);
second_item.started_at = Some(base + chrono::Duration::seconds(2));
let unrelated = sample_item(
"turn_detail_batch_unrelated",
"item_detail_unrelated",
TurnItemLifecycleStatus::Completed,
);
manager.store.save_item(&first_late)?;
manager.store.save_item(&second_item)?;
manager.store.save_item(&unrelated)?;
manager.store.save_item(&first_early)?;
let detail = manager.get_thread_detail(&thread.id).await?;
let item_ids: Vec<&str> = detail.items.iter().map(|item| item.id.as_str()).collect();
assert_eq!(
item_ids,
vec![
"item_detail_first_early",
"item_detail_first_late",
"item_detail_second"
]
);
Ok(())
}
#[tokio::test] #[tokio::test]
async fn interrupt_turn_marks_interrupted_after_cleanup() -> Result<()> { async fn interrupt_turn_marks_interrupted_after_cleanup() -> Result<()> {
let manager = test_manager(test_runtime_dir())?; let manager = test_manager(test_runtime_dir())?;