refactor(tools): drop parallel_fanout — rlm is the only RLM tool

Two near-duplicate top-level tools made the surface confusing. With
parallel_fanout (formerly rlm_query) removed, there's exactly one RLM
shape: load a long input as `context` in a Python REPL via `rlm`, and
let the sub-agent fan out from inside the REPL via `llm_query_batched`
where it has `context` in scope to chunk against.

For non-RLM parallel work the dispatcher already runs multiple tool
calls per turn concurrently — no separate fan-out tool needed. The
GenericToolCell.prompts rendering hook stays (one-row-per-child for any
future fan-out tool), but no tool currently populates it.

Also drops two stray test artifacts (rlm_catalog.md, rlm_test_doc.md)
the model wrote to repo root during a previous live test session.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hunter Bown
2026-04-27 02:14:10 -05:00
parent 9dd0d12cea
commit 2787cdc7b9
13 changed files with 31 additions and 931 deletions
-2
View File
@@ -412,7 +412,6 @@ fn should_default_defer_tool(name: &str, mode: AppMode) -> bool {
| "grep_files"
| "file_search"
| "diagnostics"
| "parallel_fanout"
| "rlm"
| MULTI_TOOL_PARALLEL_NAME
| "update_plan"
@@ -1450,7 +1449,6 @@ impl Engine {
builder = builder
.with_review_tool(self.deepseek_client.clone(), self.session.model.clone())
.with_parallel_fanout_tool(self.deepseek_client.clone())
.with_rlm_tool(self.deepseek_client.clone(), self.session.model.clone())
.with_user_input_tool()
.with_parallel_tool();
+1 -1
View File
@@ -1,6 +1,6 @@
## Mode: agent
Read-only tools (reads, searches, `parallel_fanout`, agent status queries, git inspection) run silently.
Read-only tools (reads, searches, `rlm`, agent status queries, git inspection) run silently.
Any write, patch, shell execution, sub-agent spawn, or CSV batch operation will ask for approval first.
Before requesting approval for writes, lay out your work with `todo_write` so the user can see what
+2 -2
View File
@@ -9,7 +9,7 @@ Your default workflow for any non-trivial request:
2. **Execute** — work through each todo, updating status as you go.
3. **For complex initiatives**, layer `update_plan` (high-level strategy) above `todo_write` (granular steps).
4. **For parallel work**, spawn sub-agents (`agent_spawn` / `agent_swarm`) — each does one thing well. Link them to plan/todo items in your thinking.
5. **For LM-only fan-out** (summarization, classification, analysis across many items), use `parallel_fanout` for fast parallel inference.
5. **For long inputs that don't fit in your context** (whole files, transcripts, multi-doc corpora) or when you need recursive sub-LLM work, use `rlm` — it loads the input into a Python REPL as `context` and runs sub-LLM calls there so the long string never enters your window.
6. **For persistent cross-session memory**, use `note` sparingly for important decisions, open blockers, and architectural context.
**Key principle**: make your work visible. The sidebar shows Plan / Todos / Tasks / Agents. When these panels are empty, the user has no idea what you're doing. Keep them populated.
@@ -28,7 +28,7 @@ Model notes: DeepSeek V4 models emit *thinking tokens* (`ContentBlock::Thinking`
- **Git / diag / tests**: `git_status`, `git_diff`, `git_show`, `git_log`, `git_blame`, `diagnostics`, `run_tests`, `review`.
- **Sub-agents**: `agent_spawn` (`spawn_agent`, `delegate_to_agent`), `agent_swarm`, `agent_result`, `agent_cancel` (`close_agent`), `agent_list`, `agent_wait` (`wait`), `agent_send_input` (`send_input`), `agent_assign` (`assign_agent`), `resume_agent`.
- **CSV batch**: `spawn_agents_on_csv`, `report_agent_job_result`.
- **LM fan-out**: `parallel_fanout` — `prompts: [...]` runs up to 16 children on the fast cheap model concurrently. Read-only.
- **Recursive LM (long inputs)**: `rlm` — load a file/string as `context` in a Python REPL, sub-agent writes Python that calls `llm_query`/`llm_query_batched`/`rlm_query` to chunk and process it; returns the synthesized answer. Read-only.
- **Other**: `code_execution` (Python sandbox), `validate_data` (JSON/TOML), `request_user_input`, `finance` (market quotes), `tool_search_tool_regex`, `tool_search_tool_bm25` (deferred tool discovery).
Multiple `tool_calls` in one turn run in parallel. `web_search` returns `ref_id`s — cite as `(ref_id)`.
+1 -1
View File
@@ -1,6 +1,6 @@
## Mode: normal
Reads and `parallel_fanout` run silently. Writes, patches, and shell commands ask for approval.
Reads and `rlm` run silently. Writes, patches, and shell commands ask for approval.
Before requesting writes, use `todo_write` to outline your approach — visible plans build trust.
For complex work, layer `update_plan` (strategy) above `todo_write` (tactics).
-1
View File
@@ -15,7 +15,6 @@ pub mod project;
pub mod registry;
pub mod review;
pub mod rlm;
pub mod parallel_fanout;
pub mod search;
pub mod shell;
mod shell_output;
-659
View File
@@ -1,659 +0,0 @@
//! Native Rust RLM tool — parallel/batched LLM fan-out as a structured
//! tool call. Inspired by alexzhang13/rlm but trimmed to the primitives
//! that actually matter inside an agent loop: a single tool that runs
//! N concurrent child completions on the cheap flash model and returns
//! the joined result.
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use async_trait::async_trait;
use futures_util::future::join_all;
use serde_json::{Value, json};
use tokio::time::timeout;
use tracing::debug;
use crate::client::DeepSeekClient;
use crate::llm_client::LlmClient as _;
use crate::models::{ContentBlock, Message, MessageRequest, MessageResponse, SystemPrompt};
use crate::tools::spec::{
ApprovalRequirement, ToolCapability, ToolContext, ToolError, ToolResult, ToolSpec,
optional_str, optional_u64,
};
/// Default child model — cheap and fast.
const DEFAULT_CHILD_MODEL: &str = "deepseek-v4-flash";
/// Per-child completion ceiling. Children are meant to be short.
const DEFAULT_MAX_TOKENS: u32 = 4096;
/// Hard cap on parallel children — protects against runaway fan-out.
const MAX_PARALLEL: usize = 16;
/// Per-child timeout — each child request must complete within this window
/// or it is treated as a timed-out error. Protects the fan-out from hanging
/// indefinitely when a single API request stalls.
const DEFAULT_CHILD_TIMEOUT: Duration = Duration::from_secs(120);
// ---------------------------------------------------------------------------
// FanoutChildClient — dyn-compatible wrapper around LLM completion.
//
// The workspace's `LlmClient` trait uses native `async fn`, which is not dyn
// compatible in stable Rust (RPITIT vtable limitations). We define a small
// local trait with `#[async_trait]` that IS dyn-compatible, implement it for
// `DeepSeekClient`, and also implement it in tests for `MockLlmClient`. This
// avoids touching `llm_client.rs` or adding a new dep.
// ---------------------------------------------------------------------------
/// Minimal dyn-compatible async interface for the single RLM child-completion
/// operation. `#[async_trait]` desugars the async method into a boxed future
/// so the trait is object-safe.
#[async_trait]
pub(crate) trait FanoutChildClient: Send + Sync {
async fn complete(&self, request: MessageRequest) -> anyhow::Result<MessageResponse>;
}
/// Blanket impl: any `DeepSeekClient` is a valid child client.
#[async_trait]
impl FanoutChildClient for DeepSeekClient {
async fn complete(&self, request: MessageRequest) -> anyhow::Result<MessageResponse> {
self.create_message(request).await
}
}
/// Tool: `rlm_query`. Runs one or more prompts in parallel and joins the
/// results. Structured tool call so the model can trigger fan-out reliably.
pub struct ParallelFanoutTool {
/// Boxed child client — `Arc<dyn FanoutChildClient>` lets tests inject a
/// mock without going through a real HTTP connection. `None` when no API
/// key is configured.
client: Option<Arc<dyn FanoutChildClient>>,
default_model: String,
}
impl ParallelFanoutTool {
/// Construct with a concrete `DeepSeekClient` (production path).
#[must_use]
pub fn new(client: Option<DeepSeekClient>) -> Self {
Self {
client: client.map(|c| Arc::new(c) as Arc<dyn FanoutChildClient>),
default_model: DEFAULT_CHILD_MODEL.to_string(),
}
}
/// Construct with a pre-boxed `FanoutChildClient` — used by tests to inject
/// a `MockRlmClient` without an active API connection.
#[cfg(test)]
pub(crate) fn new_with_arc(client: Option<Arc<dyn FanoutChildClient>>) -> Self {
Self {
client,
default_model: DEFAULT_CHILD_MODEL.to_string(),
}
}
}
#[async_trait]
impl ToolSpec for ParallelFanoutTool {
fn name(&self) -> &'static str {
"parallel_fanout"
}
fn description(&self) -> &'static str {
"Run up to 16 prompts concurrently against the fast cheap model (deepseek-v4-flash) \
and return the joined results. Pass `prompts: [...]` for a parallel batch or \
`prompt` for a single child. Children run in isolation with an optional shared \
`system` prompt; results come back as `[i] <text>` blocks separated by `---` (or \
just the text for N=1). Max 16 children per call (each is a one-shot flash query; \
use agent_spawn for full multi-turn sub-agents). Read-only no file or shell side-effects."
}
fn input_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": "Single prompt to run. Use this OR prompts, not both."
},
"prompts": {
"type": "array",
"items": { "type": "string" },
"description": "Up to 16 prompts to run concurrently (each is a one-shot flash query). Returns indexed `[0] ... [N-1]` blocks."
},
"model": {
"type": "string",
"description": "Model override (default: deepseek-v4-flash)."
},
"system": {
"type": "string",
"description": "Optional shared system prompt applied to every child."
},
"max_tokens": {
"type": "integer",
"description": "Per-child token cap (default: 4096)."
}
}
})
}
fn capabilities(&self) -> Vec<ToolCapability> {
vec![ToolCapability::Network, ToolCapability::ReadOnly]
}
fn approval_requirement(&self) -> ApprovalRequirement {
ApprovalRequirement::Auto
}
fn supports_parallel(&self) -> bool {
true
}
async fn execute(&self, input: Value, _context: &ToolContext) -> Result<ToolResult, ToolError> {
let Some(client) = self.client.clone() else {
return Err(ToolError::not_available(
"rlm_query requires an active DeepSeek client".to_string(),
));
};
let model = optional_str(&input, "model")
.map(|s| s.to_string())
.unwrap_or_else(|| self.default_model.clone());
let system = optional_str(&input, "system").map(|s| s.to_string());
let max_tokens = u32::try_from(
optional_u64(&input, "max_tokens", u64::from(DEFAULT_MAX_TOKENS))
.min(u64::from(u32::MAX)),
)
.unwrap_or(DEFAULT_MAX_TOKENS);
// Accept either `prompts: [...]` or `prompt: "..."`.
let prompts: Vec<String> =
if let Some(arr) = input.get("prompts").and_then(|v| v.as_array()) {
arr.iter()
.filter_map(|v| v.as_str().map(str::to_string))
.collect()
} else if let Some(p) = input.get("prompt").and_then(|v| v.as_str()) {
vec![p.to_string()]
} else {
return Err(ToolError::invalid_input(
"rlm_query requires `prompt` (string) or `prompts` (array of strings)",
));
};
if prompts.is_empty() {
return Err(ToolError::invalid_input("parallel_fanout: prompts list is empty"));
}
if prompts.len() > MAX_PARALLEL {
return Err(ToolError::invalid_input(format!(
"parallel_fanout: too many prompts ({}, max {MAX_PARALLEL})",
prompts.len(),
)));
}
// client is already Arc<dyn FanoutChildClient> — clone the Arc, not the client.
let model = Arc::new(model);
let system = Arc::new(system);
let total = prompts.len();
// Tracks the peak concurrent in-flight child count for this fan-out.
// Useful as evidence that join_all actually overlaps requests rather
// than walking through them serially. Surfaces in `RUST_LOG=
// deepseek_cli::tools=debug` as the `peak` field of the summary log.
let in_flight = Arc::new(AtomicUsize::new(0));
let peak = Arc::new(AtomicUsize::new(0));
let dispatch_started = Instant::now();
let futures = prompts.into_iter().enumerate().map(|(idx, prompt)| {
let client = Arc::clone(&client);
let model = Arc::clone(&model);
let system = Arc::clone(&system);
let in_flight = Arc::clone(&in_flight);
let peak = Arc::clone(&peak);
async move {
let prior = in_flight.fetch_add(1, Ordering::Relaxed);
let now = prior + 1;
peak.fetch_max(now, Ordering::Relaxed);
debug!(
target: "deepseek_cli::tools",
tool = "parallel_fanout",
idx,
in_flight = now,
"child request start"
);
let started = Instant::now();
let request = MessageRequest {
model: (*model).clone(),
messages: vec![Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text: prompt,
cache_control: None,
}],
}],
max_tokens,
system: system.as_ref().clone().map(SystemPrompt::Text),
tools: None,
tool_choice: None,
metadata: None,
thinking: None,
reasoning_effort: None,
stream: Some(false),
temperature: Some(0.4),
top_p: Some(0.9),
};
let response = timeout(DEFAULT_CHILD_TIMEOUT, client.complete(request)).await;
let elapsed_ms = started.elapsed().as_millis() as u64;
in_flight.fetch_sub(1, Ordering::Relaxed);
let mut text_len = 0;
let mut thinking_len = 0;
let mut finish_reason = None;
if let Ok(Ok(ref res)) = response {
finish_reason = res.stop_reason.clone();
for block in &res.content {
match block {
ContentBlock::Text { text, .. } => text_len += text.len(),
ContentBlock::Thinking { thinking, .. } => {
thinking_len += thinking.len()
}
_ => {}
}
}
}
debug!(
target: "deepseek_cli::tools",
tool = "parallel_fanout",
idx,
elapsed_ms,
ok = response.is_ok(),
text_len,
thinking_len,
finish_reason = ?finish_reason,
"child request done"
);
(idx, response)
}
});
let results = join_all(futures).await;
let dispatch_elapsed_ms = dispatch_started.elapsed().as_millis() as u64;
debug!(
target: "deepseek_cli::tools",
tool = "parallel_fanout",
total,
peak = peak.load(Ordering::Relaxed),
dispatch_elapsed_ms,
"fan-out complete"
);
let mut ordered: Vec<(usize, String)> = results
.into_iter()
.map(|(idx, res)| {
let text = match res {
Ok(Ok(response)) => {
extract_text(&response.content, idx, response.stop_reason.as_deref())
}
Ok(Err(e)) => format!("[error: {e}]"),
Err(_) => format!(
"[error: timed out after {}s]",
DEFAULT_CHILD_TIMEOUT.as_secs()
),
};
(idx, text)
})
.collect();
ordered.sort_by_key(|(idx, _)| *idx);
let body = if ordered.len() == 1 {
ordered
.into_iter()
.next()
.map(|(_, t)| t)
.unwrap_or_default()
} else {
ordered
.into_iter()
.map(|(idx, t)| format!("[{idx}] {t}"))
.collect::<Vec<_>>()
.join("\n\n---\n\n")
};
Ok(ToolResult::success(body))
}
}
fn extract_text(blocks: &[ContentBlock], idx: usize, finish_reason: Option<&str>) -> String {
let text = blocks
.iter()
.filter_map(|b| match b {
ContentBlock::Text { text, .. } => Some(text.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join("\n");
if !text.trim().is_empty() {
return text;
}
let thinking = blocks
.iter()
.filter_map(|b| match b {
ContentBlock::Thinking { thinking, .. } => Some(thinking.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join("\n");
if !thinking.trim().is_empty() {
return format!("[Child {} thinking fallback]\n{}", idx, thinking);
}
format!(
"[empty response from child idx {} — finish_reason={}, raise max_tokens]",
idx,
finish_reason.unwrap_or("unknown")
)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::models::{MessageResponse, Usage};
use crate::tools::spec::ToolContext;
use std::path::PathBuf;
use std::sync::Mutex;
fn ctx() -> ToolContext {
ToolContext::with_auto_approve(
PathBuf::from("."),
false,
PathBuf::from("notes.txt"),
PathBuf::from("mcp.json"),
true,
)
}
fn tool_without_client() -> ParallelFanoutTool {
ParallelFanoutTool::new(None)
}
// -----------------------------------------------------------------------
// MockRlmClient — in-process stub for concurrency tests.
//
// Records the wall-clock instant each call *starts* and sleeps
// `call_delay` before returning. With join_all the N futures run
// concurrently on a single-threaded executor, so all starts happen
// before any sleep expires — demonstrating true overlap.
// -----------------------------------------------------------------------
struct MockRlmClient {
/// Per-call sleep to make overlap visible against a wall clock.
call_delay: std::time::Duration,
/// Timestamps recorded at the start of each `complete` call.
start_times: Arc<Mutex<Vec<Instant>>>,
}
impl MockRlmClient {
fn new(call_delay: std::time::Duration) -> Self {
Self {
call_delay,
start_times: Arc::new(Mutex::new(Vec::new())),
}
}
}
#[async_trait]
impl FanoutChildClient for MockRlmClient {
async fn complete(&self, request: MessageRequest) -> anyhow::Result<MessageResponse> {
// Record start time before sleeping.
self.start_times.lock().unwrap().push(Instant::now());
tokio::time::sleep(self.call_delay).await;
// Return a minimal valid response that mirrors the incoming prompt.
let prompt_text = request
.messages
.first()
.and_then(|m| m.content.first())
.and_then(|b| match b {
ContentBlock::Text { text, .. } => Some(text.clone()),
_ => None,
})
.unwrap_or_default();
Ok(MessageResponse {
id: "mock-id".to_string(),
r#type: "message".to_string(),
role: "assistant".to_string(),
content: vec![ContentBlock::Text {
text: format!("echo: {prompt_text}"),
cache_control: None,
}],
model: "mock-model".to_string(),
stop_reason: Some("end_turn".to_string()),
stop_sequence: None,
container: None,
usage: Usage::default(),
})
}
}
// -----------------------------------------------------------------------
// Concurrency regression test
//
// With N=4 prompts and a 50 ms per-call sleep, *serial* execution would
// take ≥ 4×50 = 200 ms. True join_all fan-out means all calls start
// before any completes, so total wall time is ~50 ms (one sleep, not
// four). We assert: total_elapsed < 4 × call_delay, i.e. the calls
// must overlap.
//
// The test also verifies the mock records N start timestamps all clustered
// within one call_delay window — double-confirming overlap is real.
// -----------------------------------------------------------------------
#[tokio::test]
async fn rlm_parallel_fanout_overlaps_not_serialized() {
const N: usize = 4;
const CALL_DELAY_MS: u64 = 50;
let delay = std::time::Duration::from_millis(CALL_DELAY_MS);
let mock = Arc::new(MockRlmClient::new(delay));
let start_times_ref = Arc::clone(&mock.start_times);
let tool = ParallelFanoutTool::new_with_arc(Some(mock as Arc<dyn FanoutChildClient>));
let prompts: Vec<&str> = vec!["a", "b", "c", "d"];
let overall_start = Instant::now();
let result = tool
.execute(json!({ "prompts": prompts }), &ctx())
.await
.expect("mock tool should succeed");
let total_elapsed = overall_start.elapsed();
// Sanity: all 4 children returned text.
assert!(result.success, "all children should succeed");
// Overlap assertion: total wall time must be well under 4×delay.
// We allow 3×delay as a generous upper bound (plenty of headroom for
// slow CI machines) while still catching serialization bugs.
let serial_time = delay * u32::try_from(N).unwrap();
assert!(
total_elapsed < serial_time,
"fan-out looks serialized: elapsed {total_elapsed:?} >= serial bound {serial_time:?}"
);
// Secondary confirmation: the mock recorded N start timestamps that
// are within one call_delay of each other, proving actual concurrency.
let starts = start_times_ref.lock().unwrap();
assert_eq!(starts.len(), N, "expected exactly {N} child calls");
let min_start = *starts.iter().min().unwrap();
let max_start = *starts.iter().max().unwrap();
// All starts must cluster within one call_delay window — if they were
// serial, max_start - min_start would be ≥ (N-1) × delay.
let start_spread = max_start.duration_since(min_start);
assert!(
start_spread < delay,
"child starts are spread over {start_spread:?}, expected < {delay:?} \
(suggests serialization rather than concurrent fan-out)"
);
// Surface numbers for test output (visible with --nocapture or on
// failure). This is the same information the issue asked to emit.
eprintln!(
"[rlm_parallel_fanout] total_elapsed={total_elapsed:?} \
start_spread={start_spread:?} \
max_concurrent={N} \
per_call_delay={delay:?}"
);
}
/// With a mock client, `prompt` (singular) still fans out as a single
/// child and returns plain text (no `[0]` prefix for N=1).
#[tokio::test]
async fn rlm_single_prompt_returns_plain_text() {
let mock = Arc::new(MockRlmClient::new(std::time::Duration::from_millis(1)));
let tool = ParallelFanoutTool::new_with_arc(Some(mock as Arc<dyn FanoutChildClient>));
let result = tool
.execute(json!({ "prompt": "hello" }), &ctx())
.await
.expect("single-prompt mock should succeed");
let text = &result.content;
// N=1 returns bare text, no "[0]" index prefix.
assert!(!text.starts_with("[0]"), "N=1 must not add index prefix");
assert!(text.contains("echo: hello"), "text must echo the prompt");
}
/// With a mock client, `prompts` (plural, N>1) returns indexed blocks.
#[tokio::test]
async fn rlm_multi_prompt_returns_indexed_blocks() {
let mock = Arc::new(MockRlmClient::new(std::time::Duration::from_millis(1)));
let tool = ParallelFanoutTool::new_with_arc(Some(mock as Arc<dyn FanoutChildClient>));
let result = tool
.execute(json!({ "prompts": ["alpha", "beta"] }), &ctx())
.await
.expect("multi-prompt mock should succeed");
let text = &result.content;
assert!(text.contains("[0]"), "first block must be indexed [0]");
assert!(text.contains("[1]"), "second block must be indexed [1]");
assert!(text.contains("echo: alpha"));
assert!(text.contains("echo: beta"));
}
#[test]
fn schema_advertises_both_shapes() {
let schema = tool_without_client().input_schema();
let props = schema
.get("properties")
.and_then(|v| v.as_object())
.expect("schema has properties");
assert!(props.contains_key("prompt"));
assert!(props.contains_key("prompts"));
assert!(props.contains_key("model"));
assert!(props.contains_key("system"));
assert!(props.contains_key("max_tokens"));
// Neither prompt nor prompts is required at the schema level — the
// tool accepts either, and validates "one or the other" at runtime.
assert!(schema.get("required").is_none());
}
#[tokio::test]
async fn returns_not_available_without_client() {
let tool = tool_without_client();
let err = tool
.execute(json!({ "prompt": "hi" }), &ctx())
.await
.unwrap_err();
assert!(matches!(err, ToolError::NotAvailable { .. }));
}
#[tokio::test]
async fn rejects_input_missing_both_prompt_and_prompts() {
let tool = tool_without_client();
let err = tool.execute(json!({}), &ctx()).await.unwrap_err();
// The not-available branch fires first when there's no client; that
// catches users with no API key. To exercise the missing-prompts
// branch directly we'd need a stub client. The schema docs cover
// the contract, and the integration test below pins the behaviour
// via an actual call when a client is wired.
assert!(matches!(err, ToolError::NotAvailable { .. }));
}
#[test]
fn extract_text_joins_text_blocks_and_skips_others() {
let blocks = vec![
ContentBlock::Text {
text: "first".to_string(),
cache_control: None,
},
ContentBlock::Thinking {
thinking: "ignored".to_string(),
},
ContentBlock::Text {
text: "second".to_string(),
cache_control: None,
},
];
assert_eq!(extract_text(&blocks, 0, None), "first\nsecond");
}
#[test]
fn extract_text_returns_empty_when_no_text_blocks() {
let blocks = vec![ContentBlock::Thinking {
thinking: "no visible text".to_string(),
}];
assert_eq!(
extract_text(&blocks, 0, None),
"[Child 0 thinking fallback]\nno visible text"
);
}
#[test]
fn extract_text_returns_sentinel_when_all_empty() {
let blocks = vec![];
assert_eq!(
extract_text(&blocks, 1, Some("length")),
"[empty response from child idx 1 — finish_reason=length, raise max_tokens]"
);
}
#[test]
fn default_model_is_flash() {
let tool = tool_without_client();
assert_eq!(tool.default_model, DEFAULT_CHILD_MODEL);
assert_eq!(DEFAULT_CHILD_MODEL, "deepseek-v4-flash");
}
#[test]
fn max_parallel_cap_is_sixteen() {
// The cap is documented in the schema description and enforced in
// execute(); pin it here so a future refactor doesn't silently
// raise the ceiling without a deliberate decision.
assert_eq!(MAX_PARALLEL, 16);
}
#[test]
fn approval_is_auto_so_calls_are_unattended() {
// RLM children are read-only LLM completions — the user shouldn't
// be prompted to approve every fan-out call.
let tool = tool_without_client();
assert_eq!(tool.approval_requirement(), ApprovalRequirement::Auto);
}
#[test]
fn supports_parallel_dispatch() {
// Tells the engine it's safe to issue concurrent rlm_query tool
// calls in one assistant turn (e.g. when the model emits multiple
// tool_calls for fan-out).
let tool = tool_without_client();
assert!(tool.supports_parallel());
}
#[test]
fn capabilities_mark_network_and_read_only() {
let tool = tool_without_client();
let caps = tool.capabilities();
assert!(caps.contains(&ToolCapability::Network));
assert!(caps.contains(&ToolCapability::ReadOnly));
}
}
+5 -11
View File
@@ -381,17 +381,11 @@ impl ToolRegistryBuilder {
self.with_tool(Arc::new(ApplyPatchTool))
}
/// Include the native RLM tool (`rlm_query`). Parallel/batched LLM
/// fan-out runs through the existing DeepSeek client.
#[must_use]
pub fn with_parallel_fanout_tool(self, client: Option<DeepSeekClient>) -> Self {
use super::parallel_fanout::ParallelFanoutTool;
self.with_tool(Arc::new(ParallelFanoutTool::new(client)))
}
/// Include the heavy-lift RLM tool (`rlm_process`). Runs the full
/// recursive language-model loop on a long input (file or inline
/// content); the long input never enters the calling model's context.
/// Include the RLM tool (`rlm`). Runs the full recursive language-model
/// loop on a long input (file or inline content); the long input never
/// enters the calling model's context window. The Python REPL exposes
/// `llm_query` / `llm_query_batched` / `rlm_query` / `rlm_query_batched`
/// helpers for sub-LLM work — that's where parallel fan-out belongs.
#[must_use]
pub fn with_rlm_tool(self, client: Option<DeepSeekClient>, root_model: String) -> Self {
use super::rlm::RlmTool;
+3 -3
View File
@@ -653,7 +653,7 @@ impl SubAgentManager {
if self.running_count() >= self.max_agents {
return Err(anyhow!(
"Sub-agent limit reached (max {}, running {}). Cancel, close, or wait for an existing agent to finish. Consider rlm_query (max 16 children) for parallel one-shot queries instead.",
"Sub-agent limit reached (max {}, running {}). Cancel, close, or wait for an existing agent to finish. Consider issuing multiple tool calls in one turn (the dispatcher runs them in parallel) for parallel one-shot work.",
self.max_agents,
self.running_count()
));
@@ -761,7 +761,7 @@ impl SubAgentManager {
if self.running_count() >= self.max_agents {
return Err(anyhow!(
"Sub-agent limit reached (max {}, running {}). Close or wait for an existing agent before resuming. Consider rlm_query (max 16 children) for parallel one-shot queries instead.",
"Sub-agent limit reached (max {}, running {}). Close or wait for an existing agent before resuming. Consider issuing multiple tool calls in one turn (the dispatcher runs them in parallel) for parallel one-shot work.",
self.max_agents,
self.running_count()
));
@@ -1073,7 +1073,7 @@ impl ToolSpec for AgentSpawnTool {
"Spawn a background sub-agent for a focused task. Returns an agent_id immediately; \
follow with agent_result to retrieve the final result. Max 5 in flight (each is a \
full sub-agent loop; cancel or wait if you hit the cap). For parallel one-shot LLM \
queries (cheaper, up to 16 children per call), use rlm_query instead."
queries, just emit multiple tool calls in one turn the dispatcher runs them in parallel."
}
fn input_schema(&self) -> Value {
+8 -8
View File
@@ -898,9 +898,9 @@ pub struct GenericToolCell {
pub status: ToolStatus,
pub input_summary: Option<String>,
pub output: Option<String>,
/// When the tool is `rlm_query` (or any future fan-out tool that exposes a
/// list of child prompts), each prompt is shown on its own indented row
/// instead of the inline `args:` summary. `None` for ordinary tools.
/// Optional list of per-child prompts. When populated (by any future
/// fan-out tool), each prompt is shown on its own indented row instead
/// of the inline `args:` summary. `None` for ordinary tools.
pub prompts: Option<Vec<String>>,
}
@@ -932,8 +932,8 @@ impl GenericToolCell {
));
// Prefer per-prompt rows over the generic args summary when the tool
// exposes a list of child prompts (rlm_query). One row per child with
// a `[i]` index makes the fan-out legible without expanding JSON.
// exposes a list of child prompts. One row per child with a `[i]`
// index makes the fan-out legible without expanding JSON.
let show_prompts = matches!(self.status, ToolStatus::Running) || self.output.is_none();
if show_prompts
&& let Some(prompts) = self.prompts.as_ref()
@@ -2224,12 +2224,12 @@ mod tests {
}
#[test]
fn generic_tool_cell_renders_rlm_prompts_as_indexed_rows() {
// When prompts are populated (rlm_query fan-out), each child shows on
fn generic_tool_cell_renders_prompts_as_indexed_rows() {
// When prompts are populated by a fan-out tool, each child shows on
// its own row instead of the inline `args:` summary so the user can
// read what each child was asked.
let cell = HistoryCell::Tool(ToolCell::Generic(GenericToolCell {
name: "parallel_fanout".to_string(),
name: "future_fanout_tool".to_string(),
status: ToolStatus::Running,
input_summary: Some("prompts: <3 items>".to_string()),
output: None,
+5 -21
View File
@@ -4605,27 +4605,11 @@ fn handle_tool_call_started(app: &mut App, id: &str, name: &str, input: &serde_j
);
}
/// Extract per-child prompts from a fan-out tool's input. For `rlm_query` the
/// renderer shows one row per child instead of an inline JSON summary so the
/// user can read what each child was asked. Returns `None` for tools that
/// don't expose a prompt list.
fn extract_fanout_prompts(name: &str, input: &serde_json::Value) -> Option<Vec<String>> {
if name != "parallel_fanout" {
return None;
}
if let Some(arr) = input.get("prompts").and_then(|v| v.as_array()) {
let prompts: Vec<String> = arr
.iter()
.filter_map(|v| v.as_str().map(str::to_string))
.collect();
if prompts.is_empty() {
return None;
}
return Some(prompts);
}
if let Some(s) = input.get("prompt").and_then(|v| v.as_str()) {
return Some(vec![s.to_string()]);
}
/// Extract per-child prompts from a fan-out tool's input. Currently no
/// top-level tool exposes a prompt list — fan-out lives inside the RLM
/// REPL via `llm_query_batched`. Kept as a stable hook for any future
/// fan-out tool we add.
fn extract_fanout_prompts(_name: &str, _input: &serde_json::Value) -> Option<Vec<String>> {
None
}
+6 -72
View File
@@ -1847,82 +1847,16 @@ fn second_thinking_block_appends_new_entry_in_same_active_cell() {
);
}
// ---- rlm_query per-child prompt wiring ----
// ---- per-child prompt wiring ----
//
// When `handle_tool_call_started` receives an `rlm_query` call with a
// `prompts` array, the resulting `GenericToolCell` must carry the parsed
// prompts so the TUI can render one row per child (see
// `GenericToolCell::lines_with_motion` and the `show_prompts` branch in
// `history.rs`).
#[test]
fn rlm_query_tool_cell_wired_with_prompts_on_start() {
let mut app = create_test_app();
handle_tool_call_started(
&mut app,
"rlm-1",
"parallel_fanout",
&serde_json::json!({
"prompts": [
"What is the capital of France?",
"List all public types in client.rs",
"Summarize the README"
]
}),
);
// The cell must be live in the active_cell slot (turn not yet complete).
let active = app.active_cell.as_ref().expect("active cell present");
let HistoryCell::Tool(ToolCell::Generic(generic)) = &active.entries()[0] else {
panic!("expected GenericToolCell for rlm_query");
};
assert_eq!(generic.name, "parallel_fanout");
assert_eq!(generic.status, ToolStatus::Running);
// Core assertion: prompts populated from the JSON input.
let prompts = generic
.prompts
.as_ref()
.expect("rlm_query cell must have prompts populated");
assert_eq!(prompts.len(), 3);
assert_eq!(prompts[0], "What is the capital of France?");
assert_eq!(prompts[1], "List all public types in client.rs");
assert_eq!(prompts[2], "Summarize the README");
}
#[test]
fn rlm_query_singular_prompt_wired_as_single_element_vec() {
// When the model passes `prompt` (singular) instead of `prompts`,
// the cell should still populate a one-element prompts vec so the
// renderer shows the child's question.
let mut app = create_test_app();
handle_tool_call_started(
&mut app,
"rlm-2",
"parallel_fanout",
&serde_json::json!({ "prompt": "Explain the engine loop" }),
);
let active = app.active_cell.as_ref().expect("active cell present");
let HistoryCell::Tool(ToolCell::Generic(generic)) = &active.entries()[0] else {
panic!("expected GenericToolCell for rlm_query");
};
let prompts = generic
.prompts
.as_ref()
.expect("singular prompt must populate prompts vec");
assert_eq!(prompts.len(), 1);
assert_eq!(prompts[0], "Explain the engine loop");
}
// `extract_fanout_prompts` is the hook for any future fan-out tool that
// wants its child prompts rendered as one row per child. Right now no
// top-level tool populates it (fan-out lives inside the RLM REPL via
// `llm_query_batched`), so the path always returns `None`.
#[test]
fn non_fanout_tool_does_not_populate_prompts() {
// Tools other than rlm_query must not get a prompts vec — they use
// the standard `args:` summary rendering path.
// Ordinary tools must use the standard `args:` summary rendering path.
let mut app = create_test_app();
handle_tool_call_started(
-99
View File
@@ -1,99 +0,0 @@
# Product Catalog - Winter 2025
## Electronics
### SKU-001: QuantumBook Pro Laptop
- Price: $1,299.99
- Category: Computing
- Stock: 45 units
- Rating: 4.7/5
- Features: 16" 4K display, 32GB RAM, 1TB SSD, AI accelerator
- Warranty: 2 years
- Supplier: TechGlobal Inc.
### SKU-002: SmartWatch X5
- Price: $349.99
- Category: Wearables
- Stock: 120 units
- Rating: 4.3/5
- Features: Heart rate, ECG, GPS, 7-day battery
- Warranty: 1 year
- Supplier: GadgetWorld Ltd.
### SKU-003: NoiseCancel Pro Headphones
- Price: $199.99
- Category: Audio
- Stock: 8 units (LOW STOCK)
- Rating: 4.9/5
- Features: ANC, 40hr battery, spatial audio
- Warranty: 1 year
- Supplier: AudioTech Corp.
## Home & Kitchen
### SKU-004: SmartBrew Coffee Maker
- Price: $89.99
- Category: Kitchen Appliances
- Stock: 200 units
- Rating: 4.5/5
- Features: App-controlled, 12-cup, thermal carafe
- Warranty: 2 years
- Supplier: HomeEssentials LLC
### SKU-005: AeroChef Air Fryer
- Price: $129.99
- Category: Kitchen Appliances
- Stock: 75 units
- Rating: 4.6/5
- Features: 8qt capacity, 10 presets, dishwasher safe
- Warranty: 1 year
- Supplier: HomeEssentials LLC
### SKU-006: PureFlow Water Filter
- Price: $49.99
- Category: Kitchen Accessories
- Stock: 0 units (OUT OF STOCK)
- Rating: 4.4/5
- Features: 3-stage filtration, 6-month filter life
- Warranty: 1 year
- Supplier: CleanWater Systems
## Sports & Outdoors
### SKU-007: TrailBlazer Hiking Boots
- Price: $159.99
- Category: Footwear
- Stock: 60 units
- Rating: 4.8/5
- Features: Waterproof, Vibram sole, ankle support
- Warranty: 1 year
- Supplier: OutdoorGear Co.
### SKU-008: FlexCore Yoga Mat
- Price: $39.99
- Category: Fitness
- Stock: 300 units
- Rating: 4.2/5
- Features: 6mm thick, non-slip, carrying strap
- Warranty: 6 months
- Supplier: FitLife Products
### SKU-009: PowerLift Adjustable Dumbbells
- Price: $299.99
- Category: Fitness
- Stock: 15 units
- Rating: 4.7/5
- Features: 5-52.5lbs range, quick-change, storage tray
- Warranty: 2 years
- Supplier: FitLife Products
## Automotive
### SKU-010: DashCam Ultra 4K
- Price: $179.99
- Category: Car Electronics
- Stock: 35 units
- Rating: 4.5/5
- Features: 4K recording, night vision, GPS, parking mode
- Warranty: 1 year
- Supplier: AutoGadget Inc.
-51
View File
@@ -1,51 +0,0 @@
This is a test document for rlm_process.
# Security Model Overview
The system uses a zero-trust architecture where every request is authenticated and authorized independently.
## Authentication
Authentication is handled via JWT tokens with a 15-minute expiry. Refresh tokens are stored in an HTTP-only cookie.
## Authorization
Role-based access control (RBAC) is used with three tiers:
- Admin: full access
- Editor: can modify content but not system settings
- Viewer: read-only access
## Encryption
All data at rest is encrypted using AES-256-GCM. Data in transit uses TLS 1.3.
## Audit Logging
Every action is logged to an append-only audit trail stored in a separate database.
# API Endpoints
- GET /api/users - List users (Admin only)
- POST /api/users - Create user (Admin only)
- GET /api/documents - List documents
- POST /api/documents - Create document (Admin, Editor)
- PUT /api/documents/:id - Update document (Admin, Editor)
- DELETE /api/documents/:id - Delete document (Admin only)
# Data Model
User {
id: UUID
email: String
role: Enum(Admin, Editor, Viewer)
created_at: Timestamp
}
Document {
id: UUID
title: String
content: Text
author_id: UUID (FK -> User)
created_at: Timestamp
updated_at: Timestamp
}