fix(engine): wake parent turn loop on sub-agent completion (#756)

The parent agent's turn ends right after `agent_spawn` returns
`status: Running`, leaving children to finish in the background with no
path back into the parent's inference loop. The model has to be poked
by a human before it resumes the plan.

Wire a wakeup channel from `run_subagent_task` into the engine's turn
loop. When the model produces no more tool calls but direct children
are still running, the loop now blocks on the next completion (with
cancel and steer escape hatches), drains all pending sentinels, and
re-enters inference with the existing `<deepseek:subagent.done>`
elements injected as user messages. This fulfils the contract already
documented in `prompts/base.md` (lines 189-205): the parent is promised
it'll see the sentinel when a child finishes.

The async `agent_spawn` semantics from #239 are preserved — only direct
children fire on the channel (gated by `spawn_depth == 1`), so
grandchildren spawned recursively don't flood the parent.
This commit is contained in:
y0sif
2026-05-06 19:14:27 +03:00
parent b60568dfc0
commit cae4ca3c0f
4 changed files with 322 additions and 4 deletions
+15 -3
View File
@@ -47,8 +47,8 @@ use crate::tools::shell::{SharedShellManager, new_shared_shell_manager};
use crate::tools::spec::RuntimeToolServices;
use crate::tools::spec::{ApprovalRequirement, ToolError, ToolResult};
use crate::tools::subagent::{
Mailbox, SharedSubAgentManager, SubAgentRuntime, SubAgentType, new_shared_subagent_manager,
resolve_subagent_assignment_route,
Mailbox, SharedSubAgentManager, SubAgentCompletion, SubAgentRuntime, SubAgentType,
new_shared_subagent_manager, resolve_subagent_assignment_route,
};
use crate::tools::todo::{SharedTodoList, new_shared_todo_list};
use crate::tools::user_input::{UserInputRequest, UserInputResponse};
@@ -303,6 +303,14 @@ pub struct Engine {
rx_user_input: mpsc::Receiver<UserInputDecision>,
rx_steer: mpsc::Receiver<String>,
tx_event: mpsc::Sender<Event>,
/// Wakeup channel for the parent turn loop when a direct child sub-agent
/// terminates (issue #756). Cloned into `SubAgentRuntime` so the runtime
/// can fan completion events back into the engine.
tx_subagent_completion: mpsc::UnboundedSender<SubAgentCompletion>,
/// Receiver paired with `tx_subagent_completion`. Drained at the
/// turn-loop's empty-tool_uses branch to surface `<deepseek:subagent.done>`
/// sentinels into the parent's transcript before deciding to end the turn.
pub(super) rx_subagent_completion: mpsc::UnboundedReceiver<SubAgentCompletion>,
cancel_token: CancellationToken,
shared_cancel_token: Arc<StdMutex<CancellationToken>>,
tool_exec_lock: Arc<RwLock<()>>,
@@ -353,6 +361,7 @@ impl Engine {
let (tx_approval, rx_approval) = mpsc::channel(64);
let (tx_user_input, rx_user_input) = mpsc::channel(32);
let (tx_steer, rx_steer) = mpsc::channel(64);
let (tx_subagent_completion, rx_subagent_completion) = mpsc::unbounded_channel();
let cancel_token = CancellationToken::new();
let shared_cancel_token = Arc::new(StdMutex::new(cancel_token.clone()));
let tool_exec_lock = Arc::new(RwLock::new(()));
@@ -480,6 +489,8 @@ impl Engine {
rx_user_input,
rx_steer,
tx_event,
tx_subagent_completion,
rx_subagent_completion,
cancel_token: cancel_token.clone(),
shared_cancel_token: shared_cancel_token.clone(),
tool_exec_lock,
@@ -937,7 +948,8 @@ impl Engine {
self.session.reasoning_effort.clone(),
self.session.reasoning_effort_auto,
)
.with_max_spawn_depth(self.config.max_spawn_depth);
.with_max_spawn_depth(self.config.max_spawn_depth)
.with_parent_completion_tx(self.tx_subagent_completion.clone());
if let Some((mailbox, cancel_token)) = mailbox_for_runtime.as_ref() {
rt = rt
.with_mailbox(mailbox.clone())
+94
View File
@@ -832,6 +832,100 @@ impl Engine {
continue;
}
// Sub-agent completion handoff (issue #756). The model finished
// streaming with no tool calls — but if it has direct children
// still running (or completions queued from children that
// finished while we were inferring), surface their
// `<deepseek:subagent.done>` sentinels into the transcript and
// resume instead of ending the turn. This fulfils the contract
// already documented in `prompts/base.md`: the parent is
// promised it'll see the sentinel when a child finishes.
let mut completions: Vec<crate::tools::subagent::SubAgentCompletion> = Vec::new();
while let Ok(c) = self.rx_subagent_completion.try_recv() {
completions.push(c);
}
if completions.is_empty() {
let running = {
let mgr = self.subagent_manager.read().await;
mgr.running_count()
};
if running > 0 {
let _ = self
.tx_event
.send(Event::status(format!(
"Waiting on {running} sub-agent(s) to complete..."
)))
.await;
tokio::select! {
biased;
() = self.cancel_token.cancelled() => {
let _ = self
.tx_event
.send(Event::status(
"Request cancelled while waiting for sub-agents",
))
.await;
return (TurnOutcomeStatus::Interrupted, None);
}
Some(c) = self.rx_subagent_completion.recv() => {
completions.push(c);
while let Ok(extra) = self.rx_subagent_completion.try_recv() {
completions.push(extra);
}
}
Some(steer) = self.rx_steer.recv() => {
let trimmed = steer.trim().to_string();
if !trimmed.is_empty() {
self.session
.working_set
.observe_user_message(&trimmed, &self.session.workspace);
self.add_session_message(Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text: trimmed.clone(),
cache_control: None,
}],
})
.await;
let _ = self
.tx_event
.send(Event::status(format!(
"Steer input accepted: {}",
summarize_text(&trimmed, 120)
)))
.await;
}
turn.next_step();
continue;
}
}
}
}
if !completions.is_empty() {
let count = completions.len();
for c in completions {
self.session
.working_set
.observe_user_message(&c.payload, &self.session.workspace);
self.add_session_message(Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text: c.payload,
cache_control: None,
}],
})
.await;
}
let _ = self
.tx_event
.send(Event::status(format!(
"Resuming turn with {count} sub-agent completion(s)"
)))
.await;
turn.next_step();
continue;
}
// Inline ```repl execution — paper-spec RLM integration.
if has_sendable_assistant_content
&& crate::repl::sandbox::has_repl_block(&current_text_visible)
+69 -1
View File
@@ -548,6 +548,22 @@ impl Default for PersistedSubAgentState {
/// `[runtime] max_spawn_depth = N` in `~/.deepseek/config.toml`.
pub const DEFAULT_MAX_SPAWN_DEPTH: u32 = 3;
/// Terminal-state notification emitted to the engine's parent turn loop
/// when one of its direct children finishes (issue #756). Carries the
/// already-rendered `<deepseek:subagent.done>` sentinel that the model
/// expects in the transcript per `prompts/base.md`.
#[derive(Debug, Clone)]
pub struct SubAgentCompletion {
/// The completing child's agent id. Held for routing/logging — the
/// engine's turn loop does not currently key on it (it just injects
/// the payload), but downstream tooling and tests need the field.
#[allow(dead_code)]
pub agent_id: String,
/// Human summary on line 1, sentinel on line 2. Same payload shape as
/// `Event::AgentComplete::result`.
pub payload: String,
}
/// Runtime configuration for spawning sub-agents.
///
/// Carries everything a child needs to (a) build its own tool registry —
@@ -581,6 +597,12 @@ pub struct SubAgentRuntime {
/// whole spawn tree publishes into one ordered, fan-out-able mailbox.
/// `None` only when no consumer is wired (legacy entry points / tests).
pub mailbox: Option<Mailbox>,
/// Wakeup channel for the engine's parent turn loop (issue #756). Only
/// the engine's direct children fire on this — propagated to descendants
/// via clone but gated to `spawn_depth == 1` at the send site so the
/// parent isn't flooded with grandchild completions it didn't directly
/// orchestrate. `None` when no consumer is wired (tests / legacy paths).
pub parent_completion_tx: Option<mpsc::UnboundedSender<SubAgentCompletion>>,
}
impl SubAgentRuntime {
@@ -612,9 +634,23 @@ impl SubAgentRuntime {
max_spawn_depth: DEFAULT_MAX_SPAWN_DEPTH,
cancel_token: CancellationToken::new(),
mailbox: None,
parent_completion_tx: None,
}
}
/// Attach the wakeup channel so the engine's parent turn loop can resume
/// when this runtime's direct children finish (issue #756). The channel
/// is propagated to descendants via clone, but only `spawn_depth == 1`
/// agents fire on it — see `run_subagent_task`.
#[must_use]
pub fn with_parent_completion_tx(
mut self,
tx: mpsc::UnboundedSender<SubAgentCompletion>,
) -> Self {
self.parent_completion_tx = Some(tx);
self
}
/// Attach a `Mailbox` so this runtime (and every descendant — children
/// clone it) publishes structured `MailboxMessage` envelopes alongside
/// the legacy `Event` stream. Pair with [`Self::with_cancel_token`] when
@@ -714,6 +750,7 @@ impl SubAgentRuntime {
max_spawn_depth: self.max_spawn_depth,
cancel_token: self.cancel_token.child_token(),
mailbox: self.mailbox.clone(),
parent_completion_tx: self.parent_completion_tx.clone(),
}
}
@@ -2629,8 +2666,15 @@ async fn run_subagent_task(task: SubAgentTask) {
let _ = mb.send(envelope);
}
let payload = format!("{summary}\n{sentinel}");
// Wake the engine's parent turn loop if this is one of its direct
// children (issue #756). Gating by `spawn_depth == 1` means the parent
// only sees completions for agents it directly orchestrated, not for
// grandchildren spawned recursively inside its children.
emit_parent_completion(&task.runtime, &task.agent_id, &payload);
if let Some(event_tx) = task.runtime.event_tx {
let payload = format!("{summary}\n{sentinel}");
let _ = event_tx.try_send(Event::AgentComplete {
id: task.agent_id,
result: payload,
@@ -2638,6 +2682,30 @@ async fn run_subagent_task(task: SubAgentTask) {
}
}
/// Notify the engine's parent turn loop that a direct child finished
/// (issue #756). Returns `true` if a send was attempted, `false` if the
/// notification was skipped because this isn't a direct child or no channel
/// is wired. Skips silently when the channel sender has no receiver — the
/// engine outlives the runtime, so a dropped receiver means we're shutting
/// down anyway.
pub(crate) fn emit_parent_completion(
runtime: &SubAgentRuntime,
agent_id: &str,
payload: &str,
) -> bool {
if runtime.spawn_depth != 1 {
return false;
}
let Some(tx) = runtime.parent_completion_tx.as_ref() else {
return false;
};
let _ = tx.send(SubAgentCompletion {
agent_id: agent_id.to_string(),
payload: payload.to_string(),
});
true
}
/// Build a `<deepseek:subagent.done>` JSON sentinel for a successful child.
/// Intended to surface in the parent's transcript so the model recognizes
/// child completion and can decide whether to read the full result via
+144
View File
@@ -1170,6 +1170,7 @@ fn stub_runtime() -> SubAgentRuntime {
max_spawn_depth: DEFAULT_MAX_SPAWN_DEPTH,
cancel_token: CancellationToken::new(),
mailbox: None,
parent_completion_tx: None,
}
}
@@ -1366,3 +1367,146 @@ fn persist_round_trip_preserves_session_boot_id() {
.unwrap();
assert!(snap.from_prior_session);
}
// === Issue #756: parent-completion wakeup ===
//
// When a direct child of the engine finishes, `run_subagent_task` emits
// a `SubAgentCompletion` on the runtime's `parent_completion_tx`. The
// engine's turn loop drains that channel before deciding to end the turn.
// These tests cover the gating logic in `emit_parent_completion` so the
// parent isn't flooded with grandchild completions and so the function
// is safe when no channel is wired.
fn runtime_with_depth(
spawn_depth: u32,
parent_completion_tx: Option<mpsc::UnboundedSender<SubAgentCompletion>>,
) -> SubAgentRuntime {
let mut rt = stub_runtime();
rt.spawn_depth = spawn_depth;
rt.parent_completion_tx = parent_completion_tx;
rt
}
#[test]
fn emit_parent_completion_fires_for_direct_child() {
let (tx, mut rx) = mpsc::unbounded_channel::<SubAgentCompletion>();
let runtime = runtime_with_depth(1, Some(tx));
let sent = emit_parent_completion(&runtime, "agent_abc", "summary line\n<sentinel/>");
assert!(sent, "depth=1 with channel wired should send");
let received = rx.try_recv().expect("channel should have one message");
assert_eq!(received.agent_id, "agent_abc");
assert_eq!(received.payload, "summary line\n<sentinel/>");
assert!(rx.try_recv().is_err(), "should be exactly one message");
}
#[test]
fn emit_parent_completion_skips_grandchildren() {
let (tx, mut rx) = mpsc::unbounded_channel::<SubAgentCompletion>();
let runtime = runtime_with_depth(2, Some(tx));
let sent = emit_parent_completion(&runtime, "agent_grandchild", "ignored");
assert!(
!sent,
"depth=2 grandchild must not fire on the parent channel"
);
assert!(
rx.try_recv().is_err(),
"channel should remain empty for grandchildren"
);
}
#[test]
fn emit_parent_completion_skips_engine_self() {
// depth 0 is the engine itself — the engine never spawns a task at
// depth 0, but defend against accidental misuse.
let (tx, mut rx) = mpsc::unbounded_channel::<SubAgentCompletion>();
let runtime = runtime_with_depth(0, Some(tx));
let sent = emit_parent_completion(&runtime, "agent_root", "ignored");
assert!(
!sent,
"depth=0 must not fire (only depth=1 direct children)"
);
assert!(rx.try_recv().is_err());
}
#[test]
fn emit_parent_completion_no_channel_is_noop() {
let runtime = runtime_with_depth(1, None);
let sent = emit_parent_completion(&runtime, "agent_no_chan", "anything");
assert!(
!sent,
"missing channel should be a silent no-op, not a panic"
);
}
#[test]
fn emit_parent_completion_dropped_receiver_does_not_panic() {
let (tx, rx) = mpsc::unbounded_channel::<SubAgentCompletion>();
drop(rx);
let runtime = runtime_with_depth(1, Some(tx));
// The send returns an error internally but we discard it — the
// caller's run_subagent_task does not care whether the engine is
// still listening (it might be shutting down).
let sent = emit_parent_completion(&runtime, "agent_orphan", "after-rx-drop");
assert!(
sent,
"we still attempt the send; the engine being gone is not our problem"
);
}
#[test]
fn child_runtime_propagates_completion_tx_for_gating() {
// The channel is cloned through `child_runtime()` so descendants carry
// it. The gate at the send site (`spawn_depth == 1`) is what limits
// who actually fires — `child_runtime` simply must not strand it.
let (tx, _rx) = mpsc::unbounded_channel::<SubAgentCompletion>();
let parent = runtime_with_depth(0, Some(tx));
let child = parent.child_runtime();
assert_eq!(child.spawn_depth, 1, "child increments depth");
assert!(
child.parent_completion_tx.is_some(),
"child carries the wakeup channel forward"
);
}
#[test]
fn subagent_completion_payload_carries_existing_sentinel_format() {
// The payload format is the same one already documented in
// prompts/base.md: human summary on line 1, `<deepseek:subagent.done>`
// sentinel on line 2. This test pins the format so future refactors
// don't silently break the model's parsing contract.
let mut snap = make_snapshot(SubAgentStatus::Completed);
snap.result = Some("Found three errors.".to_string());
let summary = summarize_subagent_result(&snap);
let sentinel = subagent_done_sentinel("agent_test", &snap);
let payload = format!("{summary}\n{sentinel}");
let mut lines = payload.lines();
let first = lines.next().expect("first line is summary");
let second = lines.next().expect("second line is sentinel");
assert!(
!first.starts_with("<deepseek:subagent.done>"),
"summary should not be the sentinel itself"
);
assert!(
second.starts_with("<deepseek:subagent.done>"),
"second line is the sentinel"
);
assert!(second.ends_with("</deepseek:subagent.done>"));
assert!(
second.contains("\"agent_id\":\"agent_test\""),
"sentinel JSON includes agent_id"
);
}