diff --git a/crates/tui/src/core/engine.rs b/crates/tui/src/core/engine.rs index db240c30..bd32a4df 100644 --- a/crates/tui/src/core/engine.rs +++ b/crates/tui/src/core/engine.rs @@ -47,8 +47,8 @@ use crate::tools::shell::{SharedShellManager, new_shared_shell_manager}; use crate::tools::spec::RuntimeToolServices; use crate::tools::spec::{ApprovalRequirement, ToolError, ToolResult}; use crate::tools::subagent::{ - Mailbox, SharedSubAgentManager, SubAgentRuntime, SubAgentType, new_shared_subagent_manager, - resolve_subagent_assignment_route, + Mailbox, SharedSubAgentManager, SubAgentCompletion, SubAgentRuntime, SubAgentType, + new_shared_subagent_manager, resolve_subagent_assignment_route, }; use crate::tools::todo::{SharedTodoList, new_shared_todo_list}; use crate::tools::user_input::{UserInputRequest, UserInputResponse}; @@ -303,6 +303,14 @@ pub struct Engine { rx_user_input: mpsc::Receiver, rx_steer: mpsc::Receiver, tx_event: mpsc::Sender, + /// Wakeup channel for the parent turn loop when a direct child sub-agent + /// terminates (issue #756). Cloned into `SubAgentRuntime` so the runtime + /// can fan completion events back into the engine. + tx_subagent_completion: mpsc::UnboundedSender, + /// Receiver paired with `tx_subagent_completion`. Drained at the + /// turn-loop's empty-tool_uses branch to surface `` + /// sentinels into the parent's transcript before deciding to end the turn. + pub(super) rx_subagent_completion: mpsc::UnboundedReceiver, cancel_token: CancellationToken, shared_cancel_token: Arc>, tool_exec_lock: Arc>, @@ -353,6 +361,7 @@ impl Engine { let (tx_approval, rx_approval) = mpsc::channel(64); let (tx_user_input, rx_user_input) = mpsc::channel(32); let (tx_steer, rx_steer) = mpsc::channel(64); + let (tx_subagent_completion, rx_subagent_completion) = mpsc::unbounded_channel(); let cancel_token = CancellationToken::new(); let shared_cancel_token = Arc::new(StdMutex::new(cancel_token.clone())); let tool_exec_lock = Arc::new(RwLock::new(())); @@ -480,6 +489,8 @@ impl Engine { rx_user_input, rx_steer, tx_event, + tx_subagent_completion, + rx_subagent_completion, cancel_token: cancel_token.clone(), shared_cancel_token: shared_cancel_token.clone(), tool_exec_lock, @@ -937,7 +948,8 @@ impl Engine { self.session.reasoning_effort.clone(), self.session.reasoning_effort_auto, ) - .with_max_spawn_depth(self.config.max_spawn_depth); + .with_max_spawn_depth(self.config.max_spawn_depth) + .with_parent_completion_tx(self.tx_subagent_completion.clone()); if let Some((mailbox, cancel_token)) = mailbox_for_runtime.as_ref() { rt = rt .with_mailbox(mailbox.clone()) diff --git a/crates/tui/src/core/engine/turn_loop.rs b/crates/tui/src/core/engine/turn_loop.rs index 0333ca3e..7f7e51eb 100644 --- a/crates/tui/src/core/engine/turn_loop.rs +++ b/crates/tui/src/core/engine/turn_loop.rs @@ -832,6 +832,100 @@ impl Engine { continue; } + // Sub-agent completion handoff (issue #756). The model finished + // streaming with no tool calls — but if it has direct children + // still running (or completions queued from children that + // finished while we were inferring), surface their + // `` sentinels into the transcript and + // resume instead of ending the turn. This fulfils the contract + // already documented in `prompts/base.md`: the parent is + // promised it'll see the sentinel when a child finishes. + let mut completions: Vec = Vec::new(); + while let Ok(c) = self.rx_subagent_completion.try_recv() { + completions.push(c); + } + if completions.is_empty() { + let running = { + let mgr = self.subagent_manager.read().await; + mgr.running_count() + }; + if running > 0 { + let _ = self + .tx_event + .send(Event::status(format!( + "Waiting on {running} sub-agent(s) to complete..." + ))) + .await; + tokio::select! { + biased; + () = self.cancel_token.cancelled() => { + let _ = self + .tx_event + .send(Event::status( + "Request cancelled while waiting for sub-agents", + )) + .await; + return (TurnOutcomeStatus::Interrupted, None); + } + Some(c) = self.rx_subagent_completion.recv() => { + completions.push(c); + while let Ok(extra) = self.rx_subagent_completion.try_recv() { + completions.push(extra); + } + } + Some(steer) = self.rx_steer.recv() => { + let trimmed = steer.trim().to_string(); + if !trimmed.is_empty() { + self.session + .working_set + .observe_user_message(&trimmed, &self.session.workspace); + self.add_session_message(Message { + role: "user".to_string(), + content: vec![ContentBlock::Text { + text: trimmed.clone(), + cache_control: None, + }], + }) + .await; + let _ = self + .tx_event + .send(Event::status(format!( + "Steer input accepted: {}", + summarize_text(&trimmed, 120) + ))) + .await; + } + turn.next_step(); + continue; + } + } + } + } + if !completions.is_empty() { + let count = completions.len(); + for c in completions { + self.session + .working_set + .observe_user_message(&c.payload, &self.session.workspace); + self.add_session_message(Message { + role: "user".to_string(), + content: vec![ContentBlock::Text { + text: c.payload, + cache_control: None, + }], + }) + .await; + } + let _ = self + .tx_event + .send(Event::status(format!( + "Resuming turn with {count} sub-agent completion(s)" + ))) + .await; + turn.next_step(); + continue; + } + // Inline ```repl execution — paper-spec RLM integration. if has_sendable_assistant_content && crate::repl::sandbox::has_repl_block(¤t_text_visible) diff --git a/crates/tui/src/tools/subagent/mod.rs b/crates/tui/src/tools/subagent/mod.rs index 0e49385d..993584a7 100644 --- a/crates/tui/src/tools/subagent/mod.rs +++ b/crates/tui/src/tools/subagent/mod.rs @@ -548,6 +548,22 @@ impl Default for PersistedSubAgentState { /// `[runtime] max_spawn_depth = N` in `~/.deepseek/config.toml`. pub const DEFAULT_MAX_SPAWN_DEPTH: u32 = 3; +/// Terminal-state notification emitted to the engine's parent turn loop +/// when one of its direct children finishes (issue #756). Carries the +/// already-rendered `` sentinel that the model +/// expects in the transcript per `prompts/base.md`. +#[derive(Debug, Clone)] +pub struct SubAgentCompletion { + /// The completing child's agent id. Held for routing/logging — the + /// engine's turn loop does not currently key on it (it just injects + /// the payload), but downstream tooling and tests need the field. + #[allow(dead_code)] + pub agent_id: String, + /// Human summary on line 1, sentinel on line 2. Same payload shape as + /// `Event::AgentComplete::result`. + pub payload: String, +} + /// Runtime configuration for spawning sub-agents. /// /// Carries everything a child needs to (a) build its own tool registry — @@ -581,6 +597,12 @@ pub struct SubAgentRuntime { /// whole spawn tree publishes into one ordered, fan-out-able mailbox. /// `None` only when no consumer is wired (legacy entry points / tests). pub mailbox: Option, + /// Wakeup channel for the engine's parent turn loop (issue #756). Only + /// the engine's direct children fire on this — propagated to descendants + /// via clone but gated to `spawn_depth == 1` at the send site so the + /// parent isn't flooded with grandchild completions it didn't directly + /// orchestrate. `None` when no consumer is wired (tests / legacy paths). + pub parent_completion_tx: Option>, } impl SubAgentRuntime { @@ -612,9 +634,23 @@ impl SubAgentRuntime { max_spawn_depth: DEFAULT_MAX_SPAWN_DEPTH, cancel_token: CancellationToken::new(), mailbox: None, + parent_completion_tx: None, } } + /// Attach the wakeup channel so the engine's parent turn loop can resume + /// when this runtime's direct children finish (issue #756). The channel + /// is propagated to descendants via clone, but only `spawn_depth == 1` + /// agents fire on it — see `run_subagent_task`. + #[must_use] + pub fn with_parent_completion_tx( + mut self, + tx: mpsc::UnboundedSender, + ) -> Self { + self.parent_completion_tx = Some(tx); + self + } + /// Attach a `Mailbox` so this runtime (and every descendant — children /// clone it) publishes structured `MailboxMessage` envelopes alongside /// the legacy `Event` stream. Pair with [`Self::with_cancel_token`] when @@ -714,6 +750,7 @@ impl SubAgentRuntime { max_spawn_depth: self.max_spawn_depth, cancel_token: self.cancel_token.child_token(), mailbox: self.mailbox.clone(), + parent_completion_tx: self.parent_completion_tx.clone(), } } @@ -2629,8 +2666,15 @@ async fn run_subagent_task(task: SubAgentTask) { let _ = mb.send(envelope); } + let payload = format!("{summary}\n{sentinel}"); + + // Wake the engine's parent turn loop if this is one of its direct + // children (issue #756). Gating by `spawn_depth == 1` means the parent + // only sees completions for agents it directly orchestrated, not for + // grandchildren spawned recursively inside its children. + emit_parent_completion(&task.runtime, &task.agent_id, &payload); + if let Some(event_tx) = task.runtime.event_tx { - let payload = format!("{summary}\n{sentinel}"); let _ = event_tx.try_send(Event::AgentComplete { id: task.agent_id, result: payload, @@ -2638,6 +2682,30 @@ async fn run_subagent_task(task: SubAgentTask) { } } +/// Notify the engine's parent turn loop that a direct child finished +/// (issue #756). Returns `true` if a send was attempted, `false` if the +/// notification was skipped because this isn't a direct child or no channel +/// is wired. Skips silently when the channel sender has no receiver — the +/// engine outlives the runtime, so a dropped receiver means we're shutting +/// down anyway. +pub(crate) fn emit_parent_completion( + runtime: &SubAgentRuntime, + agent_id: &str, + payload: &str, +) -> bool { + if runtime.spawn_depth != 1 { + return false; + } + let Some(tx) = runtime.parent_completion_tx.as_ref() else { + return false; + }; + let _ = tx.send(SubAgentCompletion { + agent_id: agent_id.to_string(), + payload: payload.to_string(), + }); + true +} + /// Build a `` JSON sentinel for a successful child. /// Intended to surface in the parent's transcript so the model recognizes /// child completion and can decide whether to read the full result via diff --git a/crates/tui/src/tools/subagent/tests.rs b/crates/tui/src/tools/subagent/tests.rs index 2ad7cb94..71d84f8b 100644 --- a/crates/tui/src/tools/subagent/tests.rs +++ b/crates/tui/src/tools/subagent/tests.rs @@ -1170,6 +1170,7 @@ fn stub_runtime() -> SubAgentRuntime { max_spawn_depth: DEFAULT_MAX_SPAWN_DEPTH, cancel_token: CancellationToken::new(), mailbox: None, + parent_completion_tx: None, } } @@ -1366,3 +1367,146 @@ fn persist_round_trip_preserves_session_boot_id() { .unwrap(); assert!(snap.from_prior_session); } + +// === Issue #756: parent-completion wakeup === +// +// When a direct child of the engine finishes, `run_subagent_task` emits +// a `SubAgentCompletion` on the runtime's `parent_completion_tx`. The +// engine's turn loop drains that channel before deciding to end the turn. +// These tests cover the gating logic in `emit_parent_completion` so the +// parent isn't flooded with grandchild completions and so the function +// is safe when no channel is wired. + +fn runtime_with_depth( + spawn_depth: u32, + parent_completion_tx: Option>, +) -> SubAgentRuntime { + let mut rt = stub_runtime(); + rt.spawn_depth = spawn_depth; + rt.parent_completion_tx = parent_completion_tx; + rt +} + +#[test] +fn emit_parent_completion_fires_for_direct_child() { + let (tx, mut rx) = mpsc::unbounded_channel::(); + let runtime = runtime_with_depth(1, Some(tx)); + + let sent = emit_parent_completion(&runtime, "agent_abc", "summary line\n"); + + assert!(sent, "depth=1 with channel wired should send"); + let received = rx.try_recv().expect("channel should have one message"); + assert_eq!(received.agent_id, "agent_abc"); + assert_eq!(received.payload, "summary line\n"); + assert!(rx.try_recv().is_err(), "should be exactly one message"); +} + +#[test] +fn emit_parent_completion_skips_grandchildren() { + let (tx, mut rx) = mpsc::unbounded_channel::(); + let runtime = runtime_with_depth(2, Some(tx)); + + let sent = emit_parent_completion(&runtime, "agent_grandchild", "ignored"); + + assert!( + !sent, + "depth=2 grandchild must not fire on the parent channel" + ); + assert!( + rx.try_recv().is_err(), + "channel should remain empty for grandchildren" + ); +} + +#[test] +fn emit_parent_completion_skips_engine_self() { + // depth 0 is the engine itself — the engine never spawns a task at + // depth 0, but defend against accidental misuse. + let (tx, mut rx) = mpsc::unbounded_channel::(); + let runtime = runtime_with_depth(0, Some(tx)); + + let sent = emit_parent_completion(&runtime, "agent_root", "ignored"); + + assert!( + !sent, + "depth=0 must not fire (only depth=1 direct children)" + ); + assert!(rx.try_recv().is_err()); +} + +#[test] +fn emit_parent_completion_no_channel_is_noop() { + let runtime = runtime_with_depth(1, None); + + let sent = emit_parent_completion(&runtime, "agent_no_chan", "anything"); + + assert!( + !sent, + "missing channel should be a silent no-op, not a panic" + ); +} + +#[test] +fn emit_parent_completion_dropped_receiver_does_not_panic() { + let (tx, rx) = mpsc::unbounded_channel::(); + drop(rx); + let runtime = runtime_with_depth(1, Some(tx)); + + // The send returns an error internally but we discard it — the + // caller's run_subagent_task does not care whether the engine is + // still listening (it might be shutting down). + let sent = emit_parent_completion(&runtime, "agent_orphan", "after-rx-drop"); + + assert!( + sent, + "we still attempt the send; the engine being gone is not our problem" + ); +} + +#[test] +fn child_runtime_propagates_completion_tx_for_gating() { + // The channel is cloned through `child_runtime()` so descendants carry + // it. The gate at the send site (`spawn_depth == 1`) is what limits + // who actually fires — `child_runtime` simply must not strand it. + let (tx, _rx) = mpsc::unbounded_channel::(); + let parent = runtime_with_depth(0, Some(tx)); + + let child = parent.child_runtime(); + + assert_eq!(child.spawn_depth, 1, "child increments depth"); + assert!( + child.parent_completion_tx.is_some(), + "child carries the wakeup channel forward" + ); +} + +#[test] +fn subagent_completion_payload_carries_existing_sentinel_format() { + // The payload format is the same one already documented in + // prompts/base.md: human summary on line 1, `` + // sentinel on line 2. This test pins the format so future refactors + // don't silently break the model's parsing contract. + let mut snap = make_snapshot(SubAgentStatus::Completed); + snap.result = Some("Found three errors.".to_string()); + + let summary = summarize_subagent_result(&snap); + let sentinel = subagent_done_sentinel("agent_test", &snap); + let payload = format!("{summary}\n{sentinel}"); + + let mut lines = payload.lines(); + let first = lines.next().expect("first line is summary"); + let second = lines.next().expect("second line is sentinel"); + assert!( + !first.starts_with(""), + "summary should not be the sentinel itself" + ); + assert!( + second.starts_with(""), + "second line is the sentinel" + ); + assert!(second.ends_with("")); + assert!( + second.contains("\"agent_id\":\"agent_test\""), + "sentinel JSON includes agent_id" + ); +}