Merge PR #2883 from HUQIANTAO: concurrency hardening (mutex recovery, join handles)
This commit is contained in:
@@ -42,19 +42,20 @@ pub fn report(model: &str, usage: &Usage) {
|
||||
if !cost.is_positive() {
|
||||
return;
|
||||
}
|
||||
if let Ok(mut pending) = cell().lock() {
|
||||
pending.usd += cost.usd;
|
||||
pending.cny += cost.cny;
|
||||
}
|
||||
// Recover from poisoned lock — a previous holder panicked but the
|
||||
// accumulated data is still valid.
|
||||
let mut pending = cell().lock().unwrap_or_else(|e| e.into_inner());
|
||||
pending.usd += cost.usd;
|
||||
pending.cny += cost.cny;
|
||||
}
|
||||
|
||||
/// Drain the pending cost. Returns the accumulated amount and resets
|
||||
/// the pool to zero. Called by the TUI render / event loop on each
|
||||
/// frame; any non-zero result gets folded into `accrue_subagent_cost_estimate`.
|
||||
pub fn drain() -> CostEstimate {
|
||||
let Ok(mut pending) = cell().lock() else {
|
||||
return CostEstimate::default();
|
||||
};
|
||||
// Recover from poisoned lock — a previous holder panicked but the
|
||||
// accumulated data is still valid.
|
||||
let mut pending = cell().lock().unwrap_or_else(|e| e.into_inner());
|
||||
std::mem::take(&mut *pending)
|
||||
}
|
||||
|
||||
@@ -63,9 +64,8 @@ pub fn drain() -> CostEstimate {
|
||||
/// state. Production code should always use [`drain`].
|
||||
#[cfg(test)]
|
||||
pub fn reset_for_tests() {
|
||||
if let Ok(mut pending) = cell().lock() {
|
||||
*pending = CostEstimate::default();
|
||||
}
|
||||
let mut pending = cell().lock().unwrap_or_else(|e| e.into_inner());
|
||||
*pending = CostEstimate::default();
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -1077,7 +1077,7 @@ impl HookExecutor {
|
||||
let env = env_vars.clone();
|
||||
let wd = working_dir.clone();
|
||||
|
||||
// Spawn in a detached thread
|
||||
// Spawn in a detached thread (fire-and-forget hook execution).
|
||||
std::thread::spawn(move || {
|
||||
let mut command = HookExecutor::build_shell_command(&cmd);
|
||||
command
|
||||
|
||||
@@ -588,6 +588,8 @@ pub struct SseTransport {
|
||||
endpoint_url: Option<String>,
|
||||
receiver: tokio::sync::mpsc::UnboundedReceiver<SseInbound>,
|
||||
pending_messages: VecDeque<Vec<u8>>,
|
||||
#[allow(dead_code)]
|
||||
sse_task: tokio::task::JoinHandle<()>,
|
||||
}
|
||||
|
||||
enum SseInbound {
|
||||
@@ -647,7 +649,7 @@ impl SseTransport {
|
||||
let headers_clone = headers.clone();
|
||||
let wait_cancel_token = cancel_token.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let sse_task = tokio::spawn(async move {
|
||||
if cancel_token.is_cancelled() {
|
||||
return;
|
||||
}
|
||||
@@ -686,6 +688,7 @@ impl SseTransport {
|
||||
endpoint_url: None,
|
||||
receiver: rx,
|
||||
pending_messages: VecDeque::new(),
|
||||
sse_task,
|
||||
};
|
||||
transport
|
||||
.wait_for_endpoint(&wait_cancel_token, endpoint_timeout)
|
||||
@@ -5491,6 +5494,7 @@ mod tests {
|
||||
});
|
||||
|
||||
let (_sender, receiver) = mpsc::unbounded_channel();
|
||||
let sse_task = tokio::spawn(async {});
|
||||
let mut transport = SseTransport {
|
||||
client: test_http_client(),
|
||||
base_url: format!("http://{addr}/sse"),
|
||||
@@ -5498,6 +5502,7 @@ mod tests {
|
||||
endpoint_url: Some(format!("http://{addr}/messages")),
|
||||
receiver,
|
||||
pending_messages: VecDeque::new(),
|
||||
sse_task,
|
||||
};
|
||||
|
||||
let err = transport
|
||||
|
||||
@@ -381,7 +381,7 @@ impl McpServer {
|
||||
let messages = if internal_name == "deepseek" {
|
||||
vec![user_message]
|
||||
} else {
|
||||
let thread = self.threads.lock().unwrap();
|
||||
let thread = self.threads.lock().unwrap_or_else(|e| e.into_inner());
|
||||
let mut existing = thread.get(&thread_id).cloned().ok_or_else(|| RpcError {
|
||||
code: -32602,
|
||||
message: format!("Thread not found: {thread_id}"),
|
||||
@@ -431,7 +431,7 @@ impl McpServer {
|
||||
|
||||
// Store the assistant response in the thread
|
||||
{
|
||||
let mut thread = self.threads.lock().unwrap();
|
||||
let mut thread = self.threads.lock().unwrap_or_else(|e| e.into_inner());
|
||||
let convo = thread.entry(thread_id.clone()).or_default();
|
||||
// If deepseek, we already have just the user message; if deepseek-reply,
|
||||
// the user message was appended to the cloned messages above but we need
|
||||
|
||||
@@ -154,12 +154,26 @@ pub fn init() -> Result<TuiLogGuard> {
|
||||
.or_else(|_| EnvFilter::try_new("info"))
|
||||
.unwrap_or_else(|_| EnvFilter::new("info"));
|
||||
|
||||
let log_path_clone = log_path.clone();
|
||||
let subscriber = tracing_subscriber::registry().with(env_filter).with(
|
||||
fmt::layer()
|
||||
.with_writer(move || {
|
||||
subscriber_file
|
||||
.try_clone()
|
||||
.expect("clone log file handle for tracing writer")
|
||||
.with_writer(move || -> Box<dyn std::io::Write + Send> {
|
||||
// Clone the file handle for each write. If clone fails (fd exhaustion),
|
||||
// fall back to reopening the same path, or ultimately stderr.
|
||||
match subscriber_file.try_clone() {
|
||||
Ok(f) => Box::new(f),
|
||||
Err(e) => {
|
||||
tracing::warn!("Failed to clone log file handle: {e}, reopening");
|
||||
match std::fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(&log_path_clone)
|
||||
{
|
||||
Ok(f) => Box::new(f),
|
||||
Err(_) => Box::new(std::io::stderr()),
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.with_ansi(false)
|
||||
.with_target(true)
|
||||
|
||||
Reference in New Issue
Block a user