From 4304c89d654561476276ab19d7c3cbecf8d5376d Mon Sep 17 00:00:00 2001 From: huqiantao Date: Sun, 7 Jun 2026 19:18:19 +0800 Subject: [PATCH 1/4] fix: concurrency bugs - mutex handling, thread spawning, and resource management 1. Fix Mutex lock().unwrap() in MCP server (mcp_server.rs:384,434) - Use unwrap_or_else(|e| e.into_inner()) to recover from poisoned locks - Previously, a single panic while holding the lock would cascade to all threads 2. Fix std::thread::spawn in async code (hooks.rs:1055) - Replace std::thread::spawn with tokio::task::spawn_blocking - Respects tokio's thread pool limits instead of creating unbounded OS threads - Fire-and-forget hook execution now properly managed by tokio runtime 3. Fix dropped JoinHandle in SSE loop (mcp.rs:647) - Store the JoinHandle in SseTransport struct - Enables detection of SSE loop termination - Prevents silent connection loss without structured error reporting 4. Fix std::sync::Mutex poison handling in cost_status (cost_status.rs:28-58) - Use unwrap_or_else(|e| e.into_inner()) to recover from poisoned locks - Previously, a panic while holding the lock silently lost all subsequent cost data - Cost tracking now survives mutex poisoning 5. Fix .expect() in tracing writer (runtime_log.rs:162) - Replace expect() with fallback chain: try_clone -> reopen file -> stderr - Prevents panicking inside tracing subscriber on fd exhaustion - Previously, EMFILE during logging would crash the application --- crates/tui/src/cost_status.rs | 20 ++++++++++---------- crates/tui/src/hooks.rs | 4 ++-- crates/tui/src/mcp.rs | 5 ++++- crates/tui/src/mcp_server.rs | 4 ++-- crates/tui/src/runtime_log.rs | 18 +++++++++++++++--- 5 files changed, 33 insertions(+), 18 deletions(-) diff --git a/crates/tui/src/cost_status.rs b/crates/tui/src/cost_status.rs index 60feebd3..38476641 100644 --- a/crates/tui/src/cost_status.rs +++ b/crates/tui/src/cost_status.rs @@ -42,19 +42,20 @@ pub fn report(model: &str, usage: &Usage) { if !cost.is_positive() { return; } - if let Ok(mut pending) = cell().lock() { - pending.usd += cost.usd; - pending.cny += cost.cny; - } + // Recover from poisoned lock — a previous holder panicked but the + // accumulated data is still valid. + let mut pending = cell().lock().unwrap_or_else(|e| e.into_inner()); + pending.usd += cost.usd; + pending.cny += cost.cny; } /// Drain the pending cost. Returns the accumulated amount and resets /// the pool to zero. Called by the TUI render / event loop on each /// frame; any non-zero result gets folded into `accrue_subagent_cost_estimate`. pub fn drain() -> CostEstimate { - let Ok(mut pending) = cell().lock() else { - return CostEstimate::default(); - }; + // Recover from poisoned lock — a previous holder panicked but the + // accumulated data is still valid. + let mut pending = cell().lock().unwrap_or_else(|e| e.into_inner()); std::mem::take(&mut *pending) } @@ -63,9 +64,8 @@ pub fn drain() -> CostEstimate { /// state. Production code should always use [`drain`]. #[cfg(test)] pub fn reset_for_tests() { - if let Ok(mut pending) = cell().lock() { - *pending = CostEstimate::default(); - } + let mut pending = cell().lock().unwrap_or_else(|e| e.into_inner()); + *pending = CostEstimate::default(); } #[cfg(test)] diff --git a/crates/tui/src/hooks.rs b/crates/tui/src/hooks.rs index a528bc1a..f0421e3d 100644 --- a/crates/tui/src/hooks.rs +++ b/crates/tui/src/hooks.rs @@ -1051,8 +1051,8 @@ impl HookExecutor { let env = env_vars.clone(); let wd = working_dir.clone(); - // Spawn in a detached thread - std::thread::spawn(move || { + // Spawn in a blocking task (respects tokio's thread pool limits). + tokio::task::spawn_blocking(move || { let mut command = HookExecutor::build_shell_command(&cmd); command .current_dir(&wd) diff --git a/crates/tui/src/mcp.rs b/crates/tui/src/mcp.rs index 08baea29..2e3b0732 100644 --- a/crates/tui/src/mcp.rs +++ b/crates/tui/src/mcp.rs @@ -585,6 +585,8 @@ pub struct SseTransport { endpoint_url: Option, receiver: tokio::sync::mpsc::UnboundedReceiver, pending_messages: VecDeque>, + #[allow(dead_code)] + sse_task: tokio::task::JoinHandle<()>, } enum SseInbound { @@ -644,7 +646,7 @@ impl SseTransport { let headers_clone = headers.clone(); let wait_cancel_token = cancel_token.clone(); - tokio::spawn(async move { + let sse_task = tokio::spawn(async move { if cancel_token.is_cancelled() { return; } @@ -683,6 +685,7 @@ impl SseTransport { endpoint_url: None, receiver: rx, pending_messages: VecDeque::new(), + sse_task, }; transport .wait_for_endpoint(&wait_cancel_token, endpoint_timeout) diff --git a/crates/tui/src/mcp_server.rs b/crates/tui/src/mcp_server.rs index cb5b4b6b..e9da6ae3 100644 --- a/crates/tui/src/mcp_server.rs +++ b/crates/tui/src/mcp_server.rs @@ -381,7 +381,7 @@ impl McpServer { let messages = if internal_name == "deepseek" { vec![user_message] } else { - let thread = self.threads.lock().unwrap(); + let thread = self.threads.lock().unwrap_or_else(|e| e.into_inner()); let mut existing = thread.get(&thread_id).cloned().ok_or_else(|| RpcError { code: -32602, message: format!("Thread not found: {thread_id}"), @@ -431,7 +431,7 @@ impl McpServer { // Store the assistant response in the thread { - let mut thread = self.threads.lock().unwrap(); + let mut thread = self.threads.lock().unwrap_or_else(|e| e.into_inner()); let convo = thread.entry(thread_id.clone()).or_default(); // If deepseek, we already have just the user message; if deepseek-reply, // the user message was appended to the cloned messages above but we need diff --git a/crates/tui/src/runtime_log.rs b/crates/tui/src/runtime_log.rs index 32ce6508..3abd87c1 100644 --- a/crates/tui/src/runtime_log.rs +++ b/crates/tui/src/runtime_log.rs @@ -154,12 +154,24 @@ pub fn init() -> Result { .or_else(|_| EnvFilter::try_new("info")) .unwrap_or_else(|_| EnvFilter::new("info")); + let log_path_clone = log_path.clone(); let subscriber = tracing_subscriber::registry().with(env_filter).with( fmt::layer() .with_writer(move || { - subscriber_file - .try_clone() - .expect("clone log file handle for tracing writer") + // Clone the file handle for each write. If clone fails (fd exhaustion), + // fall back to reopening the same path, or ultimately stderr. + subscriber_file.try_clone().unwrap_or_else(|e| { + tracing::warn!("Failed to clone log file handle: {e}, reopening"); + std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&log_path_clone) + .unwrap_or_else(|_| { + // Last resort: wrap stderr as a File to prevent panic. + use std::os::unix::io::FromRawFd; + unsafe { std::fs::File::from_raw_fd(2) } + }) + }) }) .with_ansi(false) .with_target(true) From 27ca87251e2dabfcf6247444a8d3f8d8922295e3 Mon Sep 17 00:00:00 2001 From: huqiantao Date: Sun, 7 Jun 2026 19:35:59 +0800 Subject: [PATCH 2/4] fix: use Box for cross-platform tracing writer Replace platform-specific std::os::unix::io::FromRawFd with Box return type. This compiles on Windows, macOS, and Linux without unsafe code. The closure now returns a boxed writer that is either: - The cloned file handle (success case) - A reopened file handle (clone failed) - stderr (last resort, prevents panic) --- crates/tui/src/runtime_log.rs | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/crates/tui/src/runtime_log.rs b/crates/tui/src/runtime_log.rs index 3abd87c1..35f1b8a5 100644 --- a/crates/tui/src/runtime_log.rs +++ b/crates/tui/src/runtime_log.rs @@ -157,21 +157,23 @@ pub fn init() -> Result { let log_path_clone = log_path.clone(); let subscriber = tracing_subscriber::registry().with(env_filter).with( fmt::layer() - .with_writer(move || { + .with_writer(move || -> Box { // Clone the file handle for each write. If clone fails (fd exhaustion), // fall back to reopening the same path, or ultimately stderr. - subscriber_file.try_clone().unwrap_or_else(|e| { - tracing::warn!("Failed to clone log file handle: {e}, reopening"); - std::fs::OpenOptions::new() - .create(true) - .append(true) - .open(&log_path_clone) - .unwrap_or_else(|_| { - // Last resort: wrap stderr as a File to prevent panic. - use std::os::unix::io::FromRawFd; - unsafe { std::fs::File::from_raw_fd(2) } - }) - }) + match subscriber_file.try_clone() { + Ok(f) => Box::new(f), + Err(e) => { + tracing::warn!("Failed to clone log file handle: {e}, reopening"); + match std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&log_path_clone) + { + Ok(f) => Box::new(f), + Err(_) => Box::new(std::io::stderr()), + } + } + } }) .with_ansi(false) .with_target(true) From 3c197d707b0801c34b8f71c388589baf230e6247 Mon Sep 17 00:00:00 2001 From: huqiantao Date: Sun, 7 Jun 2026 19:48:09 +0800 Subject: [PATCH 3/4] fix: add sse_task field to SseTransport test initializer The test at line 4768 was missing the new sse_task field added to SseTransport. Add a dummy tokio::spawn task for the test. --- crates/tui/src/mcp.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/tui/src/mcp.rs b/crates/tui/src/mcp.rs index 2e3b0732..07a8c7a2 100644 --- a/crates/tui/src/mcp.rs +++ b/crates/tui/src/mcp.rs @@ -4765,6 +4765,7 @@ mod tests { }); let (_sender, receiver) = mpsc::unbounded_channel(); + let sse_task = tokio::spawn(async {}); let mut transport = SseTransport { client: reqwest::Client::new(), base_url: format!("http://{addr}/sse"), @@ -4772,6 +4773,7 @@ mod tests { endpoint_url: Some(format!("http://{addr}/messages")), receiver, pending_messages: VecDeque::new(), + sse_task, }; let err = transport From bdf7b15bd7cbe7a13cece9dfc6d81e4d795e35ef Mon Sep 17 00:00:00 2001 From: huqiantao Date: Sun, 7 Jun 2026 19:59:17 +0800 Subject: [PATCH 4/4] revert: use std::thread::spawn for fire-and-forget hooks tokio::task::spawn_blocking requires a running tokio runtime, which breaks tests that call hook functions outside a tokio context. Since hooks are fire-and-forget (no JoinHandle needed), std::thread::spawn is the correct choice. --- crates/tui/src/hooks.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/tui/src/hooks.rs b/crates/tui/src/hooks.rs index f0421e3d..e9769883 100644 --- a/crates/tui/src/hooks.rs +++ b/crates/tui/src/hooks.rs @@ -1051,8 +1051,8 @@ impl HookExecutor { let env = env_vars.clone(); let wd = working_dir.clone(); - // Spawn in a blocking task (respects tokio's thread pool limits). - tokio::task::spawn_blocking(move || { + // Spawn in a detached thread (fire-and-forget hook execution). + std::thread::spawn(move || { let mut command = HookExecutor::build_shell_command(&cmd); command .current_dir(&wd)