feat(subagent): #130 mailbox abstraction with seq + backpressure
Internal upgrade — public tool surface (agent_spawn, agent_swarm, …) unchanged. The mailbox primitive replaces ad-hoc mpsc plumbing in the runtime so: - progress events have monotonic ordering - subscribers get watch-based backpressure - close-as-cancel propagates through nested children Pairs with #128 (in-transcript cards consume the mailbox stream). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,442 @@
|
||||
//! Mailbox abstraction for sub-agent runtime coordination.
|
||||
//!
|
||||
//! Monotonic sequence numbers give every consumer a consistent ordering even
|
||||
//! when multiple subscribers (e.g. UI card + parent agent) drain
|
||||
//! independently; close-as-cancel lets a single signal both stop new mail and
|
||||
//! propagate cancellation through nested children.
|
||||
|
||||
// Some surface here is producer-only inside this crate today and consumed by
|
||||
// #128's UI cards in a follow-up; suppress the dead-code warnings until then
|
||||
// rather than deleting capabilities the design depends on.
|
||||
#![allow(dead_code)]
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||
use std::time::Duration;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::{Mutex, mpsc, watch};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use super::SubAgentType;
|
||||
|
||||
/// Stable, structured progress envelope shared across the sub-agent surface.
|
||||
///
|
||||
/// Tracks the lifecycle of a single agent (identified by `agent_id`) end to
|
||||
/// end: spawn, per-step progress, tool execution, completion / failure /
|
||||
/// cancellation, and parent → child topology so consumers can render trees.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(tag = "kind", rename_all = "snake_case")]
|
||||
pub enum MailboxMessage {
|
||||
/// Agent has been started (background task is running).
|
||||
Started {
|
||||
agent_id: String,
|
||||
agent_type: String,
|
||||
},
|
||||
/// Free-form human-readable progress (mirrors `Event::AgentProgress`).
|
||||
Progress { agent_id: String, status: String },
|
||||
/// A tool call inside the agent has started.
|
||||
ToolCallStarted {
|
||||
agent_id: String,
|
||||
tool_name: String,
|
||||
step: u32,
|
||||
},
|
||||
/// A tool call inside the agent has finished.
|
||||
ToolCallCompleted {
|
||||
agent_id: String,
|
||||
tool_name: String,
|
||||
step: u32,
|
||||
ok: bool,
|
||||
},
|
||||
/// A child agent was spawned by this agent.
|
||||
ChildSpawned { parent_id: String, child_id: String },
|
||||
/// Agent completed successfully (carries the summary line shown in the
|
||||
/// transcript; full result is still available via `agent_result`).
|
||||
Completed { agent_id: String, summary: String },
|
||||
/// Agent failed with the carried error message.
|
||||
Failed { agent_id: String, error: String },
|
||||
/// Cancellation propagated to this agent.
|
||||
Cancelled { agent_id: String },
|
||||
}
|
||||
|
||||
impl MailboxMessage {
|
||||
/// `agent_id` of the message subject (for `ChildSpawned` this is the
|
||||
/// child, since that's the new lifecycle being announced).
|
||||
#[must_use]
|
||||
pub fn agent_id(&self) -> &str {
|
||||
match self {
|
||||
Self::Started { agent_id, .. }
|
||||
| Self::Progress { agent_id, .. }
|
||||
| Self::ToolCallStarted { agent_id, .. }
|
||||
| Self::ToolCallCompleted { agent_id, .. }
|
||||
| Self::Completed { agent_id, .. }
|
||||
| Self::Failed { agent_id, .. }
|
||||
| Self::Cancelled { agent_id } => agent_id,
|
||||
Self::ChildSpawned { child_id, .. } => child_id,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn started(agent_id: impl Into<String>, agent_type: SubAgentType) -> Self {
|
||||
Self::Started {
|
||||
agent_id: agent_id.into(),
|
||||
agent_type: agent_type.as_str().to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn progress(agent_id: impl Into<String>, status: impl Into<String>) -> Self {
|
||||
Self::Progress {
|
||||
agent_id: agent_id.into(),
|
||||
status: status.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// One delivery: a sequence number plus the message. The sequence is
|
||||
/// monotonic across the entire mailbox (not per-agent) so a single ordering
|
||||
/// is well-defined even when multiple sub-agents share one mailbox.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct MailboxEnvelope {
|
||||
pub seq: u64,
|
||||
pub message: MailboxMessage,
|
||||
}
|
||||
|
||||
/// Sender side of the mailbox.
|
||||
///
|
||||
/// Cheaply cloneable (everything inside is `Arc`/atomic). Cloning a
|
||||
/// `Mailbox` shares the same delivery channel, sequence counter, watch
|
||||
/// notifier, and close/cancel state — so a child runtime that clones its
|
||||
/// parent's `Mailbox` participates in the same stream.
|
||||
#[derive(Clone)]
|
||||
pub struct Mailbox {
|
||||
inner: Arc<MailboxInner>,
|
||||
}
|
||||
|
||||
struct MailboxInner {
|
||||
tx: mpsc::UnboundedSender<MailboxEnvelope>,
|
||||
next_seq: AtomicU64,
|
||||
seq_tx: watch::Sender<u64>,
|
||||
closed: AtomicBool,
|
||||
cancel_token: CancellationToken,
|
||||
}
|
||||
|
||||
/// Receiver side of the mailbox. Not `Clone` — only the original creator
|
||||
/// can drain. Use `Mailbox::subscribe()` for fanout (UI cards + parent both
|
||||
/// observing the same stream).
|
||||
pub struct MailboxReceiver {
|
||||
rx: mpsc::UnboundedReceiver<MailboxEnvelope>,
|
||||
pending: VecDeque<MailboxEnvelope>,
|
||||
}
|
||||
|
||||
impl Mailbox {
|
||||
/// Create a new mailbox bound to the given cancellation token. Closing
|
||||
/// the mailbox (or dropping the last sender) cancels this token, which
|
||||
/// propagates to children via `child_token()` per `SubAgentRuntime`.
|
||||
#[must_use]
|
||||
pub fn new(cancel_token: CancellationToken) -> (Self, MailboxReceiver) {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
let (seq_tx, _) = watch::channel(0);
|
||||
let inner = MailboxInner {
|
||||
tx,
|
||||
next_seq: AtomicU64::new(0),
|
||||
seq_tx,
|
||||
closed: AtomicBool::new(false),
|
||||
cancel_token,
|
||||
};
|
||||
(
|
||||
Self {
|
||||
inner: Arc::new(inner),
|
||||
},
|
||||
MailboxReceiver {
|
||||
rx,
|
||||
pending: VecDeque::new(),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
/// Subscribe to seq-bump notifications. Each `recv()` returns when the
|
||||
/// sequence counter advances, signaling new mail without copying it —
|
||||
/// the consumer then calls `drain` (or `recv_one` on its own receiver).
|
||||
/// Multiple subscribers may exist; this is the fanout primitive.
|
||||
#[must_use]
|
||||
pub fn subscribe(&self) -> watch::Receiver<u64> {
|
||||
self.inner.seq_tx.subscribe()
|
||||
}
|
||||
|
||||
/// Send a message; returns `Some(seq)` on success, `None` if the
|
||||
/// mailbox is already closed (callers should treat this as "the
|
||||
/// receiver is gone, stop publishing").
|
||||
pub fn send(&self, message: MailboxMessage) -> Option<u64> {
|
||||
if self.inner.closed.load(Ordering::Acquire) {
|
||||
return None;
|
||||
}
|
||||
let seq = self.inner.next_seq.fetch_add(1, Ordering::Relaxed) + 1;
|
||||
let envelope = MailboxEnvelope { seq, message };
|
||||
if self.inner.tx.send(envelope).is_err() {
|
||||
return None;
|
||||
}
|
||||
let _ = self.inner.seq_tx.send_replace(seq);
|
||||
Some(seq)
|
||||
}
|
||||
|
||||
/// Whether the mailbox has been closed.
|
||||
#[must_use]
|
||||
pub fn is_closed(&self) -> bool {
|
||||
self.inner.closed.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
/// Close the mailbox AND cancel the bound cancellation token.
|
||||
///
|
||||
/// "Close-as-cancel": there's no useful state where the consumer is
|
||||
/// gone but children should keep producing. Closing the parent's
|
||||
/// mailbox cascades to every nested child because each child runtime
|
||||
/// derived its `cancel_token` via `child_token()` from the parent's.
|
||||
pub fn close(&self) {
|
||||
if !self.inner.closed.swap(true, Ordering::AcqRel) {
|
||||
self.inner.cancel_token.cancel();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MailboxReceiver {
|
||||
fn sync_pending(&mut self) {
|
||||
while let Ok(env) = self.rx.try_recv() {
|
||||
self.pending.push_back(env);
|
||||
}
|
||||
}
|
||||
|
||||
/// Whether any envelopes are buffered (or arrived since last check).
|
||||
pub fn has_pending(&mut self) -> bool {
|
||||
self.sync_pending();
|
||||
!self.pending.is_empty()
|
||||
}
|
||||
|
||||
/// Drain all currently available envelopes, in delivery order.
|
||||
pub fn drain(&mut self) -> Vec<MailboxEnvelope> {
|
||||
self.sync_pending();
|
||||
self.pending.drain(..).collect()
|
||||
}
|
||||
|
||||
/// Await the next envelope, with backpressure-aware blocking. Returns
|
||||
/// `None` when every sender has been dropped and the buffer is drained.
|
||||
pub async fn recv(&mut self) -> Option<MailboxEnvelope> {
|
||||
if let Some(env) = self.pending.pop_front() {
|
||||
return Some(env);
|
||||
}
|
||||
self.rx.recv().await
|
||||
}
|
||||
|
||||
/// Awaits the next envelope with a timeout. Useful in tests.
|
||||
#[allow(dead_code)]
|
||||
pub async fn recv_timeout(&mut self, timeout: Duration) -> Option<MailboxEnvelope> {
|
||||
tokio::time::timeout(timeout, self.recv())
|
||||
.await
|
||||
.ok()
|
||||
.flatten()
|
||||
}
|
||||
}
|
||||
|
||||
/// Convenience handle: a mailbox + the matching cancellation token, ready to
|
||||
/// hand to a runtime. The receiver lives on the spawning side.
|
||||
pub type SharedMailbox = Arc<Mutex<Option<MailboxReceiver>>>;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tokio::time::Duration;
|
||||
|
||||
fn open() -> (Mailbox, MailboxReceiver, CancellationToken) {
|
||||
let token = CancellationToken::new();
|
||||
let (mb, rx) = Mailbox::new(token.clone());
|
||||
(mb, rx, token)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn mailbox_assigns_monotonic_sequence_numbers() {
|
||||
let (mb, _rx, _tok) = open();
|
||||
let s1 = mb
|
||||
.send(MailboxMessage::progress("a", "one"))
|
||||
.expect("seq 1");
|
||||
let s2 = mb
|
||||
.send(MailboxMessage::progress("a", "two"))
|
||||
.expect("seq 2");
|
||||
let s3 = mb
|
||||
.send(MailboxMessage::progress("b", "three"))
|
||||
.expect("seq 3");
|
||||
assert_eq!(s1, 1);
|
||||
assert_eq!(s2, 2);
|
||||
assert_eq!(s3, 3);
|
||||
assert!(s2 > s1 && s3 > s2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn mailbox_drains_in_delivery_order() {
|
||||
let (mb, mut rx, _tok) = open();
|
||||
mb.send(MailboxMessage::progress("a", "first"));
|
||||
mb.send(MailboxMessage::progress("a", "second"));
|
||||
mb.send(MailboxMessage::Completed {
|
||||
agent_id: "a".into(),
|
||||
summary: "done".into(),
|
||||
});
|
||||
let drained = rx.drain();
|
||||
assert_eq!(drained.len(), 3);
|
||||
assert_eq!(drained[0].seq, 1);
|
||||
assert_eq!(drained[1].seq, 2);
|
||||
assert_eq!(drained[2].seq, 3);
|
||||
assert!(matches!(
|
||||
drained[0].message,
|
||||
MailboxMessage::Progress { .. }
|
||||
));
|
||||
assert!(matches!(
|
||||
drained[2].message,
|
||||
MailboxMessage::Completed { .. }
|
||||
));
|
||||
assert!(!rx.has_pending());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn subscribers_receive_seq_bumps_for_backpressure() {
|
||||
let (mb, _rx, _tok) = open();
|
||||
let mut sub_a = mb.subscribe();
|
||||
let mut sub_b = mb.subscribe();
|
||||
// Initial state: both at 0.
|
||||
assert_eq!(*sub_a.borrow(), 0);
|
||||
assert_eq!(*sub_b.borrow(), 0);
|
||||
|
||||
mb.send(MailboxMessage::progress("x", "tick"));
|
||||
sub_a.changed().await.expect("subscriber a sees bump");
|
||||
sub_b.changed().await.expect("subscriber b sees bump");
|
||||
assert_eq!(*sub_a.borrow(), 1);
|
||||
assert_eq!(*sub_b.borrow(), 1);
|
||||
|
||||
// A second send updates both subscribers' watch values too — even
|
||||
// though they share a single watch channel, fanout is N-to-many.
|
||||
mb.send(MailboxMessage::progress("x", "tick2"));
|
||||
sub_a.changed().await.expect("a sees second bump");
|
||||
assert_eq!(*sub_a.borrow(), 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn close_cancels_bound_token_and_blocks_further_sends() {
|
||||
let (mb, _rx, token) = open();
|
||||
assert!(!token.is_cancelled());
|
||||
mb.send(MailboxMessage::progress("a", "before close"));
|
||||
mb.close();
|
||||
assert!(token.is_cancelled(), "close-as-cancel: token must fire");
|
||||
assert!(mb.is_closed());
|
||||
// Further sends are no-ops, returning None instead of poisoning seq.
|
||||
assert!(
|
||||
mb.send(MailboxMessage::progress("a", "after close"))
|
||||
.is_none()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn close_propagates_to_child_tokens_across_max_spawn_depth() {
|
||||
// Mirror the runtime: root → child → grandchild (default depth 3).
|
||||
let root = CancellationToken::new();
|
||||
let child = root.child_token();
|
||||
let grandchild = child.child_token();
|
||||
let (mb, _rx) = Mailbox::new(root.clone());
|
||||
|
||||
assert!(!child.is_cancelled());
|
||||
assert!(!grandchild.is_cancelled());
|
||||
mb.close();
|
||||
assert!(child.is_cancelled(), "child inherits root close");
|
||||
assert!(
|
||||
grandchild.is_cancelled(),
|
||||
"grandchild inherits too — covers default max_spawn_depth = 3"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn recv_returns_envelope_then_none_after_close_and_drop() {
|
||||
let (mb, mut rx, _tok) = open();
|
||||
mb.send(MailboxMessage::progress("a", "queued"));
|
||||
let env = rx.recv().await.expect("buffered envelope");
|
||||
assert_eq!(env.seq, 1);
|
||||
|
||||
// After closing AND dropping the sender, recv must yield None.
|
||||
mb.close();
|
||||
drop(mb);
|
||||
let next = rx.recv_timeout(Duration::from_millis(100)).await;
|
||||
assert!(next.is_none(), "drained + dropped → recv yields None");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cloned_mailbox_shares_sequence_and_close_state() {
|
||||
let (mb, mut rx, token) = open();
|
||||
let mb_clone = mb.clone();
|
||||
let s1 = mb
|
||||
.send(MailboxMessage::progress("a", "from original"))
|
||||
.unwrap();
|
||||
let s2 = mb_clone
|
||||
.send(MailboxMessage::progress("a", "from clone"))
|
||||
.unwrap();
|
||||
assert_eq!(s1, 1);
|
||||
assert_eq!(s2, 2, "clones share the seq counter");
|
||||
|
||||
let drained = rx.drain();
|
||||
assert_eq!(drained.len(), 2);
|
||||
|
||||
// Closing through one clone closes them all (the AtomicBool is shared).
|
||||
mb_clone.close();
|
||||
assert!(mb.is_closed());
|
||||
assert!(token.is_cancelled());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn agent_id_is_extractable_from_every_variant() {
|
||||
let cases: Vec<(MailboxMessage, &str)> = vec![
|
||||
(MailboxMessage::started("a1", SubAgentType::General), "a1"),
|
||||
(MailboxMessage::progress("a2", "x"), "a2"),
|
||||
(
|
||||
MailboxMessage::ToolCallStarted {
|
||||
agent_id: "a3".into(),
|
||||
tool_name: "read_file".into(),
|
||||
step: 1,
|
||||
},
|
||||
"a3",
|
||||
),
|
||||
(
|
||||
MailboxMessage::ToolCallCompleted {
|
||||
agent_id: "a4".into(),
|
||||
tool_name: "read_file".into(),
|
||||
step: 1,
|
||||
ok: true,
|
||||
},
|
||||
"a4",
|
||||
),
|
||||
(
|
||||
MailboxMessage::ChildSpawned {
|
||||
parent_id: "parent".into(),
|
||||
child_id: "a5".into(),
|
||||
},
|
||||
"a5",
|
||||
),
|
||||
(
|
||||
MailboxMessage::Completed {
|
||||
agent_id: "a6".into(),
|
||||
summary: "done".into(),
|
||||
},
|
||||
"a6",
|
||||
),
|
||||
(
|
||||
MailboxMessage::Failed {
|
||||
agent_id: "a7".into(),
|
||||
error: "boom".into(),
|
||||
},
|
||||
"a7",
|
||||
),
|
||||
(
|
||||
MailboxMessage::Cancelled {
|
||||
agent_id: "a8".into(),
|
||||
},
|
||||
"a8",
|
||||
),
|
||||
];
|
||||
for (msg, expected) in cases {
|
||||
assert_eq!(msg.agent_id(), expected, "extract failed for {msg:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -34,6 +34,10 @@ use crate::tools::spec::{
|
||||
};
|
||||
use crate::tools::todo::{SharedTodoList, TodoList};
|
||||
|
||||
pub mod mailbox;
|
||||
#[allow(unused_imports)]
|
||||
pub use mailbox::{Mailbox, MailboxEnvelope, MailboxMessage, MailboxReceiver};
|
||||
|
||||
// === Constants ===
|
||||
|
||||
const DEFAULT_MAX_STEPS: u32 = 100;
|
||||
@@ -440,6 +444,10 @@ pub struct SubAgentRuntime {
|
||||
/// Cooperative cancellation token. Children derive a child_token() from
|
||||
/// the parent so cancelling the root cascades down.
|
||||
pub cancel_token: CancellationToken,
|
||||
/// Structured progress / lifecycle stream. Cloned across children so the
|
||||
/// whole spawn tree publishes into one ordered, fan-out-able mailbox.
|
||||
/// `None` only when no consumer is wired (legacy entry points / tests).
|
||||
pub mailbox: Option<Mailbox>,
|
||||
}
|
||||
|
||||
impl SubAgentRuntime {
|
||||
@@ -466,9 +474,30 @@ impl SubAgentRuntime {
|
||||
spawn_depth: 0,
|
||||
max_spawn_depth: DEFAULT_MAX_SPAWN_DEPTH,
|
||||
cancel_token: CancellationToken::new(),
|
||||
mailbox: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Attach a `Mailbox` so this runtime (and every descendant — children
|
||||
/// clone it) publishes structured `MailboxMessage` envelopes alongside
|
||||
/// the legacy `Event` stream. Pair with [`Self::with_cancel_token`] when
|
||||
/// you want close-as-cancel to propagate the same way.
|
||||
#[must_use]
|
||||
#[allow(dead_code)] // wired by #128 (in-transcript cards) when it lands.
|
||||
pub fn with_mailbox(mut self, mailbox: Mailbox) -> Self {
|
||||
self.mailbox = Some(mailbox);
|
||||
self
|
||||
}
|
||||
|
||||
/// Replace the cancellation token (e.g. when the engine constructs the
|
||||
/// runtime alongside a mailbox bound to the same token).
|
||||
#[must_use]
|
||||
#[allow(dead_code)] // wired by #128 alongside `with_mailbox`.
|
||||
pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
|
||||
self.cancel_token = token;
|
||||
self
|
||||
}
|
||||
|
||||
/// Override the maximum spawn depth (default `DEFAULT_MAX_SPAWN_DEPTH`).
|
||||
/// Used by config wiring (`[runtime] max_spawn_depth = N`) and tests.
|
||||
#[must_use]
|
||||
@@ -501,6 +530,7 @@ impl SubAgentRuntime {
|
||||
spawn_depth: self.spawn_depth + 1,
|
||||
max_spawn_depth: self.max_spawn_depth,
|
||||
cancel_token: self.cancel_token.child_token(),
|
||||
mailbox: self.mailbox.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2477,23 +2507,38 @@ async fn run_subagent_task(task: SubAgentTask) {
|
||||
Err(err) => manager.update_failed(&task.agent_id, err.to_string()),
|
||||
}
|
||||
|
||||
if let Some(event_tx) = task.runtime.event_tx {
|
||||
// Emit BOTH a human-friendly summary (rendered in the parent's
|
||||
// sidebar / cell) AND a structured sentinel the model can recognize
|
||||
// on its next turn. Format: human summary on the first line,
|
||||
// sentinel on the second. The sentinel uses an opaque tag
|
||||
// (`deepseek:subagent.done`) to avoid collision with normal user
|
||||
// text.
|
||||
let (summary, sentinel) = match &result {
|
||||
Ok(res) => (
|
||||
summarize_subagent_result(res),
|
||||
subagent_done_sentinel(&task.agent_id, res),
|
||||
),
|
||||
Err(err) => (
|
||||
format!("Failed: {err}"),
|
||||
subagent_failed_sentinel(&task.agent_id, &err.to_string()),
|
||||
),
|
||||
// Emit BOTH a human-friendly summary (rendered in the parent's
|
||||
// sidebar / cell) AND a structured sentinel the model can recognize
|
||||
// on its next turn. Format: human summary on the first line,
|
||||
// sentinel on the second. The sentinel uses an opaque tag
|
||||
// (`deepseek:subagent.done`) to avoid collision with normal user
|
||||
// text.
|
||||
let (summary, sentinel) = match &result {
|
||||
Ok(res) => (
|
||||
summarize_subagent_result(res),
|
||||
subagent_done_sentinel(&task.agent_id, res),
|
||||
),
|
||||
Err(err) => (
|
||||
format!("Failed: {err}"),
|
||||
subagent_failed_sentinel(&task.agent_id, &err.to_string()),
|
||||
),
|
||||
};
|
||||
|
||||
if let Some(mb) = task.runtime.mailbox.as_ref() {
|
||||
let envelope = match &result {
|
||||
Ok(_) => MailboxMessage::Completed {
|
||||
agent_id: task.agent_id.clone(),
|
||||
summary: summary.clone(),
|
||||
},
|
||||
Err(err) => MailboxMessage::Failed {
|
||||
agent_id: task.agent_id.clone(),
|
||||
error: err.to_string(),
|
||||
},
|
||||
};
|
||||
let _ = mb.send(envelope);
|
||||
}
|
||||
|
||||
if let Some(event_tx) = task.runtime.event_tx {
|
||||
let payload = format!("{summary}\n{sentinel}");
|
||||
let _ = event_tx.try_send(Event::AgentComplete {
|
||||
id: task.agent_id,
|
||||
@@ -2555,8 +2600,12 @@ async fn run_subagent(
|
||||
));
|
||||
}
|
||||
let tools = tool_registry.tools_for_model();
|
||||
if let Some(mb) = runtime.mailbox.as_ref() {
|
||||
let _ = mb.send(MailboxMessage::started(&agent_id, agent_type.clone()));
|
||||
}
|
||||
emit_agent_progress(
|
||||
runtime.event_tx.as_ref(),
|
||||
runtime.mailbox.as_ref(),
|
||||
&agent_id,
|
||||
format!("started ({})", agent_type.as_str()),
|
||||
);
|
||||
@@ -2580,9 +2629,15 @@ async fn run_subagent(
|
||||
if runtime.cancel_token.is_cancelled() {
|
||||
emit_agent_progress(
|
||||
runtime.event_tx.as_ref(),
|
||||
runtime.mailbox.as_ref(),
|
||||
&agent_id,
|
||||
format!("step {steps}/{max_steps}: cancelled"),
|
||||
);
|
||||
if let Some(mb) = runtime.mailbox.as_ref() {
|
||||
let _ = mb.send(MailboxMessage::Cancelled {
|
||||
agent_id: agent_id.clone(),
|
||||
});
|
||||
}
|
||||
return Ok(SubAgentResult {
|
||||
agent_id: agent_id.clone(),
|
||||
agent_type: agent_type.clone(),
|
||||
@@ -2597,6 +2652,7 @@ async fn run_subagent(
|
||||
steps += 1;
|
||||
emit_agent_progress(
|
||||
runtime.event_tx.as_ref(),
|
||||
runtime.mailbox.as_ref(),
|
||||
&agent_id,
|
||||
format!("step {steps}/{max_steps}: requesting model response"),
|
||||
);
|
||||
@@ -2643,9 +2699,15 @@ async fn run_subagent(
|
||||
() = runtime.cancel_token.cancelled() => {
|
||||
emit_agent_progress(
|
||||
runtime.event_tx.as_ref(),
|
||||
runtime.mailbox.as_ref(),
|
||||
&agent_id,
|
||||
format!("step {steps}/{max_steps}: cancelled mid-request"),
|
||||
);
|
||||
if let Some(mb) = runtime.mailbox.as_ref() {
|
||||
let _ = mb.send(MailboxMessage::Cancelled {
|
||||
agent_id: agent_id.clone(),
|
||||
});
|
||||
}
|
||||
return Ok(SubAgentResult {
|
||||
agent_id: agent_id.clone(),
|
||||
agent_type: agent_type.clone(),
|
||||
@@ -2692,6 +2754,7 @@ async fn run_subagent(
|
||||
if pending_inputs.is_empty() {
|
||||
emit_agent_progress(
|
||||
runtime.event_tx.as_ref(),
|
||||
runtime.mailbox.as_ref(),
|
||||
&agent_id,
|
||||
format!("step {steps}/{max_steps}: complete"),
|
||||
);
|
||||
@@ -2702,6 +2765,7 @@ async fn run_subagent(
|
||||
|
||||
emit_agent_progress(
|
||||
runtime.event_tx.as_ref(),
|
||||
runtime.mailbox.as_ref(),
|
||||
&agent_id,
|
||||
format!(
|
||||
"step {steps}/{max_steps}: executing {} tool call(s)",
|
||||
@@ -2712,9 +2776,17 @@ async fn run_subagent(
|
||||
for (tool_id, tool_name, tool_input) in tool_uses {
|
||||
emit_agent_progress(
|
||||
runtime.event_tx.as_ref(),
|
||||
runtime.mailbox.as_ref(),
|
||||
&agent_id,
|
||||
format!("step {steps}/{max_steps}: running tool '{tool_name}'"),
|
||||
);
|
||||
if let Some(mb) = runtime.mailbox.as_ref() {
|
||||
let _ = mb.send(MailboxMessage::ToolCallStarted {
|
||||
agent_id: agent_id.clone(),
|
||||
tool_name: tool_name.clone(),
|
||||
step: steps,
|
||||
});
|
||||
}
|
||||
let result = match tokio::time::timeout(TOOL_TIMEOUT, async {
|
||||
tool_registry
|
||||
.execute(&agent_id, &tool_name, tool_input)
|
||||
@@ -2726,11 +2798,21 @@ async fn run_subagent(
|
||||
Ok(Err(e)) => format!("Error: {e}"),
|
||||
Err(_) => format!("Error: Tool {tool_name} timed out"),
|
||||
};
|
||||
let tool_ok = !result.starts_with("Error:");
|
||||
emit_agent_progress(
|
||||
runtime.event_tx.as_ref(),
|
||||
runtime.mailbox.as_ref(),
|
||||
&agent_id,
|
||||
format!("step {steps}/{max_steps}: finished tool '{tool_name}'"),
|
||||
);
|
||||
if let Some(mb) = runtime.mailbox.as_ref() {
|
||||
let _ = mb.send(MailboxMessage::ToolCallCompleted {
|
||||
agent_id: agent_id.clone(),
|
||||
tool_name: tool_name.clone(),
|
||||
step: steps,
|
||||
ok: tool_ok,
|
||||
});
|
||||
}
|
||||
|
||||
tool_results.push(ContentBlock::ToolResult {
|
||||
tool_use_id: tool_id,
|
||||
@@ -3702,7 +3784,15 @@ fn build_assignment_prompt(
|
||||
)
|
||||
}
|
||||
|
||||
fn emit_agent_progress(event_tx: Option<&mpsc::Sender<Event>>, agent_id: &str, status: String) {
|
||||
fn emit_agent_progress(
|
||||
event_tx: Option<&mpsc::Sender<Event>>,
|
||||
mailbox: Option<&Mailbox>,
|
||||
agent_id: &str,
|
||||
status: String,
|
||||
) {
|
||||
if let Some(mb) = mailbox {
|
||||
let _ = mb.send(MailboxMessage::progress(agent_id, status.clone()));
|
||||
}
|
||||
if let Some(event_tx) = event_tx {
|
||||
let _ = event_tx.try_send(Event::AgentProgress {
|
||||
id: agent_id.to_string(),
|
||||
|
||||
@@ -904,6 +904,103 @@ fn child_cancellation_cascades_from_parent() {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mailbox_propagates_through_child_runtime_chain() {
|
||||
use crate::tools::subagent::mailbox::Mailbox;
|
||||
let parent_token = CancellationToken::new();
|
||||
let (mailbox, _rx) = Mailbox::new(parent_token.clone());
|
||||
|
||||
let mut parent = stub_runtime();
|
||||
parent.cancel_token = parent_token;
|
||||
parent.mailbox = Some(mailbox);
|
||||
|
||||
let child = parent.child_runtime();
|
||||
let grandchild = child.child_runtime();
|
||||
assert!(parent.mailbox.is_some());
|
||||
assert!(child.mailbox.is_some(), "child inherits parent mailbox");
|
||||
assert!(
|
||||
grandchild.mailbox.is_some(),
|
||||
"grandchild inherits via the cloned Arc inside Mailbox"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn mailbox_close_as_cancel_propagates_to_grandchild_runtime() {
|
||||
use crate::tools::subagent::mailbox::Mailbox;
|
||||
let parent_token = CancellationToken::new();
|
||||
let (mailbox, _rx) = Mailbox::new(parent_token.clone());
|
||||
|
||||
let mut parent = stub_runtime();
|
||||
parent.cancel_token = parent_token;
|
||||
parent.mailbox = Some(mailbox.clone());
|
||||
|
||||
let child = parent.child_runtime();
|
||||
let grandchild = child.child_runtime();
|
||||
assert!(!grandchild.cancel_token.is_cancelled());
|
||||
|
||||
// Close the mailbox via *any* clone — the original or the one stored on
|
||||
// the runtime. Cancellation must reach all the way to the grandchild.
|
||||
mailbox.close();
|
||||
assert!(parent.cancel_token.is_cancelled());
|
||||
assert!(child.cancel_token.is_cancelled());
|
||||
assert!(
|
||||
grandchild.cancel_token.is_cancelled(),
|
||||
"close-as-cancel must propagate across max_spawn_depth=3"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn mailbox_orders_messages_from_parent_and_child_runtimes() {
|
||||
use crate::tools::subagent::mailbox::{Mailbox, MailboxMessage};
|
||||
let parent_token = CancellationToken::new();
|
||||
let (mailbox, mut rx) = Mailbox::new(parent_token.clone());
|
||||
|
||||
let mut parent = stub_runtime();
|
||||
parent.cancel_token = parent_token;
|
||||
parent.mailbox = Some(mailbox);
|
||||
let child = parent.child_runtime();
|
||||
|
||||
// Interleave sends from both runtimes; sequence numbers stay monotonic.
|
||||
parent
|
||||
.mailbox
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.send(MailboxMessage::progress("parent_a", "step 1"));
|
||||
child
|
||||
.mailbox
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.send(MailboxMessage::progress("child_b", "step 1"));
|
||||
parent
|
||||
.mailbox
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.send(MailboxMessage::progress("parent_a", "step 2"));
|
||||
|
||||
let drained = rx.drain();
|
||||
assert_eq!(drained.len(), 3);
|
||||
assert_eq!(drained[0].seq, 1);
|
||||
assert_eq!(drained[1].seq, 2);
|
||||
assert_eq!(drained[2].seq, 3);
|
||||
// Verify ordering is preserved across publishers.
|
||||
match (
|
||||
&drained[0].message,
|
||||
&drained[1].message,
|
||||
&drained[2].message,
|
||||
) {
|
||||
(
|
||||
MailboxMessage::Progress { agent_id: a, .. },
|
||||
MailboxMessage::Progress { agent_id: b, .. },
|
||||
MailboxMessage::Progress { agent_id: c, .. },
|
||||
) => {
|
||||
assert_eq!(a, "parent_a");
|
||||
assert_eq!(b, "child_b");
|
||||
assert_eq!(c, "parent_a");
|
||||
}
|
||||
other => panic!("unexpected message order: {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn persisted_empty_allowed_tools_loads_as_full_inheritance() {
|
||||
// Backward-compat: a v0.6.5 session that persisted with an empty Vec
|
||||
@@ -989,6 +1086,7 @@ fn stub_runtime() -> SubAgentRuntime {
|
||||
spawn_depth: 0,
|
||||
max_spawn_depth: DEFAULT_MAX_SPAWN_DEPTH,
|
||||
cancel_token: CancellationToken::new(),
|
||||
mailbox: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user