fix(tui): convert remaining tokio::spawn sites to spawn_supervised + restore terminal on panic (#346)
Completes the panic-safety work #346 started in a8be33b3. Converts every
trivial production tokio::spawn site to spawn_supervised so a panicking
task writes a crash dump to ~/.deepseek/crashes/ and the parent process
stays alive.
Sites converted:
- tools/rlm.rs:190 — RLM progress drain
- tools/subagent/mod.rs:888 — run_subagent_task spawn
- tools/subagent/mod.rs:988 — run_subagent_task resume
- core/engine.rs:744 — sub-agent mailbox drainer
- core/engine.rs:1601 — engine event-loop spawn
- lsp/client.rs:127 — LSP writer
- lsp/client.rs:129 — LSP reader
- lsp/client.rs:135 — LSP dispatcher
- rlm/bridge.rs:188 — bridge progress drain
- task_manager.rs:790 — task worker loop
- automation_manager.rs:822 — automation scheduler
Sites left as-is (already panic-safe with their own catch_unwind):
- runtime_threads.rs:1242, 1462 — custom AssertUnwindSafe + catch_unwind
- mcp.rs:322 — MCP SSE loop with custom catch_unwind
Sites that don't need conversion:
- runtime_api.rs:287 — axum::serve runs in the parent task, not spawned
- runtime_api.rs:1583+ — test-helper spawn_test_server inside #[cfg(test)]
- All other spawn calls are in #[cfg(test)] modules where panics are
expected to propagate.
Also:
- main.rs panic hook now restores the terminal (LeaveAlternateScreen +
disable_raw_mode) before invoking the original hook, so a panicked TUI
doesn't leave the user's shell stuck in alt-screen mode.
- Adds spawn_supervised_tests::panicking_task_writes_crash_dump_and_does_not_kill_parent
that proves a panicking task produces a dated crash log under
~/.deepseek/crashes/<task>.log and the parent task completes Ok.
Closes #346.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -18,6 +18,7 @@ use tokio_util::sync::CancellationToken;
|
|||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::task_manager::{NewTaskRequest, SharedTaskManager, TaskStatus};
|
use crate::task_manager::{NewTaskRequest, SharedTaskManager, TaskStatus};
|
||||||
|
use crate::utils::spawn_supervised;
|
||||||
|
|
||||||
const CURRENT_AUTOMATION_SCHEMA_VERSION: u32 = 1;
|
const CURRENT_AUTOMATION_SCHEMA_VERSION: u32 = 1;
|
||||||
const CURRENT_RUN_SCHEMA_VERSION: u32 = 1;
|
const CURRENT_RUN_SCHEMA_VERSION: u32 = 1;
|
||||||
@@ -819,7 +820,10 @@ pub fn spawn_scheduler(
|
|||||||
cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
config: AutomationSchedulerConfig,
|
config: AutomationSchedulerConfig,
|
||||||
) -> tokio::task::JoinHandle<()> {
|
) -> tokio::task::JoinHandle<()> {
|
||||||
tokio::spawn(async move {
|
spawn_supervised(
|
||||||
|
"automation-scheduler",
|
||||||
|
std::panic::Location::caller(),
|
||||||
|
async move {
|
||||||
let interval = config.tick_interval_secs.max(5);
|
let interval = config.tick_interval_secs.max(5);
|
||||||
loop {
|
loop {
|
||||||
if cancel.is_cancelled() {
|
if cancel.is_cancelled() {
|
||||||
@@ -841,7 +845,8 @@ pub fn spawn_scheduler(
|
|||||||
_ = sleep(std::time::Duration::from_secs(interval)) => {}
|
_ = sleep(std::time::Duration::from_secs(interval)) => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
},
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -51,6 +51,7 @@ use crate::tools::todo::{SharedTodoList, new_shared_todo_list};
|
|||||||
use crate::tools::user_input::{UserInputRequest, UserInputResponse};
|
use crate::tools::user_input::{UserInputRequest, UserInputResponse};
|
||||||
use crate::tools::{ToolContext, ToolRegistryBuilder};
|
use crate::tools::{ToolContext, ToolRegistryBuilder};
|
||||||
use crate::tui::app::AppMode;
|
use crate::tui::app::AppMode;
|
||||||
|
use crate::utils::spawn_supervised;
|
||||||
|
|
||||||
use super::capacity::{
|
use super::capacity::{
|
||||||
CapacityController, CapacityControllerConfig, CapacityDecision, CapacityObservationInput,
|
CapacityController, CapacityControllerConfig, CapacityDecision, CapacityObservationInput,
|
||||||
@@ -741,20 +742,24 @@ impl Engine {
|
|||||||
let cancel_token = self.cancel_token.child_token();
|
let cancel_token = self.cancel_token.child_token();
|
||||||
let (mailbox, mut receiver) = Mailbox::new(cancel_token.clone());
|
let (mailbox, mut receiver) = Mailbox::new(cancel_token.clone());
|
||||||
let tx_event_clone = self.tx_event.clone();
|
let tx_event_clone = self.tx_event.clone();
|
||||||
tokio::spawn(async move {
|
spawn_supervised(
|
||||||
while let Some(envelope) = receiver.recv().await {
|
"subagent-mailbox-drainer",
|
||||||
if tx_event_clone
|
std::panic::Location::caller(),
|
||||||
.send(Event::SubAgentMailbox {
|
async move {
|
||||||
seq: envelope.seq,
|
while let Some(envelope) = receiver.recv().await {
|
||||||
message: envelope.message,
|
if tx_event_clone
|
||||||
})
|
.send(Event::SubAgentMailbox {
|
||||||
.await
|
seq: envelope.seq,
|
||||||
.is_err()
|
message: envelope.message,
|
||||||
{
|
})
|
||||||
break;
|
.await
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
});
|
);
|
||||||
Some((mailbox, cancel_token))
|
Some((mailbox, cancel_token))
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
@@ -1598,9 +1603,13 @@ impl Engine {
|
|||||||
pub fn spawn_engine(config: EngineConfig, api_config: &Config) -> EngineHandle {
|
pub fn spawn_engine(config: EngineConfig, api_config: &Config) -> EngineHandle {
|
||||||
let (engine, handle) = Engine::new(config, api_config);
|
let (engine, handle) = Engine::new(config, api_config);
|
||||||
|
|
||||||
tokio::spawn(async move {
|
spawn_supervised(
|
||||||
engine.run().await;
|
"engine-event-loop",
|
||||||
});
|
std::panic::Location::caller(),
|
||||||
|
async move {
|
||||||
|
engine.run().await;
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
handle
|
handle
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -40,6 +40,7 @@ use tokio::time::timeout;
|
|||||||
|
|
||||||
use super::diagnostics::{Diagnostic, Severity};
|
use super::diagnostics::{Diagnostic, Severity};
|
||||||
use super::registry::Language;
|
use super::registry::Language;
|
||||||
|
use crate::utils::spawn_supervised;
|
||||||
|
|
||||||
/// Trait the LSP manager talks to. A real LSP server speaks this via stdio;
|
/// Trait the LSP manager talks to. A real LSP server speaks this via stdio;
|
||||||
/// tests use an in-process fake.
|
/// tests use an in-process fake.
|
||||||
@@ -124,15 +125,27 @@ impl StdioLspTransport {
|
|||||||
let (tx_diag, rx_diag) = mpsc::channel::<(PathBuf, Vec<Diagnostic>)>(64);
|
let (tx_diag, rx_diag) = mpsc::channel::<(PathBuf, Vec<Diagnostic>)>(64);
|
||||||
|
|
||||||
// Writer task: drain outbound channel, frame with Content-Length, write to stdin.
|
// Writer task: drain outbound channel, frame with Content-Length, write to stdin.
|
||||||
tokio::spawn(writer_task(stdin, rx_outbound));
|
spawn_supervised(
|
||||||
|
"lsp-writer",
|
||||||
|
std::panic::Location::caller(),
|
||||||
|
writer_task(stdin, rx_outbound),
|
||||||
|
);
|
||||||
// Reader task: parse Content-Length frames from stdout, push to inbound queue.
|
// Reader task: parse Content-Length frames from stdout, push to inbound queue.
|
||||||
tokio::spawn(reader_task(stdout, tx_inbound));
|
spawn_supervised(
|
||||||
|
"lsp-reader",
|
||||||
|
std::panic::Location::caller(),
|
||||||
|
reader_task(stdout, tx_inbound),
|
||||||
|
);
|
||||||
// Inbound dispatcher: routes notifications to `tx_diag`, replies to a
|
// Inbound dispatcher: routes notifications to `tx_diag`, replies to a
|
||||||
// pending map. We keep the pending map for completeness even though
|
// pending map. We keep the pending map for completeness even though
|
||||||
// diagnostics polling itself does not reuse it.
|
// diagnostics polling itself does not reuse it.
|
||||||
let pending: Arc<AsyncMutex<HashMap<i64, oneshot::Sender<Value>>>> =
|
let pending: Arc<AsyncMutex<HashMap<i64, oneshot::Sender<Value>>>> =
|
||||||
Arc::new(AsyncMutex::new(HashMap::new()));
|
Arc::new(AsyncMutex::new(HashMap::new()));
|
||||||
tokio::spawn(dispatcher_task(rx_inbound, tx_diag, pending.clone()));
|
spawn_supervised(
|
||||||
|
"lsp-dispatcher",
|
||||||
|
std::panic::Location::caller(),
|
||||||
|
dispatcher_task(rx_inbound, tx_diag, pending.clone()),
|
||||||
|
);
|
||||||
|
|
||||||
// Send `initialize` and wait for `initialized`. We synthesize id=1.
|
// Send `initialize` and wait for `initialized`. We synthesize id=1.
|
||||||
let init_payload = json!({
|
let init_payload = json!({
|
||||||
|
|||||||
+10
-1
@@ -506,9 +506,18 @@ enum SandboxCommand {
|
|||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<()> {
|
async fn main() -> Result<()> {
|
||||||
// Set up process panic hook before anything else — writes crash dumps
|
// Set up process panic hook before anything else — writes crash dumps
|
||||||
// to ~/.deepseek/crashes/ even if the panic happens before tokio is up.
|
// to ~/.deepseek/crashes/ even if the panic happens before tokio is up,
|
||||||
|
// and restores the terminal so a panicked TUI doesn't leave the user's
|
||||||
|
// shell stuck in alt-screen mode.
|
||||||
let orig_hook = std::panic::take_hook();
|
let orig_hook = std::panic::take_hook();
|
||||||
std::panic::set_hook(Box::new(move |panic_info| {
|
std::panic::set_hook(Box::new(move |panic_info| {
|
||||||
|
// Restore the terminal first so the panic message itself, plus the
|
||||||
|
// user's shell after exit, are visible. Best-effort — we may not be
|
||||||
|
// in raw / alt-screen mode if the panic happens pre-TUI.
|
||||||
|
use crossterm::terminal::{LeaveAlternateScreen, disable_raw_mode};
|
||||||
|
let _ = disable_raw_mode();
|
||||||
|
let _ = crossterm::execute!(std::io::stdout(), LeaveAlternateScreen);
|
||||||
|
|
||||||
let msg = if let Some(s) = panic_info.payload().downcast_ref::<&str>() {
|
let msg = if let Some(s) = panic_info.payload().downcast_ref::<&str>() {
|
||||||
s.to_string()
|
s.to_string()
|
||||||
} else if let Some(s) = panic_info.payload().downcast_ref::<String>() {
|
} else if let Some(s) = panic_info.payload().downcast_ref::<String>() {
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ use tokio::sync::Mutex;
|
|||||||
use crate::llm_client::LlmClient;
|
use crate::llm_client::LlmClient;
|
||||||
use crate::models::{ContentBlock, Message, MessageRequest, MessageResponse, SystemPrompt, Usage};
|
use crate::models::{ContentBlock, Message, MessageRequest, MessageResponse, SystemPrompt, Usage};
|
||||||
use crate::repl::runtime::{BatchResp, RpcDispatcher, RpcRequest, RpcResponse, SingleResp};
|
use crate::repl::runtime::{BatchResp, RpcDispatcher, RpcRequest, RpcResponse, SingleResp};
|
||||||
|
use crate::utils::spawn_supervised;
|
||||||
|
|
||||||
/// Per-child completion timeout — same as the previous sidecar default.
|
/// Per-child completion timeout — same as the previous sidecar default.
|
||||||
const CHILD_TIMEOUT_SECS: u64 = 120;
|
const CHILD_TIMEOUT_SECS: u64 = 120;
|
||||||
@@ -185,7 +186,11 @@ impl RlmBridge {
|
|||||||
// turn (we don't surface them; this dispatch is invisible to the
|
// turn (we don't surface them; this dispatch is invisible to the
|
||||||
// outer agent stream).
|
// outer agent stream).
|
||||||
let (tx, mut rx) = tokio::sync::mpsc::channel(64);
|
let (tx, mut rx) = tokio::sync::mpsc::channel(64);
|
||||||
let drain = tokio::spawn(async move { while rx.recv().await.is_some() {} });
|
let drain = spawn_supervised(
|
||||||
|
"rlm-bridge-drain",
|
||||||
|
std::panic::Location::caller(),
|
||||||
|
async move { while rx.recv().await.is_some() {} },
|
||||||
|
);
|
||||||
|
|
||||||
let child_model = model
|
let child_model = model
|
||||||
.filter(|m| !m.is_empty())
|
.filter(|m| !m.is_empty())
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ use crate::runtime_threads::{
|
|||||||
CreateThreadRequest, RuntimeThreadManager, RuntimeThreadManagerConfig, RuntimeTurnStatus,
|
CreateThreadRequest, RuntimeThreadManager, RuntimeThreadManagerConfig, RuntimeTurnStatus,
|
||||||
SharedRuntimeThreadManager, StartTurnRequest,
|
SharedRuntimeThreadManager, StartTurnRequest,
|
||||||
};
|
};
|
||||||
|
use crate::utils::spawn_supervised;
|
||||||
|
|
||||||
const DEFAULT_WORKERS: usize = 2;
|
const DEFAULT_WORKERS: usize = 2;
|
||||||
const MAX_WORKERS: usize = 8;
|
const MAX_WORKERS: usize = 8;
|
||||||
@@ -787,9 +788,13 @@ impl TaskManager {
|
|||||||
|
|
||||||
for _ in 0..workers {
|
for _ in 0..workers {
|
||||||
let manager_clone = Arc::clone(&manager);
|
let manager_clone = Arc::clone(&manager);
|
||||||
tokio::spawn(async move {
|
spawn_supervised(
|
||||||
manager_clone.worker_loop().await;
|
"task-manager-worker",
|
||||||
});
|
std::panic::Location::caller(),
|
||||||
|
async move {
|
||||||
|
manager_clone.worker_loop().await;
|
||||||
|
},
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(manager)
|
Ok(manager)
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ use crate::rlm::turn::{RlmTermination, run_rlm_turn_with_root};
|
|||||||
use crate::tools::spec::{
|
use crate::tools::spec::{
|
||||||
ApprovalRequirement, ToolCapability, ToolContext, ToolError, ToolResult, ToolSpec,
|
ApprovalRequirement, ToolCapability, ToolContext, ToolError, ToolResult, ToolSpec,
|
||||||
};
|
};
|
||||||
|
use crate::utils::spawn_supervised;
|
||||||
|
|
||||||
/// Default child model — cheap and fast.
|
/// Default child model — cheap and fast.
|
||||||
const DEFAULT_CHILD_MODEL: &str = "deepseek-v4-flash";
|
const DEFAULT_CHILD_MODEL: &str = "deepseek-v4-flash";
|
||||||
@@ -187,7 +188,11 @@ impl ToolSpec for RlmTool {
|
|||||||
// we don't want RLM's progress events to interleave with the
|
// we don't want RLM's progress events to interleave with the
|
||||||
// parent agent's stream. Drain into a no-op channel.
|
// parent agent's stream. Drain into a no-op channel.
|
||||||
let (tx, mut rx) = tokio::sync::mpsc::channel(64);
|
let (tx, mut rx) = tokio::sync::mpsc::channel(64);
|
||||||
let drain = tokio::spawn(async move { while rx.recv().await.is_some() {} });
|
let drain = spawn_supervised(
|
||||||
|
"rlm-progress-drain",
|
||||||
|
std::panic::Location::caller(),
|
||||||
|
async move { while rx.recv().await.is_some() {} },
|
||||||
|
);
|
||||||
|
|
||||||
// The big body lives only in the REPL as `context`. The small
|
// The big body lives only in the REPL as `context`. The small
|
||||||
// `task` rides along as `root_prompt` and is shown to the root
|
// `task` rides along as `root_prompt` and is shown to the root
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ use crate::tools::spec::{
|
|||||||
optional_bool, optional_u64, required_str,
|
optional_bool, optional_u64, required_str,
|
||||||
};
|
};
|
||||||
use crate::tools::todo::{SharedTodoList, TodoList};
|
use crate::tools::todo::{SharedTodoList, TodoList};
|
||||||
|
use crate::utils::spawn_supervised;
|
||||||
|
|
||||||
pub mod mailbox;
|
pub mod mailbox;
|
||||||
#[allow(unused_imports)]
|
#[allow(unused_imports)]
|
||||||
@@ -885,7 +886,11 @@ impl SubAgentManager {
|
|||||||
max_steps,
|
max_steps,
|
||||||
input_rx,
|
input_rx,
|
||||||
};
|
};
|
||||||
let handle = tokio::spawn(run_subagent_task(task));
|
let handle = spawn_supervised(
|
||||||
|
"subagent-task",
|
||||||
|
std::panic::Location::caller(),
|
||||||
|
run_subagent_task(task),
|
||||||
|
);
|
||||||
agent.task_handle = Some(handle);
|
agent.task_handle = Some(handle);
|
||||||
self.agents.insert(agent_id.clone(), agent);
|
self.agents.insert(agent_id.clone(), agent);
|
||||||
self.persist_state_best_effort();
|
self.persist_state_best_effort();
|
||||||
@@ -985,7 +990,11 @@ impl SubAgentManager {
|
|||||||
max_steps: self.max_steps,
|
max_steps: self.max_steps,
|
||||||
input_rx,
|
input_rx,
|
||||||
};
|
};
|
||||||
let handle = tokio::spawn(run_subagent_task(task));
|
let handle = spawn_supervised(
|
||||||
|
"subagent-task-resume",
|
||||||
|
std::panic::Location::caller(),
|
||||||
|
run_subagent_task(task),
|
||||||
|
);
|
||||||
|
|
||||||
agent.status = SubAgentStatus::Running;
|
agent.status = SubAgentStatus::Running;
|
||||||
agent.result = None;
|
agent.result = None;
|
||||||
|
|||||||
@@ -530,6 +530,76 @@ mod atomic_write_tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod spawn_supervised_tests {
|
||||||
|
use super::*;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
|
||||||
|
/// A spawned task that panics produces a crash dump in
|
||||||
|
/// `~/.deepseek/crashes/` and the panic does not propagate to the
|
||||||
|
/// parent task — `spawn_supervised` catches it.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn panicking_task_writes_crash_dump_and_does_not_kill_parent() {
|
||||||
|
// Redirect HOME so we don't pollute the real ~/.deepseek/crashes/.
|
||||||
|
let tmp = tempfile::tempdir().expect("tempdir");
|
||||||
|
let prev_home = std::env::var_os("HOME");
|
||||||
|
// SAFETY: tests in this crate run with single-threaded env mutation
|
||||||
|
// by harness convention; we restore on exit.
|
||||||
|
unsafe { std::env::set_var("HOME", tmp.path()) };
|
||||||
|
|
||||||
|
// Spawn a task that immediately panics.
|
||||||
|
let parent_alive = Arc::new(AtomicBool::new(false));
|
||||||
|
let parent_alive_clone = parent_alive.clone();
|
||||||
|
|
||||||
|
let handle = spawn_supervised(
|
||||||
|
"panic-test-fixture",
|
||||||
|
std::panic::Location::caller(),
|
||||||
|
async move {
|
||||||
|
parent_alive_clone.store(true, Ordering::SeqCst);
|
||||||
|
panic!("deliberate panic for crash-dump test");
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
// The handle resolves to () because spawn_supervised swallows the
|
||||||
|
// panic. Awaiting must not return Err — the caller must not see
|
||||||
|
// the panic.
|
||||||
|
let result = handle.await;
|
||||||
|
|
||||||
|
// Restore HOME before any assertions can panic.
|
||||||
|
match prev_home {
|
||||||
|
Some(v) => unsafe { std::env::set_var("HOME", v) },
|
||||||
|
None => unsafe { std::env::remove_var("HOME") },
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
result.is_ok(),
|
||||||
|
"spawn_supervised must convert panic to a normal completion"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
parent_alive.load(Ordering::SeqCst),
|
||||||
|
"fixture task must have run before panicking"
|
||||||
|
);
|
||||||
|
|
||||||
|
// A crash dump file must exist under <HOME>/.deepseek/crashes/.
|
||||||
|
let crash_dir = tmp.path().join(".deepseek").join("crashes");
|
||||||
|
let entries: Vec<_> = std::fs::read_dir(&crash_dir)
|
||||||
|
.expect("crashes dir exists")
|
||||||
|
.flatten()
|
||||||
|
.collect();
|
||||||
|
assert_eq!(entries.len(), 1, "exactly one crash dump expected");
|
||||||
|
let dump = std::fs::read_to_string(entries[0].path()).expect("read dump");
|
||||||
|
assert!(
|
||||||
|
dump.contains("panic-test-fixture"),
|
||||||
|
"dump must include the task name; got: {dump}"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
dump.contains("deliberate panic for crash-dump test"),
|
||||||
|
"dump must include the panic message; got: {dump}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod project_mapping_tests {
|
mod project_mapping_tests {
|
||||||
use super::{project_tree, summarize_project};
|
use super::{project_tree, summarize_project};
|
||||||
|
|||||||
Reference in New Issue
Block a user