Merge pull request #901 from y0sif/feat/parent-resume-on-subagent-complete
fix(engine): wake parent turn loop on sub-agent completion (#756)
This commit is contained in:
@@ -47,8 +47,8 @@ use crate::tools::shell::{SharedShellManager, new_shared_shell_manager};
|
||||
use crate::tools::spec::RuntimeToolServices;
|
||||
use crate::tools::spec::{ApprovalRequirement, ToolError, ToolResult};
|
||||
use crate::tools::subagent::{
|
||||
Mailbox, SharedSubAgentManager, SubAgentRuntime, SubAgentType, new_shared_subagent_manager,
|
||||
resolve_subagent_assignment_route,
|
||||
Mailbox, SharedSubAgentManager, SubAgentCompletion, SubAgentRuntime, SubAgentType,
|
||||
new_shared_subagent_manager, resolve_subagent_assignment_route,
|
||||
};
|
||||
use crate::tools::todo::{SharedTodoList, new_shared_todo_list};
|
||||
use crate::tools::user_input::{UserInputRequest, UserInputResponse};
|
||||
@@ -304,6 +304,14 @@ pub struct Engine {
|
||||
rx_user_input: mpsc::Receiver<UserInputDecision>,
|
||||
rx_steer: mpsc::Receiver<String>,
|
||||
tx_event: mpsc::Sender<Event>,
|
||||
/// Wakeup channel for the parent turn loop when a direct child sub-agent
|
||||
/// terminates (issue #756). Cloned into `SubAgentRuntime` so the runtime
|
||||
/// can fan completion events back into the engine.
|
||||
tx_subagent_completion: mpsc::UnboundedSender<SubAgentCompletion>,
|
||||
/// Receiver paired with `tx_subagent_completion`. Drained at the
|
||||
/// turn-loop's empty-tool_uses branch to surface `<deepseek:subagent.done>`
|
||||
/// sentinels into the parent's transcript before deciding to end the turn.
|
||||
pub(super) rx_subagent_completion: mpsc::UnboundedReceiver<SubAgentCompletion>,
|
||||
cancel_token: CancellationToken,
|
||||
shared_cancel_token: Arc<StdMutex<CancellationToken>>,
|
||||
tool_exec_lock: Arc<RwLock<()>>,
|
||||
@@ -390,6 +398,7 @@ impl Engine {
|
||||
let (tx_approval, rx_approval) = mpsc::channel(64);
|
||||
let (tx_user_input, rx_user_input) = mpsc::channel(32);
|
||||
let (tx_steer, rx_steer) = mpsc::channel(64);
|
||||
let (tx_subagent_completion, rx_subagent_completion) = mpsc::unbounded_channel();
|
||||
let cancel_token = CancellationToken::new();
|
||||
let shared_cancel_token = Arc::new(StdMutex::new(cancel_token.clone()));
|
||||
let tool_exec_lock = Arc::new(RwLock::new(()));
|
||||
@@ -519,6 +528,8 @@ impl Engine {
|
||||
rx_user_input,
|
||||
rx_steer,
|
||||
tx_event,
|
||||
tx_subagent_completion,
|
||||
rx_subagent_completion,
|
||||
cancel_token: cancel_token.clone(),
|
||||
shared_cancel_token: shared_cancel_token.clone(),
|
||||
tool_exec_lock,
|
||||
@@ -976,7 +987,8 @@ impl Engine {
|
||||
self.session.reasoning_effort.clone(),
|
||||
self.session.reasoning_effort_auto,
|
||||
)
|
||||
.with_max_spawn_depth(self.config.max_spawn_depth);
|
||||
.with_max_spawn_depth(self.config.max_spawn_depth)
|
||||
.with_parent_completion_tx(self.tx_subagent_completion.clone());
|
||||
if let Some((mailbox, cancel_token)) = mailbox_for_runtime.as_ref() {
|
||||
rt = rt
|
||||
.with_mailbox(mailbox.clone())
|
||||
|
||||
@@ -834,6 +834,100 @@ impl Engine {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Sub-agent completion handoff (issue #756). The model finished
|
||||
// streaming with no tool calls — but if it has direct children
|
||||
// still running (or completions queued from children that
|
||||
// finished while we were inferring), surface their
|
||||
// `<deepseek:subagent.done>` sentinels into the transcript and
|
||||
// resume instead of ending the turn. This fulfils the contract
|
||||
// already documented in `prompts/base.md`: the parent is
|
||||
// promised it'll see the sentinel when a child finishes.
|
||||
let mut completions: Vec<crate::tools::subagent::SubAgentCompletion> = Vec::new();
|
||||
while let Ok(c) = self.rx_subagent_completion.try_recv() {
|
||||
completions.push(c);
|
||||
}
|
||||
if completions.is_empty() {
|
||||
let running = {
|
||||
let mgr = self.subagent_manager.read().await;
|
||||
mgr.running_count()
|
||||
};
|
||||
if running > 0 {
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::status(format!(
|
||||
"Waiting on {running} sub-agent(s) to complete..."
|
||||
)))
|
||||
.await;
|
||||
tokio::select! {
|
||||
biased;
|
||||
() = self.cancel_token.cancelled() => {
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::status(
|
||||
"Request cancelled while waiting for sub-agents",
|
||||
))
|
||||
.await;
|
||||
return (TurnOutcomeStatus::Interrupted, None);
|
||||
}
|
||||
Some(c) = self.rx_subagent_completion.recv() => {
|
||||
completions.push(c);
|
||||
while let Ok(extra) = self.rx_subagent_completion.try_recv() {
|
||||
completions.push(extra);
|
||||
}
|
||||
}
|
||||
Some(steer) = self.rx_steer.recv() => {
|
||||
let trimmed = steer.trim().to_string();
|
||||
if !trimmed.is_empty() {
|
||||
self.session
|
||||
.working_set
|
||||
.observe_user_message(&trimmed, &self.session.workspace);
|
||||
self.add_session_message(Message {
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentBlock::Text {
|
||||
text: trimmed.clone(),
|
||||
cache_control: None,
|
||||
}],
|
||||
})
|
||||
.await;
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::status(format!(
|
||||
"Steer input accepted: {}",
|
||||
summarize_text(&trimmed, 120)
|
||||
)))
|
||||
.await;
|
||||
}
|
||||
turn.next_step();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if !completions.is_empty() {
|
||||
let count = completions.len();
|
||||
for c in completions {
|
||||
self.session
|
||||
.working_set
|
||||
.observe_user_message(&c.payload, &self.session.workspace);
|
||||
self.add_session_message(Message {
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentBlock::Text {
|
||||
text: c.payload,
|
||||
cache_control: None,
|
||||
}],
|
||||
})
|
||||
.await;
|
||||
}
|
||||
let _ = self
|
||||
.tx_event
|
||||
.send(Event::status(format!(
|
||||
"Resuming turn with {count} sub-agent completion(s)"
|
||||
)))
|
||||
.await;
|
||||
turn.next_step();
|
||||
continue;
|
||||
}
|
||||
|
||||
// Inline ```repl execution — paper-spec RLM integration.
|
||||
if has_sendable_assistant_content
|
||||
&& crate::repl::sandbox::has_repl_block(¤t_text_visible)
|
||||
|
||||
@@ -548,6 +548,22 @@ impl Default for PersistedSubAgentState {
|
||||
/// `[runtime] max_spawn_depth = N` in `~/.deepseek/config.toml`.
|
||||
pub const DEFAULT_MAX_SPAWN_DEPTH: u32 = 3;
|
||||
|
||||
/// Terminal-state notification emitted to the engine's parent turn loop
|
||||
/// when one of its direct children finishes (issue #756). Carries the
|
||||
/// already-rendered `<deepseek:subagent.done>` sentinel that the model
|
||||
/// expects in the transcript per `prompts/base.md`.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SubAgentCompletion {
|
||||
/// The completing child's agent id. Held for routing/logging — the
|
||||
/// engine's turn loop does not currently key on it (it just injects
|
||||
/// the payload), but downstream tooling and tests need the field.
|
||||
#[allow(dead_code)]
|
||||
pub agent_id: String,
|
||||
/// Human summary on line 1, sentinel on line 2. Same payload shape as
|
||||
/// `Event::AgentComplete::result`.
|
||||
pub payload: String,
|
||||
}
|
||||
|
||||
/// Runtime configuration for spawning sub-agents.
|
||||
///
|
||||
/// Carries everything a child needs to (a) build its own tool registry —
|
||||
@@ -581,6 +597,12 @@ pub struct SubAgentRuntime {
|
||||
/// whole spawn tree publishes into one ordered, fan-out-able mailbox.
|
||||
/// `None` only when no consumer is wired (legacy entry points / tests).
|
||||
pub mailbox: Option<Mailbox>,
|
||||
/// Wakeup channel for the engine's parent turn loop (issue #756). Only
|
||||
/// the engine's direct children fire on this — propagated to descendants
|
||||
/// via clone but gated to `spawn_depth == 1` at the send site so the
|
||||
/// parent isn't flooded with grandchild completions it didn't directly
|
||||
/// orchestrate. `None` when no consumer is wired (tests / legacy paths).
|
||||
pub parent_completion_tx: Option<mpsc::UnboundedSender<SubAgentCompletion>>,
|
||||
}
|
||||
|
||||
impl SubAgentRuntime {
|
||||
@@ -612,9 +634,23 @@ impl SubAgentRuntime {
|
||||
max_spawn_depth: DEFAULT_MAX_SPAWN_DEPTH,
|
||||
cancel_token: CancellationToken::new(),
|
||||
mailbox: None,
|
||||
parent_completion_tx: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Attach the wakeup channel so the engine's parent turn loop can resume
|
||||
/// when this runtime's direct children finish (issue #756). The channel
|
||||
/// is propagated to descendants via clone, but only `spawn_depth == 1`
|
||||
/// agents fire on it — see `run_subagent_task`.
|
||||
#[must_use]
|
||||
pub fn with_parent_completion_tx(
|
||||
mut self,
|
||||
tx: mpsc::UnboundedSender<SubAgentCompletion>,
|
||||
) -> Self {
|
||||
self.parent_completion_tx = Some(tx);
|
||||
self
|
||||
}
|
||||
|
||||
/// Attach a `Mailbox` so this runtime (and every descendant — children
|
||||
/// clone it) publishes structured `MailboxMessage` envelopes alongside
|
||||
/// the legacy `Event` stream. Pair with [`Self::with_cancel_token`] when
|
||||
@@ -714,6 +750,7 @@ impl SubAgentRuntime {
|
||||
max_spawn_depth: self.max_spawn_depth,
|
||||
cancel_token: self.cancel_token.child_token(),
|
||||
mailbox: self.mailbox.clone(),
|
||||
parent_completion_tx: self.parent_completion_tx.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2629,8 +2666,15 @@ async fn run_subagent_task(task: SubAgentTask) {
|
||||
let _ = mb.send(envelope);
|
||||
}
|
||||
|
||||
let payload = format!("{summary}\n{sentinel}");
|
||||
|
||||
// Wake the engine's parent turn loop if this is one of its direct
|
||||
// children (issue #756). Gating by `spawn_depth == 1` means the parent
|
||||
// only sees completions for agents it directly orchestrated, not for
|
||||
// grandchildren spawned recursively inside its children.
|
||||
emit_parent_completion(&task.runtime, &task.agent_id, &payload);
|
||||
|
||||
if let Some(event_tx) = task.runtime.event_tx {
|
||||
let payload = format!("{summary}\n{sentinel}");
|
||||
let _ = event_tx.try_send(Event::AgentComplete {
|
||||
id: task.agent_id,
|
||||
result: payload,
|
||||
@@ -2638,6 +2682,30 @@ async fn run_subagent_task(task: SubAgentTask) {
|
||||
}
|
||||
}
|
||||
|
||||
/// Notify the engine's parent turn loop that a direct child finished
|
||||
/// (issue #756). Returns `true` if a send was attempted, `false` if the
|
||||
/// notification was skipped because this isn't a direct child or no channel
|
||||
/// is wired. Skips silently when the channel sender has no receiver — the
|
||||
/// engine outlives the runtime, so a dropped receiver means we're shutting
|
||||
/// down anyway.
|
||||
pub(crate) fn emit_parent_completion(
|
||||
runtime: &SubAgentRuntime,
|
||||
agent_id: &str,
|
||||
payload: &str,
|
||||
) -> bool {
|
||||
if runtime.spawn_depth != 1 {
|
||||
return false;
|
||||
}
|
||||
let Some(tx) = runtime.parent_completion_tx.as_ref() else {
|
||||
return false;
|
||||
};
|
||||
let _ = tx.send(SubAgentCompletion {
|
||||
agent_id: agent_id.to_string(),
|
||||
payload: payload.to_string(),
|
||||
});
|
||||
true
|
||||
}
|
||||
|
||||
/// Build a `<deepseek:subagent.done>` JSON sentinel for a successful child.
|
||||
/// Intended to surface in the parent's transcript so the model recognizes
|
||||
/// child completion and can decide whether to read the full result via
|
||||
|
||||
@@ -1170,6 +1170,7 @@ fn stub_runtime() -> SubAgentRuntime {
|
||||
max_spawn_depth: DEFAULT_MAX_SPAWN_DEPTH,
|
||||
cancel_token: CancellationToken::new(),
|
||||
mailbox: None,
|
||||
parent_completion_tx: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1366,3 +1367,146 @@ fn persist_round_trip_preserves_session_boot_id() {
|
||||
.unwrap();
|
||||
assert!(snap.from_prior_session);
|
||||
}
|
||||
|
||||
// === Issue #756: parent-completion wakeup ===
|
||||
//
|
||||
// When a direct child of the engine finishes, `run_subagent_task` emits
|
||||
// a `SubAgentCompletion` on the runtime's `parent_completion_tx`. The
|
||||
// engine's turn loop drains that channel before deciding to end the turn.
|
||||
// These tests cover the gating logic in `emit_parent_completion` so the
|
||||
// parent isn't flooded with grandchild completions and so the function
|
||||
// is safe when no channel is wired.
|
||||
|
||||
fn runtime_with_depth(
|
||||
spawn_depth: u32,
|
||||
parent_completion_tx: Option<mpsc::UnboundedSender<SubAgentCompletion>>,
|
||||
) -> SubAgentRuntime {
|
||||
let mut rt = stub_runtime();
|
||||
rt.spawn_depth = spawn_depth;
|
||||
rt.parent_completion_tx = parent_completion_tx;
|
||||
rt
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn emit_parent_completion_fires_for_direct_child() {
|
||||
let (tx, mut rx) = mpsc::unbounded_channel::<SubAgentCompletion>();
|
||||
let runtime = runtime_with_depth(1, Some(tx));
|
||||
|
||||
let sent = emit_parent_completion(&runtime, "agent_abc", "summary line\n<sentinel/>");
|
||||
|
||||
assert!(sent, "depth=1 with channel wired should send");
|
||||
let received = rx.try_recv().expect("channel should have one message");
|
||||
assert_eq!(received.agent_id, "agent_abc");
|
||||
assert_eq!(received.payload, "summary line\n<sentinel/>");
|
||||
assert!(rx.try_recv().is_err(), "should be exactly one message");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn emit_parent_completion_skips_grandchildren() {
|
||||
let (tx, mut rx) = mpsc::unbounded_channel::<SubAgentCompletion>();
|
||||
let runtime = runtime_with_depth(2, Some(tx));
|
||||
|
||||
let sent = emit_parent_completion(&runtime, "agent_grandchild", "ignored");
|
||||
|
||||
assert!(
|
||||
!sent,
|
||||
"depth=2 grandchild must not fire on the parent channel"
|
||||
);
|
||||
assert!(
|
||||
rx.try_recv().is_err(),
|
||||
"channel should remain empty for grandchildren"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn emit_parent_completion_skips_engine_self() {
|
||||
// depth 0 is the engine itself — the engine never spawns a task at
|
||||
// depth 0, but defend against accidental misuse.
|
||||
let (tx, mut rx) = mpsc::unbounded_channel::<SubAgentCompletion>();
|
||||
let runtime = runtime_with_depth(0, Some(tx));
|
||||
|
||||
let sent = emit_parent_completion(&runtime, "agent_root", "ignored");
|
||||
|
||||
assert!(
|
||||
!sent,
|
||||
"depth=0 must not fire (only depth=1 direct children)"
|
||||
);
|
||||
assert!(rx.try_recv().is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn emit_parent_completion_no_channel_is_noop() {
|
||||
let runtime = runtime_with_depth(1, None);
|
||||
|
||||
let sent = emit_parent_completion(&runtime, "agent_no_chan", "anything");
|
||||
|
||||
assert!(
|
||||
!sent,
|
||||
"missing channel should be a silent no-op, not a panic"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn emit_parent_completion_dropped_receiver_does_not_panic() {
|
||||
let (tx, rx) = mpsc::unbounded_channel::<SubAgentCompletion>();
|
||||
drop(rx);
|
||||
let runtime = runtime_with_depth(1, Some(tx));
|
||||
|
||||
// The send returns an error internally but we discard it — the
|
||||
// caller's run_subagent_task does not care whether the engine is
|
||||
// still listening (it might be shutting down).
|
||||
let sent = emit_parent_completion(&runtime, "agent_orphan", "after-rx-drop");
|
||||
|
||||
assert!(
|
||||
sent,
|
||||
"we still attempt the send; the engine being gone is not our problem"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn child_runtime_propagates_completion_tx_for_gating() {
|
||||
// The channel is cloned through `child_runtime()` so descendants carry
|
||||
// it. The gate at the send site (`spawn_depth == 1`) is what limits
|
||||
// who actually fires — `child_runtime` simply must not strand it.
|
||||
let (tx, _rx) = mpsc::unbounded_channel::<SubAgentCompletion>();
|
||||
let parent = runtime_with_depth(0, Some(tx));
|
||||
|
||||
let child = parent.child_runtime();
|
||||
|
||||
assert_eq!(child.spawn_depth, 1, "child increments depth");
|
||||
assert!(
|
||||
child.parent_completion_tx.is_some(),
|
||||
"child carries the wakeup channel forward"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn subagent_completion_payload_carries_existing_sentinel_format() {
|
||||
// The payload format is the same one already documented in
|
||||
// prompts/base.md: human summary on line 1, `<deepseek:subagent.done>`
|
||||
// sentinel on line 2. This test pins the format so future refactors
|
||||
// don't silently break the model's parsing contract.
|
||||
let mut snap = make_snapshot(SubAgentStatus::Completed);
|
||||
snap.result = Some("Found three errors.".to_string());
|
||||
|
||||
let summary = summarize_subagent_result(&snap);
|
||||
let sentinel = subagent_done_sentinel("agent_test", &snap);
|
||||
let payload = format!("{summary}\n{sentinel}");
|
||||
|
||||
let mut lines = payload.lines();
|
||||
let first = lines.next().expect("first line is summary");
|
||||
let second = lines.next().expect("second line is sentinel");
|
||||
assert!(
|
||||
!first.starts_with("<deepseek:subagent.done>"),
|
||||
"summary should not be the sentinel itself"
|
||||
);
|
||||
assert!(
|
||||
second.starts_with("<deepseek:subagent.done>"),
|
||||
"second line is the sentinel"
|
||||
);
|
||||
assert!(second.ends_with("</deepseek:subagent.done>"));
|
||||
assert!(
|
||||
second.contains("\"agent_id\":\"agent_test\""),
|
||||
"sentinel JSON includes agent_id"
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user