Merge branch 'feat/v067-mailbox' (#130 sub-agent mailbox port)

This commit is contained in:
Hunter Bown
2026-04-27 22:17:00 -05:00
3 changed files with 647 additions and 17 deletions
+442
View File
@@ -0,0 +1,442 @@
//! Mailbox abstraction for sub-agent runtime coordination.
//!
//! Monotonic sequence numbers give every consumer a consistent ordering even
//! when multiple subscribers (e.g. UI card + parent agent) drain
//! independently; close-as-cancel lets a single signal both stop new mail and
//! propagate cancellation through nested children.
// Some surface here is producer-only inside this crate today and consumed by
// #128's UI cards in a follow-up; suppress the dead-code warnings until then
// rather than deleting capabilities the design depends on.
#![allow(dead_code)]
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Duration;
use serde::{Deserialize, Serialize};
use tokio::sync::{Mutex, mpsc, watch};
use tokio_util::sync::CancellationToken;
use super::SubAgentType;
/// Stable, structured progress envelope shared across the sub-agent surface.
///
/// Tracks the lifecycle of a single agent (identified by `agent_id`) end to
/// end: spawn, per-step progress, tool execution, completion / failure /
/// cancellation, and parent → child topology so consumers can render trees.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum MailboxMessage {
/// Agent has been started (background task is running).
Started {
agent_id: String,
agent_type: String,
},
/// Free-form human-readable progress (mirrors `Event::AgentProgress`).
Progress { agent_id: String, status: String },
/// A tool call inside the agent has started.
ToolCallStarted {
agent_id: String,
tool_name: String,
step: u32,
},
/// A tool call inside the agent has finished.
ToolCallCompleted {
agent_id: String,
tool_name: String,
step: u32,
ok: bool,
},
/// A child agent was spawned by this agent.
ChildSpawned { parent_id: String, child_id: String },
/// Agent completed successfully (carries the summary line shown in the
/// transcript; full result is still available via `agent_result`).
Completed { agent_id: String, summary: String },
/// Agent failed with the carried error message.
Failed { agent_id: String, error: String },
/// Cancellation propagated to this agent.
Cancelled { agent_id: String },
}
impl MailboxMessage {
/// `agent_id` of the message subject (for `ChildSpawned` this is the
/// child, since that's the new lifecycle being announced).
#[must_use]
pub fn agent_id(&self) -> &str {
match self {
Self::Started { agent_id, .. }
| Self::Progress { agent_id, .. }
| Self::ToolCallStarted { agent_id, .. }
| Self::ToolCallCompleted { agent_id, .. }
| Self::Completed { agent_id, .. }
| Self::Failed { agent_id, .. }
| Self::Cancelled { agent_id } => agent_id,
Self::ChildSpawned { child_id, .. } => child_id,
}
}
pub(crate) fn started(agent_id: impl Into<String>, agent_type: SubAgentType) -> Self {
Self::Started {
agent_id: agent_id.into(),
agent_type: agent_type.as_str().to_string(),
}
}
pub(crate) fn progress(agent_id: impl Into<String>, status: impl Into<String>) -> Self {
Self::Progress {
agent_id: agent_id.into(),
status: status.into(),
}
}
}
/// One delivery: a sequence number plus the message. The sequence is
/// monotonic across the entire mailbox (not per-agent) so a single ordering
/// is well-defined even when multiple sub-agents share one mailbox.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MailboxEnvelope {
pub seq: u64,
pub message: MailboxMessage,
}
/// Sender side of the mailbox.
///
/// Cheaply cloneable (everything inside is `Arc`/atomic). Cloning a
/// `Mailbox` shares the same delivery channel, sequence counter, watch
/// notifier, and close/cancel state — so a child runtime that clones its
/// parent's `Mailbox` participates in the same stream.
#[derive(Clone)]
pub struct Mailbox {
inner: Arc<MailboxInner>,
}
struct MailboxInner {
tx: mpsc::UnboundedSender<MailboxEnvelope>,
next_seq: AtomicU64,
seq_tx: watch::Sender<u64>,
closed: AtomicBool,
cancel_token: CancellationToken,
}
/// Receiver side of the mailbox. Not `Clone` — only the original creator
/// can drain. Use `Mailbox::subscribe()` for fanout (UI cards + parent both
/// observing the same stream).
pub struct MailboxReceiver {
rx: mpsc::UnboundedReceiver<MailboxEnvelope>,
pending: VecDeque<MailboxEnvelope>,
}
impl Mailbox {
/// Create a new mailbox bound to the given cancellation token. Closing
/// the mailbox (or dropping the last sender) cancels this token, which
/// propagates to children via `child_token()` per `SubAgentRuntime`.
#[must_use]
pub fn new(cancel_token: CancellationToken) -> (Self, MailboxReceiver) {
let (tx, rx) = mpsc::unbounded_channel();
let (seq_tx, _) = watch::channel(0);
let inner = MailboxInner {
tx,
next_seq: AtomicU64::new(0),
seq_tx,
closed: AtomicBool::new(false),
cancel_token,
};
(
Self {
inner: Arc::new(inner),
},
MailboxReceiver {
rx,
pending: VecDeque::new(),
},
)
}
/// Subscribe to seq-bump notifications. Each `recv()` returns when the
/// sequence counter advances, signaling new mail without copying it —
/// the consumer then calls `drain` (or `recv_one` on its own receiver).
/// Multiple subscribers may exist; this is the fanout primitive.
#[must_use]
pub fn subscribe(&self) -> watch::Receiver<u64> {
self.inner.seq_tx.subscribe()
}
/// Send a message; returns `Some(seq)` on success, `None` if the
/// mailbox is already closed (callers should treat this as "the
/// receiver is gone, stop publishing").
pub fn send(&self, message: MailboxMessage) -> Option<u64> {
if self.inner.closed.load(Ordering::Acquire) {
return None;
}
let seq = self.inner.next_seq.fetch_add(1, Ordering::Relaxed) + 1;
let envelope = MailboxEnvelope { seq, message };
if self.inner.tx.send(envelope).is_err() {
return None;
}
let _ = self.inner.seq_tx.send_replace(seq);
Some(seq)
}
/// Whether the mailbox has been closed.
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.closed.load(Ordering::Acquire)
}
/// Close the mailbox AND cancel the bound cancellation token.
///
/// "Close-as-cancel": there's no useful state where the consumer is
/// gone but children should keep producing. Closing the parent's
/// mailbox cascades to every nested child because each child runtime
/// derived its `cancel_token` via `child_token()` from the parent's.
pub fn close(&self) {
if !self.inner.closed.swap(true, Ordering::AcqRel) {
self.inner.cancel_token.cancel();
}
}
}
impl MailboxReceiver {
fn sync_pending(&mut self) {
while let Ok(env) = self.rx.try_recv() {
self.pending.push_back(env);
}
}
/// Whether any envelopes are buffered (or arrived since last check).
pub fn has_pending(&mut self) -> bool {
self.sync_pending();
!self.pending.is_empty()
}
/// Drain all currently available envelopes, in delivery order.
pub fn drain(&mut self) -> Vec<MailboxEnvelope> {
self.sync_pending();
self.pending.drain(..).collect()
}
/// Await the next envelope, with backpressure-aware blocking. Returns
/// `None` when every sender has been dropped and the buffer is drained.
pub async fn recv(&mut self) -> Option<MailboxEnvelope> {
if let Some(env) = self.pending.pop_front() {
return Some(env);
}
self.rx.recv().await
}
/// Awaits the next envelope with a timeout. Useful in tests.
#[allow(dead_code)]
pub async fn recv_timeout(&mut self, timeout: Duration) -> Option<MailboxEnvelope> {
tokio::time::timeout(timeout, self.recv())
.await
.ok()
.flatten()
}
}
/// Convenience handle: a mailbox + the matching cancellation token, ready to
/// hand to a runtime. The receiver lives on the spawning side.
pub type SharedMailbox = Arc<Mutex<Option<MailboxReceiver>>>;
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::Duration;
fn open() -> (Mailbox, MailboxReceiver, CancellationToken) {
let token = CancellationToken::new();
let (mb, rx) = Mailbox::new(token.clone());
(mb, rx, token)
}
#[tokio::test]
async fn mailbox_assigns_monotonic_sequence_numbers() {
let (mb, _rx, _tok) = open();
let s1 = mb
.send(MailboxMessage::progress("a", "one"))
.expect("seq 1");
let s2 = mb
.send(MailboxMessage::progress("a", "two"))
.expect("seq 2");
let s3 = mb
.send(MailboxMessage::progress("b", "three"))
.expect("seq 3");
assert_eq!(s1, 1);
assert_eq!(s2, 2);
assert_eq!(s3, 3);
assert!(s2 > s1 && s3 > s2);
}
#[tokio::test]
async fn mailbox_drains_in_delivery_order() {
let (mb, mut rx, _tok) = open();
mb.send(MailboxMessage::progress("a", "first"));
mb.send(MailboxMessage::progress("a", "second"));
mb.send(MailboxMessage::Completed {
agent_id: "a".into(),
summary: "done".into(),
});
let drained = rx.drain();
assert_eq!(drained.len(), 3);
assert_eq!(drained[0].seq, 1);
assert_eq!(drained[1].seq, 2);
assert_eq!(drained[2].seq, 3);
assert!(matches!(
drained[0].message,
MailboxMessage::Progress { .. }
));
assert!(matches!(
drained[2].message,
MailboxMessage::Completed { .. }
));
assert!(!rx.has_pending());
}
#[tokio::test]
async fn subscribers_receive_seq_bumps_for_backpressure() {
let (mb, _rx, _tok) = open();
let mut sub_a = mb.subscribe();
let mut sub_b = mb.subscribe();
// Initial state: both at 0.
assert_eq!(*sub_a.borrow(), 0);
assert_eq!(*sub_b.borrow(), 0);
mb.send(MailboxMessage::progress("x", "tick"));
sub_a.changed().await.expect("subscriber a sees bump");
sub_b.changed().await.expect("subscriber b sees bump");
assert_eq!(*sub_a.borrow(), 1);
assert_eq!(*sub_b.borrow(), 1);
// A second send updates both subscribers' watch values too — even
// though they share a single watch channel, fanout is N-to-many.
mb.send(MailboxMessage::progress("x", "tick2"));
sub_a.changed().await.expect("a sees second bump");
assert_eq!(*sub_a.borrow(), 2);
}
#[tokio::test]
async fn close_cancels_bound_token_and_blocks_further_sends() {
let (mb, _rx, token) = open();
assert!(!token.is_cancelled());
mb.send(MailboxMessage::progress("a", "before close"));
mb.close();
assert!(token.is_cancelled(), "close-as-cancel: token must fire");
assert!(mb.is_closed());
// Further sends are no-ops, returning None instead of poisoning seq.
assert!(
mb.send(MailboxMessage::progress("a", "after close"))
.is_none()
);
}
#[tokio::test]
async fn close_propagates_to_child_tokens_across_max_spawn_depth() {
// Mirror the runtime: root → child → grandchild (default depth 3).
let root = CancellationToken::new();
let child = root.child_token();
let grandchild = child.child_token();
let (mb, _rx) = Mailbox::new(root.clone());
assert!(!child.is_cancelled());
assert!(!grandchild.is_cancelled());
mb.close();
assert!(child.is_cancelled(), "child inherits root close");
assert!(
grandchild.is_cancelled(),
"grandchild inherits too — covers default max_spawn_depth = 3"
);
}
#[tokio::test]
async fn recv_returns_envelope_then_none_after_close_and_drop() {
let (mb, mut rx, _tok) = open();
mb.send(MailboxMessage::progress("a", "queued"));
let env = rx.recv().await.expect("buffered envelope");
assert_eq!(env.seq, 1);
// After closing AND dropping the sender, recv must yield None.
mb.close();
drop(mb);
let next = rx.recv_timeout(Duration::from_millis(100)).await;
assert!(next.is_none(), "drained + dropped → recv yields None");
}
#[tokio::test]
async fn cloned_mailbox_shares_sequence_and_close_state() {
let (mb, mut rx, token) = open();
let mb_clone = mb.clone();
let s1 = mb
.send(MailboxMessage::progress("a", "from original"))
.unwrap();
let s2 = mb_clone
.send(MailboxMessage::progress("a", "from clone"))
.unwrap();
assert_eq!(s1, 1);
assert_eq!(s2, 2, "clones share the seq counter");
let drained = rx.drain();
assert_eq!(drained.len(), 2);
// Closing through one clone closes them all (the AtomicBool is shared).
mb_clone.close();
assert!(mb.is_closed());
assert!(token.is_cancelled());
}
#[tokio::test]
async fn agent_id_is_extractable_from_every_variant() {
let cases: Vec<(MailboxMessage, &str)> = vec![
(MailboxMessage::started("a1", SubAgentType::General), "a1"),
(MailboxMessage::progress("a2", "x"), "a2"),
(
MailboxMessage::ToolCallStarted {
agent_id: "a3".into(),
tool_name: "read_file".into(),
step: 1,
},
"a3",
),
(
MailboxMessage::ToolCallCompleted {
agent_id: "a4".into(),
tool_name: "read_file".into(),
step: 1,
ok: true,
},
"a4",
),
(
MailboxMessage::ChildSpawned {
parent_id: "parent".into(),
child_id: "a5".into(),
},
"a5",
),
(
MailboxMessage::Completed {
agent_id: "a6".into(),
summary: "done".into(),
},
"a6",
),
(
MailboxMessage::Failed {
agent_id: "a7".into(),
error: "boom".into(),
},
"a7",
),
(
MailboxMessage::Cancelled {
agent_id: "a8".into(),
},
"a8",
),
];
for (msg, expected) in cases {
assert_eq!(msg.agent_id(), expected, "extract failed for {msg:?}");
}
}
}
+107 -17
View File
@@ -34,6 +34,10 @@ use crate::tools::spec::{
};
use crate::tools::todo::{SharedTodoList, TodoList};
pub mod mailbox;
#[allow(unused_imports)]
pub use mailbox::{Mailbox, MailboxEnvelope, MailboxMessage, MailboxReceiver};
// === Constants ===
const DEFAULT_MAX_STEPS: u32 = 100;
@@ -440,6 +444,10 @@ pub struct SubAgentRuntime {
/// Cooperative cancellation token. Children derive a child_token() from
/// the parent so cancelling the root cascades down.
pub cancel_token: CancellationToken,
/// Structured progress / lifecycle stream. Cloned across children so the
/// whole spawn tree publishes into one ordered, fan-out-able mailbox.
/// `None` only when no consumer is wired (legacy entry points / tests).
pub mailbox: Option<Mailbox>,
}
impl SubAgentRuntime {
@@ -466,9 +474,30 @@ impl SubAgentRuntime {
spawn_depth: 0,
max_spawn_depth: DEFAULT_MAX_SPAWN_DEPTH,
cancel_token: CancellationToken::new(),
mailbox: None,
}
}
/// Attach a `Mailbox` so this runtime (and every descendant — children
/// clone it) publishes structured `MailboxMessage` envelopes alongside
/// the legacy `Event` stream. Pair with [`Self::with_cancel_token`] when
/// you want close-as-cancel to propagate the same way.
#[must_use]
#[allow(dead_code)] // wired by #128 (in-transcript cards) when it lands.
pub fn with_mailbox(mut self, mailbox: Mailbox) -> Self {
self.mailbox = Some(mailbox);
self
}
/// Replace the cancellation token (e.g. when the engine constructs the
/// runtime alongside a mailbox bound to the same token).
#[must_use]
#[allow(dead_code)] // wired by #128 alongside `with_mailbox`.
pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
self.cancel_token = token;
self
}
/// Override the maximum spawn depth (default `DEFAULT_MAX_SPAWN_DEPTH`).
/// Used by config wiring (`[runtime] max_spawn_depth = N`) and tests.
#[must_use]
@@ -501,6 +530,7 @@ impl SubAgentRuntime {
spawn_depth: self.spawn_depth + 1,
max_spawn_depth: self.max_spawn_depth,
cancel_token: self.cancel_token.child_token(),
mailbox: self.mailbox.clone(),
}
}
@@ -2477,23 +2507,38 @@ async fn run_subagent_task(task: SubAgentTask) {
Err(err) => manager.update_failed(&task.agent_id, err.to_string()),
}
if let Some(event_tx) = task.runtime.event_tx {
// Emit BOTH a human-friendly summary (rendered in the parent's
// sidebar / cell) AND a structured sentinel the model can recognize
// on its next turn. Format: human summary on the first line,
// sentinel on the second. The sentinel uses an opaque tag
// (`deepseek:subagent.done`) to avoid collision with normal user
// text.
let (summary, sentinel) = match &result {
Ok(res) => (
summarize_subagent_result(res),
subagent_done_sentinel(&task.agent_id, res),
),
Err(err) => (
format!("Failed: {err}"),
subagent_failed_sentinel(&task.agent_id, &err.to_string()),
),
// Emit BOTH a human-friendly summary (rendered in the parent's
// sidebar / cell) AND a structured sentinel the model can recognize
// on its next turn. Format: human summary on the first line,
// sentinel on the second. The sentinel uses an opaque tag
// (`deepseek:subagent.done`) to avoid collision with normal user
// text.
let (summary, sentinel) = match &result {
Ok(res) => (
summarize_subagent_result(res),
subagent_done_sentinel(&task.agent_id, res),
),
Err(err) => (
format!("Failed: {err}"),
subagent_failed_sentinel(&task.agent_id, &err.to_string()),
),
};
if let Some(mb) = task.runtime.mailbox.as_ref() {
let envelope = match &result {
Ok(_) => MailboxMessage::Completed {
agent_id: task.agent_id.clone(),
summary: summary.clone(),
},
Err(err) => MailboxMessage::Failed {
agent_id: task.agent_id.clone(),
error: err.to_string(),
},
};
let _ = mb.send(envelope);
}
if let Some(event_tx) = task.runtime.event_tx {
let payload = format!("{summary}\n{sentinel}");
let _ = event_tx.try_send(Event::AgentComplete {
id: task.agent_id,
@@ -2555,8 +2600,12 @@ async fn run_subagent(
));
}
let tools = tool_registry.tools_for_model();
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::started(&agent_id, agent_type.clone()));
}
emit_agent_progress(
runtime.event_tx.as_ref(),
runtime.mailbox.as_ref(),
&agent_id,
format!("started ({})", agent_type.as_str()),
);
@@ -2580,9 +2629,15 @@ async fn run_subagent(
if runtime.cancel_token.is_cancelled() {
emit_agent_progress(
runtime.event_tx.as_ref(),
runtime.mailbox.as_ref(),
&agent_id,
format!("step {steps}/{max_steps}: cancelled"),
);
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::Cancelled {
agent_id: agent_id.clone(),
});
}
return Ok(SubAgentResult {
agent_id: agent_id.clone(),
agent_type: agent_type.clone(),
@@ -2597,6 +2652,7 @@ async fn run_subagent(
steps += 1;
emit_agent_progress(
runtime.event_tx.as_ref(),
runtime.mailbox.as_ref(),
&agent_id,
format!("step {steps}/{max_steps}: requesting model response"),
);
@@ -2643,9 +2699,15 @@ async fn run_subagent(
() = runtime.cancel_token.cancelled() => {
emit_agent_progress(
runtime.event_tx.as_ref(),
runtime.mailbox.as_ref(),
&agent_id,
format!("step {steps}/{max_steps}: cancelled mid-request"),
);
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::Cancelled {
agent_id: agent_id.clone(),
});
}
return Ok(SubAgentResult {
agent_id: agent_id.clone(),
agent_type: agent_type.clone(),
@@ -2692,6 +2754,7 @@ async fn run_subagent(
if pending_inputs.is_empty() {
emit_agent_progress(
runtime.event_tx.as_ref(),
runtime.mailbox.as_ref(),
&agent_id,
format!("step {steps}/{max_steps}: complete"),
);
@@ -2702,6 +2765,7 @@ async fn run_subagent(
emit_agent_progress(
runtime.event_tx.as_ref(),
runtime.mailbox.as_ref(),
&agent_id,
format!(
"step {steps}/{max_steps}: executing {} tool call(s)",
@@ -2712,9 +2776,17 @@ async fn run_subagent(
for (tool_id, tool_name, tool_input) in tool_uses {
emit_agent_progress(
runtime.event_tx.as_ref(),
runtime.mailbox.as_ref(),
&agent_id,
format!("step {steps}/{max_steps}: running tool '{tool_name}'"),
);
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::ToolCallStarted {
agent_id: agent_id.clone(),
tool_name: tool_name.clone(),
step: steps,
});
}
let result = match tokio::time::timeout(TOOL_TIMEOUT, async {
tool_registry
.execute(&agent_id, &tool_name, tool_input)
@@ -2726,11 +2798,21 @@ async fn run_subagent(
Ok(Err(e)) => format!("Error: {e}"),
Err(_) => format!("Error: Tool {tool_name} timed out"),
};
let tool_ok = !result.starts_with("Error:");
emit_agent_progress(
runtime.event_tx.as_ref(),
runtime.mailbox.as_ref(),
&agent_id,
format!("step {steps}/{max_steps}: finished tool '{tool_name}'"),
);
if let Some(mb) = runtime.mailbox.as_ref() {
let _ = mb.send(MailboxMessage::ToolCallCompleted {
agent_id: agent_id.clone(),
tool_name: tool_name.clone(),
step: steps,
ok: tool_ok,
});
}
tool_results.push(ContentBlock::ToolResult {
tool_use_id: tool_id,
@@ -3702,7 +3784,15 @@ fn build_assignment_prompt(
)
}
fn emit_agent_progress(event_tx: Option<&mpsc::Sender<Event>>, agent_id: &str, status: String) {
fn emit_agent_progress(
event_tx: Option<&mpsc::Sender<Event>>,
mailbox: Option<&Mailbox>,
agent_id: &str,
status: String,
) {
if let Some(mb) = mailbox {
let _ = mb.send(MailboxMessage::progress(agent_id, status.clone()));
}
if let Some(event_tx) = event_tx {
let _ = event_tx.try_send(Event::AgentProgress {
id: agent_id.to_string(),
+98
View File
@@ -904,6 +904,103 @@ fn child_cancellation_cascades_from_parent() {
);
}
#[test]
fn mailbox_propagates_through_child_runtime_chain() {
use crate::tools::subagent::mailbox::Mailbox;
let parent_token = CancellationToken::new();
let (mailbox, _rx) = Mailbox::new(parent_token.clone());
let mut parent = stub_runtime();
parent.cancel_token = parent_token;
parent.mailbox = Some(mailbox);
let child = parent.child_runtime();
let grandchild = child.child_runtime();
assert!(parent.mailbox.is_some());
assert!(child.mailbox.is_some(), "child inherits parent mailbox");
assert!(
grandchild.mailbox.is_some(),
"grandchild inherits via the cloned Arc inside Mailbox"
);
}
#[tokio::test]
async fn mailbox_close_as_cancel_propagates_to_grandchild_runtime() {
use crate::tools::subagent::mailbox::Mailbox;
let parent_token = CancellationToken::new();
let (mailbox, _rx) = Mailbox::new(parent_token.clone());
let mut parent = stub_runtime();
parent.cancel_token = parent_token;
parent.mailbox = Some(mailbox.clone());
let child = parent.child_runtime();
let grandchild = child.child_runtime();
assert!(!grandchild.cancel_token.is_cancelled());
// Close the mailbox via *any* clone — the original or the one stored on
// the runtime. Cancellation must reach all the way to the grandchild.
mailbox.close();
assert!(parent.cancel_token.is_cancelled());
assert!(child.cancel_token.is_cancelled());
assert!(
grandchild.cancel_token.is_cancelled(),
"close-as-cancel must propagate across max_spawn_depth=3"
);
}
#[tokio::test]
async fn mailbox_orders_messages_from_parent_and_child_runtimes() {
use crate::tools::subagent::mailbox::{Mailbox, MailboxMessage};
let parent_token = CancellationToken::new();
let (mailbox, mut rx) = Mailbox::new(parent_token.clone());
let mut parent = stub_runtime();
parent.cancel_token = parent_token;
parent.mailbox = Some(mailbox);
let child = parent.child_runtime();
// Interleave sends from both runtimes; sequence numbers stay monotonic.
parent
.mailbox
.as_ref()
.unwrap()
.send(MailboxMessage::progress("parent_a", "step 1"));
child
.mailbox
.as_ref()
.unwrap()
.send(MailboxMessage::progress("child_b", "step 1"));
parent
.mailbox
.as_ref()
.unwrap()
.send(MailboxMessage::progress("parent_a", "step 2"));
let drained = rx.drain();
assert_eq!(drained.len(), 3);
assert_eq!(drained[0].seq, 1);
assert_eq!(drained[1].seq, 2);
assert_eq!(drained[2].seq, 3);
// Verify ordering is preserved across publishers.
match (
&drained[0].message,
&drained[1].message,
&drained[2].message,
) {
(
MailboxMessage::Progress { agent_id: a, .. },
MailboxMessage::Progress { agent_id: b, .. },
MailboxMessage::Progress { agent_id: c, .. },
) => {
assert_eq!(a, "parent_a");
assert_eq!(b, "child_b");
assert_eq!(c, "parent_a");
}
other => panic!("unexpected message order: {other:?}"),
}
}
#[test]
fn persisted_empty_allowed_tools_loads_as_full_inheritance() {
// Backward-compat: a v0.6.5 session that persisted with an empty Vec
@@ -989,6 +1086,7 @@ fn stub_runtime() -> SubAgentRuntime {
spawn_depth: 0,
max_spawn_depth: DEFAULT_MAX_SPAWN_DEPTH,
cancel_token: CancellationToken::new(),
mailbox: None,
}
}