From 32750cb52d6f1131fb96f41835e6ba3cdc755568 Mon Sep 17 00:00:00 2001 From: Hunter Bown Date: Mon, 27 Apr 2026 21:50:14 -0500 Subject: [PATCH] feat(subagent): #130 mailbox abstraction with seq + backpressure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Internal upgrade — public tool surface (agent_spawn, agent_swarm, …) unchanged. The mailbox primitive replaces ad-hoc mpsc plumbing in the runtime so: - progress events have monotonic ordering - subscribers get watch-based backpressure - close-as-cancel propagates through nested children Pairs with #128 (in-transcript cards consume the mailbox stream). Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/tui/src/tools/subagent/mailbox.rs | 442 +++++++++++++++++++++++ crates/tui/src/tools/subagent/mod.rs | 124 ++++++- crates/tui/src/tools/subagent/tests.rs | 98 +++++ 3 files changed, 647 insertions(+), 17 deletions(-) create mode 100644 crates/tui/src/tools/subagent/mailbox.rs diff --git a/crates/tui/src/tools/subagent/mailbox.rs b/crates/tui/src/tools/subagent/mailbox.rs new file mode 100644 index 00000000..fce6499b --- /dev/null +++ b/crates/tui/src/tools/subagent/mailbox.rs @@ -0,0 +1,442 @@ +//! Mailbox abstraction for sub-agent runtime coordination. +//! +//! Monotonic sequence numbers give every consumer a consistent ordering even +//! when multiple subscribers (e.g. UI card + parent agent) drain +//! independently; close-as-cancel lets a single signal both stop new mail and +//! propagate cancellation through nested children. + +// Some surface here is producer-only inside this crate today and consumed by +// #128's UI cards in a follow-up; suppress the dead-code warnings until then +// rather than deleting capabilities the design depends on. +#![allow(dead_code)] + +use std::collections::VecDeque; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use tokio::sync::{Mutex, mpsc, watch}; +use tokio_util::sync::CancellationToken; + +use super::SubAgentType; + +/// Stable, structured progress envelope shared across the sub-agent surface. +/// +/// Tracks the lifecycle of a single agent (identified by `agent_id`) end to +/// end: spawn, per-step progress, tool execution, completion / failure / +/// cancellation, and parent → child topology so consumers can render trees. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum MailboxMessage { + /// Agent has been started (background task is running). + Started { + agent_id: String, + agent_type: String, + }, + /// Free-form human-readable progress (mirrors `Event::AgentProgress`). + Progress { agent_id: String, status: String }, + /// A tool call inside the agent has started. + ToolCallStarted { + agent_id: String, + tool_name: String, + step: u32, + }, + /// A tool call inside the agent has finished. + ToolCallCompleted { + agent_id: String, + tool_name: String, + step: u32, + ok: bool, + }, + /// A child agent was spawned by this agent. + ChildSpawned { parent_id: String, child_id: String }, + /// Agent completed successfully (carries the summary line shown in the + /// transcript; full result is still available via `agent_result`). + Completed { agent_id: String, summary: String }, + /// Agent failed with the carried error message. + Failed { agent_id: String, error: String }, + /// Cancellation propagated to this agent. + Cancelled { agent_id: String }, +} + +impl MailboxMessage { + /// `agent_id` of the message subject (for `ChildSpawned` this is the + /// child, since that's the new lifecycle being announced). + #[must_use] + pub fn agent_id(&self) -> &str { + match self { + Self::Started { agent_id, .. } + | Self::Progress { agent_id, .. } + | Self::ToolCallStarted { agent_id, .. } + | Self::ToolCallCompleted { agent_id, .. } + | Self::Completed { agent_id, .. } + | Self::Failed { agent_id, .. } + | Self::Cancelled { agent_id } => agent_id, + Self::ChildSpawned { child_id, .. } => child_id, + } + } + + pub(crate) fn started(agent_id: impl Into, agent_type: SubAgentType) -> Self { + Self::Started { + agent_id: agent_id.into(), + agent_type: agent_type.as_str().to_string(), + } + } + + pub(crate) fn progress(agent_id: impl Into, status: impl Into) -> Self { + Self::Progress { + agent_id: agent_id.into(), + status: status.into(), + } + } +} + +/// One delivery: a sequence number plus the message. The sequence is +/// monotonic across the entire mailbox (not per-agent) so a single ordering +/// is well-defined even when multiple sub-agents share one mailbox. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct MailboxEnvelope { + pub seq: u64, + pub message: MailboxMessage, +} + +/// Sender side of the mailbox. +/// +/// Cheaply cloneable (everything inside is `Arc`/atomic). Cloning a +/// `Mailbox` shares the same delivery channel, sequence counter, watch +/// notifier, and close/cancel state — so a child runtime that clones its +/// parent's `Mailbox` participates in the same stream. +#[derive(Clone)] +pub struct Mailbox { + inner: Arc, +} + +struct MailboxInner { + tx: mpsc::UnboundedSender, + next_seq: AtomicU64, + seq_tx: watch::Sender, + closed: AtomicBool, + cancel_token: CancellationToken, +} + +/// Receiver side of the mailbox. Not `Clone` — only the original creator +/// can drain. Use `Mailbox::subscribe()` for fanout (UI cards + parent both +/// observing the same stream). +pub struct MailboxReceiver { + rx: mpsc::UnboundedReceiver, + pending: VecDeque, +} + +impl Mailbox { + /// Create a new mailbox bound to the given cancellation token. Closing + /// the mailbox (or dropping the last sender) cancels this token, which + /// propagates to children via `child_token()` per `SubAgentRuntime`. + #[must_use] + pub fn new(cancel_token: CancellationToken) -> (Self, MailboxReceiver) { + let (tx, rx) = mpsc::unbounded_channel(); + let (seq_tx, _) = watch::channel(0); + let inner = MailboxInner { + tx, + next_seq: AtomicU64::new(0), + seq_tx, + closed: AtomicBool::new(false), + cancel_token, + }; + ( + Self { + inner: Arc::new(inner), + }, + MailboxReceiver { + rx, + pending: VecDeque::new(), + }, + ) + } + + /// Subscribe to seq-bump notifications. Each `recv()` returns when the + /// sequence counter advances, signaling new mail without copying it — + /// the consumer then calls `drain` (or `recv_one` on its own receiver). + /// Multiple subscribers may exist; this is the fanout primitive. + #[must_use] + pub fn subscribe(&self) -> watch::Receiver { + self.inner.seq_tx.subscribe() + } + + /// Send a message; returns `Some(seq)` on success, `None` if the + /// mailbox is already closed (callers should treat this as "the + /// receiver is gone, stop publishing"). + pub fn send(&self, message: MailboxMessage) -> Option { + if self.inner.closed.load(Ordering::Acquire) { + return None; + } + let seq = self.inner.next_seq.fetch_add(1, Ordering::Relaxed) + 1; + let envelope = MailboxEnvelope { seq, message }; + if self.inner.tx.send(envelope).is_err() { + return None; + } + let _ = self.inner.seq_tx.send_replace(seq); + Some(seq) + } + + /// Whether the mailbox has been closed. + #[must_use] + pub fn is_closed(&self) -> bool { + self.inner.closed.load(Ordering::Acquire) + } + + /// Close the mailbox AND cancel the bound cancellation token. + /// + /// "Close-as-cancel": there's no useful state where the consumer is + /// gone but children should keep producing. Closing the parent's + /// mailbox cascades to every nested child because each child runtime + /// derived its `cancel_token` via `child_token()` from the parent's. + pub fn close(&self) { + if !self.inner.closed.swap(true, Ordering::AcqRel) { + self.inner.cancel_token.cancel(); + } + } +} + +impl MailboxReceiver { + fn sync_pending(&mut self) { + while let Ok(env) = self.rx.try_recv() { + self.pending.push_back(env); + } + } + + /// Whether any envelopes are buffered (or arrived since last check). + pub fn has_pending(&mut self) -> bool { + self.sync_pending(); + !self.pending.is_empty() + } + + /// Drain all currently available envelopes, in delivery order. + pub fn drain(&mut self) -> Vec { + self.sync_pending(); + self.pending.drain(..).collect() + } + + /// Await the next envelope, with backpressure-aware blocking. Returns + /// `None` when every sender has been dropped and the buffer is drained. + pub async fn recv(&mut self) -> Option { + if let Some(env) = self.pending.pop_front() { + return Some(env); + } + self.rx.recv().await + } + + /// Awaits the next envelope with a timeout. Useful in tests. + #[allow(dead_code)] + pub async fn recv_timeout(&mut self, timeout: Duration) -> Option { + tokio::time::timeout(timeout, self.recv()) + .await + .ok() + .flatten() + } +} + +/// Convenience handle: a mailbox + the matching cancellation token, ready to +/// hand to a runtime. The receiver lives on the spawning side. +pub type SharedMailbox = Arc>>; + +#[cfg(test)] +mod tests { + use super::*; + use tokio::time::Duration; + + fn open() -> (Mailbox, MailboxReceiver, CancellationToken) { + let token = CancellationToken::new(); + let (mb, rx) = Mailbox::new(token.clone()); + (mb, rx, token) + } + + #[tokio::test] + async fn mailbox_assigns_monotonic_sequence_numbers() { + let (mb, _rx, _tok) = open(); + let s1 = mb + .send(MailboxMessage::progress("a", "one")) + .expect("seq 1"); + let s2 = mb + .send(MailboxMessage::progress("a", "two")) + .expect("seq 2"); + let s3 = mb + .send(MailboxMessage::progress("b", "three")) + .expect("seq 3"); + assert_eq!(s1, 1); + assert_eq!(s2, 2); + assert_eq!(s3, 3); + assert!(s2 > s1 && s3 > s2); + } + + #[tokio::test] + async fn mailbox_drains_in_delivery_order() { + let (mb, mut rx, _tok) = open(); + mb.send(MailboxMessage::progress("a", "first")); + mb.send(MailboxMessage::progress("a", "second")); + mb.send(MailboxMessage::Completed { + agent_id: "a".into(), + summary: "done".into(), + }); + let drained = rx.drain(); + assert_eq!(drained.len(), 3); + assert_eq!(drained[0].seq, 1); + assert_eq!(drained[1].seq, 2); + assert_eq!(drained[2].seq, 3); + assert!(matches!( + drained[0].message, + MailboxMessage::Progress { .. } + )); + assert!(matches!( + drained[2].message, + MailboxMessage::Completed { .. } + )); + assert!(!rx.has_pending()); + } + + #[tokio::test] + async fn subscribers_receive_seq_bumps_for_backpressure() { + let (mb, _rx, _tok) = open(); + let mut sub_a = mb.subscribe(); + let mut sub_b = mb.subscribe(); + // Initial state: both at 0. + assert_eq!(*sub_a.borrow(), 0); + assert_eq!(*sub_b.borrow(), 0); + + mb.send(MailboxMessage::progress("x", "tick")); + sub_a.changed().await.expect("subscriber a sees bump"); + sub_b.changed().await.expect("subscriber b sees bump"); + assert_eq!(*sub_a.borrow(), 1); + assert_eq!(*sub_b.borrow(), 1); + + // A second send updates both subscribers' watch values too — even + // though they share a single watch channel, fanout is N-to-many. + mb.send(MailboxMessage::progress("x", "tick2")); + sub_a.changed().await.expect("a sees second bump"); + assert_eq!(*sub_a.borrow(), 2); + } + + #[tokio::test] + async fn close_cancels_bound_token_and_blocks_further_sends() { + let (mb, _rx, token) = open(); + assert!(!token.is_cancelled()); + mb.send(MailboxMessage::progress("a", "before close")); + mb.close(); + assert!(token.is_cancelled(), "close-as-cancel: token must fire"); + assert!(mb.is_closed()); + // Further sends are no-ops, returning None instead of poisoning seq. + assert!( + mb.send(MailboxMessage::progress("a", "after close")) + .is_none() + ); + } + + #[tokio::test] + async fn close_propagates_to_child_tokens_across_max_spawn_depth() { + // Mirror the runtime: root → child → grandchild (default depth 3). + let root = CancellationToken::new(); + let child = root.child_token(); + let grandchild = child.child_token(); + let (mb, _rx) = Mailbox::new(root.clone()); + + assert!(!child.is_cancelled()); + assert!(!grandchild.is_cancelled()); + mb.close(); + assert!(child.is_cancelled(), "child inherits root close"); + assert!( + grandchild.is_cancelled(), + "grandchild inherits too — covers default max_spawn_depth = 3" + ); + } + + #[tokio::test] + async fn recv_returns_envelope_then_none_after_close_and_drop() { + let (mb, mut rx, _tok) = open(); + mb.send(MailboxMessage::progress("a", "queued")); + let env = rx.recv().await.expect("buffered envelope"); + assert_eq!(env.seq, 1); + + // After closing AND dropping the sender, recv must yield None. + mb.close(); + drop(mb); + let next = rx.recv_timeout(Duration::from_millis(100)).await; + assert!(next.is_none(), "drained + dropped → recv yields None"); + } + + #[tokio::test] + async fn cloned_mailbox_shares_sequence_and_close_state() { + let (mb, mut rx, token) = open(); + let mb_clone = mb.clone(); + let s1 = mb + .send(MailboxMessage::progress("a", "from original")) + .unwrap(); + let s2 = mb_clone + .send(MailboxMessage::progress("a", "from clone")) + .unwrap(); + assert_eq!(s1, 1); + assert_eq!(s2, 2, "clones share the seq counter"); + + let drained = rx.drain(); + assert_eq!(drained.len(), 2); + + // Closing through one clone closes them all (the AtomicBool is shared). + mb_clone.close(); + assert!(mb.is_closed()); + assert!(token.is_cancelled()); + } + + #[tokio::test] + async fn agent_id_is_extractable_from_every_variant() { + let cases: Vec<(MailboxMessage, &str)> = vec![ + (MailboxMessage::started("a1", SubAgentType::General), "a1"), + (MailboxMessage::progress("a2", "x"), "a2"), + ( + MailboxMessage::ToolCallStarted { + agent_id: "a3".into(), + tool_name: "read_file".into(), + step: 1, + }, + "a3", + ), + ( + MailboxMessage::ToolCallCompleted { + agent_id: "a4".into(), + tool_name: "read_file".into(), + step: 1, + ok: true, + }, + "a4", + ), + ( + MailboxMessage::ChildSpawned { + parent_id: "parent".into(), + child_id: "a5".into(), + }, + "a5", + ), + ( + MailboxMessage::Completed { + agent_id: "a6".into(), + summary: "done".into(), + }, + "a6", + ), + ( + MailboxMessage::Failed { + agent_id: "a7".into(), + error: "boom".into(), + }, + "a7", + ), + ( + MailboxMessage::Cancelled { + agent_id: "a8".into(), + }, + "a8", + ), + ]; + for (msg, expected) in cases { + assert_eq!(msg.agent_id(), expected, "extract failed for {msg:?}"); + } + } +} diff --git a/crates/tui/src/tools/subagent/mod.rs b/crates/tui/src/tools/subagent/mod.rs index 8841b056..3d472b87 100644 --- a/crates/tui/src/tools/subagent/mod.rs +++ b/crates/tui/src/tools/subagent/mod.rs @@ -34,6 +34,10 @@ use crate::tools::spec::{ }; use crate::tools::todo::{SharedTodoList, TodoList}; +pub mod mailbox; +#[allow(unused_imports)] +pub use mailbox::{Mailbox, MailboxEnvelope, MailboxMessage, MailboxReceiver}; + // === Constants === const DEFAULT_MAX_STEPS: u32 = 100; @@ -440,6 +444,10 @@ pub struct SubAgentRuntime { /// Cooperative cancellation token. Children derive a child_token() from /// the parent so cancelling the root cascades down. pub cancel_token: CancellationToken, + /// Structured progress / lifecycle stream. Cloned across children so the + /// whole spawn tree publishes into one ordered, fan-out-able mailbox. + /// `None` only when no consumer is wired (legacy entry points / tests). + pub mailbox: Option, } impl SubAgentRuntime { @@ -466,9 +474,30 @@ impl SubAgentRuntime { spawn_depth: 0, max_spawn_depth: DEFAULT_MAX_SPAWN_DEPTH, cancel_token: CancellationToken::new(), + mailbox: None, } } + /// Attach a `Mailbox` so this runtime (and every descendant — children + /// clone it) publishes structured `MailboxMessage` envelopes alongside + /// the legacy `Event` stream. Pair with [`Self::with_cancel_token`] when + /// you want close-as-cancel to propagate the same way. + #[must_use] + #[allow(dead_code)] // wired by #128 (in-transcript cards) when it lands. + pub fn with_mailbox(mut self, mailbox: Mailbox) -> Self { + self.mailbox = Some(mailbox); + self + } + + /// Replace the cancellation token (e.g. when the engine constructs the + /// runtime alongside a mailbox bound to the same token). + #[must_use] + #[allow(dead_code)] // wired by #128 alongside `with_mailbox`. + pub fn with_cancel_token(mut self, token: CancellationToken) -> Self { + self.cancel_token = token; + self + } + /// Override the maximum spawn depth (default `DEFAULT_MAX_SPAWN_DEPTH`). /// Used by config wiring (`[runtime] max_spawn_depth = N`) and tests. #[must_use] @@ -501,6 +530,7 @@ impl SubAgentRuntime { spawn_depth: self.spawn_depth + 1, max_spawn_depth: self.max_spawn_depth, cancel_token: self.cancel_token.child_token(), + mailbox: self.mailbox.clone(), } } @@ -2477,23 +2507,38 @@ async fn run_subagent_task(task: SubAgentTask) { Err(err) => manager.update_failed(&task.agent_id, err.to_string()), } - if let Some(event_tx) = task.runtime.event_tx { - // Emit BOTH a human-friendly summary (rendered in the parent's - // sidebar / cell) AND a structured sentinel the model can recognize - // on its next turn. Format: human summary on the first line, - // sentinel on the second. The sentinel uses an opaque tag - // (`deepseek:subagent.done`) to avoid collision with normal user - // text. - let (summary, sentinel) = match &result { - Ok(res) => ( - summarize_subagent_result(res), - subagent_done_sentinel(&task.agent_id, res), - ), - Err(err) => ( - format!("Failed: {err}"), - subagent_failed_sentinel(&task.agent_id, &err.to_string()), - ), + // Emit BOTH a human-friendly summary (rendered in the parent's + // sidebar / cell) AND a structured sentinel the model can recognize + // on its next turn. Format: human summary on the first line, + // sentinel on the second. The sentinel uses an opaque tag + // (`deepseek:subagent.done`) to avoid collision with normal user + // text. + let (summary, sentinel) = match &result { + Ok(res) => ( + summarize_subagent_result(res), + subagent_done_sentinel(&task.agent_id, res), + ), + Err(err) => ( + format!("Failed: {err}"), + subagent_failed_sentinel(&task.agent_id, &err.to_string()), + ), + }; + + if let Some(mb) = task.runtime.mailbox.as_ref() { + let envelope = match &result { + Ok(_) => MailboxMessage::Completed { + agent_id: task.agent_id.clone(), + summary: summary.clone(), + }, + Err(err) => MailboxMessage::Failed { + agent_id: task.agent_id.clone(), + error: err.to_string(), + }, }; + let _ = mb.send(envelope); + } + + if let Some(event_tx) = task.runtime.event_tx { let payload = format!("{summary}\n{sentinel}"); let _ = event_tx.try_send(Event::AgentComplete { id: task.agent_id, @@ -2555,8 +2600,12 @@ async fn run_subagent( )); } let tools = tool_registry.tools_for_model(); + if let Some(mb) = runtime.mailbox.as_ref() { + let _ = mb.send(MailboxMessage::started(&agent_id, agent_type.clone())); + } emit_agent_progress( runtime.event_tx.as_ref(), + runtime.mailbox.as_ref(), &agent_id, format!("started ({})", agent_type.as_str()), ); @@ -2580,9 +2629,15 @@ async fn run_subagent( if runtime.cancel_token.is_cancelled() { emit_agent_progress( runtime.event_tx.as_ref(), + runtime.mailbox.as_ref(), &agent_id, format!("step {steps}/{max_steps}: cancelled"), ); + if let Some(mb) = runtime.mailbox.as_ref() { + let _ = mb.send(MailboxMessage::Cancelled { + agent_id: agent_id.clone(), + }); + } return Ok(SubAgentResult { agent_id: agent_id.clone(), agent_type: agent_type.clone(), @@ -2597,6 +2652,7 @@ async fn run_subagent( steps += 1; emit_agent_progress( runtime.event_tx.as_ref(), + runtime.mailbox.as_ref(), &agent_id, format!("step {steps}/{max_steps}: requesting model response"), ); @@ -2643,9 +2699,15 @@ async fn run_subagent( () = runtime.cancel_token.cancelled() => { emit_agent_progress( runtime.event_tx.as_ref(), + runtime.mailbox.as_ref(), &agent_id, format!("step {steps}/{max_steps}: cancelled mid-request"), ); + if let Some(mb) = runtime.mailbox.as_ref() { + let _ = mb.send(MailboxMessage::Cancelled { + agent_id: agent_id.clone(), + }); + } return Ok(SubAgentResult { agent_id: agent_id.clone(), agent_type: agent_type.clone(), @@ -2692,6 +2754,7 @@ async fn run_subagent( if pending_inputs.is_empty() { emit_agent_progress( runtime.event_tx.as_ref(), + runtime.mailbox.as_ref(), &agent_id, format!("step {steps}/{max_steps}: complete"), ); @@ -2702,6 +2765,7 @@ async fn run_subagent( emit_agent_progress( runtime.event_tx.as_ref(), + runtime.mailbox.as_ref(), &agent_id, format!( "step {steps}/{max_steps}: executing {} tool call(s)", @@ -2712,9 +2776,17 @@ async fn run_subagent( for (tool_id, tool_name, tool_input) in tool_uses { emit_agent_progress( runtime.event_tx.as_ref(), + runtime.mailbox.as_ref(), &agent_id, format!("step {steps}/{max_steps}: running tool '{tool_name}'"), ); + if let Some(mb) = runtime.mailbox.as_ref() { + let _ = mb.send(MailboxMessage::ToolCallStarted { + agent_id: agent_id.clone(), + tool_name: tool_name.clone(), + step: steps, + }); + } let result = match tokio::time::timeout(TOOL_TIMEOUT, async { tool_registry .execute(&agent_id, &tool_name, tool_input) @@ -2726,11 +2798,21 @@ async fn run_subagent( Ok(Err(e)) => format!("Error: {e}"), Err(_) => format!("Error: Tool {tool_name} timed out"), }; + let tool_ok = !result.starts_with("Error:"); emit_agent_progress( runtime.event_tx.as_ref(), + runtime.mailbox.as_ref(), &agent_id, format!("step {steps}/{max_steps}: finished tool '{tool_name}'"), ); + if let Some(mb) = runtime.mailbox.as_ref() { + let _ = mb.send(MailboxMessage::ToolCallCompleted { + agent_id: agent_id.clone(), + tool_name: tool_name.clone(), + step: steps, + ok: tool_ok, + }); + } tool_results.push(ContentBlock::ToolResult { tool_use_id: tool_id, @@ -3702,7 +3784,15 @@ fn build_assignment_prompt( ) } -fn emit_agent_progress(event_tx: Option<&mpsc::Sender>, agent_id: &str, status: String) { +fn emit_agent_progress( + event_tx: Option<&mpsc::Sender>, + mailbox: Option<&Mailbox>, + agent_id: &str, + status: String, +) { + if let Some(mb) = mailbox { + let _ = mb.send(MailboxMessage::progress(agent_id, status.clone())); + } if let Some(event_tx) = event_tx { let _ = event_tx.try_send(Event::AgentProgress { id: agent_id.to_string(), diff --git a/crates/tui/src/tools/subagent/tests.rs b/crates/tui/src/tools/subagent/tests.rs index c6df92be..57840982 100644 --- a/crates/tui/src/tools/subagent/tests.rs +++ b/crates/tui/src/tools/subagent/tests.rs @@ -904,6 +904,103 @@ fn child_cancellation_cascades_from_parent() { ); } +#[test] +fn mailbox_propagates_through_child_runtime_chain() { + use crate::tools::subagent::mailbox::Mailbox; + let parent_token = CancellationToken::new(); + let (mailbox, _rx) = Mailbox::new(parent_token.clone()); + + let mut parent = stub_runtime(); + parent.cancel_token = parent_token; + parent.mailbox = Some(mailbox); + + let child = parent.child_runtime(); + let grandchild = child.child_runtime(); + assert!(parent.mailbox.is_some()); + assert!(child.mailbox.is_some(), "child inherits parent mailbox"); + assert!( + grandchild.mailbox.is_some(), + "grandchild inherits via the cloned Arc inside Mailbox" + ); +} + +#[tokio::test] +async fn mailbox_close_as_cancel_propagates_to_grandchild_runtime() { + use crate::tools::subagent::mailbox::Mailbox; + let parent_token = CancellationToken::new(); + let (mailbox, _rx) = Mailbox::new(parent_token.clone()); + + let mut parent = stub_runtime(); + parent.cancel_token = parent_token; + parent.mailbox = Some(mailbox.clone()); + + let child = parent.child_runtime(); + let grandchild = child.child_runtime(); + assert!(!grandchild.cancel_token.is_cancelled()); + + // Close the mailbox via *any* clone — the original or the one stored on + // the runtime. Cancellation must reach all the way to the grandchild. + mailbox.close(); + assert!(parent.cancel_token.is_cancelled()); + assert!(child.cancel_token.is_cancelled()); + assert!( + grandchild.cancel_token.is_cancelled(), + "close-as-cancel must propagate across max_spawn_depth=3" + ); +} + +#[tokio::test] +async fn mailbox_orders_messages_from_parent_and_child_runtimes() { + use crate::tools::subagent::mailbox::{Mailbox, MailboxMessage}; + let parent_token = CancellationToken::new(); + let (mailbox, mut rx) = Mailbox::new(parent_token.clone()); + + let mut parent = stub_runtime(); + parent.cancel_token = parent_token; + parent.mailbox = Some(mailbox); + let child = parent.child_runtime(); + + // Interleave sends from both runtimes; sequence numbers stay monotonic. + parent + .mailbox + .as_ref() + .unwrap() + .send(MailboxMessage::progress("parent_a", "step 1")); + child + .mailbox + .as_ref() + .unwrap() + .send(MailboxMessage::progress("child_b", "step 1")); + parent + .mailbox + .as_ref() + .unwrap() + .send(MailboxMessage::progress("parent_a", "step 2")); + + let drained = rx.drain(); + assert_eq!(drained.len(), 3); + assert_eq!(drained[0].seq, 1); + assert_eq!(drained[1].seq, 2); + assert_eq!(drained[2].seq, 3); + // Verify ordering is preserved across publishers. + match ( + &drained[0].message, + &drained[1].message, + &drained[2].message, + ) { + ( + MailboxMessage::Progress { agent_id: a, .. }, + MailboxMessage::Progress { agent_id: b, .. }, + MailboxMessage::Progress { agent_id: c, .. }, + ) => { + assert_eq!(a, "parent_a"); + assert_eq!(b, "child_b"); + assert_eq!(c, "parent_a"); + } + other => panic!("unexpected message order: {other:?}"), + } +} + #[test] fn persisted_empty_allowed_tools_loads_as_full_inheritance() { // Backward-compat: a v0.6.5 session that persisted with an empty Vec @@ -989,6 +1086,7 @@ fn stub_runtime() -> SubAgentRuntime { spawn_depth: 0, max_spawn_depth: DEFAULT_MAX_SPAWN_DEPTH, cancel_token: CancellationToken::new(), + mailbox: None, } }