refactor(rlm): drop HTTP sidecar — long-lived Python REPL over stdin/stdout
The RLM tool used to spawn a fresh `python3 -c "..."` per round and route sub-LLM calls through a localhost axum sidecar; state persisted only via a JSON file (lossy: imports and non-JSON values were lost). The model could also short-circuit by replying with prose and the loop would ship the prose as if it came from the REPL. This commit replaces that with one long-lived `python3 -u` subprocess per turn driven by a stdin/stdout RPC protocol with UUID-prefixed sentinels. No more HTTP server, no more port allocation, no more JSON state file — variables, imports, and any other Python state persist naturally across rounds. The `RlmBridge` (`crates/tui/src/rlm/bridge.rs`) services `llm_query` / `llm_query_batched` / `rlm_query` / `rlm_query_batched` calls inline, recursing into `run_rlm_turn_inner` for sub-RLMs. The system prompt is tightened: the only legal turn shape is one ` ```repl ` block; calling `FINAL(...)` from prose without ever invoking a sub-LLM is rejected with a strict reminder. The `DirectAnswer` termination is gone, replaced by `NoCode` which only surfaces after multiple consecutive empty rounds. `rlm_process` now returns a per-round trace (code summary, sub-LLM call count, elapsed) so callers can verify the model actually engaged with `context` rather than guessing from the preview. Net: -313 lines. 17 new REPL runtime tests cover variable persistence, import persistence, RPC round-trips, FINAL capture, and error recovery. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,31 +1,10 @@
|
||||
//! REPL runtime for paper-spec RLM (Zhang et al., arXiv:2512.24601).
|
||||
//!
|
||||
//! Manages a persistent Python subprocess that can execute code blocks,
|
||||
//! call `llm_query()` for recursive sub-LLM calls, and return results
|
||||
//! via `FINAL()` / `FINAL_VAR()` patterns.
|
||||
//!
|
||||
//! ## Architecture
|
||||
//!
|
||||
//! - `PythonRuntime` — owns the Python subprocess lifecycle, sends code
|
||||
//! via stdin, collects stdout/stderr with truncation.
|
||||
//! - `LlmQueryFn` — injected into the Python namespace as `llm_query(prompt)`.
|
||||
//! Calls back to Rust which dispatches a one-shot DeepSeek API completion.
|
||||
//! - `ReplOutput` — parsed result from a REPL execution round, carrying
|
||||
//! stdout text, whether a FINAL was detected, and any error signals.
|
||||
//! Long-lived Python REPL runtime used by the RLM loop and by inline
|
||||
//! `` ```repl `` block execution in the agent loop.
|
||||
|
||||
pub mod runtime;
|
||||
pub mod sandbox;
|
||||
|
||||
pub use runtime::PythonRuntime;
|
||||
pub use sandbox::{ReplOutput, inject_llm_query_fn, parse_final};
|
||||
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
/// Shared handle to a long-lived Python REPL session.
|
||||
pub type SharedRepl = Arc<Mutex<Option<PythonRuntime>>>;
|
||||
|
||||
/// Create a new shared REPL handle (initially uninitialized — lazy start).
|
||||
pub fn new_shared_repl() -> SharedRepl {
|
||||
Arc::new(Mutex::new(None))
|
||||
}
|
||||
pub use runtime::{
|
||||
BatchResp, PythonRuntime, ReplRound, RpcDispatcher, RpcRequest, RpcResponse, SingleResp,
|
||||
};
|
||||
pub use sandbox::{ReplBlock, extract_repl_blocks, has_repl_block};
|
||||
|
||||
+763
-366
File diff suppressed because it is too large
Load Diff
+10
-138
@@ -1,120 +1,30 @@
|
||||
//! REPL sandbox utilities: FINAL/FINAL_VAR parsing, llm_query injection,
|
||||
//! and the ReplOutput type.
|
||||
//! REPL fence-extraction utilities.
|
||||
//!
|
||||
//! The agent's main loop scans assistant text for ` ```repl ` fenced blocks
|
||||
//! and feeds them to a [`crate::repl::runtime::PythonRuntime`]. Capturing
|
||||
//! `FINAL(...)` and routing sub-LLM RPCs are handled inside the runtime via
|
||||
//! a stdin/stdout protocol — no scraping required here.
|
||||
|
||||
/// Output from a REPL execution round.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ReplOutput {
|
||||
/// Cleaned stdout (protocol lines removed).
|
||||
pub stdout: String,
|
||||
/// Raw stdout including protocol lines.
|
||||
pub raw_stdout: String,
|
||||
/// Whether the round had an error.
|
||||
pub has_error: bool,
|
||||
/// If FINAL() or FINAL_VAR() was called, the value.
|
||||
pub final_value: Option<String>,
|
||||
/// Any llm_query() calls that were detected (prompt, model, max_tokens).
|
||||
pub llm_queries: Vec<LlmQueryRequest>,
|
||||
}
|
||||
|
||||
/// A request from Python's `llm_query()` function.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct LlmQueryRequest {
|
||||
pub prompt: String,
|
||||
pub model: Option<String>,
|
||||
pub max_tokens: Option<u32>,
|
||||
}
|
||||
|
||||
/// Parse a stdout string into a ReplOutput, extracting FINAL markers
|
||||
/// and cleaning protocol lines.
|
||||
pub fn parse_final(raw_stdout: &str) -> (String, Option<String>) {
|
||||
let mut final_value: Option<String> = None;
|
||||
let mut cleaned = String::new();
|
||||
|
||||
for line in raw_stdout.lines() {
|
||||
if let Some(val) = line.strip_prefix("__REPL_FINAL__::") {
|
||||
// Parse the JSON-encoded final value.
|
||||
if let Ok(parsed) = serde_json::from_str::<String>(val) {
|
||||
final_value = Some(parsed);
|
||||
} else {
|
||||
// Fallback: use the raw text after the prefix.
|
||||
final_value = Some(val.to_string());
|
||||
}
|
||||
continue;
|
||||
}
|
||||
// Skip other protocol lines.
|
||||
if line.starts_with("__REPL_LLM_QUERY__::")
|
||||
|| line.starts_with("__REPL_DONE__")
|
||||
|| line.starts_with("__REPL_READY__")
|
||||
{
|
||||
continue;
|
||||
}
|
||||
cleaned.push_str(line);
|
||||
cleaned.push('\n');
|
||||
}
|
||||
|
||||
(cleaned.trim().to_string(), final_value)
|
||||
}
|
||||
|
||||
/// Generate the Python code that injects `llm_query()` with a callback
|
||||
/// mechanism. The function writes a JSON request to stdout, and the Rust
|
||||
/// side reads it, dispatches the API call, and writes the result back.
|
||||
///
|
||||
/// In practice, the `llm_query()` stub in the bootstrap does this via
|
||||
/// `print('__REPL_LLM_QUERY__::...')` and we handle the dispatch on the
|
||||
/// Rust side. For a single round, we pre-compute all llm_query results
|
||||
/// before executing the code.
|
||||
pub fn inject_llm_query_fn(
|
||||
bootstrap: &str,
|
||||
queries: &[(usize, &str)], // (id, result)
|
||||
) -> String {
|
||||
// Replace the stub llm_query with one that returns pre-computed results.
|
||||
let mock_results: Vec<String> = queries
|
||||
.iter()
|
||||
.map(|(id, result)| format!(" {id}: {result:?}"))
|
||||
.collect();
|
||||
let mock_dict = format!("{{\n{}\n}}", mock_results.join(",\n"));
|
||||
|
||||
let override_fn = format!(
|
||||
r#"
|
||||
_llm_query_results = {mock_dict}
|
||||
_llm_query_idx = [0]
|
||||
def llm_query(prompt, model=None, max_tokens=None):
|
||||
idx = _llm_query_idx[0]
|
||||
_llm_query_idx[0] += 1
|
||||
result = _llm_query_results.get(idx, f'[llm_query: idx {{idx}} not found]')
|
||||
return result
|
||||
"#
|
||||
);
|
||||
|
||||
bootstrap.replace(
|
||||
"def llm_query(prompt, model=None, max_tokens=None):\n return f'[llm_query stub: {str(prompt)[:100]}...]'",
|
||||
&override_fn,
|
||||
)
|
||||
}
|
||||
|
||||
/// Check if a string contains a ```repl fenced code block.
|
||||
/// Check if a string contains a `` ```repl `` fenced code block.
|
||||
pub fn has_repl_block(text: &str) -> bool {
|
||||
text.contains("```repl")
|
||||
}
|
||||
|
||||
/// Extract all ```repl code blocks from text.
|
||||
/// Returns a list of (code, start_offset, end_offset).
|
||||
/// Extract every `` ```repl `` block from `text` with byte offsets.
|
||||
pub fn extract_repl_blocks(text: &str) -> Vec<ReplBlock> {
|
||||
let mut blocks = Vec::new();
|
||||
let mut rest = text;
|
||||
|
||||
while let Some(start_idx) = rest.find("```repl") {
|
||||
let after_fence = &rest[start_idx..];
|
||||
// Find the end of the opening fence line.
|
||||
let code_start = after_fence.find('\n').unwrap_or(after_fence.len());
|
||||
let code_region = &after_fence[code_start..];
|
||||
// Find the closing ```.
|
||||
let Some(end_offset) = code_region.find("\n```") else {
|
||||
break;
|
||||
};
|
||||
let code = code_region[..end_offset].to_string();
|
||||
let global_start = text.len() - rest.len() + start_idx;
|
||||
let global_end = global_start + code_start + end_offset + 3; // 3 for "```\n"
|
||||
let global_end = global_start + code_start + end_offset + 3;
|
||||
blocks.push(ReplBlock {
|
||||
code,
|
||||
start_offset: global_start,
|
||||
@@ -126,7 +36,7 @@ pub fn extract_repl_blocks(text: &str) -> Vec<ReplBlock> {
|
||||
blocks
|
||||
}
|
||||
|
||||
/// A ```repl code block with position info.
|
||||
/// A `` ```repl `` code block with byte-offset position info.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ReplBlock {
|
||||
pub code: String,
|
||||
@@ -134,38 +44,10 @@ pub struct ReplBlock {
|
||||
pub end_offset: usize,
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn parse_final_detects_value() {
|
||||
let raw = "hello\n__REPL_FINAL__::\"the answer\"\nworld";
|
||||
let (cleaned, final_val) = parse_final(raw);
|
||||
assert_eq!(final_val.as_deref(), Some("the answer"));
|
||||
assert!(cleaned.contains("hello"));
|
||||
assert!(!cleaned.contains("__REPL_FINAL__"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_final_no_final_returns_none() {
|
||||
let raw = "just some output\nnothing special";
|
||||
let (cleaned, final_val) = parse_final(raw);
|
||||
assert_eq!(final_val, None);
|
||||
assert_eq!(cleaned, "just some output\nnothing special");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_final_handles_non_json_value() {
|
||||
let raw = "__REPL_FINAL__::plain text value";
|
||||
let (_, final_val) = parse_final(raw);
|
||||
assert_eq!(final_val.as_deref(), Some("plain text value"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn has_repl_block_detects_fence() {
|
||||
assert!(has_repl_block("some text ```repl\ncode\n``` more"));
|
||||
@@ -195,14 +77,4 @@ mod tests {
|
||||
let blocks = extract_repl_blocks("no blocks here");
|
||||
assert!(blocks.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn inject_llm_query_replaces_stub() {
|
||||
let bootstrap = "def llm_query(prompt, model=None, max_tokens=None):\n return f'[llm_query stub: {str(prompt)[:100]}...]'";
|
||||
let result = inject_llm_query_fn(bootstrap, &[(0, "result0"), (1, "result1")]);
|
||||
assert!(!result.contains("llm_query stub"));
|
||||
assert!(result.contains("_llm_query_results"));
|
||||
assert!(result.contains("result0"));
|
||||
assert!(result.contains("result1"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
pub mod repl;
|
||||
@@ -0,0 +1,257 @@
|
||||
//! RPC bridge that services `llm_query` / `rlm_query` calls coming back
|
||||
//! from the long-lived Python REPL during an RLM turn.
|
||||
//!
|
||||
//! This is the spiritual successor to the HTTP sidecar from earlier
|
||||
//! versions — except instead of binding a localhost port and routing
|
||||
//! through `urllib`, requests come in through stdin/stdout and we just
|
||||
//! call the LLM client directly here in Rust.
|
||||
//!
|
||||
//! The bridge tracks cumulative token usage and the recursion budget. For
|
||||
//! `Rlm` / `RlmBatch` requests it recursively calls `run_rlm_turn_inner`
|
||||
//! at depth-1; the future-type cycle (bridge → run_rlm_turn_inner →
|
||||
//! bridge) is broken by `run_rlm_turn_inner` returning a boxed dyn future.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use futures_util::future::join_all;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::client::DeepSeekClient;
|
||||
use crate::llm_client::LlmClient as _;
|
||||
use crate::models::{ContentBlock, Message, MessageRequest, SystemPrompt, Usage};
|
||||
use crate::repl::runtime::{BatchResp, RpcDispatcher, RpcRequest, RpcResponse, SingleResp};
|
||||
|
||||
/// Per-child completion timeout — same as the previous sidecar default.
|
||||
const CHILD_TIMEOUT_SECS: u64 = 120;
|
||||
/// Default `max_tokens` for one-shot child completions.
|
||||
const DEFAULT_CHILD_MAX_TOKENS: u32 = 4096;
|
||||
/// Hard cap on prompts per batch RPC.
|
||||
pub const MAX_BATCH: usize = 16;
|
||||
|
||||
/// State shared with the bridge across all RPC calls in one turn.
|
||||
pub struct RlmBridge {
|
||||
pub client: DeepSeekClient,
|
||||
pub child_model: String,
|
||||
/// Recursion budget remaining for `Rlm` / `RlmBatch` requests. When
|
||||
/// zero, those requests fall back to plain `Llm` completions.
|
||||
pub depth_remaining: u32,
|
||||
pub usage: Arc<Mutex<Usage>>,
|
||||
}
|
||||
|
||||
impl RlmBridge {
|
||||
pub fn new(client: DeepSeekClient, child_model: String, depth_remaining: u32) -> Self {
|
||||
Self {
|
||||
client,
|
||||
child_model,
|
||||
depth_remaining,
|
||||
usage: Arc::new(Mutex::new(Usage::default())),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn usage_handle(&self) -> Arc<Mutex<Usage>> {
|
||||
Arc::clone(&self.usage)
|
||||
}
|
||||
|
||||
async fn dispatch_llm(
|
||||
&self,
|
||||
prompt: String,
|
||||
model: Option<String>,
|
||||
max_tokens: Option<u32>,
|
||||
system: Option<String>,
|
||||
) -> SingleResp {
|
||||
let request = MessageRequest {
|
||||
model: model
|
||||
.filter(|m| !m.is_empty())
|
||||
.unwrap_or_else(|| self.child_model.clone()),
|
||||
messages: vec![Message {
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentBlock::Text {
|
||||
text: prompt,
|
||||
cache_control: None,
|
||||
}],
|
||||
}],
|
||||
max_tokens: max_tokens.unwrap_or(DEFAULT_CHILD_MAX_TOKENS),
|
||||
system: system.map(SystemPrompt::Text),
|
||||
tools: None,
|
||||
tool_choice: None,
|
||||
metadata: None,
|
||||
thinking: None,
|
||||
reasoning_effort: None,
|
||||
stream: Some(false),
|
||||
temperature: Some(0.4_f32),
|
||||
top_p: Some(0.9_f32),
|
||||
};
|
||||
|
||||
let fut = self.client.create_message(request);
|
||||
let response =
|
||||
match tokio::time::timeout(Duration::from_secs(CHILD_TIMEOUT_SECS), fut).await {
|
||||
Ok(Ok(r)) => r,
|
||||
Ok(Err(e)) => {
|
||||
return SingleResp {
|
||||
text: String::new(),
|
||||
error: Some(format!("llm_query failed: {e}")),
|
||||
};
|
||||
}
|
||||
Err(_) => {
|
||||
return SingleResp {
|
||||
text: String::new(),
|
||||
error: Some(format!("llm_query timed out after {CHILD_TIMEOUT_SECS}s")),
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
let text = response
|
||||
.content
|
||||
.iter()
|
||||
.filter_map(|b| match b {
|
||||
ContentBlock::Text { text, .. } => Some(text.as_str()),
|
||||
_ => None,
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
|
||||
{
|
||||
let mut u = self.usage.lock().await;
|
||||
u.input_tokens = u.input_tokens.saturating_add(response.usage.input_tokens);
|
||||
u.output_tokens = u.output_tokens.saturating_add(response.usage.output_tokens);
|
||||
}
|
||||
|
||||
SingleResp { text, error: None }
|
||||
}
|
||||
|
||||
async fn dispatch_llm_batch(&self, prompts: Vec<String>, model: Option<String>) -> BatchResp {
|
||||
if prompts.is_empty() {
|
||||
return BatchResp { results: vec![] };
|
||||
}
|
||||
if prompts.len() > MAX_BATCH {
|
||||
return BatchResp {
|
||||
results: prompts
|
||||
.iter()
|
||||
.map(|_| SingleResp {
|
||||
text: String::new(),
|
||||
error: Some(format!("batch too large: {} > {MAX_BATCH}", prompts.len())),
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
}
|
||||
|
||||
let model = Arc::new(
|
||||
model
|
||||
.filter(|m| !m.is_empty())
|
||||
.unwrap_or_else(|| self.child_model.clone()),
|
||||
);
|
||||
|
||||
let futures = prompts.into_iter().map(|prompt| {
|
||||
let model = Arc::clone(&model);
|
||||
async move {
|
||||
self.dispatch_llm((*prompt).to_string(), Some((*model).clone()), None, None)
|
||||
.await
|
||||
}
|
||||
});
|
||||
|
||||
BatchResp {
|
||||
results: join_all(futures).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn dispatch_rlm(&self, prompt: String, model: Option<String>) -> SingleResp {
|
||||
if self.depth_remaining == 0 {
|
||||
// Budget exhausted — fall back to a one-shot child completion
|
||||
// rather than returning an error. Matches the paper's behaviour
|
||||
// ("sub_RLM gracefully degrades to llm_query at depth=0").
|
||||
return self.dispatch_llm(prompt, model, None, None).await;
|
||||
}
|
||||
|
||||
// Build a drain channel to absorb status events from the nested
|
||||
// turn (we don't surface them; this dispatch is invisible to the
|
||||
// outer agent stream).
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(64);
|
||||
let drain = tokio::spawn(async move { while rx.recv().await.is_some() {} });
|
||||
|
||||
let child_model = model
|
||||
.filter(|m| !m.is_empty())
|
||||
.unwrap_or_else(|| self.child_model.clone());
|
||||
|
||||
// Recursive call. The dyn-erasure on `run_rlm_turn_inner` breaks
|
||||
// the `bridge → turn → bridge` opaque-future cycle.
|
||||
let result = super::turn::run_rlm_turn_inner(
|
||||
&self.client,
|
||||
child_model.clone(),
|
||||
prompt,
|
||||
None,
|
||||
child_model,
|
||||
tx,
|
||||
self.depth_remaining.saturating_sub(1),
|
||||
)
|
||||
.await;
|
||||
|
||||
drain.abort();
|
||||
|
||||
{
|
||||
let mut u = self.usage.lock().await;
|
||||
u.input_tokens = u.input_tokens.saturating_add(result.usage.input_tokens);
|
||||
u.output_tokens = u.output_tokens.saturating_add(result.usage.output_tokens);
|
||||
}
|
||||
|
||||
SingleResp {
|
||||
text: result.answer,
|
||||
error: result.error,
|
||||
}
|
||||
}
|
||||
|
||||
async fn dispatch_rlm_batch(&self, prompts: Vec<String>, model: Option<String>) -> BatchResp {
|
||||
if prompts.is_empty() {
|
||||
return BatchResp { results: vec![] };
|
||||
}
|
||||
if prompts.len() > MAX_BATCH {
|
||||
return BatchResp {
|
||||
results: prompts
|
||||
.iter()
|
||||
.map(|_| SingleResp {
|
||||
text: String::new(),
|
||||
error: Some(format!("batch too large: {} > {MAX_BATCH}", prompts.len())),
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
}
|
||||
|
||||
let model = Arc::new(model);
|
||||
let futures = prompts.into_iter().map(|p| {
|
||||
let model = Arc::clone(&model);
|
||||
async move { self.dispatch_rlm(p, (*model).clone()).await }
|
||||
});
|
||||
BatchResp {
|
||||
results: join_all(futures).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RpcDispatcher for RlmBridge {
|
||||
fn dispatch<'a>(
|
||||
&'a self,
|
||||
req: RpcRequest,
|
||||
) -> std::pin::Pin<Box<dyn std::future::Future<Output = RpcResponse> + Send + 'a>> {
|
||||
Box::pin(async move {
|
||||
match req {
|
||||
RpcRequest::Llm {
|
||||
prompt,
|
||||
model,
|
||||
max_tokens,
|
||||
system,
|
||||
} => {
|
||||
RpcResponse::Single(self.dispatch_llm(prompt, model, max_tokens, system).await)
|
||||
}
|
||||
RpcRequest::LlmBatch { prompts, model } => {
|
||||
RpcResponse::Batch(self.dispatch_llm_batch(prompts, model).await)
|
||||
}
|
||||
RpcRequest::Rlm { prompt, model } => {
|
||||
RpcResponse::Single(self.dispatch_rlm(prompt, model).await)
|
||||
}
|
||||
RpcRequest::RlmBatch { prompts, model } => {
|
||||
RpcResponse::Batch(self.dispatch_rlm_batch(prompts, model).await)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
+12
-12
@@ -1,7 +1,6 @@
|
||||
//! True Recursive Language Model (RLM) loop — paper-spec Algorithm 1.
|
||||
//! Recursive Language Model (RLM) loop — paper-spec Algorithm 1.
|
||||
//!
|
||||
//! Implements the RLM inference paradigm from Zhang, Kraska, Khattab
|
||||
//! (arXiv:2512.24601, §2 Algorithm 1):
|
||||
//! Implements Zhang, Kraska & Khattab (arXiv:2512.24601, §2 Algorithm 1):
|
||||
//!
|
||||
//! ```text
|
||||
//! state ← InitREPL(prompt=P)
|
||||
@@ -15,17 +14,18 @@
|
||||
//! return state[Final]
|
||||
//! ```
|
||||
//!
|
||||
//! Key invariants:
|
||||
//! - P is stored as a REPL variable, NEVER in the LLM's context window.
|
||||
//! - Only metadata about state/stdout goes to the LLM — constant-size context.
|
||||
//! - The LLM generates Python code, not free text.
|
||||
//! - The REPL exposes `llm_query()` (one-shot child) and `sub_rlm()` (recursive
|
||||
//! RLM call); both are serviced by an in-process HTTP sidecar so Python can
|
||||
//! call them synchronously via `urllib`.
|
||||
//! Invariants:
|
||||
//! - `P` is held only as a REPL variable (`context` / `ctx`); never
|
||||
//! appears in the root LLM's window.
|
||||
//! - The root LLM receives small metadata messages — length, preview,
|
||||
//! helper list, prior-round summary.
|
||||
//! - Code rounds and sub-LLM calls travel over a single stdin/stdout
|
||||
//! pipe to a long-lived `python3 -u` subprocess. No HTTP sidecar.
|
||||
|
||||
pub mod bridge;
|
||||
pub mod prompt;
|
||||
pub mod sidecar;
|
||||
pub mod turn;
|
||||
|
||||
pub use bridge::RlmBridge;
|
||||
pub use prompt::rlm_system_prompt;
|
||||
pub use turn::{RlmTurnResult, run_rlm_turn};
|
||||
pub use turn::{RlmTermination, RlmTurnResult, run_rlm_turn, run_rlm_turn_with_root};
|
||||
|
||||
@@ -1,41 +1,33 @@
|
||||
//! RLM system prompt — adapted from the reference RLM implementation
|
||||
//! (alexzhang13/rlm) and Zhang et al., arXiv:2512.24601, so the same
|
||||
//! decomposition strategies and prompt patterns apply here.
|
||||
//! RLM system prompt — adapted from the reference implementation
|
||||
//! (alexzhang13/rlm) and Zhang et al., arXiv:2512.24601.
|
||||
//!
|
||||
//! The prompt is deliberately strict: the only way to make progress is
|
||||
//! through a `repl` block. There is no fall-through prose path.
|
||||
|
||||
use crate::models::SystemPrompt;
|
||||
|
||||
/// Build the system prompt for a Recursive Language Model (RLM) root LLM call.
|
||||
///
|
||||
/// Tells the root model:
|
||||
/// - your context lives in the REPL as `context`
|
||||
/// - emit a single ```repl block per turn
|
||||
/// - reach for `llm_query` / `rlm_query` (and the batched variants) for
|
||||
/// sub-LLM work; never try to fit the whole context into one call
|
||||
/// - end the loop with `FINAL(value)` or `FINAL_VAR(name)`
|
||||
/// Build the system prompt for a Recursive Language Model (RLM) root call.
|
||||
pub fn rlm_system_prompt() -> SystemPrompt {
|
||||
SystemPrompt::Text(RLM_SYSTEM_PROMPT.trim().to_string())
|
||||
}
|
||||
|
||||
const RLM_SYSTEM_PROMPT: &str = r#"You are a Recursive Language Model (RLM). You answer the user's query interactively in a Python REPL that holds the full input as a `context` variable, and you can recursively call sub-LLMs to chunk, decompose, and synthesize answers over it. You will be queried iteratively until you provide a final answer.
|
||||
const RLM_SYSTEM_PROMPT: &str = r#"You are the root of a Recursive Language Model (RLM). Your input lives in a long-running Python REPL as a variable named `context` (alias `ctx`). You DO NOT see `context` in your prompt — only its length and a short preview. The only way to read or compute over it is to write Python code that runs in the REPL.
|
||||
|
||||
The REPL is initialised with:
|
||||
1. `context` — the full input as a string. May be very large; never print it in full.
|
||||
2. `llm_query(prompt, model=None, max_tokens=None, system=None)` — one-shot child LLM call. Fast and lightweight; use for chunk-level extraction, summarization, or Q&A. The child can handle very large prompts (~hundreds of thousands of chars).
|
||||
3. `llm_query_batched(prompts, model=None)` — run many `llm_query` calls concurrently. Returns `list[str]` in input order. Much faster than sequential calls when sub-prompts are independent.
|
||||
4. `rlm_query(prompt, model=None)` — spawn a recursive RLM sub-call for sub-tasks that themselves need multi-step reasoning, code execution, or their own iteration. Falls back to `llm_query` when the recursion budget is exhausted.
|
||||
5. `rlm_query_batched(prompts, model=None)` — multiple recursive RLM sub-calls in parallel.
|
||||
6. `SHOW_VARS()` — list user-created REPL variables and their types.
|
||||
7. `repl_set(name, value)` / `repl_get(name)` — explicit cross-round persistence (note: any JSON-serializable top-level variable already persists automatically).
|
||||
8. `print()` — show output. The driver feeds a (truncated) preview back to you.
|
||||
9. `FINAL(value)` or `FINAL_VAR(name)` — end the loop. Place either on its own line OUTSIDE the ```repl block (preferred) or call as a Python statement INSIDE the block.
|
||||
The REPL exposes:
|
||||
- `context` (alias `ctx`) — the full input string. Often huge — never `print(context)` in full.
|
||||
- `llm_query(prompt, model=None, max_tokens=None, system=None)` — one-shot child LLM. Cheap. Use for chunk-level work.
|
||||
- `llm_query_batched(prompts, model=None)` — concurrent fan-out. Returns `list[str]` in input order.
|
||||
- `rlm_query(prompt, model=None)` — recursive sub-RLM. Use when a sub-task itself needs decomposition.
|
||||
- `rlm_query_batched(prompts, model=None)` — concurrent recursive sub-RLMs.
|
||||
- `SHOW_VARS()` — list user variables and their types.
|
||||
- `repl_set(name, value)` / `repl_get(name)` — explicit cross-round storage.
|
||||
- `print(...)` — diagnostic output. The driver feeds you a truncated preview next round.
|
||||
- `FINAL(value)` — end the loop with this string answer.
|
||||
- `FINAL_VAR(name)` — end the loop with the value of a named variable.
|
||||
|
||||
How to operate
|
||||
Variables, imports, and any other state PERSIST across rounds — the REPL is a single long-lived Python process for the whole turn.
|
||||
|
||||
Each turn, emit ONE ```repl block of Python. The block runs inside the REPL; printed output and any new variables come back to you next turn. End the loop with `FINAL(...)`.
|
||||
|
||||
When to use `llm_query` vs `rlm_query`:
|
||||
- `llm_query` for one-shot work: extracting from a chunk, summarizing, classifying, simple Q&A.
|
||||
- `rlm_query` when the sub-task itself needs decomposition or iteration — i.e. it's RLM-shaped on its own (a long doc → its own chunked summary, a hard sub-question that needs branching).
|
||||
Contract — every turn, output ONE ` ```repl ` block of Python. That's it. No prose-only turns. No "I will do X" — just emit the code that does X.
|
||||
|
||||
Strategy patterns
|
||||
|
||||
@@ -56,7 +48,9 @@ answer = llm_query(f"Synthesize across these section-level extractions:\n\n{comb
|
||||
print(answer[:500])
|
||||
```
|
||||
Then on the next turn:
|
||||
```repl
|
||||
FINAL(answer)
|
||||
```
|
||||
|
||||
3. RECURSIVE decomposition for hard sub-problems.
|
||||
```repl
|
||||
@@ -70,16 +64,16 @@ print(trend, "→", recommendation)
|
||||
import math
|
||||
theta = math.degrees(math.atan2(v_perp, v_parallel))
|
||||
final_answer = llm_query(f"Entry angle is {theta:.2f}°. Phrase the answer for a physics student.")
|
||||
FINAL(final_answer)
|
||||
```
|
||||
Then: FINAL(final_answer)
|
||||
|
||||
Rules
|
||||
|
||||
- Emit exactly one ```repl block per turn (or `FINAL(...)` on its own line to end the loop).
|
||||
- Never print or stuff `context` in its entirety. Slice, sample, or chunk.
|
||||
- Sub-LLMs are powerful — feed them generous chunks (e.g. tens of thousands of chars) rather than padding through tiny windows.
|
||||
- JSON-serializable top-level variables persist across rounds automatically; non-serializable ones (custom objects, file handles) do not.
|
||||
- Do not say "I will do X" — just do it. Output the next ```repl block.
|
||||
- Emit exactly ONE ` ```repl ` block per turn. The block must contain Python code only.
|
||||
- Never `print(context)` or otherwise dump it whole — slice, sample, or chunk.
|
||||
- You MUST call `llm_query` / `llm_query_batched` / `rlm_query` at least once before `FINAL(...)`. Calling FINAL from a top-level prose answer (without ever running a `repl` block that touched `context` via a sub-LLM) is REJECTED — the driver will discard the FINAL and ask you to actually use the REPL.
|
||||
- Sub-LLMs are powerful — feed them generous chunks (tens of thousands of chars), not tiny windows.
|
||||
- Do NOT pad your output with prose like "Here is what I'll do:" — just emit the next ```repl block.
|
||||
"#;
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -108,6 +102,11 @@ mod tests {
|
||||
assert!(body().contains("`context`"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rlm_prompt_mentions_ctx_alias() {
|
||||
assert!(body().contains("`ctx`"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rlm_prompt_mentions_all_helpers() {
|
||||
let s = body();
|
||||
@@ -125,9 +124,13 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rlm_prompt_does_not_promise_plaintext_exit_loophole() {
|
||||
// The old prompt had "just write a short response without code fences
|
||||
// and the RLM loop will end". Make sure that's gone.
|
||||
assert!(!body().contains("without code fences and the RLM loop"));
|
||||
fn rlm_prompt_forbids_prose_shortcut() {
|
||||
// The new contract requires a sub-LLM call before FINAL — the
|
||||
// prompt must say so explicitly so the model doesn't try to bail
|
||||
// with FINAL("...inferred from preview...").
|
||||
assert!(
|
||||
body().contains("REJECTED") || body().contains("rejected"),
|
||||
"system prompt should reject the prose-shortcut path explicitly"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,485 +0,0 @@
|
||||
//! HTTP sidecar that services `llm_query()` and `sub_rlm()` calls made
|
||||
//! from inside the RLM Python REPL.
|
||||
//!
|
||||
//! Why HTTP? The Python REPL runs as a short-lived `python3 -c` subprocess
|
||||
//! per round. We need synchronous request/response between Python (running)
|
||||
//! and Rust (servicing the request) — and we need it to work for arbitrary
|
||||
//! recursion depth. A localhost HTTP server with axum is the cleanest fit:
|
||||
//! Python calls `urllib.request.urlopen(...)` and blocks until Rust returns
|
||||
//! the LLM completion. No long-lived process, no FIFO/pipe gymnastics.
|
||||
//!
|
||||
//! The sidecar binds to `127.0.0.1:0` (kernel-assigned port), runs for the
|
||||
//! lifetime of one root `run_rlm_turn`, and is aborted on return.
|
||||
//!
|
||||
//! Endpoints:
|
||||
//! - `POST /llm` — one-shot child completion via the configured `child_model`.
|
||||
//! - `POST /rlm` — full recursive RLM turn at depth-1 (paper's `sub_RLM`).
|
||||
//!
|
||||
//! Cumulative token usage is tracked in the shared [`SidecarCtx`] so the
|
||||
//! parent turn can fold it into its own [`Usage`].
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::Json;
|
||||
use axum::Router;
|
||||
use axum::extract::State;
|
||||
use axum::routing::post;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use crate::client::DeepSeekClient;
|
||||
use crate::llm_client::LlmClient as _;
|
||||
use crate::models::{ContentBlock, Message, MessageRequest, Usage};
|
||||
|
||||
/// Default per-child request timeout — mirrors `tools/rlm_query.rs`.
|
||||
const CHILD_TIMEOUT_SECS: u64 = 120;
|
||||
/// Default `max_tokens` for one-shot child completions.
|
||||
const DEFAULT_CHILD_MAX_TOKENS: u32 = 4096;
|
||||
|
||||
/// Shared state for the sidecar — the LLM client, the child model name,
|
||||
/// the recursion budget, and a usage accumulator.
|
||||
pub struct SidecarCtx {
|
||||
pub client: DeepSeekClient,
|
||||
pub child_model: String,
|
||||
/// Recursion budget remaining for `/rlm` calls. `0` means "no further
|
||||
/// recursion" — `/rlm` will return an error.
|
||||
pub depth_remaining: u32,
|
||||
/// Cumulative usage across all sidecar-served calls in this turn.
|
||||
pub usage: Mutex<Usage>,
|
||||
}
|
||||
|
||||
impl SidecarCtx {
|
||||
pub fn new(client: DeepSeekClient, child_model: String, depth_remaining: u32) -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
client,
|
||||
child_model,
|
||||
depth_remaining,
|
||||
usage: Mutex::new(Usage::default()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct LlmReq {
|
||||
prompt: String,
|
||||
#[serde(default)]
|
||||
model: Option<String>,
|
||||
#[serde(default)]
|
||||
max_tokens: Option<u32>,
|
||||
#[serde(default)]
|
||||
system: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct LlmResp {
|
||||
text: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
error: Option<String>,
|
||||
}
|
||||
|
||||
async fn llm_handler(State(ctx): State<Arc<SidecarCtx>>, Json(req): Json<LlmReq>) -> Json<LlmResp> {
|
||||
let model = req
|
||||
.model
|
||||
.filter(|m| !m.is_empty())
|
||||
.unwrap_or_else(|| ctx.child_model.clone());
|
||||
let max_tokens = req.max_tokens.unwrap_or(DEFAULT_CHILD_MAX_TOKENS);
|
||||
|
||||
let request = MessageRequest {
|
||||
model,
|
||||
messages: vec![Message {
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentBlock::Text {
|
||||
text: req.prompt,
|
||||
cache_control: None,
|
||||
}],
|
||||
}],
|
||||
max_tokens,
|
||||
system: req.system.map(crate::models::SystemPrompt::Text),
|
||||
tools: None,
|
||||
tool_choice: None,
|
||||
metadata: None,
|
||||
thinking: None,
|
||||
reasoning_effort: None,
|
||||
stream: Some(false),
|
||||
temperature: Some(0.4_f32),
|
||||
top_p: Some(0.9_f32),
|
||||
};
|
||||
|
||||
let fut = ctx.client.create_message(request);
|
||||
let response =
|
||||
match tokio::time::timeout(std::time::Duration::from_secs(CHILD_TIMEOUT_SECS), fut).await {
|
||||
Ok(Ok(r)) => r,
|
||||
Ok(Err(e)) => {
|
||||
return Json(LlmResp {
|
||||
text: String::new(),
|
||||
error: Some(format!("llm_query failed: {e}")),
|
||||
});
|
||||
}
|
||||
Err(_) => {
|
||||
return Json(LlmResp {
|
||||
text: String::new(),
|
||||
error: Some(format!("llm_query timed out after {CHILD_TIMEOUT_SECS}s")),
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
let text = response
|
||||
.content
|
||||
.iter()
|
||||
.filter_map(|b| match b {
|
||||
ContentBlock::Text { text, .. } => Some(text.as_str()),
|
||||
_ => None,
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
|
||||
{
|
||||
let mut u = ctx.usage.lock().await;
|
||||
u.input_tokens = u.input_tokens.saturating_add(response.usage.input_tokens);
|
||||
u.output_tokens = u.output_tokens.saturating_add(response.usage.output_tokens);
|
||||
}
|
||||
|
||||
Json(LlmResp { text, error: None })
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct SubRlmReq {
|
||||
prompt: String,
|
||||
}
|
||||
|
||||
async fn sub_rlm_handler(
|
||||
State(ctx): State<Arc<SidecarCtx>>,
|
||||
Json(req): Json<SubRlmReq>,
|
||||
) -> Json<LlmResp> {
|
||||
if ctx.depth_remaining == 0 {
|
||||
return Json(LlmResp {
|
||||
text: String::new(),
|
||||
error: Some(
|
||||
"sub_rlm: recursion depth budget exhausted (configure /rlm with deeper budget)"
|
||||
.to_string(),
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
// Sub-RLM uses the child_model as its own root model — paper's pattern
|
||||
// is to run sub_RLM with a smaller model, and to also use child_model
|
||||
// for any further `llm_query` calls inside the sub-turn.
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(64);
|
||||
let drain = tokio::spawn(async move { while rx.recv().await.is_some() {} });
|
||||
|
||||
// The recursive future-type cycle here
|
||||
// (sub_rlm_handler → run_rlm_turn_inner → start_sidecar → sub_rlm_handler)
|
||||
// is broken by `run_rlm_turn_inner` returning a concrete
|
||||
// `Pin<Box<dyn Future + Send>>` rather than `impl Future`.
|
||||
let result = super::turn::run_rlm_turn_inner(
|
||||
&ctx.client,
|
||||
ctx.child_model.clone(),
|
||||
req.prompt,
|
||||
None,
|
||||
ctx.child_model.clone(),
|
||||
tx,
|
||||
ctx.depth_remaining.saturating_sub(1),
|
||||
)
|
||||
.await;
|
||||
|
||||
drain.abort();
|
||||
|
||||
{
|
||||
let mut u = ctx.usage.lock().await;
|
||||
u.input_tokens = u.input_tokens.saturating_add(result.usage.input_tokens);
|
||||
u.output_tokens = u.output_tokens.saturating_add(result.usage.output_tokens);
|
||||
}
|
||||
|
||||
Json(LlmResp {
|
||||
text: result.answer,
|
||||
error: result.error,
|
||||
})
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Batch endpoints
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Hard cap on prompts per batch request. Mirrors `tools/rlm_query.rs`.
|
||||
const MAX_BATCH: usize = 16;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct BatchReq {
|
||||
prompts: Vec<String>,
|
||||
#[serde(default)]
|
||||
model: Option<String>,
|
||||
#[serde(default)]
|
||||
max_tokens: Option<u32>,
|
||||
#[serde(default)]
|
||||
system: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct BatchResp {
|
||||
results: Vec<LlmResp>,
|
||||
}
|
||||
|
||||
async fn llm_batch_handler(
|
||||
State(ctx): State<Arc<SidecarCtx>>,
|
||||
Json(req): Json<BatchReq>,
|
||||
) -> Json<BatchResp> {
|
||||
if req.prompts.is_empty() {
|
||||
return Json(BatchResp { results: vec![] });
|
||||
}
|
||||
if req.prompts.len() > MAX_BATCH {
|
||||
return Json(BatchResp {
|
||||
results: req
|
||||
.prompts
|
||||
.iter()
|
||||
.map(|_| LlmResp {
|
||||
text: String::new(),
|
||||
error: Some(format!(
|
||||
"batch too large: {} > {MAX_BATCH}",
|
||||
req.prompts.len()
|
||||
)),
|
||||
})
|
||||
.collect(),
|
||||
});
|
||||
}
|
||||
|
||||
let model = req
|
||||
.model
|
||||
.filter(|m| !m.is_empty())
|
||||
.unwrap_or_else(|| ctx.child_model.clone());
|
||||
let max_tokens = req.max_tokens.unwrap_or(DEFAULT_CHILD_MAX_TOKENS);
|
||||
let system = req.system;
|
||||
|
||||
let futures = req.prompts.into_iter().map(|prompt| {
|
||||
let client = ctx.client.clone();
|
||||
let model = model.clone();
|
||||
let system = system.clone();
|
||||
async move {
|
||||
let request = MessageRequest {
|
||||
model,
|
||||
messages: vec![Message {
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentBlock::Text {
|
||||
text: prompt,
|
||||
cache_control: None,
|
||||
}],
|
||||
}],
|
||||
max_tokens,
|
||||
system: system.map(crate::models::SystemPrompt::Text),
|
||||
tools: None,
|
||||
tool_choice: None,
|
||||
metadata: None,
|
||||
thinking: None,
|
||||
reasoning_effort: None,
|
||||
stream: Some(false),
|
||||
temperature: Some(0.4_f32),
|
||||
top_p: Some(0.9_f32),
|
||||
};
|
||||
let fut = client.create_message(request);
|
||||
tokio::time::timeout(std::time::Duration::from_secs(CHILD_TIMEOUT_SECS), fut).await
|
||||
}
|
||||
});
|
||||
|
||||
let outcomes = futures_util::future::join_all(futures).await;
|
||||
|
||||
let mut results = Vec::with_capacity(outcomes.len());
|
||||
let mut total_input = 0_u32;
|
||||
let mut total_output = 0_u32;
|
||||
|
||||
for outcome in outcomes {
|
||||
match outcome {
|
||||
Ok(Ok(resp)) => {
|
||||
let text = resp
|
||||
.content
|
||||
.iter()
|
||||
.filter_map(|b| match b {
|
||||
ContentBlock::Text { text, .. } => Some(text.as_str()),
|
||||
_ => None,
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
total_input = total_input.saturating_add(resp.usage.input_tokens);
|
||||
total_output = total_output.saturating_add(resp.usage.output_tokens);
|
||||
results.push(LlmResp { text, error: None });
|
||||
}
|
||||
Ok(Err(e)) => results.push(LlmResp {
|
||||
text: String::new(),
|
||||
error: Some(format!("{e}")),
|
||||
}),
|
||||
Err(_) => results.push(LlmResp {
|
||||
text: String::new(),
|
||||
error: Some(format!("timed out after {CHILD_TIMEOUT_SECS}s")),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
let mut u = ctx.usage.lock().await;
|
||||
u.input_tokens = u.input_tokens.saturating_add(total_input);
|
||||
u.output_tokens = u.output_tokens.saturating_add(total_output);
|
||||
}
|
||||
|
||||
Json(BatchResp { results })
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct RlmBatchReq {
|
||||
prompts: Vec<String>,
|
||||
}
|
||||
|
||||
async fn rlm_batch_handler(
|
||||
State(ctx): State<Arc<SidecarCtx>>,
|
||||
Json(req): Json<RlmBatchReq>,
|
||||
) -> Json<BatchResp> {
|
||||
if req.prompts.is_empty() {
|
||||
return Json(BatchResp { results: vec![] });
|
||||
}
|
||||
if req.prompts.len() > MAX_BATCH {
|
||||
return Json(BatchResp {
|
||||
results: req
|
||||
.prompts
|
||||
.iter()
|
||||
.map(|_| LlmResp {
|
||||
text: String::new(),
|
||||
error: Some(format!(
|
||||
"batch too large: {} > {MAX_BATCH}",
|
||||
req.prompts.len()
|
||||
)),
|
||||
})
|
||||
.collect(),
|
||||
});
|
||||
}
|
||||
if ctx.depth_remaining == 0 {
|
||||
return Json(BatchResp {
|
||||
results: req
|
||||
.prompts
|
||||
.iter()
|
||||
.map(|_| LlmResp {
|
||||
text: String::new(),
|
||||
error: Some("rlm_query_batched: recursion budget exhausted".to_string()),
|
||||
})
|
||||
.collect(),
|
||||
});
|
||||
}
|
||||
|
||||
let futures = req.prompts.into_iter().map(|prompt| {
|
||||
let client = ctx.client.clone();
|
||||
let child_model = ctx.child_model.clone();
|
||||
let depth = ctx.depth_remaining.saturating_sub(1);
|
||||
async move {
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(64);
|
||||
let drain = tokio::spawn(async move { while rx.recv().await.is_some() {} });
|
||||
// Same dyn-erasure pattern as sub_rlm_handler — break the recursive
|
||||
// future-type cycle through the boxed dyn return of run_rlm_turn_inner.
|
||||
let result = super::turn::run_rlm_turn_inner(
|
||||
&client,
|
||||
child_model.clone(),
|
||||
prompt,
|
||||
None,
|
||||
child_model,
|
||||
tx,
|
||||
depth,
|
||||
)
|
||||
.await;
|
||||
drain.abort();
|
||||
result
|
||||
}
|
||||
});
|
||||
|
||||
let results_raw = futures_util::future::join_all(futures).await;
|
||||
|
||||
let mut results = Vec::with_capacity(results_raw.len());
|
||||
let mut total_input = 0_u32;
|
||||
let mut total_output = 0_u32;
|
||||
|
||||
for result in results_raw {
|
||||
total_input = total_input.saturating_add(result.usage.input_tokens);
|
||||
total_output = total_output.saturating_add(result.usage.output_tokens);
|
||||
results.push(LlmResp {
|
||||
text: result.answer,
|
||||
error: result.error,
|
||||
});
|
||||
}
|
||||
|
||||
{
|
||||
let mut u = ctx.usage.lock().await;
|
||||
u.input_tokens = u.input_tokens.saturating_add(total_input);
|
||||
u.output_tokens = u.output_tokens.saturating_add(total_output);
|
||||
}
|
||||
|
||||
Json(BatchResp { results })
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Handle / start
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Result of starting the sidecar — the bound socket address and the task
|
||||
/// handle. Drop or abort the handle to stop the server.
|
||||
pub struct SidecarHandle {
|
||||
pub addr: SocketAddr,
|
||||
task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl SidecarHandle {
|
||||
pub fn llm_url(&self) -> String {
|
||||
format!("http://{}/llm", self.addr)
|
||||
}
|
||||
pub fn llm_batch_url(&self) -> String {
|
||||
format!("http://{}/llm_batch", self.addr)
|
||||
}
|
||||
pub fn rlm_url(&self) -> String {
|
||||
format!("http://{}/rlm", self.addr)
|
||||
}
|
||||
pub fn rlm_batch_url(&self) -> String {
|
||||
format!("http://{}/rlm_batch", self.addr)
|
||||
}
|
||||
pub fn shutdown(self) {
|
||||
self.task.abort();
|
||||
}
|
||||
}
|
||||
|
||||
/// Bind a sidecar on `127.0.0.1` with a kernel-assigned port and start
|
||||
/// serving in a background task.
|
||||
pub async fn start_sidecar(ctx: Arc<SidecarCtx>) -> std::io::Result<SidecarHandle> {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await?;
|
||||
let addr = listener.local_addr()?;
|
||||
let app = Router::new()
|
||||
.route("/llm", post(llm_handler))
|
||||
.route("/llm_batch", post(llm_batch_handler))
|
||||
.route("/rlm", post(sub_rlm_handler))
|
||||
.route("/rlm_batch", post(rlm_batch_handler))
|
||||
.with_state(ctx);
|
||||
let task = tokio::spawn(async move {
|
||||
let _ = axum::serve(listener, app).await;
|
||||
});
|
||||
Ok(SidecarHandle { addr, task })
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn llm_resp_skips_none_error() {
|
||||
let r = LlmResp {
|
||||
text: "hello".to_string(),
|
||||
error: None,
|
||||
};
|
||||
let s = serde_json::to_string(&r).unwrap();
|
||||
assert!(!s.contains("error"));
|
||||
assert!(s.contains("hello"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn llm_resp_includes_error_when_set() {
|
||||
let r = LlmResp {
|
||||
text: String::new(),
|
||||
error: Some("boom".to_string()),
|
||||
};
|
||||
let s = serde_json::to_string(&r).unwrap();
|
||||
assert!(s.contains("boom"));
|
||||
}
|
||||
}
|
||||
+291
-417
File diff suppressed because it is too large
Load Diff
@@ -223,13 +223,16 @@ impl ToolSpec for RlmProcessTool {
|
||||
});
|
||||
}
|
||||
|
||||
// Surface the termination reason so the model can tell whether the
|
||||
// sub-agent finished cleanly via FINAL or fell out of the loop.
|
||||
// Surface the termination reason and a brief per-round trace so the
|
||||
// user can verify the sub-agent actually engaged with `context`
|
||||
// through sub-LLM calls — not just inferred an answer from the
|
||||
// preview.
|
||||
let footer = match result.termination {
|
||||
RlmTermination::Final => String::new(),
|
||||
RlmTermination::DirectAnswer => {
|
||||
"\n\n[note: sub-agent emitted a direct answer instead of FINAL()]".to_string()
|
||||
}
|
||||
RlmTermination::NoCode => format!(
|
||||
"\n\n[warning: sub-agent failed to engage the REPL after {} iterations — answer is the model's last raw response]",
|
||||
result.iterations
|
||||
),
|
||||
RlmTermination::Exhausted => format!(
|
||||
"\n\n[warning: sub-agent hit the {}-iteration cap without FINAL()]",
|
||||
result.iterations
|
||||
@@ -237,6 +240,46 @@ impl ToolSpec for RlmProcessTool {
|
||||
RlmTermination::Error => String::new(),
|
||||
};
|
||||
|
||||
let trace_summary = if result.trace.is_empty() {
|
||||
String::from("\n\n[trace: no REPL rounds executed]")
|
||||
} else {
|
||||
let mut s = String::from("\n\n[RLM trace]");
|
||||
for r in &result.trace {
|
||||
let head = r
|
||||
.code_summary
|
||||
.lines()
|
||||
.next()
|
||||
.unwrap_or(r.code_summary.as_str())
|
||||
.chars()
|
||||
.take(80)
|
||||
.collect::<String>();
|
||||
s.push_str(&format!(
|
||||
"\n round {}: {} sub-LLM call(s), {}ms{} — {}",
|
||||
r.round,
|
||||
r.rpc_count,
|
||||
r.elapsed_ms,
|
||||
if r.had_error { " (error)" } else { "" },
|
||||
head
|
||||
));
|
||||
}
|
||||
s
|
||||
};
|
||||
|
||||
let trace_json: Vec<_> = result
|
||||
.trace
|
||||
.iter()
|
||||
.map(|r| {
|
||||
json!({
|
||||
"round": r.round,
|
||||
"rpc_count": r.rpc_count,
|
||||
"elapsed_ms": r.elapsed_ms,
|
||||
"had_error": r.had_error,
|
||||
"code_summary": r.code_summary,
|
||||
"stdout_preview": r.stdout_preview,
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
let metadata = json!({
|
||||
"iterations": result.iterations,
|
||||
"duration_ms": result.duration.as_millis() as u64,
|
||||
@@ -245,9 +288,14 @@ impl ToolSpec for RlmProcessTool {
|
||||
"termination": format!("{:?}", result.termination).to_lowercase(),
|
||||
"child_model": child_model,
|
||||
"max_depth": max_depth,
|
||||
"total_rpcs": result.total_rpcs,
|
||||
"trace": trace_json,
|
||||
});
|
||||
|
||||
Ok(ToolResult::success(format!("{}{}", result.answer, footer)).with_metadata(metadata))
|
||||
Ok(
|
||||
ToolResult::success(format!("{}{}{}", result.answer, footer, trace_summary))
|
||||
.with_metadata(metadata),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user