From 119285f0565c2a5b839df7552b65d3d86956e4cb Mon Sep 17 00:00:00 2001 From: Hunter B Date: Fri, 12 Jun 2026 02:35:39 -0700 Subject: [PATCH] fix(runtime): batch thread detail item reads Avoid an N+1 item-directory scan when loading runtime thread details by grouping persisted items for all turns in one pass. Harvested from PR #3141. Co-authored-by: Hmbown <101357273+Hmbown@users.noreply.github.com> Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com> --- CHANGELOG.md | 4 + crates/tui/CHANGELOG.md | 4 + crates/tui/src/runtime_threads.rs | 140 ++++++++++++++++++++++++++++-- 3 files changed, 142 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index de6db201..8f86f169 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 Anthropic stream readers now accept both `data: {...}` and `data:{...}` SSE frames, matching the spec and preventing providers that omit the optional space from streaming empty output. Thanks @wgeeker for the PR. +- **Runtime thread detail N+1 reads (#3141).** `get_thread_detail` now scans + persisted turn items once and groups them by turn instead of reading the + items directory once per turn, preserving item order while keeping large + thread detail loads responsive. - **SiliconFlow China provider config (#2893/#2895).** `siliconflow-CN` now reads its own `[providers.siliconflow_cn]` / `[providers.siliconflow-CN]` table and falls back to `[providers.siliconflow]` only for unset diff --git a/crates/tui/CHANGELOG.md b/crates/tui/CHANGELOG.md index 702f8feb..990c1c8d 100644 --- a/crates/tui/CHANGELOG.md +++ b/crates/tui/CHANGELOG.md @@ -53,6 +53,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 Anthropic stream readers now accept both `data: {...}` and `data:{...}` SSE frames, matching the spec and preventing providers that omit the optional space from streaming empty output. Thanks @wgeeker for the PR. +- **Runtime thread detail N+1 reads (#3141).** `get_thread_detail` now scans + persisted turn items once and groups them by turn instead of reading the + items directory once per turn, preserving item order while keeping large + thread detail loads responsive. - **SiliconFlow China provider config (#2893/#2895).** `siliconflow-CN` now reads its own `[providers.siliconflow_cn]` / `[providers.siliconflow-CN]` table and falls back to `[providers.siliconflow]` only for unset diff --git a/crates/tui/src/runtime_threads.rs b/crates/tui/src/runtime_threads.rs index 64859221..3039ad18 100644 --- a/crates/tui/src/runtime_threads.rs +++ b/crates/tui/src/runtime_threads.rs @@ -61,6 +61,15 @@ fn validated_record_id<'a>(id: &'a str, label: &str) -> Result<&'a str> { Ok(trimmed) } +fn sort_turn_items_by_start(items: &mut [TurnItemRecord]) { + let fallback = Utc::now(); + items.sort_by(|a, b| { + let left = a.started_at.unwrap_or(fallback); + let right = b.started_at.unwrap_or(fallback); + left.cmp(&right) + }); +} + /// Bumped to 2 for v0.6.6 after live engine semantics changed. The persisted /// thread/turn/item records did not change shape, but a v1 reader on a v2 /// session should still fail closed rather than silently mis-replay. @@ -432,11 +441,51 @@ impl RuntimeThreadStore { out.push(item); } } - out.sort_by(|a, b| { - let left = a.started_at.unwrap_or_else(Utc::now); - let right = b.started_at.unwrap_or_else(Utc::now); - left.cmp(&right) - }); + sort_turn_items_by_start(&mut out); + Ok(out) + } + + pub fn list_items_for_turns_map( + &self, + turn_ids: &[String], + ) -> Result>> { + if turn_ids.is_empty() { + return Ok(HashMap::new()); + } + + for turn_id in turn_ids { + validated_record_id(turn_id, "turn id")?; + } + + let wanted: HashSet<&str> = turn_ids.iter().map(String::as_str).collect(); + let mut out: HashMap> = HashMap::new(); + for entry in fs::read_dir(&self.items_dir) + .with_context(|| format!("Failed to read {}", self.items_dir.display()))? + { + let entry = entry?; + let path = entry.path(); + if path.extension().is_none_or(|ext| ext != "json") { + continue; + } + let raw = fs::read_to_string(&path) + .with_context(|| format!("Failed to read {}", path.display()))?; + let item: TurnItemRecord = serde_json::from_str(&raw) + .with_context(|| format!("Failed to parse {}", path.display()))?; + if item.schema_version > CURRENT_RUNTIME_SCHEMA_VERSION { + bail!( + "Item schema v{} is newer than supported v{}", + item.schema_version, + CURRENT_RUNTIME_SCHEMA_VERSION + ); + } + if wanted.contains(item.turn_id.as_str()) { + out.entry(item.turn_id.clone()).or_default().push(item); + } + } + + for items in out.values_mut() { + sort_turn_items_by_start(items); + } Ok(out) } @@ -1237,9 +1286,13 @@ impl RuntimeThreadManager { pub async fn get_thread_detail(&self, id: &str) -> Result { let thread = self.get_thread(id).await?; let turns = self.store.list_turns_for_thread(id)?; + let turn_ids: Vec = turns.iter().map(|turn| turn.id.clone()).collect(); + let mut items_by_turn = self.store.list_items_for_turns_map(&turn_ids)?; let mut items = Vec::new(); for turn in &turns { - items.extend(self.store.list_items_for_turn(&turn.id)?); + if let Some(mut turn_items) = items_by_turn.remove(&turn.id) { + items.append(&mut turn_items); + } } let latest_seq = self.store.current_seq().await; Ok(ThreadDetail { @@ -4291,6 +4344,81 @@ mod tests { Ok(()) } + #[tokio::test] + async fn get_thread_detail_batches_items_by_turn_without_losing_order() -> Result<()> { + let manager = test_manager(test_runtime_dir())?; + let thread = manager + .create_thread(CreateThreadRequest { + model: None, + workspace: None, + mode: None, + allow_shell: None, + trust_mode: None, + auto_approve: None, + archived: false, + system_prompt: None, + task_id: None, + }) + .await?; + + let base = Utc::now(); + let mut first_turn = sample_turn( + &thread.id, + "turn_detail_batch_first", + RuntimeTurnStatus::Completed, + ); + first_turn.created_at = base; + let mut second_turn = sample_turn( + &thread.id, + "turn_detail_batch_second", + RuntimeTurnStatus::Completed, + ); + second_turn.created_at = base + chrono::Duration::seconds(1); + manager.store.save_turn(&first_turn)?; + manager.store.save_turn(&second_turn)?; + + let mut first_late = sample_item( + &first_turn.id, + "item_detail_first_late", + TurnItemLifecycleStatus::Completed, + ); + first_late.started_at = Some(base + chrono::Duration::seconds(5)); + let mut first_early = sample_item( + &first_turn.id, + "item_detail_first_early", + TurnItemLifecycleStatus::Completed, + ); + first_early.started_at = Some(base + chrono::Duration::seconds(1)); + let mut second_item = sample_item( + &second_turn.id, + "item_detail_second", + TurnItemLifecycleStatus::Completed, + ); + second_item.started_at = Some(base + chrono::Duration::seconds(2)); + let unrelated = sample_item( + "turn_detail_batch_unrelated", + "item_detail_unrelated", + TurnItemLifecycleStatus::Completed, + ); + + manager.store.save_item(&first_late)?; + manager.store.save_item(&second_item)?; + manager.store.save_item(&unrelated)?; + manager.store.save_item(&first_early)?; + + let detail = manager.get_thread_detail(&thread.id).await?; + let item_ids: Vec<&str> = detail.items.iter().map(|item| item.id.as_str()).collect(); + assert_eq!( + item_ids, + vec![ + "item_detail_first_early", + "item_detail_first_late", + "item_detail_second" + ] + ); + Ok(()) + } + #[tokio::test] async fn interrupt_turn_marks_interrupted_after_cleanup() -> Result<()> { let manager = test_manager(test_runtime_dir())?;