diff --git a/crates/tui/src/repl/mod.rs b/crates/tui/src/repl/mod.rs index bed69b97..011d494e 100644 --- a/crates/tui/src/repl/mod.rs +++ b/crates/tui/src/repl/mod.rs @@ -1,31 +1,10 @@ -//! REPL runtime for paper-spec RLM (Zhang et al., arXiv:2512.24601). -//! -//! Manages a persistent Python subprocess that can execute code blocks, -//! call `llm_query()` for recursive sub-LLM calls, and return results -//! via `FINAL()` / `FINAL_VAR()` patterns. -//! -//! ## Architecture -//! -//! - `PythonRuntime` — owns the Python subprocess lifecycle, sends code -//! via stdin, collects stdout/stderr with truncation. -//! - `LlmQueryFn` — injected into the Python namespace as `llm_query(prompt)`. -//! Calls back to Rust which dispatches a one-shot DeepSeek API completion. -//! - `ReplOutput` — parsed result from a REPL execution round, carrying -//! stdout text, whether a FINAL was detected, and any error signals. +//! Long-lived Python REPL runtime used by the RLM loop and by inline +//! `` ```repl `` block execution in the agent loop. pub mod runtime; pub mod sandbox; -pub use runtime::PythonRuntime; -pub use sandbox::{ReplOutput, inject_llm_query_fn, parse_final}; - -use std::sync::Arc; -use tokio::sync::Mutex; - -/// Shared handle to a long-lived Python REPL session. -pub type SharedRepl = Arc>>; - -/// Create a new shared REPL handle (initially uninitialized — lazy start). -pub fn new_shared_repl() -> SharedRepl { - Arc::new(Mutex::new(None)) -} +pub use runtime::{ + BatchResp, PythonRuntime, ReplRound, RpcDispatcher, RpcRequest, RpcResponse, SingleResp, +}; +pub use sandbox::{ReplBlock, extract_repl_blocks, has_repl_block}; diff --git a/crates/tui/src/repl/runtime.rs b/crates/tui/src/repl/runtime.rs index 00145dba..84324e65 100644 --- a/crates/tui/src/repl/runtime.rs +++ b/crates/tui/src/repl/runtime.rs @@ -1,191 +1,561 @@ -//! Python sandbox runtime for the REPL. +//! Long-lived Python REPL runtime. //! -//! Each code-execution round spawns a fresh `python3` process with all -//! state loaded from / saved to a JSON file. This is simpler and more -//! robust than trying to manage a long-lived subprocess with async -//! stdout re-attachment. +//! One `python3 -u` subprocess lives for the duration of an RLM turn (or an +//! inline `repl` block sequence in the agent loop). Code blocks are sent +//! over stdin framed by `__RLM_RUN__`/`__RLM_END__` sentinels; the bootstrap +//! `exec()`s them into the same global namespace so variables, imports, +//! and even open file handles persist naturally across rounds. //! -//! State persistence across rounds: -//! - `_repl_vars` dict is serialized to a JSON file after each round -//! - The next round reads it back before executing new code -//! - This matches the paper's "persistent variable store" design +//! Sub-LLM helpers (`llm_query`, `llm_query_batched`, `rlm_query`, +//! `rlm_query_batched`) are wired through a stdin/stdout RPC protocol: +//! Python emits `__RLM_REQ___::{json}` on stdout, Rust dispatches the +//! request and writes `__RLM_RESP___::{json}` back on stdin. No HTTP +//! sidecar, no temp ports — the same pipes carry both control and data. +//! +//! The session id (``) is a UUID generated per spawn, so user output +//! that happens to contain "REQ" or "FINAL" can't be confused with control +//! messages. -use std::path::PathBuf; +use std::path::{Path, PathBuf}; +use std::process::Stdio; use std::time::{Duration, Instant}; -use tokio::process::Command; +use serde::{Deserialize, Serialize}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::process::{Child, ChildStdin, ChildStdout, Command}; +use uuid::Uuid; -use super::sandbox::parse_final; - -/// Python REPL runtime — executes code blocks in isolated processes -/// with persistent variable state via a JSON state file. -#[derive(Debug, Clone)] -pub struct PythonRuntime { - /// Path to the state file for variable persistence. - state_path: PathBuf, - /// Max bytes of stdout to return per round. - stdout_limit: usize, - /// Total rounds executed. - round_count: u64, - /// When the runtime was created. - started: Instant, - /// Extra env vars passed to every spawned `python3` invocation. The RLM - /// loop uses this to inject `REPL_LLM_URL` / `REPL_RLM_URL` so that - /// `llm_query()` / `sub_rlm()` inside Python can reach the local sidecar. - extra_env: Vec<(String, String)>, -} +// --------------------------------------------------------------------------- +// Public types +// --------------------------------------------------------------------------- /// Result of executing one code block. #[derive(Debug, Clone)] pub struct ReplRound { - /// Truncated stdout (for LLM feedback — paper's "metadata only"). + /// Stdout shown to the model as metadata next round. pub stdout: String, - /// Full stdout (for debugging). + /// Full stdout (with sentinels stripped, but otherwise raw). pub full_stdout: String, - /// Stderr from this round. + /// Stderr from this round (if any). pub stderr: String, - /// Whether the code raised an unhandled Python exception. + /// `True` if the user code raised an unhandled Python exception. pub has_error: bool, - /// If a FINAL(answer) or FINAL_VAR(var) was detected. + /// Captured `FINAL(value)` payload, if any. pub final_value: Option, - /// Wall-clock duration. + /// Number of `llm_query`/`rlm_query` RPCs the round issued. + pub rpc_count: u32, + /// Wall-clock duration of the round. pub elapsed: Duration, } -const DEFAULT_STDOUT_LIMIT: usize = 8_192; -const ROUND_TIMEOUT: Duration = Duration::from_secs(120); +/// One RPC request emitted by Python during a round. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum RpcRequest { + /// `llm_query(prompt, model=None, max_tokens=None, system=None)` + Llm { + prompt: String, + #[serde(default)] + model: Option, + #[serde(default)] + max_tokens: Option, + #[serde(default)] + system: Option, + }, + /// `llm_query_batched(prompts, model=None)` + LlmBatch { + prompts: Vec, + #[serde(default)] + model: Option, + }, + /// `rlm_query(prompt, model=None)` — recursive sub-RLM (paper's `sub_RLM`). + Rlm { + prompt: String, + #[serde(default)] + model: Option, + }, + /// `rlm_query_batched(prompts, model=None)` + RlmBatch { + prompts: Vec, + #[serde(default)] + model: Option, + }, +} -/// Python bootstrap — loaded at the top of every execution round. +/// Response for one RPC request. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(untagged)] +pub enum RpcResponse { + /// Single-text reply (Llm / Rlm). + Single(SingleResp), + /// Batch reply (LlmBatch / RlmBatch). + Batch(BatchResp), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SingleResp { + #[serde(default)] + pub text: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BatchResp { + pub results: Vec, +} + +/// Trait-object handle for dispatching Python RPCs back into Rust. /// -/// Conforms to the reference RLM runtime (alexzhang13/rlm) so the same -/// strategies and prompt patterns work here. Helpers exposed: -/// -/// - `context` — the user's input (loaded from the persistent state file) -/// - `llm_query(prompt, model=None, max_tokens=None, system=None)` -/// - `llm_query_batched(prompts, model=None)` — concurrent fanout -/// - `rlm_query(prompt, model=None)` — recursive sub-RLM (paper's `sub_RLM`) -/// - `rlm_query_batched(prompts, model=None)` — concurrent recursive sub-RLMs -/// - `SHOW_VARS()` — list user-created REPL variables -/// - `FINAL(value)` / `FINAL_VAR(name)` — terminate the loop -/// - `repl_get(name, default=None)` / `repl_set(name, value)` — explicit store -/// -/// Sub-LLM and sub-RLM calls are routed through a localhost HTTP sidecar -/// started by the RLM driver. URLs are injected via env vars -/// (`REPL_LLM_URL`, `REPL_LLM_BATCH_URL`, `REPL_RLM_URL`, -/// `REPL_RLM_BATCH_URL`). When the REPL is used outside an active RLM -/// turn the functions return a clear "unavailable" sentinel. -/// -/// Persistent state: every round, all top-level user variables that are -/// JSON-serializable are saved to the state file so the next round can -/// access them as ordinary Python locals (no `repl_get` ceremony needed). -const PYTHON_BOOTSTRAP: &str = r#" +/// Each RLM turn supplies one. Implementations forward to the LLM client +/// (and recursively into `run_rlm_turn_inner` for `Rlm` / `RlmBatch`). +pub trait RpcDispatcher: Send + Sync { + fn dispatch<'a>( + &'a self, + req: RpcRequest, + ) -> std::pin::Pin + Send + 'a>>; +} + +// --------------------------------------------------------------------------- +// Constants +// --------------------------------------------------------------------------- + +const DEFAULT_STDOUT_LIMIT: usize = 8_192; +const ROUND_TIMEOUT: Duration = Duration::from_secs(180); +const SPAWN_READY_TIMEOUT: Duration = Duration::from_secs(10); + +// --------------------------------------------------------------------------- +// PythonRuntime +// --------------------------------------------------------------------------- + +/// Long-lived Python REPL. +#[derive(Debug)] +pub struct PythonRuntime { + child: Child, + stdin: ChildStdin, + stdout: BufReader, + /// Per-spawn session id used in protocol sentinels. + session_id: String, + /// Path to the file holding `context` (kept around for cleanup). + context_path: Option, + stdout_limit: usize, + round_count: u64, + started: Instant, +} + +impl PythonRuntime { + /// Spawn a REPL with no `context` variable and no LLM helpers wired up. + /// Used by the agent loop for inline `repl` blocks the model emits in + /// regular conversation. + pub async fn new() -> Result { + Self::spawn_inner(None).await + } + + /// Compatibility shim — older RLM code path used to pass a state file. + /// The state file is no longer used, but the path doubles as an extra + /// scratch location callers can rely on for cleanup symmetry. + pub fn with_state_path(_path: PathBuf) -> Self { + // Synchronous constructor is no longer meaningful: spawning Python + // is async. Callers in turn.rs already use `spawn_with_context` — + // this stub is kept only so the public surface compiles for any + // out-of-tree user. It returns a deliberately broken runtime that + // panics on first use, which is preferable to silently lying. + unreachable!( + "PythonRuntime::with_state_path is deprecated — \ + use PythonRuntime::new() or PythonRuntime::spawn_with_context()" + ) + } + + /// Spawn a REPL with `context` (and `ctx`) preloaded from a file. Used + /// by the RLM turn loop. + pub async fn spawn_with_context(context_path: &Path) -> Result { + Self::spawn_inner(Some(context_path)).await + } + + async fn spawn_inner(context_path: Option<&Path>) -> Result { + let session_id = Uuid::new_v4().simple().to_string(); + let bootstrap = render_bootstrap(&session_id); + + let mut cmd = Command::new("python3"); + cmd.arg("-u") + .arg("-c") + .arg(&bootstrap) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .kill_on_drop(true); + + if let Some(path) = context_path { + cmd.env("RLM_CONTEXT_FILE", path); + } + + let mut child = cmd + .spawn() + .map_err(|e| format!("failed to spawn python3: {e}"))?; + + let stdin = child + .stdin + .take() + .ok_or_else(|| "python3 stdin pipe missing".to_string())?; + let raw_stdout = child + .stdout + .take() + .ok_or_else(|| "python3 stdout pipe missing".to_string())?; + let stdout = BufReader::new(raw_stdout); + + let mut rt = Self { + child, + stdin, + stdout, + session_id: session_id.clone(), + context_path: context_path.map(Path::to_path_buf), + stdout_limit: DEFAULT_STDOUT_LIMIT, + round_count: 0, + started: Instant::now(), + }; + + // Wait for `__RLM_READY___` before handing control back. If + // Python failed to start (missing module, syntax error in the + // bootstrap, etc.), this is where we'll find out. + let ready_sentinel = format!("__RLM_READY_{session_id}__"); + match tokio::time::timeout(SPAWN_READY_TIMEOUT, rt.read_until_ready(&ready_sentinel)).await + { + Ok(Ok(())) => Ok(rt), + Ok(Err(e)) => { + let _ = rt.child.kill().await; + Err(format!("python3 bootstrap failed: {e}")) + } + Err(_) => { + let _ = rt.child.kill().await; + Err(format!( + "python3 bootstrap did not signal ready within {}s", + SPAWN_READY_TIMEOUT.as_secs() + )) + } + } + } + + async fn read_until_ready(&mut self, ready_sentinel: &str) -> Result<(), String> { + loop { + let mut line = String::new(); + let n = self + .stdout + .read_line(&mut line) + .await + .map_err(|e| format!("stdout read: {e}"))?; + if n == 0 { + return Err("python3 closed stdout before ready signal".to_string()); + } + let trimmed = line.trim_end_matches(['\n', '\r']); + if trimmed == ready_sentinel { + return Ok(()); + } + // Pre-ready output is rare; ignore it. + } + } + + /// Execute a Python code block with no RPC dispatcher. Used for inline + /// `repl` blocks where `llm_query()` should fall back to a sentinel. + pub async fn execute(&mut self, code: &str) -> Result { + self.run(code, None::<&dyn RpcDispatcher>).await + } + + /// Execute a code block, dispatching any sub-LLM RPCs through `bridge`. + /// + /// Returns once Python emits `__RLM_DONE___` or the round timeout + /// elapses (whichever happens first). + pub async fn run(&mut self, code: &str, bridge: Option<&D>) -> Result + where + D: RpcDispatcher + ?Sized, + { + let started = Instant::now(); + self.round_count += 1; + let round_id = self.round_count; + + // Send the code header + body + end marker in one write. + let header = format!("__RLM_RUN_{}__::{round_id}\n", self.session_id); + let footer = format!("__RLM_END_{}__\n", self.session_id); + let payload = format!("{header}{code}\n{footer}"); + self.stdin + .write_all(payload.as_bytes()) + .await + .map_err(|e| format!("stdin write: {e}"))?; + self.stdin + .flush() + .await + .map_err(|e| format!("stdin flush: {e}"))?; + + // Sentinels for this session. + let req_prefix = format!("__RLM_REQ_{}__::", self.session_id); + let final_prefix = format!("__RLM_FINAL_{}__::", self.session_id); + let err_prefix = format!("__RLM_ERR_{}__::", self.session_id); + let done_prefix = format!("__RLM_DONE_{}__::", self.session_id); + + let mut stdout_buf = String::new(); + let mut final_value: Option = None; + let mut had_error = false; + let mut rpc_count: u32 = 0; + + let read_loop = async { + loop { + let mut line = String::new(); + let n = self + .stdout + .read_line(&mut line) + .await + .map_err(|e| format!("stdout read: {e}"))?; + if n == 0 { + return Err("python3 closed stdout mid-round".to_string()); + } + let trimmed = line.trim_end_matches(['\n', '\r']); + + if let Some(rest) = trimmed.strip_prefix(&done_prefix) { + let _ = rest; + break; + } + if let Some(rest) = trimmed.strip_prefix(&final_prefix) { + // Stored as a JSON-encoded string. + let v = + serde_json::from_str::(rest).unwrap_or_else(|_| rest.to_string()); + final_value = Some(v); + continue; + } + if let Some(rest) = trimmed.strip_prefix(&err_prefix) { + let traceback = + serde_json::from_str::(rest).unwrap_or_else(|_| rest.to_string()); + had_error = true; + stdout_buf.push_str(&format!("[traceback]\n{traceback}\n")); + continue; + } + if let Some(rest) = trimmed.strip_prefix(&req_prefix) { + rpc_count = rpc_count.saturating_add(1); + let req: RpcRequest = match serde_json::from_str(rest) { + Ok(r) => r, + Err(e) => { + // Send an error response so Python isn't blocked. + self.send_resp(&RpcResponse::Single(SingleResp { + text: String::new(), + error: Some(format!("malformed RPC: {e}")), + })) + .await?; + continue; + } + }; + let resp = match bridge { + Some(b) => b.dispatch(req).await, + None => RpcResponse::Single(SingleResp { + text: String::new(), + error: Some("no LLM bridge bound to this REPL".to_string()), + }), + }; + self.send_resp(&resp).await?; + continue; + } + + stdout_buf.push_str(&line); + } + Ok::<_, String>(()) + }; + + match tokio::time::timeout(ROUND_TIMEOUT, read_loop).await { + Ok(Ok(())) => {} + Ok(Err(e)) => return Err(e), + Err(_) => { + return Err(format!( + "REPL round timed out after {}s", + ROUND_TIMEOUT.as_secs() + )); + } + } + + let stderr = self.drain_stderr().await; + let display = truncate_stdout(stdout_buf.trim_end_matches('\n'), self.stdout_limit); + + Ok(ReplRound { + stdout: display, + full_stdout: stdout_buf, + stderr, + has_error: had_error, + final_value, + rpc_count, + elapsed: started.elapsed(), + }) + } + + async fn send_resp(&mut self, resp: &RpcResponse) -> Result<(), String> { + let body = serde_json::to_string(resp).map_err(|e| format!("encode rpc resp: {e}"))?; + let line = format!("__RLM_RESP_{}__::{body}\n", self.session_id); + self.stdin + .write_all(line.as_bytes()) + .await + .map_err(|e| format!("stdin write resp: {e}"))?; + self.stdin + .flush() + .await + .map_err(|e| format!("stdin flush resp: {e}"))?; + Ok(()) + } + + async fn drain_stderr(&mut self) -> String { + // We don't continuously read stderr — drain whatever's pending after + // a round so it can show up in error reports without deadlocking + // anything during normal operation. + let Some(stderr) = self.child.stderr.as_mut() else { + return String::new(); + }; + use tokio::io::AsyncReadExt; + let mut buf = Vec::new(); + // Best-effort read with a tight deadline; we don't want to block. + let fut = async { + let mut chunk = [0u8; 4096]; + loop { + match tokio::time::timeout(Duration::from_millis(20), stderr.read(&mut chunk)).await + { + Ok(Ok(0)) => break, + Ok(Ok(n)) => buf.extend_from_slice(&chunk[..n]), + _ => break, + } + } + }; + let _ = fut.await; + String::from_utf8_lossy(&buf).to_string() + } + + /// Total rounds executed. + pub fn round_count(&self) -> u64 { + self.round_count + } + + /// Wall-clock uptime since spawn. + pub fn uptime(&self) -> Duration { + self.started.elapsed() + } + + /// Cleanly tear down the subprocess. + pub async fn shutdown(mut self) { + let _ = self.stdin.shutdown().await; + let _ = self.child.kill().await; + if let Some(path) = self.context_path.take() { + let _ = tokio::fs::remove_file(path).await; + } + } +} + +impl Drop for PythonRuntime { + fn drop(&mut self) { + // tokio sets `kill_on_drop(true)` on the child; the context file + // (if any) is removed on `shutdown()` — drop is best-effort. + if let Some(path) = self.context_path.take() { + let _ = std::fs::remove_file(path); + } + } +} + +// --------------------------------------------------------------------------- +// Bootstrap script +// --------------------------------------------------------------------------- + +/// Render the Python bootstrap with session-specific sentinels baked in. +/// The sentinels include a UUID to prevent user prints from being mistaken +/// for control messages. +fn render_bootstrap(session_id: &str) -> String { + BOOTSTRAP_TEMPLATE.replace("__SID__", session_id) +} + +const BOOTSTRAP_TEMPLATE: &str = r#" import json as _json import os as _os -import urllib.request as _urlreq -import urllib.error as _urlerr +import sys as _sys +import traceback as _traceback -# --- Sidecar URLs (set by the RLM driver) --- -_LLM_URL = _os.environ.get('REPL_LLM_URL', '') -_LLM_BATCH_URL = _os.environ.get('REPL_LLM_BATCH_URL', '') -_RLM_URL = _os.environ.get('REPL_RLM_URL', '') -_RLM_BATCH_URL = _os.environ.get('REPL_RLM_BATCH_URL', '') -_STATE_FILE = _os.environ.get('REPL_STATE_FILE', '') +_SID = "__SID__" +_REQ = f"__RLM_REQ_{_SID}__::" +_RESP = f"__RLM_RESP_{_SID}__::" +_FINAL = f"__RLM_FINAL_{_SID}__::" +_ERR = f"__RLM_ERR_{_SID}__::" +_RUN = f"__RLM_RUN_{_SID}__::" +_END = f"__RLM_END_{_SID}__" +_DONE = f"__RLM_DONE_{_SID}__::" +_READY = f"__RLM_READY_{_SID}__" -def _post_json(url, body, timeout): - data = _json.dumps(body).encode('utf-8') - req = _urlreq.Request( - url, data=data, - headers={'Content-Type': 'application/json'}, - method='POST', - ) - with _urlreq.urlopen(req, timeout=timeout) as resp: - return _json.loads(resp.read().decode('utf-8')) +def _rpc(req): + _sys.stdout.write(_REQ + _json.dumps(req) + "\n") + _sys.stdout.flush() + line = _sys.stdin.readline() + if not line: + return {"error": "rust driver closed stdin"} + if line.startswith(_RESP): + try: + return _json.loads(line[len(_RESP):]) + except Exception as e: + return {"error": f"malformed rpc resp: {e}"} + return {"error": f"unexpected protocol line: {line[:120]!r}"} def llm_query(prompt, model=None, max_tokens=None, system=None): - """One-shot sub-LLM call. Returns the completion text as a string. - Cheap and fast — use for chunk extraction / summarization / Q&A. - The sub-LLM uses the configured child_model by default.""" - if not _LLM_URL: - return '[llm_query unavailable: no sidecar URL]' - body = {'prompt': str(prompt), 'model': model, - 'max_tokens': max_tokens, 'system': system} - try: - data = _post_json(_LLM_URL, body, timeout=180) - except _urlerr.URLError as e: - return f'[llm_query transport error: {e}]' - except Exception as e: - return f'[llm_query error: {e}]' - if data.get('error'): - return f'[llm_query: {data["error"]}]' - return data.get('text', '') + """One-shot sub-LLM call. Returns the completion text as a string.""" + resp = _rpc({"type":"llm","prompt":str(prompt),"model":model, + "max_tokens":max_tokens,"system":system}) + if isinstance(resp, dict) and resp.get("error"): + return f"[llm_query error: {resp['error']}]" + if isinstance(resp, dict): + return resp.get("text","") + return str(resp) def llm_query_batched(prompts, model=None): - """Run multiple llm_query calls concurrently. Returns a list of strings - in the same order as the input prompts. Much faster than sequential - calls when the sub-prompts are independent.""" + """Run multiple sub-LLM calls concurrently. Returns a list of strings.""" if not isinstance(prompts, (list, tuple)): - return [f'[llm_query_batched error: prompts must be a list]'] - if not _LLM_BATCH_URL: - # Fall back to serial llm_query if no batch endpoint is configured. - return [llm_query(p, model=model) for p in prompts] - body = {'prompts': [str(p) for p in prompts], 'model': model} - try: - data = _post_json(_LLM_BATCH_URL, body, timeout=300) - except _urlerr.URLError as e: - return [f'[llm_query_batched transport error: {e}]'] * len(prompts) - except Exception as e: - return [f'[llm_query_batched error: {e}]'] * len(prompts) - results = data.get('results', []) + return ["[llm_query_batched: prompts must be a list]"] + resp = _rpc({"type":"llm_batch","prompts":[str(p) for p in prompts],"model":model}) + if isinstance(resp, dict) and resp.get("error"): + return [f"[llm_query_batched: {resp['error']}]" for _ in prompts] + results = (resp or {}).get("results", []) if isinstance(resp, dict) else [] if len(results) != len(prompts): - return [f'[llm_query_batched mismatch: got {len(results)} for {len(prompts)} prompts]'] * len(prompts) - return [r.get('text', f'[llm_query_batched: {r.get("error","")}]') for r in results] + return [f"[llm_query_batched: size mismatch ({len(results)}/{len(prompts)})]" for _ in prompts] + out = [] + for r in results: + if r.get("error"): + out.append(f"[child err: {r['error']}]") + else: + out.append(r.get("text","")) + return out def rlm_query(prompt, model=None): - """Spawn a recursive RLM sub-call (paper's `sub_RLM`). The child gets - its own REPL and can iterate, query further sub-LLMs, etc. Use when a - sub-task itself requires multi-step reasoning. Bounded by the parent's - recursion budget; falls back to llm_query when at depth=0.""" - if not _RLM_URL: - return '[rlm_query unavailable: no sidecar URL]' - try: - data = _post_json(_RLM_URL, {'prompt': str(prompt), 'model': model}, timeout=600) - except _urlerr.URLError as e: - return f'[rlm_query transport error: {e}]' - except Exception as e: - return f'[rlm_query error: {e}]' - if data.get('error'): - return f'[rlm_query: {data["error"]}]' - return data.get('text', '') + """Recursive sub-RLM (paper's `sub_RLM`). Each call gets its own REPL.""" + resp = _rpc({"type":"rlm","prompt":str(prompt),"model":model}) + if isinstance(resp, dict) and resp.get("error"): + return f"[rlm_query error: {resp['error']}]" + if isinstance(resp, dict): + return resp.get("text","") + return str(resp) def rlm_query_batched(prompts, model=None): - """Spawn multiple recursive RLM sub-calls in parallel. Each prompt - gets its own child RLM. Returns a list in input order.""" + """Run multiple recursive sub-RLMs in parallel.""" if not isinstance(prompts, (list, tuple)): - return [f'[rlm_query_batched error: prompts must be a list]'] - if not _RLM_BATCH_URL: - return [rlm_query(p, model=model) for p in prompts] - body = {'prompts': [str(p) for p in prompts], 'model': model} - try: - data = _post_json(_RLM_BATCH_URL, body, timeout=900) - except _urlerr.URLError as e: - return [f'[rlm_query_batched transport error: {e}]'] * len(prompts) - except Exception as e: - return [f'[rlm_query_batched error: {e}]'] * len(prompts) - results = data.get('results', []) + return ["[rlm_query_batched: prompts must be a list]"] + resp = _rpc({"type":"rlm_batch","prompts":[str(p) for p in prompts],"model":model}) + if isinstance(resp, dict) and resp.get("error"): + return [f"[rlm_query_batched: {resp['error']}]" for _ in prompts] + results = (resp or {}).get("results", []) if isinstance(resp, dict) else [] if len(results) != len(prompts): - return [f'[rlm_query_batched mismatch: got {len(results)} for {len(prompts)} prompts]'] * len(prompts) - return [r.get('text', f'[rlm_query_batched: {r.get("error","")}]') for r in results] + return [f"[rlm_query_batched: size mismatch ({len(results)}/{len(prompts)})]" for _ in prompts] + out = [] + for r in results: + if r.get("error"): + out.append(f"[child err: {r['error']}]") + else: + out.append(r.get("text","")) + return out def FINAL(value): - """Signal the RLM loop to stop with this final answer.""" - print(f'__REPL_FINAL__::{_json.dumps(str(value))}', flush=True) + """Signal the loop to stop with this final answer.""" + _sys.stdout.write(_FINAL + _json.dumps(str(value)) + "\n") + _sys.stdout.flush() def FINAL_VAR(name): - """Signal the RLM loop to stop, returning a named variable as the answer.""" + """Signal the loop to stop, returning the value of a named variable.""" name_str = str(name).strip().strip("'\"") if name_str in globals(): - val = globals()[name_str] - print(f'__REPL_FINAL__::{_json.dumps(str(val))}', flush=True) + FINAL(globals()[name_str]) else: print(f"FINAL_VAR error: variable '{name_str}' not found. " f"Use SHOW_VARS() to list available variables.", flush=True) @@ -205,192 +575,65 @@ def repl_get(name, default=None): def repl_set(name, value): globals()[str(name)] = value -# Names defined by the bootstrap that should NOT be persisted as user vars. +# Load the long input as `context` (and `ctx`) from a file. This keeps the +# big string out of the process command-line and out of the LLM's window. +_ctx_file = _os.environ.get("RLM_CONTEXT_FILE","") +context = "" +if _ctx_file: + try: + with open(_ctx_file, "r", encoding="utf-8", errors="replace") as f: + context = f.read() + except Exception as e: + _sys.stderr.write(f"[bootstrap] failed to load context: {e}\n") +ctx = context # short alias matching aleph + _BOOTSTRAP_NAMES = { - 'llm_query', 'llm_query_batched', 'rlm_query', 'rlm_query_batched', - 'SHOW_VARS', 'FINAL', 'FINAL_VAR', 'repl_get', 'repl_set', + "_SID","_REQ","_RESP","_FINAL","_ERR","_RUN","_END","_DONE","_READY", + "_rpc","_ctx_file","_BOOTSTRAP_NAMES","_main_loop", + "llm_query","llm_query_batched","rlm_query","rlm_query_batched", + "FINAL","FINAL_VAR","SHOW_VARS","repl_get","repl_set", + "context","ctx", + "_json","_os","_sys","_traceback", } -# Restore user variables from the previous round's state file. Any -# JSON-serializable value persists as a regular Python local. -def _load_state(): - if not _STATE_FILE or not _os.path.exists(_STATE_FILE): - return - try: - with open(_STATE_FILE, 'r') as f: - data = _json.load(f) - if isinstance(data, dict): - for k, v in data.items(): - if not k.startswith('_'): - globals()[k] = v - except Exception: - pass - -# Save user variables (everything that's JSON-serializable and not a -# bootstrap helper) to the state file for the next round. -def _save_state(): - if not _STATE_FILE: - return - out = {} - for k, v in list(globals().items()): - if k.startswith('_') or k in _BOOTSTRAP_NAMES: +def _main_loop(): + _sys.stdout.write(_READY + "\n") + _sys.stdout.flush() + while True: + header = _sys.stdin.readline() + if not header: + return + if not header.startswith(_RUN): continue + round_id = header.rstrip("\n")[len(_RUN):] + code_lines = [] + while True: + line = _sys.stdin.readline() + if not line: + return + if line.rstrip("\n") == _END: + break + code_lines.append(line) + code = "".join(code_lines) try: - _json.dumps(v) - except (TypeError, ValueError): - continue - out[k] = v - try: - with open(_STATE_FILE, 'w') as f: - _json.dump(out, f) - except Exception: - pass + exec(compile(code, f"", "exec"), globals()) + except SystemExit: + _sys.stdout.write(_DONE + round_id + "\n") + _sys.stdout.flush() + return + except BaseException: + tb = _traceback.format_exc() + _sys.stdout.write(_ERR + _json.dumps(tb) + "\n") + _sys.stdout.flush() + _sys.stdout.write(_DONE + round_id + "\n") + _sys.stdout.flush() -_load_state() +_main_loop() "#; -/// Code suffix — appended after user code to save state. -const PYTHON_SUFFIX: &str = r#" -# --- Save state after execution --- -_save_state() -"#; - -impl PythonRuntime { - /// Create a new Python REPL runtime. - pub async fn new() -> Result { - let dir = std::env::temp_dir().join("deepseek_repl"); - std::fs::create_dir_all(&dir) - .map_err(|e| format!("Failed to create REPL temp dir: {e}"))?; - - let state_path = dir.join(format!("state_{}.json", std::process::id())); - - Ok(Self { - state_path, - stdout_limit: DEFAULT_STDOUT_LIMIT, - round_count: 0, - started: Instant::now(), - extra_env: Vec::new(), - }) - } - - /// Create with a specific state path (for testing / RLM integration). - pub fn with_state_path(path: PathBuf) -> Self { - Self { - state_path: path, - stdout_limit: DEFAULT_STDOUT_LIMIT, - round_count: 0, - started: Instant::now(), - extra_env: Vec::new(), - } - } - - /// Set an env var that will be passed to every subsequent `python3` - /// invocation. Used by the RLM driver to inject sidecar URLs. - pub fn set_env(&mut self, key: impl Into, value: impl Into) { - let key = key.into(); - let value = value.into(); - self.extra_env.retain(|(k, _)| k != &key); - self.extra_env.push((key, value)); - } - - /// Execute a block of Python code. - /// - /// Spawns a `python3 -u` process with the bootstrap, the user code, - /// and the suffix, then collects stdout/stderr. - pub async fn execute(&mut self, code: &str) -> Result { - let round_start = Instant::now(); - self.round_count += 1; - - // Build the full script: bootstrap + user code + suffix. - let full_script = format!( - "{}\n\n# --- User code (round {}) ---\ntry:\n{}\nexcept Exception as _repl_err:\n print(f'__REPL_ERROR__::{{_repl_err}}', flush=True)\n\n{}", - PYTHON_BOOTSTRAP, - self.round_count, - indent_code(code, 4), - PYTHON_SUFFIX, - ); - - let output = tokio::time::timeout(ROUND_TIMEOUT, async { - let mut cmd = Command::new("python3"); - cmd.arg("-u") // unbuffered - .arg("-c") - .arg(&full_script) - .env( - "REPL_STATE_FILE", - self.state_path.to_string_lossy().as_ref(), - ); - for (k, v) in &self.extra_env { - cmd.env(k, v); - } - cmd.output() - .await - .map_err(|e| format!("Failed to execute python3: {e}")) - }) - .await - .map_err(|_| { - format!( - "Python REPL round timed out after {}s", - ROUND_TIMEOUT.as_secs() - ) - })??; - - let full_stdout = String::from_utf8_lossy(&output.stdout).to_string(); - let stderr = String::from_utf8_lossy(&output.stderr).to_string(); - let has_error = !output.status.success() || full_stdout.contains("__REPL_ERROR__::"); - - // Parse FINAL markers and clean up protocol lines. - let (display_stdout, final_value) = parse_final(&full_stdout); - let display_stdout = clean_repl_output(&display_stdout); - let display_stdout = truncate_stdout(&display_stdout, self.stdout_limit); - - Ok(ReplRound { - stdout: display_stdout, - full_stdout, - stderr, - has_error, - final_value, - elapsed: round_start.elapsed(), - }) - } - - /// Total rounds executed. - pub fn round_count(&self) -> u64 { - self.round_count - } - - /// Wall-clock uptime. - pub fn uptime(&self) -> Duration { - self.started.elapsed() - } -} - -/// Clean protocol lines (__REPL_LLM_QUERY__, etc.) from stdout. -fn clean_repl_output(raw: &str) -> String { - raw.lines() - .filter(|line| { - !line.starts_with("__REPL_LLM_QUERY__::") - && !line.starts_with("__REPL_FINAL__::") - && !line.starts_with("__REPL_ERROR__::") - && !line.starts_with("__REPL_DONE__") - && !line.starts_with("__REPL_READY__") - }) - .collect::>() - .join("\n") -} - -fn indent_code(code: &str, spaces: usize) -> String { - let indent = " ".repeat(spaces); - code.lines() - .map(|line| { - if line.is_empty() { - String::new() - } else { - format!("{indent}{line}") - } - }) - .collect::>() - .join("\n") -} +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- fn truncate_stdout(stdout: &str, limit: usize) -> String { if stdout.len() <= limit { @@ -398,80 +641,234 @@ fn truncate_stdout(stdout: &str, limit: usize) -> String { } let take = limit.saturating_sub(80); let mut out: String = stdout.chars().take(take).collect(); - let omitted = stdout.len().saturating_sub(take); + let omitted = stdout.len().saturating_sub(out.len()); out.push_str(&format!( "\n\n[... REPL output truncated: {omitted} bytes omitted ...]\n" )); out } +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + #[cfg(test)] mod tests { use super::*; + use std::sync::Arc; + use std::sync::atomic::{AtomicU32, Ordering}; + use tokio::sync::Mutex; - #[tokio::test] - async fn repl_executes_simple_code() { - let mut rt = PythonRuntime::new().await.expect("create runtime"); - let round = rt - .execute("print('hello from repl')") - .await - .expect("execute"); - assert!(round.stdout.contains("hello from repl")); - assert!(!round.has_error); - assert!(round.final_value.is_none()); + /// In-process dispatcher that records what was asked and replies with + /// canned text. Lets tests verify the round-trip without real network. + struct StubBridge { + calls: Arc>>, + canned: Arc, + } + + impl StubBridge { + fn new() -> Self { + Self { + calls: Arc::new(Mutex::new(Vec::new())), + canned: Arc::new(AtomicU32::new(0)), + } + } + } + + impl RpcDispatcher for StubBridge { + fn dispatch<'a>( + &'a self, + req: RpcRequest, + ) -> std::pin::Pin + Send + 'a>> { + Box::pin(async move { + self.calls.lock().await.push(req.clone()); + let n = self.canned.fetch_add(1, Ordering::Relaxed); + match req { + RpcRequest::Llm { prompt, .. } | RpcRequest::Rlm { prompt, .. } => { + RpcResponse::Single(SingleResp { + text: format!("stub#{n}: {prompt}"), + error: None, + }) + } + RpcRequest::LlmBatch { prompts, .. } | RpcRequest::RlmBatch { prompts, .. } => { + let results = prompts + .into_iter() + .enumerate() + .map(|(i, p)| SingleResp { + text: format!("stub#{n}.{i}: {p}"), + error: None, + }) + .collect(); + RpcResponse::Batch(BatchResp { results }) + } + } + }) + } + } + + fn write_temp_context(body: &str) -> std::path::PathBuf { + let dir = std::env::temp_dir().join("deepseek_repl_runtime_tests"); + std::fs::create_dir_all(&dir).unwrap(); + let path = dir.join(format!("ctx_{}_{}.txt", std::process::id(), Uuid::new_v4())); + std::fs::write(&path, body).unwrap(); + path } #[tokio::test] - async fn repl_handles_final() { - let mut rt = PythonRuntime::new().await.expect("create runtime"); + async fn spawns_and_executes_simple_print() { + let mut rt = PythonRuntime::new().await.expect("spawn"); + let round = rt.execute("print('hello world')").await.expect("execute"); + assert!(round.stdout.contains("hello world")); + assert!(!round.has_error); + assert!(round.final_value.is_none()); + assert_eq!(round.rpc_count, 0); + rt.shutdown().await; + } + + #[tokio::test] + async fn variables_persist_across_rounds() { + let mut rt = PythonRuntime::new().await.expect("spawn"); + rt.execute("x = [1, 2, 3]").await.expect("r1"); + rt.execute("x.append(99)").await.expect("r2"); + let round = rt.execute("print(x)").await.expect("r3"); + assert!(round.stdout.contains("[1, 2, 3, 99]")); + rt.shutdown().await; + } + + #[tokio::test] + async fn imports_persist_across_rounds() { + let mut rt = PythonRuntime::new().await.expect("spawn"); + rt.execute("import math").await.expect("r1"); + let round = rt.execute("print(math.pi)").await.expect("r2"); + assert!(round.stdout.contains("3.14")); + rt.shutdown().await; + } + + #[tokio::test] + async fn context_loads_from_file() { + let path = write_temp_context("the quick brown fox"); + let mut rt = PythonRuntime::spawn_with_context(&path) + .await + .expect("spawn"); + let round = rt + .execute("print(len(context), context[:5])") + .await + .expect("execute"); + assert!(round.stdout.contains("19")); + assert!(round.stdout.contains("the q")); + rt.shutdown().await; + } + + #[tokio::test] + async fn ctx_alias_works() { + let path = write_temp_context("aleph-style"); + let mut rt = PythonRuntime::spawn_with_context(&path) + .await + .expect("spawn"); + let round = rt.execute("print(ctx)").await.expect("execute"); + assert!(round.stdout.contains("aleph-style")); + rt.shutdown().await; + } + + #[tokio::test] + async fn final_is_captured() { + let mut rt = PythonRuntime::new().await.expect("spawn"); let round = rt .execute("FINAL('the answer is 42')") .await .expect("execute"); assert_eq!(round.final_value.as_deref(), Some("the answer is 42")); + rt.shutdown().await; } #[tokio::test] - async fn repl_persists_variables_across_rounds() { - let dir = std::env::temp_dir().join("deepseek_repl_test"); - std::fs::create_dir_all(&dir).ok(); - let state_path = dir.join(format!("test_state_{}.json", std::process::id())); - let _ = std::fs::remove_file(&state_path); + async fn final_var_is_captured() { + let mut rt = PythonRuntime::new().await.expect("spawn"); + rt.execute("answer = 'computed'").await.expect("r1"); + let round = rt.execute("FINAL_VAR('answer')").await.expect("r2"); + assert_eq!(round.final_value.as_deref(), Some("computed")); + rt.shutdown().await; + } - let mut rt = PythonRuntime::with_state_path(state_path.clone()); + #[tokio::test] + async fn errors_are_reported_without_killing_runtime() { + let mut rt = PythonRuntime::new().await.expect("spawn"); + let r1 = rt.execute("raise ValueError('boom')").await.expect("r1"); + assert!(r1.has_error); + assert!(r1.full_stdout.contains("boom") || r1.stdout.contains("boom")); + // The runtime is still alive — next round should work. + let r2 = rt.execute("print('still here')").await.expect("r2"); + assert!(r2.stdout.contains("still here")); + rt.shutdown().await; + } - // Round 1: set a variable. - rt.execute("repl_set('count', 41)").await.expect("round 1"); - // Round 2: read it back and increment. + #[tokio::test] + async fn rpc_dispatcher_round_trips_llm_query() { + let bridge = StubBridge::new(); + let calls = Arc::clone(&bridge.calls); + + let mut rt = PythonRuntime::new().await.expect("spawn"); let round = rt - .execute( - "val = repl_get('count', 0); repl_set('count', val + 1); print(f'count={val+1}')", + .run("print(llm_query('hello'))", Some(&bridge)) + .await + .expect("execute"); + assert!( + round.stdout.contains("stub#0: hello"), + "stdout: {:?}", + round.stdout + ); + assert_eq!(round.rpc_count, 1); + + let recorded = calls.lock().await; + assert_eq!(recorded.len(), 1); + match &recorded[0] { + RpcRequest::Llm { prompt, .. } => assert_eq!(prompt, "hello"), + other => panic!("expected Llm request, got {other:?}"), + } + drop(recorded); + rt.shutdown().await; + } + + #[tokio::test] + async fn rpc_dispatcher_round_trips_batch() { + let bridge = StubBridge::new(); + let mut rt = PythonRuntime::new().await.expect("spawn"); + let round = rt + .run( + "outs = llm_query_batched(['a','b','c']); print('|'.join(outs))", + Some(&bridge), ) .await - .expect("round 2"); - assert!(round.stdout.contains("count=42")); + .expect("execute"); + assert!(round.stdout.contains("stub#0.0: a")); + assert!(round.stdout.contains("stub#0.1: b")); + assert!(round.stdout.contains("stub#0.2: c")); + assert_eq!(round.rpc_count, 1); + rt.shutdown().await; + } - // Round 3: verify via FINAL_VAR. - let round = rt.execute("FINAL_VAR('count')").await.expect("round 3"); - assert_eq!(round.final_value.as_deref(), Some("42")); - - let _ = std::fs::remove_file(&state_path); + #[tokio::test] + async fn no_dispatcher_returns_unavailable_sentinel() { + let mut rt = PythonRuntime::new().await.expect("spawn"); + let round = rt.execute("print(llm_query('hi'))").await.expect("execute"); + assert!( + round.stdout.contains("[llm_query error:") || round.stdout.contains("no LLM bridge"), + "stdout: {:?}", + round.stdout + ); + rt.shutdown().await; } #[test] - fn clean_output_removes_protocol_lines() { - let raw = "hello\n__REPL_FINAL__::\"done\"\nworld\n__REPL_LLM_QUERY__::{}"; - let cleaned = clean_repl_output(raw); - assert!(cleaned.contains("hello")); - assert!(cleaned.contains("world")); - assert!(!cleaned.contains("__REPL_FINAL__")); - assert!(!cleaned.contains("__REPL_LLM_QUERY__")); + fn truncate_keeps_short_unchanged() { + assert_eq!(truncate_stdout("hello", 100), "hello"); } #[test] - fn indent_preserves_empty_lines() { - let code = "print(1)\n\nprint(2)"; - let result = indent_code(code, 4); - assert_eq!(result, " print(1)\n\n print(2)"); + fn truncate_clips_long() { + let long = "a".repeat(10_000); + let out = truncate_stdout(&long, 1024); + assert!(out.len() < 1500); + assert!(out.contains("truncated")); } } diff --git a/crates/tui/src/repl/sandbox.rs b/crates/tui/src/repl/sandbox.rs index 4dc6d620..0935e0eb 100644 --- a/crates/tui/src/repl/sandbox.rs +++ b/crates/tui/src/repl/sandbox.rs @@ -1,120 +1,30 @@ -//! REPL sandbox utilities: FINAL/FINAL_VAR parsing, llm_query injection, -//! and the ReplOutput type. +//! REPL fence-extraction utilities. +//! +//! The agent's main loop scans assistant text for ` ```repl ` fenced blocks +//! and feeds them to a [`crate::repl::runtime::PythonRuntime`]. Capturing +//! `FINAL(...)` and routing sub-LLM RPCs are handled inside the runtime via +//! a stdin/stdout protocol — no scraping required here. -/// Output from a REPL execution round. -#[derive(Debug, Clone)] -pub struct ReplOutput { - /// Cleaned stdout (protocol lines removed). - pub stdout: String, - /// Raw stdout including protocol lines. - pub raw_stdout: String, - /// Whether the round had an error. - pub has_error: bool, - /// If FINAL() or FINAL_VAR() was called, the value. - pub final_value: Option, - /// Any llm_query() calls that were detected (prompt, model, max_tokens). - pub llm_queries: Vec, -} - -/// A request from Python's `llm_query()` function. -#[derive(Debug, Clone)] -pub struct LlmQueryRequest { - pub prompt: String, - pub model: Option, - pub max_tokens: Option, -} - -/// Parse a stdout string into a ReplOutput, extracting FINAL markers -/// and cleaning protocol lines. -pub fn parse_final(raw_stdout: &str) -> (String, Option) { - let mut final_value: Option = None; - let mut cleaned = String::new(); - - for line in raw_stdout.lines() { - if let Some(val) = line.strip_prefix("__REPL_FINAL__::") { - // Parse the JSON-encoded final value. - if let Ok(parsed) = serde_json::from_str::(val) { - final_value = Some(parsed); - } else { - // Fallback: use the raw text after the prefix. - final_value = Some(val.to_string()); - } - continue; - } - // Skip other protocol lines. - if line.starts_with("__REPL_LLM_QUERY__::") - || line.starts_with("__REPL_DONE__") - || line.starts_with("__REPL_READY__") - { - continue; - } - cleaned.push_str(line); - cleaned.push('\n'); - } - - (cleaned.trim().to_string(), final_value) -} - -/// Generate the Python code that injects `llm_query()` with a callback -/// mechanism. The function writes a JSON request to stdout, and the Rust -/// side reads it, dispatches the API call, and writes the result back. -/// -/// In practice, the `llm_query()` stub in the bootstrap does this via -/// `print('__REPL_LLM_QUERY__::...')` and we handle the dispatch on the -/// Rust side. For a single round, we pre-compute all llm_query results -/// before executing the code. -pub fn inject_llm_query_fn( - bootstrap: &str, - queries: &[(usize, &str)], // (id, result) -) -> String { - // Replace the stub llm_query with one that returns pre-computed results. - let mock_results: Vec = queries - .iter() - .map(|(id, result)| format!(" {id}: {result:?}")) - .collect(); - let mock_dict = format!("{{\n{}\n}}", mock_results.join(",\n")); - - let override_fn = format!( - r#" -_llm_query_results = {mock_dict} -_llm_query_idx = [0] -def llm_query(prompt, model=None, max_tokens=None): - idx = _llm_query_idx[0] - _llm_query_idx[0] += 1 - result = _llm_query_results.get(idx, f'[llm_query: idx {{idx}} not found]') - return result -"# - ); - - bootstrap.replace( - "def llm_query(prompt, model=None, max_tokens=None):\n return f'[llm_query stub: {str(prompt)[:100]}...]'", - &override_fn, - ) -} - -/// Check if a string contains a ```repl fenced code block. +/// Check if a string contains a `` ```repl `` fenced code block. pub fn has_repl_block(text: &str) -> bool { text.contains("```repl") } -/// Extract all ```repl code blocks from text. -/// Returns a list of (code, start_offset, end_offset). +/// Extract every `` ```repl `` block from `text` with byte offsets. pub fn extract_repl_blocks(text: &str) -> Vec { let mut blocks = Vec::new(); let mut rest = text; while let Some(start_idx) = rest.find("```repl") { let after_fence = &rest[start_idx..]; - // Find the end of the opening fence line. let code_start = after_fence.find('\n').unwrap_or(after_fence.len()); let code_region = &after_fence[code_start..]; - // Find the closing ```. let Some(end_offset) = code_region.find("\n```") else { break; }; let code = code_region[..end_offset].to_string(); let global_start = text.len() - rest.len() + start_idx; - let global_end = global_start + code_start + end_offset + 3; // 3 for "```\n" + let global_end = global_start + code_start + end_offset + 3; blocks.push(ReplBlock { code, start_offset: global_start, @@ -126,7 +36,7 @@ pub fn extract_repl_blocks(text: &str) -> Vec { blocks } -/// A ```repl code block with position info. +/// A `` ```repl `` code block with byte-offset position info. #[derive(Debug, Clone)] pub struct ReplBlock { pub code: String, @@ -134,38 +44,10 @@ pub struct ReplBlock { pub end_offset: usize, } -// --------------------------------------------------------------------------- -// Tests -// --------------------------------------------------------------------------- - #[cfg(test)] mod tests { use super::*; - #[test] - fn parse_final_detects_value() { - let raw = "hello\n__REPL_FINAL__::\"the answer\"\nworld"; - let (cleaned, final_val) = parse_final(raw); - assert_eq!(final_val.as_deref(), Some("the answer")); - assert!(cleaned.contains("hello")); - assert!(!cleaned.contains("__REPL_FINAL__")); - } - - #[test] - fn parse_final_no_final_returns_none() { - let raw = "just some output\nnothing special"; - let (cleaned, final_val) = parse_final(raw); - assert_eq!(final_val, None); - assert_eq!(cleaned, "just some output\nnothing special"); - } - - #[test] - fn parse_final_handles_non_json_value() { - let raw = "__REPL_FINAL__::plain text value"; - let (_, final_val) = parse_final(raw); - assert_eq!(final_val.as_deref(), Some("plain text value")); - } - #[test] fn has_repl_block_detects_fence() { assert!(has_repl_block("some text ```repl\ncode\n``` more")); @@ -195,14 +77,4 @@ mod tests { let blocks = extract_repl_blocks("no blocks here"); assert!(blocks.is_empty()); } - - #[test] - fn inject_llm_query_replaces_stub() { - let bootstrap = "def llm_query(prompt, model=None, max_tokens=None):\n return f'[llm_query stub: {str(prompt)[:100]}...]'"; - let result = inject_llm_query_fn(bootstrap, &[(0, "result0"), (1, "result1")]); - assert!(!result.contains("llm_query stub")); - assert!(result.contains("_llm_query_results")); - assert!(result.contains("result0")); - assert!(result.contains("result1")); - } } diff --git a/crates/tui/src/repl/visibility_fix.txt b/crates/tui/src/repl/visibility_fix.txt deleted file mode 100644 index 66fbcf1a..00000000 --- a/crates/tui/src/repl/visibility_fix.txt +++ /dev/null @@ -1 +0,0 @@ -pub mod repl; \ No newline at end of file diff --git a/crates/tui/src/rlm/bridge.rs b/crates/tui/src/rlm/bridge.rs new file mode 100644 index 00000000..0b8e7754 --- /dev/null +++ b/crates/tui/src/rlm/bridge.rs @@ -0,0 +1,257 @@ +//! RPC bridge that services `llm_query` / `rlm_query` calls coming back +//! from the long-lived Python REPL during an RLM turn. +//! +//! This is the spiritual successor to the HTTP sidecar from earlier +//! versions — except instead of binding a localhost port and routing +//! through `urllib`, requests come in through stdin/stdout and we just +//! call the LLM client directly here in Rust. +//! +//! The bridge tracks cumulative token usage and the recursion budget. For +//! `Rlm` / `RlmBatch` requests it recursively calls `run_rlm_turn_inner` +//! at depth-1; the future-type cycle (bridge → run_rlm_turn_inner → +//! bridge) is broken by `run_rlm_turn_inner` returning a boxed dyn future. + +use std::sync::Arc; +use std::time::Duration; + +use futures_util::future::join_all; +use tokio::sync::Mutex; + +use crate::client::DeepSeekClient; +use crate::llm_client::LlmClient as _; +use crate::models::{ContentBlock, Message, MessageRequest, SystemPrompt, Usage}; +use crate::repl::runtime::{BatchResp, RpcDispatcher, RpcRequest, RpcResponse, SingleResp}; + +/// Per-child completion timeout — same as the previous sidecar default. +const CHILD_TIMEOUT_SECS: u64 = 120; +/// Default `max_tokens` for one-shot child completions. +const DEFAULT_CHILD_MAX_TOKENS: u32 = 4096; +/// Hard cap on prompts per batch RPC. +pub const MAX_BATCH: usize = 16; + +/// State shared with the bridge across all RPC calls in one turn. +pub struct RlmBridge { + pub client: DeepSeekClient, + pub child_model: String, + /// Recursion budget remaining for `Rlm` / `RlmBatch` requests. When + /// zero, those requests fall back to plain `Llm` completions. + pub depth_remaining: u32, + pub usage: Arc>, +} + +impl RlmBridge { + pub fn new(client: DeepSeekClient, child_model: String, depth_remaining: u32) -> Self { + Self { + client, + child_model, + depth_remaining, + usage: Arc::new(Mutex::new(Usage::default())), + } + } + + pub fn usage_handle(&self) -> Arc> { + Arc::clone(&self.usage) + } + + async fn dispatch_llm( + &self, + prompt: String, + model: Option, + max_tokens: Option, + system: Option, + ) -> SingleResp { + let request = MessageRequest { + model: model + .filter(|m| !m.is_empty()) + .unwrap_or_else(|| self.child_model.clone()), + messages: vec![Message { + role: "user".to_string(), + content: vec![ContentBlock::Text { + text: prompt, + cache_control: None, + }], + }], + max_tokens: max_tokens.unwrap_or(DEFAULT_CHILD_MAX_TOKENS), + system: system.map(SystemPrompt::Text), + tools: None, + tool_choice: None, + metadata: None, + thinking: None, + reasoning_effort: None, + stream: Some(false), + temperature: Some(0.4_f32), + top_p: Some(0.9_f32), + }; + + let fut = self.client.create_message(request); + let response = + match tokio::time::timeout(Duration::from_secs(CHILD_TIMEOUT_SECS), fut).await { + Ok(Ok(r)) => r, + Ok(Err(e)) => { + return SingleResp { + text: String::new(), + error: Some(format!("llm_query failed: {e}")), + }; + } + Err(_) => { + return SingleResp { + text: String::new(), + error: Some(format!("llm_query timed out after {CHILD_TIMEOUT_SECS}s")), + }; + } + }; + + let text = response + .content + .iter() + .filter_map(|b| match b { + ContentBlock::Text { text, .. } => Some(text.as_str()), + _ => None, + }) + .collect::>() + .join("\n"); + + { + let mut u = self.usage.lock().await; + u.input_tokens = u.input_tokens.saturating_add(response.usage.input_tokens); + u.output_tokens = u.output_tokens.saturating_add(response.usage.output_tokens); + } + + SingleResp { text, error: None } + } + + async fn dispatch_llm_batch(&self, prompts: Vec, model: Option) -> BatchResp { + if prompts.is_empty() { + return BatchResp { results: vec![] }; + } + if prompts.len() > MAX_BATCH { + return BatchResp { + results: prompts + .iter() + .map(|_| SingleResp { + text: String::new(), + error: Some(format!("batch too large: {} > {MAX_BATCH}", prompts.len())), + }) + .collect(), + }; + } + + let model = Arc::new( + model + .filter(|m| !m.is_empty()) + .unwrap_or_else(|| self.child_model.clone()), + ); + + let futures = prompts.into_iter().map(|prompt| { + let model = Arc::clone(&model); + async move { + self.dispatch_llm((*prompt).to_string(), Some((*model).clone()), None, None) + .await + } + }); + + BatchResp { + results: join_all(futures).await, + } + } + + async fn dispatch_rlm(&self, prompt: String, model: Option) -> SingleResp { + if self.depth_remaining == 0 { + // Budget exhausted — fall back to a one-shot child completion + // rather than returning an error. Matches the paper's behaviour + // ("sub_RLM gracefully degrades to llm_query at depth=0"). + return self.dispatch_llm(prompt, model, None, None).await; + } + + // Build a drain channel to absorb status events from the nested + // turn (we don't surface them; this dispatch is invisible to the + // outer agent stream). + let (tx, mut rx) = tokio::sync::mpsc::channel(64); + let drain = tokio::spawn(async move { while rx.recv().await.is_some() {} }); + + let child_model = model + .filter(|m| !m.is_empty()) + .unwrap_or_else(|| self.child_model.clone()); + + // Recursive call. The dyn-erasure on `run_rlm_turn_inner` breaks + // the `bridge → turn → bridge` opaque-future cycle. + let result = super::turn::run_rlm_turn_inner( + &self.client, + child_model.clone(), + prompt, + None, + child_model, + tx, + self.depth_remaining.saturating_sub(1), + ) + .await; + + drain.abort(); + + { + let mut u = self.usage.lock().await; + u.input_tokens = u.input_tokens.saturating_add(result.usage.input_tokens); + u.output_tokens = u.output_tokens.saturating_add(result.usage.output_tokens); + } + + SingleResp { + text: result.answer, + error: result.error, + } + } + + async fn dispatch_rlm_batch(&self, prompts: Vec, model: Option) -> BatchResp { + if prompts.is_empty() { + return BatchResp { results: vec![] }; + } + if prompts.len() > MAX_BATCH { + return BatchResp { + results: prompts + .iter() + .map(|_| SingleResp { + text: String::new(), + error: Some(format!("batch too large: {} > {MAX_BATCH}", prompts.len())), + }) + .collect(), + }; + } + + let model = Arc::new(model); + let futures = prompts.into_iter().map(|p| { + let model = Arc::clone(&model); + async move { self.dispatch_rlm(p, (*model).clone()).await } + }); + BatchResp { + results: join_all(futures).await, + } + } +} + +impl RpcDispatcher for RlmBridge { + fn dispatch<'a>( + &'a self, + req: RpcRequest, + ) -> std::pin::Pin + Send + 'a>> { + Box::pin(async move { + match req { + RpcRequest::Llm { + prompt, + model, + max_tokens, + system, + } => { + RpcResponse::Single(self.dispatch_llm(prompt, model, max_tokens, system).await) + } + RpcRequest::LlmBatch { prompts, model } => { + RpcResponse::Batch(self.dispatch_llm_batch(prompts, model).await) + } + RpcRequest::Rlm { prompt, model } => { + RpcResponse::Single(self.dispatch_rlm(prompt, model).await) + } + RpcRequest::RlmBatch { prompts, model } => { + RpcResponse::Batch(self.dispatch_rlm_batch(prompts, model).await) + } + } + }) + } +} diff --git a/crates/tui/src/rlm/mod.rs b/crates/tui/src/rlm/mod.rs index ef8caa07..fbd76776 100644 --- a/crates/tui/src/rlm/mod.rs +++ b/crates/tui/src/rlm/mod.rs @@ -1,7 +1,6 @@ -//! True Recursive Language Model (RLM) loop — paper-spec Algorithm 1. +//! Recursive Language Model (RLM) loop — paper-spec Algorithm 1. //! -//! Implements the RLM inference paradigm from Zhang, Kraska, Khattab -//! (arXiv:2512.24601, §2 Algorithm 1): +//! Implements Zhang, Kraska & Khattab (arXiv:2512.24601, §2 Algorithm 1): //! //! ```text //! state ← InitREPL(prompt=P) @@ -15,17 +14,18 @@ //! return state[Final] //! ``` //! -//! Key invariants: -//! - P is stored as a REPL variable, NEVER in the LLM's context window. -//! - Only metadata about state/stdout goes to the LLM — constant-size context. -//! - The LLM generates Python code, not free text. -//! - The REPL exposes `llm_query()` (one-shot child) and `sub_rlm()` (recursive -//! RLM call); both are serviced by an in-process HTTP sidecar so Python can -//! call them synchronously via `urllib`. +//! Invariants: +//! - `P` is held only as a REPL variable (`context` / `ctx`); never +//! appears in the root LLM's window. +//! - The root LLM receives small metadata messages — length, preview, +//! helper list, prior-round summary. +//! - Code rounds and sub-LLM calls travel over a single stdin/stdout +//! pipe to a long-lived `python3 -u` subprocess. No HTTP sidecar. +pub mod bridge; pub mod prompt; -pub mod sidecar; pub mod turn; +pub use bridge::RlmBridge; pub use prompt::rlm_system_prompt; -pub use turn::{RlmTurnResult, run_rlm_turn}; +pub use turn::{RlmTermination, RlmTurnResult, run_rlm_turn, run_rlm_turn_with_root}; diff --git a/crates/tui/src/rlm/prompt.rs b/crates/tui/src/rlm/prompt.rs index 60f1c9e3..ba003cb9 100644 --- a/crates/tui/src/rlm/prompt.rs +++ b/crates/tui/src/rlm/prompt.rs @@ -1,41 +1,33 @@ -//! RLM system prompt — adapted from the reference RLM implementation -//! (alexzhang13/rlm) and Zhang et al., arXiv:2512.24601, so the same -//! decomposition strategies and prompt patterns apply here. +//! RLM system prompt — adapted from the reference implementation +//! (alexzhang13/rlm) and Zhang et al., arXiv:2512.24601. +//! +//! The prompt is deliberately strict: the only way to make progress is +//! through a `repl` block. There is no fall-through prose path. use crate::models::SystemPrompt; -/// Build the system prompt for a Recursive Language Model (RLM) root LLM call. -/// -/// Tells the root model: -/// - your context lives in the REPL as `context` -/// - emit a single ```repl block per turn -/// - reach for `llm_query` / `rlm_query` (and the batched variants) for -/// sub-LLM work; never try to fit the whole context into one call -/// - end the loop with `FINAL(value)` or `FINAL_VAR(name)` +/// Build the system prompt for a Recursive Language Model (RLM) root call. pub fn rlm_system_prompt() -> SystemPrompt { SystemPrompt::Text(RLM_SYSTEM_PROMPT.trim().to_string()) } -const RLM_SYSTEM_PROMPT: &str = r#"You are a Recursive Language Model (RLM). You answer the user's query interactively in a Python REPL that holds the full input as a `context` variable, and you can recursively call sub-LLMs to chunk, decompose, and synthesize answers over it. You will be queried iteratively until you provide a final answer. +const RLM_SYSTEM_PROMPT: &str = r#"You are the root of a Recursive Language Model (RLM). Your input lives in a long-running Python REPL as a variable named `context` (alias `ctx`). You DO NOT see `context` in your prompt — only its length and a short preview. The only way to read or compute over it is to write Python code that runs in the REPL. -The REPL is initialised with: -1. `context` — the full input as a string. May be very large; never print it in full. -2. `llm_query(prompt, model=None, max_tokens=None, system=None)` — one-shot child LLM call. Fast and lightweight; use for chunk-level extraction, summarization, or Q&A. The child can handle very large prompts (~hundreds of thousands of chars). -3. `llm_query_batched(prompts, model=None)` — run many `llm_query` calls concurrently. Returns `list[str]` in input order. Much faster than sequential calls when sub-prompts are independent. -4. `rlm_query(prompt, model=None)` — spawn a recursive RLM sub-call for sub-tasks that themselves need multi-step reasoning, code execution, or their own iteration. Falls back to `llm_query` when the recursion budget is exhausted. -5. `rlm_query_batched(prompts, model=None)` — multiple recursive RLM sub-calls in parallel. -6. `SHOW_VARS()` — list user-created REPL variables and their types. -7. `repl_set(name, value)` / `repl_get(name)` — explicit cross-round persistence (note: any JSON-serializable top-level variable already persists automatically). -8. `print()` — show output. The driver feeds a (truncated) preview back to you. -9. `FINAL(value)` or `FINAL_VAR(name)` — end the loop. Place either on its own line OUTSIDE the ```repl block (preferred) or call as a Python statement INSIDE the block. +The REPL exposes: +- `context` (alias `ctx`) — the full input string. Often huge — never `print(context)` in full. +- `llm_query(prompt, model=None, max_tokens=None, system=None)` — one-shot child LLM. Cheap. Use for chunk-level work. +- `llm_query_batched(prompts, model=None)` — concurrent fan-out. Returns `list[str]` in input order. +- `rlm_query(prompt, model=None)` — recursive sub-RLM. Use when a sub-task itself needs decomposition. +- `rlm_query_batched(prompts, model=None)` — concurrent recursive sub-RLMs. +- `SHOW_VARS()` — list user variables and their types. +- `repl_set(name, value)` / `repl_get(name)` — explicit cross-round storage. +- `print(...)` — diagnostic output. The driver feeds you a truncated preview next round. +- `FINAL(value)` — end the loop with this string answer. +- `FINAL_VAR(name)` — end the loop with the value of a named variable. -How to operate +Variables, imports, and any other state PERSIST across rounds — the REPL is a single long-lived Python process for the whole turn. -Each turn, emit ONE ```repl block of Python. The block runs inside the REPL; printed output and any new variables come back to you next turn. End the loop with `FINAL(...)`. - -When to use `llm_query` vs `rlm_query`: -- `llm_query` for one-shot work: extracting from a chunk, summarizing, classifying, simple Q&A. -- `rlm_query` when the sub-task itself needs decomposition or iteration — i.e. it's RLM-shaped on its own (a long doc → its own chunked summary, a hard sub-question that needs branching). +Contract — every turn, output ONE ` ```repl ` block of Python. That's it. No prose-only turns. No "I will do X" — just emit the code that does X. Strategy patterns @@ -56,7 +48,9 @@ answer = llm_query(f"Synthesize across these section-level extractions:\n\n{comb print(answer[:500]) ``` Then on the next turn: +```repl FINAL(answer) +``` 3. RECURSIVE decomposition for hard sub-problems. ```repl @@ -70,16 +64,16 @@ print(trend, "→", recommendation) import math theta = math.degrees(math.atan2(v_perp, v_parallel)) final_answer = llm_query(f"Entry angle is {theta:.2f}°. Phrase the answer for a physics student.") +FINAL(final_answer) ``` -Then: FINAL(final_answer) Rules -- Emit exactly one ```repl block per turn (or `FINAL(...)` on its own line to end the loop). -- Never print or stuff `context` in its entirety. Slice, sample, or chunk. -- Sub-LLMs are powerful — feed them generous chunks (e.g. tens of thousands of chars) rather than padding through tiny windows. -- JSON-serializable top-level variables persist across rounds automatically; non-serializable ones (custom objects, file handles) do not. -- Do not say "I will do X" — just do it. Output the next ```repl block. +- Emit exactly ONE ` ```repl ` block per turn. The block must contain Python code only. +- Never `print(context)` or otherwise dump it whole — slice, sample, or chunk. +- You MUST call `llm_query` / `llm_query_batched` / `rlm_query` at least once before `FINAL(...)`. Calling FINAL from a top-level prose answer (without ever running a `repl` block that touched `context` via a sub-LLM) is REJECTED — the driver will discard the FINAL and ask you to actually use the REPL. +- Sub-LLMs are powerful — feed them generous chunks (tens of thousands of chars), not tiny windows. +- Do NOT pad your output with prose like "Here is what I'll do:" — just emit the next ```repl block. "#; #[cfg(test)] @@ -108,6 +102,11 @@ mod tests { assert!(body().contains("`context`")); } + #[test] + fn rlm_prompt_mentions_ctx_alias() { + assert!(body().contains("`ctx`")); + } + #[test] fn rlm_prompt_mentions_all_helpers() { let s = body(); @@ -125,9 +124,13 @@ mod tests { } #[test] - fn rlm_prompt_does_not_promise_plaintext_exit_loophole() { - // The old prompt had "just write a short response without code fences - // and the RLM loop will end". Make sure that's gone. - assert!(!body().contains("without code fences and the RLM loop")); + fn rlm_prompt_forbids_prose_shortcut() { + // The new contract requires a sub-LLM call before FINAL — the + // prompt must say so explicitly so the model doesn't try to bail + // with FINAL("...inferred from preview..."). + assert!( + body().contains("REJECTED") || body().contains("rejected"), + "system prompt should reject the prose-shortcut path explicitly" + ); } } diff --git a/crates/tui/src/rlm/sidecar.rs b/crates/tui/src/rlm/sidecar.rs deleted file mode 100644 index 6711cea8..00000000 --- a/crates/tui/src/rlm/sidecar.rs +++ /dev/null @@ -1,485 +0,0 @@ -//! HTTP sidecar that services `llm_query()` and `sub_rlm()` calls made -//! from inside the RLM Python REPL. -//! -//! Why HTTP? The Python REPL runs as a short-lived `python3 -c` subprocess -//! per round. We need synchronous request/response between Python (running) -//! and Rust (servicing the request) — and we need it to work for arbitrary -//! recursion depth. A localhost HTTP server with axum is the cleanest fit: -//! Python calls `urllib.request.urlopen(...)` and blocks until Rust returns -//! the LLM completion. No long-lived process, no FIFO/pipe gymnastics. -//! -//! The sidecar binds to `127.0.0.1:0` (kernel-assigned port), runs for the -//! lifetime of one root `run_rlm_turn`, and is aborted on return. -//! -//! Endpoints: -//! - `POST /llm` — one-shot child completion via the configured `child_model`. -//! - `POST /rlm` — full recursive RLM turn at depth-1 (paper's `sub_RLM`). -//! -//! Cumulative token usage is tracked in the shared [`SidecarCtx`] so the -//! parent turn can fold it into its own [`Usage`]. - -use std::net::SocketAddr; -use std::sync::Arc; - -use axum::Json; -use axum::Router; -use axum::extract::State; -use axum::routing::post; -use serde::{Deserialize, Serialize}; -use tokio::net::TcpListener; -use tokio::sync::Mutex; -use tokio::task::JoinHandle; - -use crate::client::DeepSeekClient; -use crate::llm_client::LlmClient as _; -use crate::models::{ContentBlock, Message, MessageRequest, Usage}; - -/// Default per-child request timeout — mirrors `tools/rlm_query.rs`. -const CHILD_TIMEOUT_SECS: u64 = 120; -/// Default `max_tokens` for one-shot child completions. -const DEFAULT_CHILD_MAX_TOKENS: u32 = 4096; - -/// Shared state for the sidecar — the LLM client, the child model name, -/// the recursion budget, and a usage accumulator. -pub struct SidecarCtx { - pub client: DeepSeekClient, - pub child_model: String, - /// Recursion budget remaining for `/rlm` calls. `0` means "no further - /// recursion" — `/rlm` will return an error. - pub depth_remaining: u32, - /// Cumulative usage across all sidecar-served calls in this turn. - pub usage: Mutex, -} - -impl SidecarCtx { - pub fn new(client: DeepSeekClient, child_model: String, depth_remaining: u32) -> Arc { - Arc::new(Self { - client, - child_model, - depth_remaining, - usage: Mutex::new(Usage::default()), - }) - } -} - -#[derive(Deserialize)] -struct LlmReq { - prompt: String, - #[serde(default)] - model: Option, - #[serde(default)] - max_tokens: Option, - #[serde(default)] - system: Option, -} - -#[derive(Serialize)] -struct LlmResp { - text: String, - #[serde(skip_serializing_if = "Option::is_none")] - error: Option, -} - -async fn llm_handler(State(ctx): State>, Json(req): Json) -> Json { - let model = req - .model - .filter(|m| !m.is_empty()) - .unwrap_or_else(|| ctx.child_model.clone()); - let max_tokens = req.max_tokens.unwrap_or(DEFAULT_CHILD_MAX_TOKENS); - - let request = MessageRequest { - model, - messages: vec![Message { - role: "user".to_string(), - content: vec![ContentBlock::Text { - text: req.prompt, - cache_control: None, - }], - }], - max_tokens, - system: req.system.map(crate::models::SystemPrompt::Text), - tools: None, - tool_choice: None, - metadata: None, - thinking: None, - reasoning_effort: None, - stream: Some(false), - temperature: Some(0.4_f32), - top_p: Some(0.9_f32), - }; - - let fut = ctx.client.create_message(request); - let response = - match tokio::time::timeout(std::time::Duration::from_secs(CHILD_TIMEOUT_SECS), fut).await { - Ok(Ok(r)) => r, - Ok(Err(e)) => { - return Json(LlmResp { - text: String::new(), - error: Some(format!("llm_query failed: {e}")), - }); - } - Err(_) => { - return Json(LlmResp { - text: String::new(), - error: Some(format!("llm_query timed out after {CHILD_TIMEOUT_SECS}s")), - }); - } - }; - - let text = response - .content - .iter() - .filter_map(|b| match b { - ContentBlock::Text { text, .. } => Some(text.as_str()), - _ => None, - }) - .collect::>() - .join("\n"); - - { - let mut u = ctx.usage.lock().await; - u.input_tokens = u.input_tokens.saturating_add(response.usage.input_tokens); - u.output_tokens = u.output_tokens.saturating_add(response.usage.output_tokens); - } - - Json(LlmResp { text, error: None }) -} - -#[derive(Deserialize)] -struct SubRlmReq { - prompt: String, -} - -async fn sub_rlm_handler( - State(ctx): State>, - Json(req): Json, -) -> Json { - if ctx.depth_remaining == 0 { - return Json(LlmResp { - text: String::new(), - error: Some( - "sub_rlm: recursion depth budget exhausted (configure /rlm with deeper budget)" - .to_string(), - ), - }); - } - - // Sub-RLM uses the child_model as its own root model — paper's pattern - // is to run sub_RLM with a smaller model, and to also use child_model - // for any further `llm_query` calls inside the sub-turn. - let (tx, mut rx) = tokio::sync::mpsc::channel(64); - let drain = tokio::spawn(async move { while rx.recv().await.is_some() {} }); - - // The recursive future-type cycle here - // (sub_rlm_handler → run_rlm_turn_inner → start_sidecar → sub_rlm_handler) - // is broken by `run_rlm_turn_inner` returning a concrete - // `Pin>` rather than `impl Future`. - let result = super::turn::run_rlm_turn_inner( - &ctx.client, - ctx.child_model.clone(), - req.prompt, - None, - ctx.child_model.clone(), - tx, - ctx.depth_remaining.saturating_sub(1), - ) - .await; - - drain.abort(); - - { - let mut u = ctx.usage.lock().await; - u.input_tokens = u.input_tokens.saturating_add(result.usage.input_tokens); - u.output_tokens = u.output_tokens.saturating_add(result.usage.output_tokens); - } - - Json(LlmResp { - text: result.answer, - error: result.error, - }) -} - -// --------------------------------------------------------------------------- -// Batch endpoints -// --------------------------------------------------------------------------- - -/// Hard cap on prompts per batch request. Mirrors `tools/rlm_query.rs`. -const MAX_BATCH: usize = 16; - -#[derive(Deserialize)] -struct BatchReq { - prompts: Vec, - #[serde(default)] - model: Option, - #[serde(default)] - max_tokens: Option, - #[serde(default)] - system: Option, -} - -#[derive(Serialize)] -struct BatchResp { - results: Vec, -} - -async fn llm_batch_handler( - State(ctx): State>, - Json(req): Json, -) -> Json { - if req.prompts.is_empty() { - return Json(BatchResp { results: vec![] }); - } - if req.prompts.len() > MAX_BATCH { - return Json(BatchResp { - results: req - .prompts - .iter() - .map(|_| LlmResp { - text: String::new(), - error: Some(format!( - "batch too large: {} > {MAX_BATCH}", - req.prompts.len() - )), - }) - .collect(), - }); - } - - let model = req - .model - .filter(|m| !m.is_empty()) - .unwrap_or_else(|| ctx.child_model.clone()); - let max_tokens = req.max_tokens.unwrap_or(DEFAULT_CHILD_MAX_TOKENS); - let system = req.system; - - let futures = req.prompts.into_iter().map(|prompt| { - let client = ctx.client.clone(); - let model = model.clone(); - let system = system.clone(); - async move { - let request = MessageRequest { - model, - messages: vec![Message { - role: "user".to_string(), - content: vec![ContentBlock::Text { - text: prompt, - cache_control: None, - }], - }], - max_tokens, - system: system.map(crate::models::SystemPrompt::Text), - tools: None, - tool_choice: None, - metadata: None, - thinking: None, - reasoning_effort: None, - stream: Some(false), - temperature: Some(0.4_f32), - top_p: Some(0.9_f32), - }; - let fut = client.create_message(request); - tokio::time::timeout(std::time::Duration::from_secs(CHILD_TIMEOUT_SECS), fut).await - } - }); - - let outcomes = futures_util::future::join_all(futures).await; - - let mut results = Vec::with_capacity(outcomes.len()); - let mut total_input = 0_u32; - let mut total_output = 0_u32; - - for outcome in outcomes { - match outcome { - Ok(Ok(resp)) => { - let text = resp - .content - .iter() - .filter_map(|b| match b { - ContentBlock::Text { text, .. } => Some(text.as_str()), - _ => None, - }) - .collect::>() - .join("\n"); - total_input = total_input.saturating_add(resp.usage.input_tokens); - total_output = total_output.saturating_add(resp.usage.output_tokens); - results.push(LlmResp { text, error: None }); - } - Ok(Err(e)) => results.push(LlmResp { - text: String::new(), - error: Some(format!("{e}")), - }), - Err(_) => results.push(LlmResp { - text: String::new(), - error: Some(format!("timed out after {CHILD_TIMEOUT_SECS}s")), - }), - } - } - - { - let mut u = ctx.usage.lock().await; - u.input_tokens = u.input_tokens.saturating_add(total_input); - u.output_tokens = u.output_tokens.saturating_add(total_output); - } - - Json(BatchResp { results }) -} - -#[derive(Deserialize)] -struct RlmBatchReq { - prompts: Vec, -} - -async fn rlm_batch_handler( - State(ctx): State>, - Json(req): Json, -) -> Json { - if req.prompts.is_empty() { - return Json(BatchResp { results: vec![] }); - } - if req.prompts.len() > MAX_BATCH { - return Json(BatchResp { - results: req - .prompts - .iter() - .map(|_| LlmResp { - text: String::new(), - error: Some(format!( - "batch too large: {} > {MAX_BATCH}", - req.prompts.len() - )), - }) - .collect(), - }); - } - if ctx.depth_remaining == 0 { - return Json(BatchResp { - results: req - .prompts - .iter() - .map(|_| LlmResp { - text: String::new(), - error: Some("rlm_query_batched: recursion budget exhausted".to_string()), - }) - .collect(), - }); - } - - let futures = req.prompts.into_iter().map(|prompt| { - let client = ctx.client.clone(); - let child_model = ctx.child_model.clone(); - let depth = ctx.depth_remaining.saturating_sub(1); - async move { - let (tx, mut rx) = tokio::sync::mpsc::channel(64); - let drain = tokio::spawn(async move { while rx.recv().await.is_some() {} }); - // Same dyn-erasure pattern as sub_rlm_handler — break the recursive - // future-type cycle through the boxed dyn return of run_rlm_turn_inner. - let result = super::turn::run_rlm_turn_inner( - &client, - child_model.clone(), - prompt, - None, - child_model, - tx, - depth, - ) - .await; - drain.abort(); - result - } - }); - - let results_raw = futures_util::future::join_all(futures).await; - - let mut results = Vec::with_capacity(results_raw.len()); - let mut total_input = 0_u32; - let mut total_output = 0_u32; - - for result in results_raw { - total_input = total_input.saturating_add(result.usage.input_tokens); - total_output = total_output.saturating_add(result.usage.output_tokens); - results.push(LlmResp { - text: result.answer, - error: result.error, - }); - } - - { - let mut u = ctx.usage.lock().await; - u.input_tokens = u.input_tokens.saturating_add(total_input); - u.output_tokens = u.output_tokens.saturating_add(total_output); - } - - Json(BatchResp { results }) -} - -// --------------------------------------------------------------------------- -// Handle / start -// --------------------------------------------------------------------------- - -/// Result of starting the sidecar — the bound socket address and the task -/// handle. Drop or abort the handle to stop the server. -pub struct SidecarHandle { - pub addr: SocketAddr, - task: JoinHandle<()>, -} - -impl SidecarHandle { - pub fn llm_url(&self) -> String { - format!("http://{}/llm", self.addr) - } - pub fn llm_batch_url(&self) -> String { - format!("http://{}/llm_batch", self.addr) - } - pub fn rlm_url(&self) -> String { - format!("http://{}/rlm", self.addr) - } - pub fn rlm_batch_url(&self) -> String { - format!("http://{}/rlm_batch", self.addr) - } - pub fn shutdown(self) { - self.task.abort(); - } -} - -/// Bind a sidecar on `127.0.0.1` with a kernel-assigned port and start -/// serving in a background task. -pub async fn start_sidecar(ctx: Arc) -> std::io::Result { - let listener = TcpListener::bind("127.0.0.1:0").await?; - let addr = listener.local_addr()?; - let app = Router::new() - .route("/llm", post(llm_handler)) - .route("/llm_batch", post(llm_batch_handler)) - .route("/rlm", post(sub_rlm_handler)) - .route("/rlm_batch", post(rlm_batch_handler)) - .with_state(ctx); - let task = tokio::spawn(async move { - let _ = axum::serve(listener, app).await; - }); - Ok(SidecarHandle { addr, task }) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn llm_resp_skips_none_error() { - let r = LlmResp { - text: "hello".to_string(), - error: None, - }; - let s = serde_json::to_string(&r).unwrap(); - assert!(!s.contains("error")); - assert!(s.contains("hello")); - } - - #[test] - fn llm_resp_includes_error_when_set() { - let r = LlmResp { - text: String::new(), - error: Some("boom".to_string()), - }; - let s = serde_json::to_string(&r).unwrap(); - assert!(s.contains("boom")); - } -} diff --git a/crates/tui/src/rlm/turn.rs b/crates/tui/src/rlm/turn.rs index 2ac599c2..08d59818 100644 --- a/crates/tui/src/rlm/turn.rs +++ b/crates/tui/src/rlm/turn.rs @@ -1,41 +1,20 @@ -//! True RLM turn loop — Algorithm 1 from Zhang et al. (arXiv:2512.24601). -//! -//! # Algorithm -//! -//! ```text -//! state ← InitREPL(prompt=P) -//! state ← AddFunction(state, sub_RLM) -//! hist ← [Metadata(state)] -//! while True: -//! code ← LLM(hist) -//! (state, stdout) ← REPL(state, code) -//! hist ← hist ∥ code ∥ Metadata(stdout) -//! if state[Final] is set: -//! return state[Final] -//! ``` -//! -//! Key invariants: -//! 1. P is stored as `PROMPT` in the REPL — NEVER in the LLM context. -//! 2. Only metadata (length, preview, variable names) goes to LLM context. -//! 3. The LLM writes Python code, executed by the REPL. -//! 4. The REPL exposes `llm_query()` (one-shot child) and `sub_rlm()` -//! (recursive RLM call), both serviced by an in-process HTTP sidecar. +//! RLM turn loop — paper Algorithm 1 driven over a long-lived Python +//! subprocess + stdin/stdout RPC bridge (no HTTP sidecar). -use std::sync::Arc; +use std::path::PathBuf; use std::time::{Duration, Instant}; -use serde_json::json; use tokio::sync::mpsc; +use uuid::Uuid; use crate::client::DeepSeekClient; use crate::core::events::Event; use crate::llm_client::LlmClient; use crate::models::{ContentBlock, Message, MessageRequest, Usage}; -use crate::repl::runtime::PythonRuntime; -use crate::repl::sandbox::parse_final; +use crate::repl::PythonRuntime; +use super::bridge::RlmBridge; use super::prompt::rlm_system_prompt; -use super::sidecar::{SidecarCtx, start_sidecar}; // --------------------------------------------------------------------------- // Constants @@ -43,28 +22,22 @@ use super::sidecar::{SidecarCtx, start_sidecar}; /// Maximum number of RLM iterations before the loop gives up. const MAX_RLM_ITERATIONS: u32 = 25; - -/// Max consecutive rounds where the model returns no `python` fence before -/// we give up. The paper requires `code → REPL → Final`; a chatty round is -/// tolerated once but not indefinitely. -const MAX_CONSECUTIVE_NO_CODE: u32 = 2; - -/// Max output tokens for the root LLM — just needs to generate code, not -/// the full answer. +/// Max consecutive rounds where the model returns no `repl` fence before we +/// hard-fail. The paper requires `code → REPL → Final`; anything else is +/// not the RLM contract. +const MAX_CONSECUTIVE_NO_CODE: u32 = 3; +/// Max output tokens for the root LLM — it just needs to generate code. const ROOT_MAX_TOKENS: u32 = 4096; - /// Max chars of stdout shown as metadata to the root LLM in next iteration. -/// Matches the paper's "only metadata about stdout" constraint. const STDOUT_METADATA_PREVIEW_LEN: usize = 800; - -/// Max chars of PROMPT shown as preview in metadata. +/// Max chars of `context` shown as a preview in the metadata. const PROMPT_PREVIEW_LEN: usize = 500; - -/// Temperature for root LLM calls. Low to keep code generation focused. +/// Temperature for root LLM calls. const ROOT_TEMPERATURE: f32 = 0.3; - -/// Per-iteration timeout for the entire LLM+REPL round (whole-turn cap). -const ROUND_TIMEOUT: Duration = Duration::from_secs(180); +/// Hard wall-clock cap on a whole RLM turn. +const TURN_TIMEOUT: Duration = Duration::from_secs(180); +/// Bound on conversation history we keep across iterations. +const MAX_HISTORY_MESSAGES: usize = 20; // --------------------------------------------------------------------------- // Public API @@ -73,41 +46,50 @@ const ROUND_TIMEOUT: Duration = Duration::from_secs(180); /// How an RLM turn ended. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum RlmTermination { - /// `FINAL(value)` was called inside the REPL. + /// `FINAL(value)` was called inside the REPL or `FINAL(...)` appeared + /// at the top of the model's response on its own line. Final, - /// The model emitted a non-code answer at the top of the loop. Only - /// possible when strict mode is disabled (currently always strict). - DirectAnswer, + /// The model failed to emit a `repl` block for too many rounds in a + /// row. The accumulated last response text is surfaced as the answer + /// rather than being thrown away. + NoCode, /// Iteration cap reached without `FINAL`. Exhausted, - /// Hard error (LLM call failed, REPL crashed, timeout, …). + /// Hard error — LLM call failed, REPL crashed, timeout. Error, } +/// Per-round trace entry. Surfaced in the tool result so the user can see +/// exactly what the sub-agent did. +#[derive(Debug, Clone)] +pub struct RlmRoundTrace { + pub round: u32, + pub code_summary: String, + pub stdout_preview: String, + pub had_error: bool, + pub rpc_count: u32, + pub elapsed_ms: u64, +} + /// Result of an RLM turn. #[derive(Debug, Clone)] pub struct RlmTurnResult { - /// The final answer (from FINAL(), or empty on error/exhaustion). pub answer: String, - /// Number of iterations used. pub iterations: u32, - /// Total wall-clock duration. pub duration: Duration, - /// Error message if the turn failed. pub error: Option, - /// Usage from the root LLM calls + sidecar-served sub-LLM calls. pub usage: Usage, - /// How the loop ended. pub termination: RlmTermination, + /// Per-round trace. Empty when the loop never reached the REPL. + pub trace: Vec, + /// Total sub-LLM RPCs made by the sub-agent (sum of `rpc_count` across + /// rounds). Useful for verifying that the model engaged with `context` + /// rather than answering directly. + pub total_rpcs: u32, } -/// Run a full RLM turn (paper Algorithm 1). -/// -/// `prompt` is loaded into the REPL as the `context` variable and never -/// enters the root LLM's window. `root_prompt` (optional) is a small -/// instruction shown to the root LLM each iteration — typically the -/// model's task ("summarize the security model"). `max_depth` controls -/// how many levels of `rlm_query()` recursion the sub-agent may use. +/// Run a full RLM turn. `prompt` is loaded into the REPL as `context`; it +/// never enters the root LLM's window. pub async fn run_rlm_turn( client: &DeepSeekClient, model: String, @@ -128,8 +110,8 @@ pub async fn run_rlm_turn( .await } -/// Variant that lets the caller pass a small `root_prompt` shown to the -/// root LLM each iteration. The big `prompt` still lives only in the REPL. +/// Variant that also passes a small `root_prompt` (the user-facing task) +/// shown to the root LLM each iteration so it remembers its objective. pub async fn run_rlm_turn_with_root( client: &DeepSeekClient, model: String, @@ -151,12 +133,9 @@ pub async fn run_rlm_turn_with_root( .await } -/// Inner entry point — also used by the sidecar's `/rlm` handler when it -/// recurses. Decrements `max_depth` for nested calls. -/// -/// Returns an explicit boxed-trait-object future to break the recursive -/// opaque-type cycle: -/// `run_rlm_turn_inner` → `start_sidecar` → `sub_rlm_handler` → `run_rlm_turn_inner`. +/// Inner entry point — also used by the bridge when it recurses. Returns +/// a boxed future to break the recursive opaque-future-type cycle: +/// `run_rlm_turn_inner` → `RlmBridge::dispatch` → `run_rlm_turn_inner`. pub(crate) fn run_rlm_turn_inner<'a>( client: &'a DeepSeekClient, model: String, @@ -166,7 +145,7 @@ pub(crate) fn run_rlm_turn_inner<'a>( tx_event: mpsc::Sender, max_depth: u32, ) -> std::pin::Pin + Send + 'a>> { - Box::pin(run_rlm_turn_inner_impl( + Box::pin(run_rlm_turn_impl( client, model, prompt, @@ -177,7 +156,11 @@ pub(crate) fn run_rlm_turn_inner<'a>( )) } -async fn run_rlm_turn_inner_impl( +// --------------------------------------------------------------------------- +// Implementation +// --------------------------------------------------------------------------- + +async fn run_rlm_turn_impl( client: &DeepSeekClient, model: String, prompt: String, @@ -188,92 +171,85 @@ async fn run_rlm_turn_inner_impl( ) -> RlmTurnResult { let start = Instant::now(); let mut total_usage = Usage::default(); + let mut trace: Vec = Vec::new(); + let mut total_rpcs: u32 = 0; - // ------------------------------------------------------------------ - // 0. Start the HTTP sidecar that services llm_query() / sub_rlm() - // from inside the Python REPL. Lives for the duration of this turn. - // ------------------------------------------------------------------ - let sidecar_ctx = SidecarCtx::new(client.clone(), child_model.clone(), max_depth); - let sidecar = match start_sidecar(Arc::clone(&sidecar_ctx)).await { - Ok(h) => h, + // 1. Stage `context` to a temp file. The REPL reads it on bootstrap so + // the big string never enters the process command line and doesn't + // show up in `ps`. + let ctx_path = match write_context_file(&prompt) { + Ok(p) => p, Err(e) => { return RlmTurnResult { answer: String::new(), iterations: 0, duration: start.elapsed(), - error: Some(format!("Failed to start RLM sidecar: {e}")), + error: Some(format!("rlm: failed to stage context: {e}")), usage: total_usage, termination: RlmTermination::Error, + trace, + total_rpcs, }; } }; - let llm_url = sidecar.llm_url(); - let llm_batch_url = sidecar.llm_batch_url(); - let rlm_url = sidecar.rlm_url(); - let rlm_batch_url = sidecar.rlm_batch_url(); - // ------------------------------------------------------------------ - // 1. Initialise REPL with `context` variable - // ------------------------------------------------------------------ - let state_dir = std::env::temp_dir().join("deepseek_rlm"); - let _ = std::fs::create_dir_all(&state_dir); - let state_path = state_dir.join(format!("rlm_{}.json", uuid::Uuid::new_v4())); + // 2. Spawn the long-lived REPL. + let mut repl = match PythonRuntime::spawn_with_context(&ctx_path).await { + Ok(rt) => rt, + Err(e) => { + let _ = tokio::fs::remove_file(&ctx_path).await; + return RlmTurnResult { + answer: String::new(), + iterations: 0, + duration: start.elapsed(), + error: Some(format!("rlm: failed to spawn REPL: {e}")), + usage: total_usage, + termination: RlmTermination::Error, + trace, + total_rpcs, + }; + } + }; - // Write `context` into the REPL state before the REPL even starts. - // Matches the reference (alexzhang13/rlm) variable name. - let initial_vars = json!({"context": &prompt}); - if let Err(e) = std::fs::write(&state_path, serde_json::to_string(&initial_vars).unwrap()) { - sidecar.shutdown(); - return RlmTurnResult { - answer: String::new(), - iterations: 0, - duration: start.elapsed(), - error: Some(format!("Failed to write REPL state: {e}")), - usage: total_usage, - termination: RlmTermination::Error, - }; - } - - let mut repl = PythonRuntime::with_state_path(state_path.clone()); - repl.set_env("REPL_LLM_URL", &llm_url); - repl.set_env("REPL_LLM_BATCH_URL", &llm_batch_url); - repl.set_env("REPL_RLM_URL", &rlm_url); - repl.set_env("REPL_RLM_BATCH_URL", &rlm_batch_url); + // 3. Build the bridge that services llm_query / rlm_query RPCs. + let bridge = RlmBridge::new(client.clone(), child_model.clone(), max_depth); + let usage_handle = bridge.usage_handle(); let _ = tx_event .send(Event::status(format!( - "RLM turn started — root={model}, child={child_model}, max_depth={max_depth}" + "RLM: spawned Python REPL (root={model}, child={child_model}, max_depth={max_depth}, ctx={} chars)", + prompt.chars().count() ))) .await; - // ------------------------------------------------------------------ - // 2. Build metadata-only conversation history - // ------------------------------------------------------------------ + // 4. Build initial metadata-only history. let system = rlm_system_prompt(); - let metadata_msg = - build_metadata_message(&prompt, root_prompt.as_deref(), 0, None, None, &state_path); + let mut messages: Vec = vec![build_metadata_message( + &prompt, + root_prompt.as_deref(), + 0, + None, + None, + )]; - let mut messages: Vec = vec![metadata_msg]; - - // Track consecutive no-code rounds for strict-mode termination. let mut consecutive_no_code: u32 = 0; + let mut last_response_text = String::new(); - // ------------------------------------------------------------------ - // 3. RLM loop (Algorithm 1) - // ------------------------------------------------------------------ let result = 'turn: { for iteration in 0..MAX_RLM_ITERATIONS { - if start.elapsed() > ROUND_TIMEOUT { + if start.elapsed() > TURN_TIMEOUT { break 'turn RlmTurnResult { answer: String::new(), iterations: iteration, duration: start.elapsed(), error: Some(format!( "RLM turn timed out after {}s", - ROUND_TIMEOUT.as_secs() + TURN_TIMEOUT.as_secs() )), usage: total_usage, termination: RlmTermination::Error, + trace: trace.clone(), + total_rpcs, }; } @@ -285,7 +261,7 @@ async fn run_rlm_turn_inner_impl( ))) .await; - // 3a. LLM generates code from metadata-only context + // 4a. Root LLM generates code from metadata-only context. let request = MessageRequest { model: model.clone(), messages: messages.clone(), @@ -311,11 +287,12 @@ async fn run_rlm_turn_inner_impl( error: Some(format!("Root LLM call failed: {e}")), usage: total_usage, termination: RlmTermination::Error, + trace: trace.clone(), + total_rpcs, }; } }; - // Accumulate root usage total_usage.input_tokens = total_usage .input_tokens .saturating_add(response.usage.input_tokens); @@ -324,19 +301,51 @@ async fn run_rlm_turn_inner_impl( .saturating_add(response.usage.output_tokens); let response_text = extract_text_blocks(&response.content); + last_response_text = response_text.clone(); - let _ = tx_event - .send(Event::MessageDelta { - index: iteration as usize, - content: format!("\n[RLM iteration {}]\n", iteration + 1), - }) - .await; - - // 3b. Check for a text-level FINAL(...) / FINAL_VAR(...) in the - // model's raw response. The reference RLM allows the model to - // close out by writing `FINAL(my answer)` directly in its - // message, without going through a ```repl block. + // 4b. Top-level FINAL(...) lets the model close out without + // touching the REPL — but only if it has done some work + // (non-zero rpc_count) on a prior round. Otherwise it's a + // shortcut and we reject it. if let Some(final_val) = parse_text_final(&response_text) { + if total_rpcs == 0 { + // Discard the top-level FINAL — the model is bypassing + // the loop. Force it to use the REPL by appending a + // strict reminder. + consecutive_no_code = consecutive_no_code.saturating_add(1); + if consecutive_no_code >= MAX_CONSECUTIVE_NO_CODE { + break 'turn RlmTurnResult { + answer: final_val, + iterations: iteration + 1, + duration: start.elapsed(), + error: None, + usage: total_usage, + termination: RlmTermination::NoCode, + trace: trace.clone(), + total_rpcs, + }; + } + messages.push(Message { + role: "assistant".to_string(), + content: vec![ContentBlock::Text { + text: response_text.clone(), + cache_control: None, + }], + }); + messages.push(Message { + role: "user".to_string(), + content: vec![ContentBlock::Text { + text: "You called FINAL(...) without ever running a ```repl block. \ + That defeats the recursive language model — you're guessing \ + from the preview alone. Emit a ```repl block now that uses \ + `llm_query`, `llm_query_batched`, or `rlm_query` against \ + `context` to actually compute the answer." + .to_string(), + cache_control: None, + }], + }); + continue; + } let _ = tx_event .send(Event::status( "RLM: FINAL detected in response text".to_string(), @@ -349,13 +358,13 @@ async fn run_rlm_turn_inner_impl( error: None, usage: total_usage, termination: RlmTermination::Final, + trace: trace.clone(), + total_rpcs, }; } - // 3c. Extract a ```repl block. Match the reference RLM's - // language identifier so the same prompts/examples work here. + // 4c. Extract a ```repl block. let code = extract_repl_code(&response_text); - let code_to_run = match code { Some(c) => { consecutive_no_code = 0; @@ -364,24 +373,19 @@ async fn run_rlm_turn_inner_impl( None => { consecutive_no_code = consecutive_no_code.saturating_add(1); if consecutive_no_code >= MAX_CONSECUTIVE_NO_CODE { - // Give up — surface the model's text as a degraded - // direct answer rather than throwing the turn away. - let _ = tx_event - .send(Event::MessageDelta { - index: iteration as usize, - content: response_text.clone(), - }) - .await; break 'turn RlmTurnResult { answer: response_text, iterations: iteration + 1, duration: start.elapsed(), - error: None, + error: Some(format!( + "RLM: model failed to emit ```repl after {MAX_CONSECUTIVE_NO_CODE} consecutive rounds" + )), usage: total_usage, - termination: RlmTermination::DirectAnswer, + termination: RlmTermination::NoCode, + trace: trace.clone(), + total_rpcs, }; } - // Append a reminder and retry. messages.push(Message { role: "assistant".to_string(), content: vec![ContentBlock::Text { @@ -392,7 +396,10 @@ async fn run_rlm_turn_inner_impl( messages.push(Message { role: "user".to_string(), content: vec![ContentBlock::Text { - text: "Reminder: emit Python inside a ```repl … ``` fence (or write FINAL(value) on its own line) when you have the answer. Reply with a ```repl block now.".to_string(), + text: "Reminder: emit Python inside a ```repl … ``` fence. \ + Use `llm_query` / `llm_query_batched` / `rlm_query` to \ + process `context` and call `FINAL(value)` when done." + .to_string(), cache_control: None, }], }); @@ -403,17 +410,18 @@ async fn run_rlm_turn_inner_impl( let _ = tx_event .send(Event::MessageDelta { index: iteration as usize, - content: format!("```repl\n{code_to_run}\n```\n"), + content: format!( + "\n[RLM round {} — code]\n```repl\n{code_to_run}\n```\n", + iteration + 1 + ), }) .await; - // 3c. Execute code in REPL - let round = match repl.execute(&code_to_run).await { + // 4d. Execute the code in the REPL with the bridge servicing + // llm_query / rlm_query callbacks. + let round = match repl.run(&code_to_run, Some(&bridge)).await { Ok(r) => r, Err(e) => { - let _ = tx_event - .send(Event::status(format!("RLM REPL error: {e}"))) - .await; break 'turn RlmTurnResult { answer: String::new(), iterations: iteration + 1, @@ -421,20 +429,40 @@ async fn run_rlm_turn_inner_impl( error: Some(format!("REPL execution failed: {e}")), usage: total_usage, termination: RlmTermination::Error, + trace: trace.clone(), + total_rpcs, }; } }; - // 3d. Check for FINAL (parsed by the runtime, or in raw stdout - // as a belt-and-braces check). - if let Some(final_val) = round - .final_value - .clone() - .or_else(|| parse_final(&round.full_stdout).1) - { + total_rpcs = total_rpcs.saturating_add(round.rpc_count); + + // Trace this round. + let stdout_preview = truncate_text(round.stdout.trim(), STDOUT_METADATA_PREVIEW_LEN); + trace.push(RlmRoundTrace { + round: iteration + 1, + code_summary: summarize_code(&code_to_run), + stdout_preview: stdout_preview.clone(), + had_error: round.has_error, + rpc_count: round.rpc_count, + elapsed_ms: round.elapsed.as_millis() as u64, + }); + + let _ = tx_event + .send(Event::status(format!( + "RLM round {}: {} bytes stdout, {} sub-LLM call(s){}", + iteration + 1, + round.full_stdout.len(), + round.rpc_count, + if round.has_error { " (error)" } else { "" }, + ))) + .await; + + // 4e. FINAL detection. + if let Some(final_val) = round.final_value.clone() { let _ = tx_event .send(Event::status( - "RLM: FINAL detected, ending loop".to_string(), + "RLM: FINAL detected in REPL, ending loop".to_string(), )) .await; break 'turn RlmTurnResult { @@ -444,21 +472,12 @@ async fn run_rlm_turn_inner_impl( error: None, usage: total_usage, termination: RlmTermination::Final, + trace: trace.clone(), + total_rpcs, }; } - // 3e. Build metadata for next iteration and append to history - // hist ← hist ∥ code ∥ Metadata(stdout) - let stdout_display = if round.stdout.is_empty() && !round.stderr.is_empty() { - format!( - "[stderr]\n{}", - truncate_text(&round.stderr, STDOUT_METADATA_PREVIEW_LEN) - ) - } else { - truncate_text(&round.stdout, STDOUT_METADATA_PREVIEW_LEN) - }; - - // Assistant message: the code the model wrote + // 4f. Build metadata for next iteration. messages.push(Message { role: "assistant".to_string(), content: vec![ContentBlock::Text { @@ -466,31 +485,14 @@ async fn run_rlm_turn_inner_impl( cache_control: None, }], }); - - // User message: metadata about stdout + current REPL state - let next_metadata = build_metadata_message( + messages.push(build_metadata_message( &prompt, root_prompt.as_deref(), iteration + 1, Some(&code_to_run), - Some(&stdout_display), - &state_path, - ); - messages.push(next_metadata); + Some(&stdout_preview), + )); - // Emit stdout preview as a status update - let _ = tx_event - .send(Event::status(format!( - "REPL round {}: {} bytes output{}", - iteration + 1, - round.full_stdout.len(), - if round.has_error { " (error)" } else { "" }, - ))) - .await; - - // Bound history growth. Keep the original metadata + the most - // recent N pairs so the model still sees the running thread. - const MAX_HISTORY_MESSAGES: usize = 20; if messages.len() > MAX_HISTORY_MESSAGES { let drop_from = messages.len() - MAX_HISTORY_MESSAGES + 1; let mut kept = vec![messages[0].clone()]; @@ -499,6 +501,7 @@ async fn run_rlm_turn_inner_impl( } } + let _ = last_response_text; RlmTurnResult { answer: String::new(), iterations: MAX_RLM_ITERATIONS, @@ -508,22 +511,24 @@ async fn run_rlm_turn_inner_impl( )), usage: total_usage, termination: RlmTermination::Exhausted, + trace: trace.clone(), + total_rpcs, } }; - // Fold sidecar usage (children + nested sub_rlm) into the totals. - let sidecar_usage = sidecar_ctx.usage.lock().await; + // Fold bridge usage (children + nested sub_rlm) into totals. + let bridge_usage = usage_handle.lock().await; let mut final_usage = result.usage.clone(); final_usage.input_tokens = final_usage .input_tokens - .saturating_add(sidecar_usage.input_tokens); + .saturating_add(bridge_usage.input_tokens); final_usage.output_tokens = final_usage .output_tokens - .saturating_add(sidecar_usage.output_tokens); - drop(sidecar_usage); - // Best-effort cleanup of the per-turn state file. Non-fatal. - let _ = std::fs::remove_file(&state_path); - sidecar.shutdown(); + .saturating_add(bridge_usage.output_tokens); + drop(bridge_usage); + + repl.shutdown().await; + RlmTurnResult { usage: final_usage, ..result @@ -534,28 +539,35 @@ async fn run_rlm_turn_inner_impl( // Helpers // --------------------------------------------------------------------------- -/// Build a metadata message describing the current REPL state. -/// -/// This is `Metadata(state)` from the paper. We surface: -/// - `context` length (chars) and a short preview +fn write_context_file(prompt: &str) -> std::io::Result { + let dir = std::env::temp_dir().join("deepseek_rlm_ctx"); + std::fs::create_dir_all(&dir)?; + let path = dir.join(format!( + "ctx_{}_{}.txt", + std::process::id(), + Uuid::new_v4().simple() + )); + std::fs::write(&path, prompt)?; + Ok(path) +} + +/// Build `Metadata(state)` from the paper. Surfaces: /// - the small `root_prompt` (if any) — repeated each iteration -/// - REPL helpers the sub-agent can call -/// - keys currently present in the REPL variable store -/// - the previous round's code summary and stdout preview +/// - `context` length + preview +/// - the REPL helpers +/// - the previous round's code summary + stdout preview fn build_metadata_message( prompt: &str, root_prompt: Option<&str>, iteration: u32, previous_code: Option<&str>, previous_stdout: Option<&str>, - state_path: &std::path::Path, ) -> Message { let prompt_len = prompt.chars().count(); let prompt_preview = truncate_text(prompt, PROMPT_PREVIEW_LEN); let mut parts = Vec::new(); - - parts.push(format!("## REPL State (Round {iteration})")); + parts.push(format!("## REPL state (round {iteration})")); parts.push(String::new()); if let Some(rp) = root_prompt && !rp.trim().is_empty() @@ -564,18 +576,20 @@ fn build_metadata_message( parts.push(format!("> {}", truncate_text(rp.trim(), 600))); parts.push(String::new()); } - parts.push("**`context`** — REPL variable holding the full input".to_string()); + parts.push("**`context`** — the long input lives in the REPL only".to_string()); parts.push(format!("- Length: {prompt_len} chars")); parts.push(format!("- Preview: \"{prompt_preview}\"")); parts.push(String::new()); parts.push("**REPL helpers** (use inside ```repl blocks)".to_string()); - parts.push("- `context` — the full input string".to_string()); + parts.push("- `context` / `ctx` — the full input string".to_string()); parts.push("- `len(context)` / `context[a:b]` / `context.splitlines()` — slice it".to_string()); parts.push("- `llm_query(prompt, model=None)` — one-shot child LLM".to_string()); - parts.push("- `llm_query_batched([p1, p2, ...])` — concurrent fanout".to_string()); + parts.push("- `llm_query_batched([p1, p2, ...])` — concurrent fan-out".to_string()); parts.push("- `rlm_query(prompt, model=None)` — recursive sub-RLM".to_string()); - parts.push("- `rlm_query_batched([p1, p2, ...])` — concurrent recursive RLMs".to_string()); + parts.push( + "- `rlm_query_batched([p1, p2, ...])` — concurrent recursive sub-RLMs".to_string(), + ); parts.push("- `SHOW_VARS()` — list user variables".to_string()); parts.push("- `repl_set(name, value)` / `repl_get(name)` — explicit store".to_string()); parts.push( @@ -587,28 +601,10 @@ fn build_metadata_message( ); parts.push(String::new()); - // Variables currently in the persistent store. - if let Ok(text) = std::fs::read_to_string(state_path) - && let Ok(map) = serde_json::from_str::>(&text) - { - let mut keys: Vec = map.keys().cloned().collect(); - keys.sort(); - if !keys.is_empty() { - let listed = keys - .iter() - .map(|k| format!("\"{k}\"")) - .collect::>() - .join(", "); - parts.push(format!("**Variables in REPL state**: [{listed}]")); - parts.push(String::new()); - } - } - if iteration > 0 { parts.push("**Previous round**".to_string()); if let Some(code) = previous_code { - let code_summary = summarize_code(code); - parts.push(format!("- Code: {code_summary}")); + parts.push(format!("- Code: {}", summarize_code(code))); } if let Some(stdout) = previous_stdout { let stdout_clean = stdout.trim(); @@ -631,7 +627,6 @@ fn build_metadata_message( } } -/// Compress a code block to a short summary — first 4 + last 4 lines. fn summarize_code(code: &str) -> String { let lines: Vec<&str> = code.lines().collect(); if lines.len() <= 8 { @@ -642,7 +637,6 @@ fn summarize_code(code: &str) -> String { format!("{} lines:\n{head}\n…\n{tail}", lines.len()) } -/// Extract text from content blocks, joining all text blocks together. fn extract_text_blocks(blocks: &[ContentBlock]) -> String { blocks .iter() @@ -654,12 +648,9 @@ fn extract_text_blocks(blocks: &[ContentBlock]) -> String { .join("\n") } -/// Extract the first ```repl code block from the model's response. -/// -/// Matches the reference RLM (alexzhang13/rlm) language identifier so the -/// same prompts and examples work here. Falls back to ```python / -/// ```py for backward compatibility with text the model may have learned -/// from earlier prompt versions. +/// Extract the first ` ```repl ` block from `text`. Falls back to +/// ` ```python `/`` ```py `` for compatibility with prompts that learned +/// the older fence style. fn extract_repl_code(text: &str) -> Option { let start_markers = [ "```repl\n", @@ -699,31 +690,20 @@ fn extract_repl_code(text: &str) -> Option { Some(code) } -/// Parse a `FINAL(...)` or `FINAL_VAR(...)` directive from the model's raw -/// response text. Mirrors `find_final_answer` from the reference RLM: -/// the directive must appear at the start of a line. For `FINAL_VAR`, -/// returns `None` (we can't resolve the variable from text alone — the -/// REPL-level `FINAL_VAR()` Python helper handles that path). +/// Parse a top-level `FINAL(...)` directive from the model's raw text. +/// Mirrors the reference RLM's `find_final_answer`: directive must appear +/// at the start of a line, *outside* any code fence. fn parse_text_final(text: &str) -> Option { - // Skip if the FINAL appears inside a code fence — we already handle - // those via the REPL's `__REPL_FINAL__::` sentinel. let outside_fence = strip_code_fences(text); for line in outside_fence.lines() { let trimmed = line.trim_start(); - if let Some(rest) = trimmed.strip_prefix("FINAL_VAR(") { - // Heuristic: if a single quoted/bareword ident, defer to the - // REPL path. Otherwise treat the inner literal as the answer. - let inner = rest.trim_end_matches(')').trim(); - // Treat as "use REPL FINAL_VAR" — return None and let the next - // round's REPL evaluation resolve it. Currently we just skip; - // the model can use FINAL(value) for direct text-level exit. - let _ = inner; + if trimmed.starts_with("FINAL_VAR(") { + // FINAL_VAR can't be resolved from text alone — defer to REPL. continue; } if let Some(rest) = trimmed.strip_prefix("FINAL(") { let inner = rest.trim_end(); - // Take everything up to the LAST closing paren on this line. if let Some(end) = inner.rfind(')') { let value = inner[..end].trim(); if !value.is_empty() { @@ -735,8 +715,6 @@ fn parse_text_final(text: &str) -> Option { None } -/// Drop content inside ``` … ``` fenced blocks so we don't read FINAL() -/// out of code the model wrote (that path is handled by the REPL sentinel). fn strip_code_fences(text: &str) -> String { let mut out = String::with_capacity(text.len()); let mut in_fence = false; @@ -753,7 +731,6 @@ fn strip_code_fences(text: &str) -> String { out } -/// Strip one layer of matching surrounding quotes (`"…"` or `'…'`). fn strip_quotes(s: &str) -> String { let bytes = s.as_bytes(); if bytes.len() >= 2 @@ -765,8 +742,6 @@ fn strip_quotes(s: &str) -> String { s.to_string() } -/// Truncate text to `max_chars` (counted by Unicode chars), adding an -/// ellipsis if truncated. Char-safe: never splits a multi-byte codepoint. fn truncate_text(text: &str, max_chars: usize) -> String { let count = text.chars().count(); if count <= max_chars { @@ -786,27 +761,15 @@ fn truncate_text(text: &str, max_chars: usize) -> String { mod tests { use super::*; - fn tmp_state_path(label: &str) -> std::path::PathBuf { - let dir = std::env::temp_dir().join("deepseek_rlm_test"); - std::fs::create_dir_all(&dir).ok(); - dir.join(format!( - "test_{}_{}_{}.json", - label, - std::process::id(), - uuid::Uuid::new_v4() - )) - } - #[test] fn extract_repl_code_finds_simple_block() { - let text = "Here's some code:\n```repl\nprint('hello')\n```\nEnd."; + let text = "Here:\n```repl\nprint('hi')\n```\nEnd."; let code = extract_repl_code(text).unwrap(); - assert_eq!(code, "print('hello')"); + assert_eq!(code, "print('hi')"); } #[test] fn extract_repl_code_falls_back_to_python_marker() { - // Backward compat: if the model still emits ```python we accept it. let text = "Code:\n```python\nx = 1 + 2\n```"; let code = extract_repl_code(text).unwrap(); assert_eq!(code, "x = 1 + 2"); @@ -814,103 +777,31 @@ mod tests { #[test] fn extract_repl_code_returns_none_when_missing() { - let text = "Just some text without code fences."; - assert!(extract_repl_code(text).is_none()); + assert!(extract_repl_code("Just text.").is_none()); } #[test] fn extract_repl_code_returns_none_on_empty_block() { - let text = "Code:\n```repl\n\n```"; - assert!(extract_repl_code(text).is_none()); + assert!(extract_repl_code("```repl\n\n```").is_none()); } #[test] fn extract_repl_code_handles_multiple_blocks() { - let text = "First:\n```repl\na=1\n```\nSecond:\n```repl\nb=2\n```"; + let text = "```repl\na=1\n```\n```repl\nb=2\n```"; let code = extract_repl_code(text).unwrap(); assert_eq!(code, "a=1"); } #[test] fn extract_repl_code_ignores_other_fences() { - let text = "```\nsome text\n```\nActual:\n```repl\nreal_code()\n```"; + let text = "```\nfoo\n```\n```repl\nreal_code()\n```"; let code = extract_repl_code(text).unwrap(); assert_eq!(code, "real_code()"); } - #[test] - fn build_metadata_contains_key_information() { - let path = tmp_state_path("meta_basic"); - std::fs::write(&path, "{\"context\":\"Hello, world!\"}").unwrap(); - let prompt = "Hello, world!"; - let msg = build_metadata_message(prompt, None, 0, None, None, &path); - let text = extract_text_blocks(&msg.content); - assert!(text.contains("context")); - assert!(text.contains("Hello, world!")); - assert!(text.contains("Round 0")); - assert!(text.contains("llm_query")); - assert!(text.contains("rlm_query")); - assert!(text.contains("FINAL")); - let _ = std::fs::remove_file(&path); - } - - #[test] - fn build_metadata_lists_state_variables() { - let path = tmp_state_path("meta_vars"); - std::fs::write( - &path, - "{\"context\":\"x\",\"chunk_summaries\":[\"a\"],\"counter\":1}", - ) - .unwrap(); - let msg = build_metadata_message("x", None, 1, Some("noop"), Some("ok"), &path); - let text = extract_text_blocks(&msg.content); - assert!(text.contains("Variables in REPL state")); - assert!(text.contains("\"context\"")); - assert!(text.contains("\"chunk_summaries\"")); - assert!(text.contains("\"counter\"")); - let _ = std::fs::remove_file(&path); - } - - #[test] - fn build_metadata_with_iteration_shows_previous_code() { - let path = tmp_state_path("meta_prev"); - std::fs::write(&path, "{}").unwrap(); - let msg = build_metadata_message( - "Test prompt", - None, - 3, - Some("print('hi')"), - Some("hi"), - &path, - ); - let text = extract_text_blocks(&msg.content); - assert!(text.contains("Round 3")); - assert!(text.contains("print('hi')")); - assert!(text.contains("hi")); - let _ = std::fs::remove_file(&path); - } - - #[test] - fn build_metadata_includes_root_prompt_when_provided() { - let path = tmp_state_path("meta_root"); - std::fs::write(&path, "{}").unwrap(); - let msg = build_metadata_message( - "long context text", - Some("Summarize the security model"), - 1, - Some("# noop"), - Some("ok"), - &path, - ); - let text = extract_text_blocks(&msg.content); - assert!(text.contains("Original task")); - assert!(text.contains("Summarize the security model")); - let _ = std::fs::remove_file(&path); - } - #[test] fn parse_text_final_extracts_simple_value() { - let text = "OK here's the answer.\nFINAL(42)\nThanks."; + let text = "OK.\nFINAL(42)\nThanks."; assert_eq!(parse_text_final(text).as_deref(), Some("42")); } @@ -933,7 +824,42 @@ mod tests { } #[test] - fn truncate_text_leaves_short_text_alone() { + fn build_metadata_contains_key_information() { + let msg = build_metadata_message("Hello, world!", None, 0, None, None); + let text = extract_text_blocks(&msg.content); + assert!(text.contains("context")); + assert!(text.contains("Hello, world!")); + assert!(text.contains("round 0")); + assert!(text.contains("llm_query")); + assert!(text.contains("rlm_query")); + assert!(text.contains("FINAL")); + } + + #[test] + fn build_metadata_with_iteration_shows_previous_code() { + let msg = build_metadata_message("Test prompt", None, 3, Some("print('hi')"), Some("hi")); + let text = extract_text_blocks(&msg.content); + assert!(text.contains("round 3")); + assert!(text.contains("print('hi')")); + assert!(text.contains("hi")); + } + + #[test] + fn build_metadata_includes_root_prompt() { + let msg = build_metadata_message( + "long context", + Some("Summarize the security model"), + 1, + Some("# noop"), + Some("ok"), + ); + let text = extract_text_blocks(&msg.content); + assert!(text.contains("Original task")); + assert!(text.contains("Summarize the security model")); + } + + #[test] + fn truncate_text_leaves_short_alone() { assert_eq!(truncate_text("hello", 100), "hello"); } @@ -947,18 +873,15 @@ mod tests { #[test] fn truncate_text_is_unicode_safe() { - // 4 multi-byte codepoints, each occupying 3 bytes. - let s = "日本語テスト"; // 6 chars + let s = "日本語テスト"; let out = truncate_text(s, 4); - // Should keep 1 char + "..." == 4 chars total. assert_eq!(out.chars().count(), 4); assert!(out.ends_with("...")); - // Must NOT split a codepoint — string is valid utf-8 by construction. assert!(std::str::from_utf8(out.as_bytes()).is_ok()); } #[test] - fn extract_text_blocks_joins_text_blocks() { + fn extract_text_blocks_joins_text() { let blocks = vec![ ContentBlock::Text { text: "first".to_string(), @@ -975,27 +898,15 @@ mod tests { assert_eq!(extract_text_blocks(&blocks), "first\nsecond"); } - #[test] - fn extract_text_blocks_returns_empty_on_no_text() { - let blocks = vec![ContentBlock::Thinking { - thinking: "only thinking".to_string(), - }]; - assert_eq!(extract_text_blocks(&blocks), ""); - } - #[test] fn metadata_msg_role_is_user() { - let path = tmp_state_path("meta_role"); - std::fs::write(&path, "{}").unwrap(); - let msg = build_metadata_message("test", None, 0, None, None, &path); + let msg = build_metadata_message("test", None, 0, None, None); assert_eq!(msg.role, "user"); - let _ = std::fs::remove_file(&path); } #[test] - fn summarize_code_keeps_short_unchanged() { - let s = "a\nb\nc"; - assert_eq!(summarize_code(s), s); + fn summarize_code_keeps_short() { + assert_eq!(summarize_code("a\nb\nc"), "a\nb\nc"); } #[test] @@ -1005,44 +916,7 @@ mod tests { let s = summarize_code(&code); assert!(s.starts_with("20 lines:")); assert!(s.contains("line0")); - assert!(s.contains("line3")); assert!(s.contains("line19")); assert!(s.contains("…")); } - - /// End-to-end test: spin up the sidecar with a real httpbin-like loopback, - /// then drive a python3 process that calls llm_query() and confirm the - /// HTTP path is wired correctly. We don't talk to a real LLM here — we - /// stand up a stand-in HTTP server using the same axum stack and just - /// verify the sidecar URL is reachable from python3. - /// - /// This guards against a regression where the sidecar URL doesn't get - /// exported into the python child's environment. - #[tokio::test] - async fn sidecar_url_is_exported_to_python_env() { - // Stand up a tiny axum server that always replies {"text":"pong"}. - use axum::{Json, Router, routing::post}; - let app = Router::new().route( - "/llm", - post(|| async { Json(serde_json::json!({"text": "pong-from-sidecar"})) }), - ); - let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); - let server = tokio::spawn(async move { - let _ = axum::serve(listener, app).await; - }); - - let mut rt = PythonRuntime::with_state_path(tmp_state_path("sidecar_smoke")); - rt.set_env("REPL_LLM_URL", format!("http://{addr}/llm")); - let round = rt - .execute("print(llm_query('hello'))") - .await - .expect("execute"); - assert!( - round.stdout.contains("pong-from-sidecar"), - "stdout did not contain sidecar reply: {:?}", - round.stdout - ); - server.abort(); - } } diff --git a/crates/tui/src/tools/rlm_process.rs b/crates/tui/src/tools/rlm_process.rs index 95d54ea8..192ef765 100644 --- a/crates/tui/src/tools/rlm_process.rs +++ b/crates/tui/src/tools/rlm_process.rs @@ -223,13 +223,16 @@ impl ToolSpec for RlmProcessTool { }); } - // Surface the termination reason so the model can tell whether the - // sub-agent finished cleanly via FINAL or fell out of the loop. + // Surface the termination reason and a brief per-round trace so the + // user can verify the sub-agent actually engaged with `context` + // through sub-LLM calls — not just inferred an answer from the + // preview. let footer = match result.termination { RlmTermination::Final => String::new(), - RlmTermination::DirectAnswer => { - "\n\n[note: sub-agent emitted a direct answer instead of FINAL()]".to_string() - } + RlmTermination::NoCode => format!( + "\n\n[warning: sub-agent failed to engage the REPL after {} iterations — answer is the model's last raw response]", + result.iterations + ), RlmTermination::Exhausted => format!( "\n\n[warning: sub-agent hit the {}-iteration cap without FINAL()]", result.iterations @@ -237,6 +240,46 @@ impl ToolSpec for RlmProcessTool { RlmTermination::Error => String::new(), }; + let trace_summary = if result.trace.is_empty() { + String::from("\n\n[trace: no REPL rounds executed]") + } else { + let mut s = String::from("\n\n[RLM trace]"); + for r in &result.trace { + let head = r + .code_summary + .lines() + .next() + .unwrap_or(r.code_summary.as_str()) + .chars() + .take(80) + .collect::(); + s.push_str(&format!( + "\n round {}: {} sub-LLM call(s), {}ms{} — {}", + r.round, + r.rpc_count, + r.elapsed_ms, + if r.had_error { " (error)" } else { "" }, + head + )); + } + s + }; + + let trace_json: Vec<_> = result + .trace + .iter() + .map(|r| { + json!({ + "round": r.round, + "rpc_count": r.rpc_count, + "elapsed_ms": r.elapsed_ms, + "had_error": r.had_error, + "code_summary": r.code_summary, + "stdout_preview": r.stdout_preview, + }) + }) + .collect(); + let metadata = json!({ "iterations": result.iterations, "duration_ms": result.duration.as_millis() as u64, @@ -245,9 +288,14 @@ impl ToolSpec for RlmProcessTool { "termination": format!("{:?}", result.termination).to_lowercase(), "child_model": child_model, "max_depth": max_depth, + "total_rpcs": result.total_rpcs, + "trace": trace_json, }); - Ok(ToolResult::success(format!("{}{}", result.answer, footer)).with_metadata(metadata)) + Ok( + ToolResult::success(format!("{}{}{}", result.answer, footer, trace_summary)) + .with_metadata(metadata), + ) } }