use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; use anyhow::Result; use codewhale_agent::ModelRegistry; use codewhale_config::{CliRuntimeOverrides, ConfigToml, ProviderKind}; use codewhale_execpolicy::{ AskForApproval, ExecApprovalRequirement, ExecPolicyContext, ExecPolicyDecision, ExecPolicyEngine, }; use codewhale_hooks::{HookDispatcher, HookEvent}; use codewhale_mcp::{ McpManager, McpStartupCompleteEvent, McpStartupStatus as McpManagerStartupStatus, }; use codewhale_protocol::{ AppResponse, EventFrame, ExecApprovalRequestEvent, PromptRequest, PromptResponse, ResponseChannel, ReviewDecision, Thread, ThreadForkParams, ThreadListParams, ThreadReadParams, ThreadRequest, ThreadResponse, ThreadResumeParams, ThreadSetNameParams, ThreadStatus, ToolPayload, }; use codewhale_state::{ JobStateRecord, JobStateStatus, SessionSource, StateStore, ThreadListFilters, ThreadMetadata, ThreadStatus as PersistedThreadStatus, }; use codewhale_tools::{ToolCall, ToolRegistry}; use serde_json::{Value, json}; use uuid::Uuid; /// How a new thread's conversation history is initialized. #[derive(Debug, Clone)] pub enum InitialHistory { /// Start with an empty conversation. New, /// Forked from an existing thread with the given history items. Forked(Vec), /// Resumed from a persisted thread with its full history. Resumed { conversation_id: String, history: Vec, rollout_path: PathBuf, }, } /// Result of spawning or resuming a thread. #[derive(Debug, Clone)] pub struct NewThread { /// The thread metadata. pub thread: Thread, /// Resolved model identifier. pub model: String, /// Provider that serves the model. pub model_provider: String, /// Working directory for the thread. pub cwd: PathBuf, /// Approval policy override, if any. pub approval_policy: Option, /// Sandbox mode override, if any. pub sandbox: Option, } /// Status of a background job. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum JobStatus { /// Waiting to be picked up. Queued, /// Currently executing. Running, /// Temporarily paused. Paused, /// Finished successfully. Completed, /// Finished with an error. Failed, /// Cancelled by the user. Cancelled, } const JOB_DETAIL_SCHEMA_VERSION: u8 = 1; const DEFAULT_JOB_MAX_ATTEMPTS: u32 = 3; const DEFAULT_JOB_BACKOFF_BASE_MS: u64 = 500; const MAX_JOB_HISTORY_ENTRIES: usize = 64; /// Retry state for a job that failed and may be retried. #[derive(Debug, Clone)] pub struct JobRetryMetadata { /// Current attempt number (0 = not yet retried). pub attempt: u32, /// Maximum number of retry attempts before giving up. pub max_attempts: u32, /// Base delay in milliseconds for exponential backoff. pub backoff_base_ms: u64, /// Computed delay in milliseconds until the next retry. pub next_backoff_ms: u64, /// Timestamp when the next retry should be attempted. pub next_retry_at: Option, } impl Default for JobRetryMetadata { fn default() -> Self { Self { attempt: 0, max_attempts: DEFAULT_JOB_MAX_ATTEMPTS, backoff_base_ms: DEFAULT_JOB_BACKOFF_BASE_MS, next_backoff_ms: 0, next_retry_at: None, } } } /// A single entry in a job's history log. #[derive(Debug, Clone)] pub struct JobHistoryEntry { /// Timestamp when this entry was recorded. pub at: i64, /// Phase name (e.g., "created", "running", "failed"). pub phase: String, /// Job status at this point in time. pub status: JobStatus, /// Progress percentage at this point, if available. pub progress: Option, /// Human-readable detail message. pub detail: Option, /// Retry state snapshot at this point. pub retry: JobRetryMetadata, } #[derive(Debug, Clone)] struct PersistedJobDetail { pub status: JobStatus, pub detail: Option, pub retry: JobRetryMetadata, pub history: Vec, } /// A complete job record with all metadata and history. #[derive(Debug, Clone)] pub struct JobRecord { /// Unique job identifier. pub id: String, /// Human-readable job name. pub name: String, /// Current job status. pub status: JobStatus, /// Current progress percentage (0-100). pub progress: Option, /// Human-readable detail about the current state. pub detail: Option, /// Retry state for failed jobs. pub retry: JobRetryMetadata, /// Chronological history of state transitions. pub history: Vec, /// Timestamp when the job was created. pub created_at: i64, /// Timestamp of the last state change. pub updated_at: i64, } /// Manages background jobs with retry logic and persistence. #[derive(Debug, Default)] pub struct JobManager { jobs: HashMap, } impl JobManager { fn now_ts() -> i64 { chrono::Utc::now().timestamp() } fn deterministic_backoff_ms(retry: &JobRetryMetadata) -> u64 { if retry.attempt == 0 { return 0; } let exponent = retry.attempt.saturating_sub(1).min(20); let multiplier = 1u64.checked_shl(exponent).unwrap_or(u64::MAX); retry.backoff_base_ms.saturating_mul(multiplier) } fn clear_retry_schedule(retry: &mut JobRetryMetadata) { retry.next_backoff_ms = 0; retry.next_retry_at = None; } fn push_history(job: &mut JobRecord, phase: &str) { job.history.push(JobHistoryEntry { at: job.updated_at, phase: phase.to_string(), status: job.status, progress: job.progress, detail: job.detail.clone(), retry: job.retry.clone(), }); if job.history.len() > MAX_JOB_HISTORY_ENTRIES { let to_drain = job.history.len() - MAX_JOB_HISTORY_ENTRIES; job.history.drain(0..to_drain); } } fn parse_persisted_detail(raw: Option<&str>) -> Option { let raw = raw?; let parsed: Value = serde_json::from_str(raw).ok()?; let status = parsed .get("status") .and_then(Value::as_str) .and_then(job_status_from_str)?; let detail = parsed.get("detail").and_then(json_optional_string); let retry = parse_retry_metadata(parsed.get("retry")); let history = parsed .get("history") .and_then(Value::as_array) .map(|items| { items .iter() .filter_map(parse_history_entry) .collect::>() }) .unwrap_or_default(); Some(PersistedJobDetail { status, detail, retry, history, }) } fn encode_persisted_detail(job: &JobRecord) -> Result> { let encoded = json!({ "schema_version": JOB_DETAIL_SCHEMA_VERSION, "status": job_status_to_str(job.status), "detail": job.detail.clone(), "retry": job_retry_to_value(&job.retry), "history": job.history.iter().map(job_history_to_value).collect::>() }) .to_string(); Ok(Some(encoded)) } /// Enqueues a new job and returns its record. pub fn enqueue(&mut self, name: impl Into) -> JobRecord { let now = Self::now_ts(); let id = format!("job-{}", Uuid::new_v4()); let mut job = JobRecord { id: id.clone(), name: name.into(), status: JobStatus::Queued, progress: Some(0), detail: None, retry: JobRetryMetadata::default(), history: Vec::new(), created_at: now, updated_at: now, }; Self::push_history(&mut job, "created"); self.jobs.insert(id, job.clone()); job } /// Transitions a job to running and clears its retry schedule. pub fn set_running(&mut self, id: &str) { if let Some(job) = self.jobs.get_mut(id) { job.status = JobStatus::Running; Self::clear_retry_schedule(&mut job.retry); job.updated_at = Self::now_ts(); Self::push_history(job, "running"); } } /// Updates a job's progress (clamped to 100) and optional detail message. pub fn update_progress(&mut self, id: &str, progress: u8, detail: Option) { if let Some(job) = self.jobs.get_mut(id) { job.progress = Some(progress.min(100)); job.detail = detail; job.updated_at = Self::now_ts(); Self::push_history(job, "progress_updated"); } } /// Marks a job as completed with 100% progress and clears its retry schedule. pub fn complete(&mut self, id: &str) { if let Some(job) = self.jobs.get_mut(id) { job.status = JobStatus::Completed; job.progress = Some(100); Self::clear_retry_schedule(&mut job.retry); job.updated_at = Self::now_ts(); Self::push_history(job, "completed"); } } /// Marks a job as failed and schedules a retry if attempts remain. pub fn fail(&mut self, id: &str, detail: impl Into) { if let Some(job) = self.jobs.get_mut(id) { let now = Self::now_ts(); job.status = JobStatus::Failed; job.detail = Some(detail.into()); if job.retry.attempt < job.retry.max_attempts { job.retry.attempt += 1; job.retry.next_backoff_ms = Self::deterministic_backoff_ms(&job.retry); let delay_secs = ((job.retry.next_backoff_ms.saturating_add(999)) / 1000) .min(i64::MAX as u64) as i64; job.retry.next_retry_at = Some(now.saturating_add(delay_secs)); } else { Self::clear_retry_schedule(&mut job.retry); } job.updated_at = now; Self::push_history(job, "failed"); } } /// Cancels a job and clears any pending retry schedule. pub fn cancel(&mut self, id: &str) { if let Some(job) = self.jobs.get_mut(id) { job.status = JobStatus::Cancelled; Self::clear_retry_schedule(&mut job.retry); job.updated_at = Self::now_ts(); Self::push_history(job, "cancelled"); } } /// Pauses a job, optionally updating its detail message. pub fn pause(&mut self, id: &str, detail: Option) { if let Some(job) = self.jobs.get_mut(id) { job.status = JobStatus::Paused; if detail.is_some() { job.detail = detail; } job.updated_at = Self::now_ts(); Self::push_history(job, "paused"); } } /// Resumes a paused or failed job back to running status. pub fn resume(&mut self, id: &str, detail: Option) { if let Some(job) = self.jobs.get_mut(id) { job.status = JobStatus::Running; if detail.is_some() { job.detail = detail; } Self::clear_retry_schedule(&mut job.retry); job.updated_at = Self::now_ts(); Self::push_history(job, "resumed"); } } /// Returns all jobs sorted by most recently updated first. pub fn list(&self) -> Vec { let mut out = self.jobs.values().cloned().collect::>(); out.sort_by_key(|job| std::cmp::Reverse(job.updated_at)); out } /// Returns the history entries for a job, or an empty vec if not found. pub fn history(&self, id: &str) -> Vec { self.jobs .get(id) .map(|job| job.history.clone()) .unwrap_or_default() } /// Resets queued or running jobs back to queued on application resume. pub fn resume_pending(&mut self) -> Vec { let mut resumed = Vec::new(); for job in self.jobs.values_mut() { if matches!(job.status, JobStatus::Queued | JobStatus::Running) { job.status = JobStatus::Queued; job.updated_at = Self::now_ts(); Self::push_history(job, "queued_after_resume"); resumed.push(job.clone()); } } resumed } /// Loads jobs from the state store, deserializing extended detail when available. pub fn load_from_store(&mut self, store: &StateStore) -> Result<()> { let persisted = store.list_jobs(Some(500))?; for job in persisted { let fallback_status = job_state_status_to_runtime(job.status); let parsed = Self::parse_persisted_detail(job.detail.as_deref()); let (status, detail, retry, history) = if let Some(detail_state) = parsed { ( detail_state.status, detail_state.detail, detail_state.retry, detail_state.history, ) } else { ( fallback_status, job.detail, JobRetryMetadata::default(), Vec::new(), ) }; self.jobs.insert( job.id.clone(), JobRecord { id: job.id, name: job.name, status, progress: job.progress, detail, retry, history, created_at: job.created_at, updated_at: job.updated_at, }, ); } Ok(()) } /// Persists a single job's current state to the state store. pub fn persist_job(&self, store: &StateStore, id: &str) -> Result<()> { let Some(job) = self.jobs.get(id) else { return Ok(()); }; let encoded_detail = Self::encode_persisted_detail(job)?; store.upsert_job(&JobStateRecord { id: job.id.clone(), name: job.name.clone(), status: runtime_status_to_job_state(job.status), progress: job.progress, detail: encoded_detail, created_at: job.created_at, updated_at: job.updated_at, }) } /// Persists all in-memory jobs to the state store. pub fn persist_all(&self, store: &StateStore) -> Result<()> { for id in self.jobs.keys() { self.persist_job(store, id)?; } Ok(()) } } /// Manages thread lifecycle: spawn, resume, fork, archive, and persistence. pub struct ThreadManager { store: StateStore, running_threads: HashMap, cli_version: String, } impl ThreadManager { /// Creates a new `ThreadManager` backed by the given state store. pub fn new(store: StateStore) -> Self { Self { store, running_threads: HashMap::new(), cli_version: env!("CARGO_PKG_VERSION").to_string(), } } /// Returns a reference to the underlying state store. pub fn state_store(&self) -> &StateStore { &self.store } /// Spawns a new thread with the given initial history and persists it. pub fn spawn_thread_with_history( &mut self, model_provider: String, cwd: PathBuf, initial_history: InitialHistory, persist_extended_history: bool, ) -> Result { let id = format!("thread-{}", Uuid::new_v4()); let now = chrono::Utc::now().timestamp(); let preview = preview_from_initial_history(&initial_history); let source = match initial_history { InitialHistory::New => SessionSource::Interactive, InitialHistory::Forked(_) => SessionSource::Fork, InitialHistory::Resumed { .. } => SessionSource::Resume, }; let thread = Thread { id: id.clone(), preview, ephemeral: !persist_extended_history, model_provider: model_provider.clone(), created_at: now, updated_at: now, status: ThreadStatus::Running, path: None, cwd: cwd.clone(), cli_version: self.cli_version.clone(), source: match source { SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive, SessionSource::Resume => codewhale_protocol::SessionSource::Resume, SessionSource::Fork => codewhale_protocol::SessionSource::Fork, SessionSource::Api => codewhale_protocol::SessionSource::Api, SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown, }, name: None, }; self.persist_thread(&thread, None)?; match &initial_history { InitialHistory::Forked(items) => { for item in items { self.store.append_message( &thread.id, "history", &item.to_string(), Some(item.clone()), )?; } } InitialHistory::Resumed { history, .. } => { for item in history { self.store.append_message( &thread.id, "history", &item.to_string(), Some(item.clone()), )?; } } InitialHistory::New => {} } self.running_threads .insert(thread.id.clone(), thread.clone()); Ok(NewThread { thread, model: "auto".to_string(), model_provider, cwd, approval_policy: None, sandbox: None, }) } /// Resumes an existing thread, returning `None` if not found. pub fn resume_thread_with_history( &mut self, params: &ThreadResumeParams, fallback_cwd: &Path, model_provider: String, ) -> Result> { if params.history.is_none() && let Some(thread) = self.running_threads.get(¶ms.thread_id).cloned() { return Ok(Some(NewThread { model: params.model.clone().unwrap_or_else(|| "auto".to_string()), model_provider: params.model_provider.clone().unwrap_or(model_provider), cwd: params.cwd.clone().unwrap_or_else(|| thread.cwd.clone()), approval_policy: params.approval_policy.clone(), sandbox: params.sandbox.clone(), thread, })); } let persisted = self.store.get_thread(¶ms.thread_id)?; let Some(metadata) = persisted else { return Ok(None); }; let mut thread = to_protocol_thread(metadata); thread.status = ThreadStatus::Running; thread.updated_at = chrono::Utc::now().timestamp(); thread.cwd = params .cwd .clone() .unwrap_or_else(|| fallback_cwd.to_path_buf()); self.persist_thread(&thread, None)?; self.running_threads .insert(thread.id.clone(), thread.clone()); if let Some(history) = params.history.as_ref() { for item in history { self.store.append_message( &thread.id, "history", &item.to_string(), Some(item.clone()), )?; } } Ok(Some(NewThread { model: params.model.clone().unwrap_or_else(|| "auto".to_string()), model_provider: params.model_provider.clone().unwrap_or(model_provider), cwd: thread.cwd.clone(), approval_policy: params.approval_policy.clone(), sandbox: params.sandbox.clone(), thread, })) } /// Forks an existing thread into a new one, inheriting the parent's provider. pub fn fork_thread( &mut self, params: &ThreadForkParams, fallback_cwd: &Path, ) -> Result> { let parent = self.store.get_thread(¶ms.thread_id)?; let Some(parent) = parent else { return Ok(None); }; let parent_thread = to_protocol_thread(parent); let new = self.spawn_thread_with_history( params .model_provider .clone() .unwrap_or_else(|| parent_thread.model_provider.clone()), params .cwd .clone() .unwrap_or_else(|| fallback_cwd.to_path_buf()), InitialHistory::Forked(vec![json!({ "type": "fork", "from_thread_id": parent_thread.id })]), params.persist_extended_history, )?; Ok(Some(new)) } /// Lists threads matching the given filter parameters. pub fn list_threads(&self, params: &ThreadListParams) -> Result> { let list = self.store.list_threads(ThreadListFilters { include_archived: params.include_archived, limit: params.limit, })?; Ok(list.into_iter().map(to_protocol_thread).collect()) } /// Reads a single thread by id, or `None` if not found. pub fn read_thread(&self, params: &ThreadReadParams) -> Result> { Ok(self .store .get_thread(¶ms.thread_id)? .map(to_protocol_thread)) } /// Sets the display name for a thread, returning the updated thread or `None`. pub fn set_thread_name(&mut self, params: &ThreadSetNameParams) -> Result> { let Some(mut metadata) = self.store.get_thread(¶ms.thread_id)? else { return Ok(None); }; metadata.name = Some(params.name.clone()); metadata.updated_at = chrono::Utc::now().timestamp(); self.store.upsert_thread(&metadata)?; let updated = to_protocol_thread(metadata); self.running_threads .insert(updated.id.clone(), updated.clone()); Ok(Some(updated)) } /// Archives a thread so it no longer appears in default listings. pub fn archive_thread(&mut self, thread_id: &str) -> Result<()> { self.store.mark_archived(thread_id)?; if let Some(thread) = self.running_threads.get_mut(thread_id) { thread.status = ThreadStatus::Archived; } Ok(()) } /// Restores an archived thread to active status. pub fn unarchive_thread(&mut self, thread_id: &str) -> Result<()> { self.store.mark_unarchived(thread_id)?; Ok(()) } /// Records a user message in a thread and updates its preview and timestamp. pub fn touch_message(&mut self, thread_id: &str, input: &str) -> Result<()> { let Some(mut metadata) = self.store.get_thread(thread_id)? else { return Ok(()); }; metadata.updated_at = chrono::Utc::now().timestamp(); metadata.preview = truncate_preview(input); metadata.status = PersistedThreadStatus::Running; self.store.upsert_thread(&metadata)?; if let Some(thread) = self.running_threads.get_mut(thread_id) { thread.updated_at = metadata.updated_at; thread.preview = metadata.preview; thread.status = ThreadStatus::Running; } let message_id = self.store.append_message(thread_id, "user", input, None)?; self.store.save_checkpoint( thread_id, "latest", &json!({ "reason": "thread_message", "message_id": message_id, "role": "user", "preview": truncate_preview(input), "updated_at": metadata.updated_at }), )?; Ok(()) } fn persist_thread(&self, thread: &Thread, rollout_path: Option) -> Result<()> { self.store.upsert_thread(&ThreadMetadata { id: thread.id.clone(), rollout_path, preview: thread.preview.clone(), ephemeral: thread.ephemeral, model_provider: thread.model_provider.clone(), created_at: thread.created_at, updated_at: thread.updated_at, status: to_persisted_status(&thread.status), path: thread.path.clone(), cwd: thread.cwd.clone(), cli_version: thread.cli_version.clone(), source: to_persisted_source(&thread.source), name: thread.name.clone(), sandbox_policy: None, approval_mode: None, archived: matches!(thread.status, ThreadStatus::Archived), archived_at: None, git_sha: None, git_branch: None, git_origin_url: None, memory_mode: None, current_leaf_id: None, }) } } /// Top-level runtime combining config, model registry, threads, tools, MCP, and hooks. pub struct Runtime { /// Resolved application configuration. pub config: ConfigToml, /// Registry of available model providers. pub model_registry: ModelRegistry, /// Manages conversation thread lifecycle. pub thread_manager: ThreadManager, /// Registry of callable tools. pub tool_registry: Arc, /// Manager for MCP server connections. pub mcp_manager: Arc, /// Engine for evaluating execution policy decisions. pub exec_policy: ExecPolicyEngine, /// Dispatcher for lifecycle hooks. pub hooks: HookDispatcher, /// Manager for background job lifecycle. pub jobs: JobManager, } impl Runtime { /// Constructs a new `Runtime`, loading existing jobs from the state store. pub fn new( config: ConfigToml, model_registry: ModelRegistry, state: StateStore, tool_registry: Arc, mcp_manager: Arc, exec_policy: ExecPolicyEngine, hooks: HookDispatcher, ) -> Self { let mut jobs = JobManager::default(); if let Err(e) = jobs.load_from_store(&state) { tracing::warn!("Failed to load job store, starting with empty job list: {e}"); } Self { config, model_registry, thread_manager: ThreadManager::new(state), tool_registry, mcp_manager, exec_policy, hooks, jobs, } } fn persisted_thread_data(&self, thread_id: &str) -> Result { let history = self .thread_manager .state_store() .list_messages(thread_id, Some(500))? .into_iter() .map(|message| { json!({ "id": message.id, "role": message.role, "content": message.content, "item": message.item, "created_at": message.created_at }) }) .collect::>(); let checkpoint = self .thread_manager .state_store() .load_checkpoint(thread_id, None)? .map(|record| { json!({ "checkpoint_id": record.checkpoint_id, "state": record.state, "created_at": record.created_at }) }); Ok(json!({ "history": history, "checkpoint": checkpoint })) } fn persist_latest_checkpoint(&self, thread_id: &str, reason: &str, state: Value) -> Result<()> { self.thread_manager.state_store().save_checkpoint( thread_id, "latest", &json!({ "reason": reason, "saved_at": chrono::Utc::now().timestamp(), "state": state }), ) } /// Dispatches a thread request (create, start, resume, fork, list, read, etc.). pub async fn handle_thread(&mut self, req: ThreadRequest) -> Result { match req { ThreadRequest::Create { .. } => { let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")); let new = self.thread_manager.spawn_thread_with_history( "deepseek".to_string(), cwd, InitialHistory::New, false, )?; let mut response = thread_response_from_new("created", new); response.data = self.persisted_thread_data(&response.thread_id)?; Ok(response) } ThreadRequest::Start(params) => { let cwd = params.cwd.clone().unwrap_or_else(|| { std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")) }); let new = self.thread_manager.spawn_thread_with_history( params .model_provider .clone() .unwrap_or_else(|| "deepseek".to_string()), cwd, InitialHistory::New, params.persist_extended_history, )?; let mut response = thread_response_from_new("started", new); response.data = self.persisted_thread_data(&response.thread_id)?; Ok(response) } ThreadRequest::Resume(params) => { let fallback_cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")); if let Some(new) = self.thread_manager.resume_thread_with_history( ¶ms, &fallback_cwd, "deepseek".to_string(), )? { let mut response = thread_response_from_new("resumed", new); response.data = self.persisted_thread_data(&response.thread_id)?; Ok(response) } else { Ok(ThreadResponse { thread_id: params.thread_id, status: "missing".to_string(), thread: None, threads: Vec::new(), model: None, model_provider: None, cwd: None, approval_policy: params.approval_policy, sandbox: params.sandbox, events: Vec::new(), data: json!({"error":"thread not found"}), }) } } ThreadRequest::Fork(params) => { let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")); if let Some(new) = self.thread_manager.fork_thread(¶ms, &cwd)? { let mut response = thread_response_from_new("forked", new); response.data = self.persisted_thread_data(&response.thread_id)?; Ok(response) } else { Ok(ThreadResponse { thread_id: params.thread_id, status: "missing".to_string(), thread: None, threads: Vec::new(), model: None, model_provider: None, cwd: None, approval_policy: params.approval_policy, sandbox: params.sandbox, events: Vec::new(), data: json!({"error":"thread not found"}), }) } } ThreadRequest::List(params) => Ok(ThreadResponse { thread_id: "list".to_string(), status: "ok".to_string(), thread: None, threads: self.thread_manager.list_threads(¶ms)?, model: None, model_provider: None, cwd: None, approval_policy: None, sandbox: None, events: Vec::new(), data: json!({}), }), ThreadRequest::Read(params) => { let id = params.thread_id.clone(); let data = self.persisted_thread_data(&id)?; Ok(ThreadResponse { thread_id: id, status: "ok".to_string(), thread: self.thread_manager.read_thread(¶ms)?, threads: Vec::new(), model: None, model_provider: None, cwd: None, approval_policy: None, sandbox: None, events: Vec::new(), data, }) } ThreadRequest::SetName(params) => Ok(ThreadResponse { thread_id: params.thread_id.clone(), status: "ok".to_string(), thread: self.thread_manager.set_thread_name(¶ms)?, threads: Vec::new(), model: None, model_provider: None, cwd: None, approval_policy: None, sandbox: None, events: Vec::new(), data: json!({}), }), ThreadRequest::Archive { thread_id } => { self.thread_manager.archive_thread(&thread_id)?; Ok(ThreadResponse { thread_id, status: "archived".to_string(), thread: None, threads: Vec::new(), model: None, model_provider: None, cwd: None, approval_policy: None, sandbox: None, events: Vec::new(), data: json!({}), }) } ThreadRequest::Unarchive { thread_id } => { self.thread_manager.unarchive_thread(&thread_id)?; Ok(ThreadResponse { thread_id, status: "unarchived".to_string(), thread: None, threads: Vec::new(), model: None, model_provider: None, cwd: None, approval_policy: None, sandbox: None, events: Vec::new(), data: json!({}), }) } ThreadRequest::Message { thread_id, input } => { self.thread_manager.touch_message(&thread_id, &input)?; let response_id = format!("{thread_id}:{}", input.len()); self.hooks .emit(HookEvent::ResponseStart { response_id: response_id.clone(), }) .await; self.hooks .emit(HookEvent::ResponseEnd { response_id: response_id.clone(), }) .await; Ok(ThreadResponse { thread_id, status: "accepted".to_string(), thread: None, threads: Vec::new(), model: None, model_provider: None, cwd: None, approval_policy: None, sandbox: None, events: vec![ EventFrame::ResponseStart { response_id: response_id.clone(), }, EventFrame::ResponseDelta { response_id: response_id.clone(), delta: "queued".to_string(), channel: ResponseChannel::Text, }, EventFrame::ResponseEnd { response_id }, ], data: json!({}), }) } } } /// Resolves the model for a prompt, records the message, and returns the response. pub async fn handle_prompt( &mut self, req: PromptRequest, cli_overrides: &CliRuntimeOverrides, ) -> Result { let resolved = self.config.resolve_runtime_options(cli_overrides); let requested_model = req.model.clone().unwrap_or_else(|| resolved.model.clone()); let selection = self .model_registry .resolve(Some(&requested_model), Some(resolved.provider)); let resolved_model = selection.resolved.id.clone(); let response_id = format!("resp-{}", Uuid::new_v4()); self.hooks .emit(HookEvent::ResponseStart { response_id: response_id.clone(), }) .await; self.hooks .emit(HookEvent::ResponseDelta { response_id: response_id.clone(), delta: "model-selected".to_string(), }) .await; self.hooks .emit(HookEvent::ResponseEnd { response_id: response_id.clone(), }) .await; let payload = json!({ "provider": resolved.provider.as_str(), "model": resolved_model.clone(), "prompt": req.prompt, "telemetry": resolved.telemetry, "base_url": resolved.base_url, "has_api_key": resolved.api_key.as_ref().is_some_and(|k| !k.trim().is_empty()), "approval_policy": resolved.approval_policy, "sandbox_mode": resolved.sandbox_mode }); if let Some(thread_id) = req.thread_id.as_ref() { self.thread_manager.touch_message(thread_id, &req.prompt)?; let assistant_message_id = self.thread_manager.store.append_message( thread_id, "assistant", &payload.to_string(), Some(payload.clone()), )?; self.persist_latest_checkpoint( thread_id, "prompt_response", json!({ "response_id": response_id.clone(), "model": resolved_model.clone(), "provider": resolved.provider.as_str(), "assistant_message_id": assistant_message_id }), )?; } Ok(PromptResponse { output: payload.to_string(), model: resolved_model, events: vec![ EventFrame::ResponseStart { response_id: response_id.clone(), }, EventFrame::ResponseDelta { response_id: response_id.clone(), delta: "model-selected".to_string(), channel: ResponseChannel::Text, }, EventFrame::ResponseEnd { response_id }, ], }) } /// Evaluates execution policy and dispatches a tool call. pub async fn invoke_tool( &self, call: ToolCall, approval_mode: AskForApproval, cwd: &Path, ) -> Result { let fallback_cwd = cwd.display().to_string(); let (command, policy_cwd, execution_kind) = call.execution_subject(&fallback_cwd); let policy_tool = match &call.payload { ToolPayload::LocalShell { .. } => "exec_shell", _ => call.name.as_str(), }; let policy_path = permission_path_for_call(&call); let decision = self.exec_policy.check(ExecPolicyContext { command: &command, cwd: &policy_cwd, tool: Some(policy_tool), path: policy_path.as_deref(), ask_for_approval: approval_mode, sandbox_mode: None, })?; let precheck = policy_precheck_payload(&decision, &command, &policy_cwd, execution_kind); let response_id = format!("tool-{}", Uuid::new_v4()); let call_id = call .raw_tool_call_id .clone() .unwrap_or_else(|| format!("tool-call-{}", Uuid::new_v4())); self.hooks .emit(HookEvent::ToolLifecycle { response_id: response_id.clone(), tool_name: call.name.clone(), phase: "precheck".to_string(), payload: precheck.clone(), }) .await; if !decision.allow { let reason = decision.reason().to_string(); let approval_id = format!("approval-{}", Uuid::new_v4()); let error_frame = EventFrame::Error { response_id: response_id.clone(), message: reason.clone(), }; self.hooks .emit(HookEvent::ApprovalLifecycle { approval_id, phase: "denied".to_string(), reason: Some(reason.clone()), }) .await; self.hooks .emit(HookEvent::GenericEventFrame { frame: Box::new(error_frame.clone()), }) .await; return Ok(json!({ "ok": false, "status": "denied", "execution_kind": execution_kind, "response_id": response_id, "precheck": precheck, "error": reason, "events": [event_frame_payload(&error_frame)], })); } if decision.requires_approval { let approval_id = format!("approval-{}", Uuid::new_v4()); let reason = decision.reason().to_string(); let maybe_approval_frame = approval_request_frame( &decision.requirement, decision.matched_rule.as_deref(), call_id, approval_id.clone(), response_id.clone(), command.clone(), policy_cwd.clone(), ); self.hooks .emit(HookEvent::ApprovalLifecycle { approval_id: approval_id.clone(), phase: "requested".to_string(), reason: Some(reason.clone()), }) .await; let mut events = Vec::new(); if let Some(frame) = maybe_approval_frame { self.hooks .emit(HookEvent::GenericEventFrame { frame: Box::new(frame.clone()), }) .await; events.push(event_frame_payload(&frame)); } return Ok(json!({ "ok": false, "status": "approval_required", "execution_kind": execution_kind, "response_id": response_id, "approval_id": approval_id, "precheck": precheck, "error": reason, "events": events, })); } let start_frame = EventFrame::ToolCallStart { response_id: response_id.clone(), tool_name: call.name.clone(), arguments: tool_payload_value(&call.payload), }; self.hooks .emit(HookEvent::GenericEventFrame { frame: Box::new(start_frame.clone()), }) .await; self.hooks .emit(HookEvent::ToolLifecycle { response_id: response_id.clone(), tool_name: call.name.clone(), phase: "dispatching".to_string(), payload: json!({ "call_id": call_id, "execution_kind": execution_kind }), }) .await; match self.tool_registry.dispatch(call.clone(), true).await { Ok(tool_output) => { let result_frame = EventFrame::ToolCallResult { response_id: response_id.clone(), tool_name: call.name.clone(), output: tool_output_value(&tool_output), }; self.hooks .emit(HookEvent::GenericEventFrame { frame: Box::new(result_frame.clone()), }) .await; self.hooks .emit(HookEvent::ToolLifecycle { response_id: response_id.clone(), tool_name: call.name, phase: "completed".to_string(), payload: json!({ "ok": true }), }) .await; Ok(json!({ "ok": true, "status": "completed", "execution_kind": execution_kind, "response_id": response_id, "precheck": precheck, "output": tool_output, "events": [ event_frame_payload(&start_frame), event_frame_payload(&result_frame) ] })) } Err(err) => { let message = format!("{err:?}"); let error_frame = EventFrame::Error { response_id: response_id.clone(), message: message.clone(), }; self.hooks .emit(HookEvent::GenericEventFrame { frame: Box::new(error_frame.clone()), }) .await; self.hooks .emit(HookEvent::ToolLifecycle { response_id: response_id.clone(), tool_name: call.name, phase: "failed".to_string(), payload: json!({ "error": message.clone() }), }) .await; Ok(json!({ "ok": false, "status": "failed", "execution_kind": execution_kind, "response_id": response_id, "precheck": precheck, "error": message, "events": [ event_frame_payload(&start_frame), event_frame_payload(&error_frame) ] })) } } } /// Starts all configured MCP servers and emits startup events via hooks. pub async fn mcp_startup(&self) -> McpStartupCompleteEvent { let mut updates = Vec::new(); let summary = self.mcp_manager.start_all(|update| { updates.push(update); }); for update in updates { let status = match update.status { McpManagerStartupStatus::Starting => codewhale_protocol::McpStartupStatus::Starting, McpManagerStartupStatus::Ready => codewhale_protocol::McpStartupStatus::Ready, McpManagerStartupStatus::Failed { error } => { codewhale_protocol::McpStartupStatus::Failed { error } } McpManagerStartupStatus::Cancelled => { codewhale_protocol::McpStartupStatus::Cancelled } }; self.hooks .emit(HookEvent::GenericEventFrame { frame: Box::new(EventFrame::McpStartupUpdate { update: codewhale_protocol::McpStartupUpdateEvent { server_name: update.server_name, status, }, }), }) .await; } self.hooks .emit(HookEvent::GenericEventFrame { frame: Box::new(EventFrame::McpStartupComplete { summary: codewhale_protocol::McpStartupCompleteEvent { ready: summary.ready.clone(), failed: summary .failed .iter() .map(|f| codewhale_protocol::McpStartupFailure { server_name: f.server_name.clone(), error: f.error.clone(), }) .collect(), cancelled: summary.cancelled.clone(), }, }), }) .await; summary } /// Returns the current application status including all jobs and their history. pub fn app_status(&self) -> AppResponse { let jobs = self.jobs.list(); let events = jobs .iter() .flat_map(|job| { job.history.iter().map(|entry| EventFrame::ResponseDelta { response_id: job.id.clone(), delta: json!({ "kind": "job_transition", "job_id": job.id.clone(), "phase": entry.phase.clone(), "status": job_status_to_str(entry.status), "progress": entry.progress, "detail": entry.detail.clone(), "retry": job_retry_to_value(&entry.retry), "at": entry.at }) .to_string(), channel: ResponseChannel::Text, }) }) .collect::>(); AppResponse { ok: true, data: json!({ "jobs": jobs.into_iter().map(|job| { json!({ "id": job.id, "name": job.name, "status": job_status_to_str(job.status), "progress": job.progress, "detail": job.detail, "retry": job_retry_to_value(&job.retry), "history": job.history.iter().map(job_history_to_value).collect::>() }) }).collect::>() }), events, } } /// Returns the default model provider from the resolved configuration. pub fn provider_default(&self) -> ProviderKind { self.config.provider } /// Saves a named checkpoint for a thread. pub fn save_thread_checkpoint( &self, thread_id: &str, checkpoint_id: &str, state: &Value, ) -> Result<()> { self.thread_manager .state_store() .save_checkpoint(thread_id, checkpoint_id, state) } /// Loads a checkpoint for a thread. Pass `None` for the latest. pub fn load_thread_checkpoint( &self, thread_id: &str, checkpoint_id: Option<&str>, ) -> Result> { Ok(self .thread_manager .state_store() .load_checkpoint(thread_id, checkpoint_id)? .map(|checkpoint| checkpoint.state)) } /// Enqueues a new background job and persists it immediately. pub fn enqueue_job(&mut self, name: impl Into) -> Result { let job = self.jobs.enqueue(name); self.jobs .persist_job(self.thread_manager.state_store(), &job.id)?; Ok(job) } /// Transitions a job to running and persists the change. pub fn set_job_running(&mut self, job_id: &str) -> Result<()> { self.jobs.set_running(job_id); self.jobs .persist_job(self.thread_manager.state_store(), job_id) } /// Updates a job's progress and persists the change. pub fn update_job_progress( &mut self, job_id: &str, progress: u8, detail: Option, ) -> Result<()> { self.jobs.update_progress(job_id, progress, detail); self.jobs .persist_job(self.thread_manager.state_store(), job_id) } /// Marks a job as completed and persists the change. pub fn complete_job(&mut self, job_id: &str) -> Result<()> { self.jobs.complete(job_id); self.jobs .persist_job(self.thread_manager.state_store(), job_id) } /// Marks a job as failed and persists the change. pub fn fail_job(&mut self, job_id: &str, detail: impl Into) -> Result<()> { self.jobs.fail(job_id, detail); self.jobs .persist_job(self.thread_manager.state_store(), job_id) } /// Cancels a job and persists the change. pub fn cancel_job(&mut self, job_id: &str) -> Result<()> { self.jobs.cancel(job_id); self.jobs .persist_job(self.thread_manager.state_store(), job_id) } /// Pauses a job and persists the change. pub fn pause_job(&mut self, job_id: &str, detail: Option) -> Result<()> { self.jobs.pause(job_id, detail); self.jobs .persist_job(self.thread_manager.state_store(), job_id) } /// Resumes a paused job and persists the change. pub fn resume_job(&mut self, job_id: &str, detail: Option) -> Result<()> { self.jobs.resume(job_id, detail); self.jobs .persist_job(self.thread_manager.state_store(), job_id) } /// Returns the state-transition history for a job. pub fn job_history(&self, job_id: &str) -> Vec { self.jobs.history(job_id) } } fn thread_response_from_new(status: &str, new: NewThread) -> ThreadResponse { ThreadResponse { thread_id: new.thread.id.clone(), status: status.to_string(), thread: Some(new.thread), threads: Vec::new(), model: Some(new.model), model_provider: Some(new.model_provider), cwd: Some(new.cwd), approval_policy: new.approval_policy, sandbox: new.sandbox, events: Vec::new(), data: json!({}), } } fn preview_from_initial_history(initial_history: &InitialHistory) -> String { match initial_history { InitialHistory::New => "New conversation".to_string(), InitialHistory::Forked(items) => truncate_preview( &items .first() .map(Value::to_string) .unwrap_or_else(|| "Forked conversation".to_string()), ), InitialHistory::Resumed { history, .. } => truncate_preview( &history .first() .map(Value::to_string) .unwrap_or_else(|| "Resumed conversation".to_string()), ), } } fn permission_path_for_call(call: &ToolCall) -> Option { match &call.payload { ToolPayload::Function { arguments } => serde_json::from_str::(arguments) .ok() .and_then(|value| { value .get("path") .and_then(Value::as_str) .map(str::to_string) }), ToolPayload::Mcp { raw_arguments, .. } => raw_arguments .get("path") .and_then(Value::as_str) .map(str::to_string), ToolPayload::Custom { .. } | ToolPayload::LocalShell { .. } => None, } } fn truncate_preview(value: &str) -> String { value.chars().take(120).collect() } fn to_protocol_thread(thread: ThreadMetadata) -> Thread { Thread { id: thread.id, preview: thread.preview, ephemeral: thread.ephemeral, model_provider: thread.model_provider, created_at: thread.created_at, updated_at: thread.updated_at, status: match thread.status { PersistedThreadStatus::Running => ThreadStatus::Running, PersistedThreadStatus::Idle => ThreadStatus::Idle, PersistedThreadStatus::Completed => ThreadStatus::Completed, PersistedThreadStatus::Failed => ThreadStatus::Failed, PersistedThreadStatus::Paused => ThreadStatus::Paused, PersistedThreadStatus::Archived => ThreadStatus::Archived, }, path: thread.path, cwd: thread.cwd, cli_version: thread.cli_version, source: match thread.source { SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive, SessionSource::Resume => codewhale_protocol::SessionSource::Resume, SessionSource::Fork => codewhale_protocol::SessionSource::Fork, SessionSource::Api => codewhale_protocol::SessionSource::Api, SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown, }, name: thread.name, } } fn to_persisted_status(status: &ThreadStatus) -> PersistedThreadStatus { match status { ThreadStatus::Running => PersistedThreadStatus::Running, ThreadStatus::Idle => PersistedThreadStatus::Idle, ThreadStatus::Completed => PersistedThreadStatus::Completed, ThreadStatus::Failed => PersistedThreadStatus::Failed, ThreadStatus::Paused => PersistedThreadStatus::Paused, ThreadStatus::Archived => PersistedThreadStatus::Archived, } } fn to_persisted_source(source: &codewhale_protocol::SessionSource) -> SessionSource { match source { codewhale_protocol::SessionSource::Interactive => SessionSource::Interactive, codewhale_protocol::SessionSource::Resume => SessionSource::Resume, codewhale_protocol::SessionSource::Fork => SessionSource::Fork, codewhale_protocol::SessionSource::Api => SessionSource::Api, codewhale_protocol::SessionSource::Unknown => SessionSource::Unknown, } } fn approval_request_frame( requirement: &ExecApprovalRequirement, matched_rule: Option<&str>, call_id: String, approval_id: String, turn_id: String, command: String, cwd: String, ) -> Option { let ExecApprovalRequirement::NeedsApproval { reason, proposed_execpolicy_amendment, proposed_network_policy_amendments, } = requirement else { return None; }; let mut available_decisions = vec![ ReviewDecision::Approved, ReviewDecision::ApprovedForSession, ReviewDecision::Denied, ReviewDecision::Abort, ]; if proposed_execpolicy_amendment .as_ref() .is_some_and(|amendment| !amendment.prefixes.is_empty()) { available_decisions.push(ReviewDecision::ApprovedExecpolicyAmendment); } available_decisions.extend(proposed_network_policy_amendments.iter().cloned().map( |amendment| ReviewDecision::NetworkPolicyAmendment { host: amendment.host, action: amendment.action, }, )); Some(EventFrame::ExecApprovalRequest { request: ExecApprovalRequestEvent { call_id, approval_id, turn_id, command, cwd, reason: reason.clone(), matched_rule: matched_rule.map(|rule| rule.to_string().into_boxed_str()), network_approval_context: None, proposed_execpolicy_amendment: proposed_execpolicy_amendment .as_ref() .map(|amendment| amendment.prefixes.clone()) .unwrap_or_default(), proposed_network_policy_amendments: proposed_network_policy_amendments.clone(), additional_permissions: Vec::new(), available_decisions, }, }) } fn approval_requirement_payload(requirement: &ExecApprovalRequirement) -> Value { match requirement { ExecApprovalRequirement::Skip { bypass_sandbox, proposed_execpolicy_amendment, } => json!({ "type": "skip", "bypass_sandbox": bypass_sandbox, "reason": requirement.reason(), "proposed_execpolicy_amendment": proposed_execpolicy_amendment .as_ref() .map(|amendment| amendment.prefixes.clone()) .unwrap_or_default() }), ExecApprovalRequirement::NeedsApproval { reason, proposed_execpolicy_amendment, proposed_network_policy_amendments, } => json!({ "type": "needs_approval", "reason": reason, "proposed_execpolicy_amendment": proposed_execpolicy_amendment .as_ref() .map(|amendment| amendment.prefixes.clone()) .unwrap_or_default(), "proposed_network_policy_amendments": proposed_network_policy_amendments }), ExecApprovalRequirement::Forbidden { reason } => json!({ "type": "forbidden", "reason": reason }), } } fn policy_precheck_payload( decision: &ExecPolicyDecision, command: &str, cwd: &str, execution_kind: &str, ) -> Value { json!({ "execution_kind": execution_kind, "command": command, "cwd": cwd, "allow": decision.allow, "requires_approval": decision.requires_approval, "matched_rule": decision.matched_rule.clone(), "phase": decision.requirement.phase(), "reason": decision.reason(), "requirement": approval_requirement_payload(&decision.requirement) }) } fn tool_payload_value(payload: &ToolPayload) -> Value { serde_json::to_value(payload).unwrap_or_else( |_| json!({"type":"serialization_error","message":"tool payload unavailable"}), ) } fn tool_output_value(output: &codewhale_protocol::ToolOutput) -> Value { serde_json::to_value(output).unwrap_or_else( |_| json!({"type":"serialization_error","message":"tool output unavailable"}), ) } fn event_frame_payload(frame: &EventFrame) -> Value { serde_json::to_value(frame) .unwrap_or_else(|_| json!({"event":"error","message":"failed to encode event frame"})) } fn json_optional_string(value: &Value) -> Option { if value.is_null() { None } else { value.as_str().map(ToString::to_string) } } fn parse_retry_metadata(value: Option<&Value>) -> JobRetryMetadata { let Some(value) = value else { return JobRetryMetadata::default(); }; JobRetryMetadata { attempt: value .get("attempt") .and_then(Value::as_u64) .unwrap_or(0) .min(u32::MAX as u64) as u32, max_attempts: value .get("max_attempts") .and_then(Value::as_u64) .unwrap_or(DEFAULT_JOB_MAX_ATTEMPTS as u64) .min(u32::MAX as u64) as u32, backoff_base_ms: value .get("backoff_base_ms") .and_then(Value::as_u64) .unwrap_or(DEFAULT_JOB_BACKOFF_BASE_MS), next_backoff_ms: value .get("next_backoff_ms") .and_then(Value::as_u64) .unwrap_or(0), next_retry_at: value.get("next_retry_at").and_then(Value::as_i64), } } fn parse_history_entry(value: &Value) -> Option { let status = value .get("status") .and_then(Value::as_str) .and_then(job_status_from_str)?; Some(JobHistoryEntry { at: value.get("at").and_then(Value::as_i64).unwrap_or(0), phase: value .get("phase") .and_then(Value::as_str) .unwrap_or("unknown") .to_string(), status, progress: value .get("progress") .and_then(Value::as_u64) .map(|v| v.min(u8::MAX as u64) as u8), detail: value.get("detail").and_then(json_optional_string), retry: parse_retry_metadata(value.get("retry")), }) } fn job_status_to_str(status: JobStatus) -> &'static str { match status { JobStatus::Queued => "queued", JobStatus::Running => "running", JobStatus::Paused => "paused", JobStatus::Completed => "completed", JobStatus::Failed => "failed", JobStatus::Cancelled => "cancelled", } } fn job_status_from_str(value: &str) -> Option { match value { "queued" => Some(JobStatus::Queued), "running" => Some(JobStatus::Running), "paused" => Some(JobStatus::Paused), "completed" => Some(JobStatus::Completed), "failed" => Some(JobStatus::Failed), "cancelled" => Some(JobStatus::Cancelled), _ => None, } } fn job_retry_to_value(retry: &JobRetryMetadata) -> Value { json!({ "attempt": retry.attempt, "max_attempts": retry.max_attempts, "backoff_base_ms": retry.backoff_base_ms, "next_backoff_ms": retry.next_backoff_ms, "next_retry_at": retry.next_retry_at }) } fn job_history_to_value(entry: &JobHistoryEntry) -> Value { json!({ "at": entry.at, "phase": entry.phase.clone(), "status": job_status_to_str(entry.status), "progress": entry.progress, "detail": entry.detail.clone(), "retry": job_retry_to_value(&entry.retry) }) } fn runtime_status_to_job_state(status: JobStatus) -> JobStateStatus { match status { JobStatus::Queued => JobStateStatus::Queued, JobStatus::Running => JobStateStatus::Running, JobStatus::Paused => JobStateStatus::Running, JobStatus::Completed => JobStateStatus::Completed, JobStatus::Failed => JobStateStatus::Failed, JobStatus::Cancelled => JobStateStatus::Cancelled, } } fn job_state_status_to_runtime(status: JobStateStatus) -> JobStatus { match status { JobStateStatus::Queued => JobStatus::Queued, JobStateStatus::Running => JobStatus::Running, JobStateStatus::Completed => JobStatus::Completed, JobStateStatus::Failed => JobStatus::Failed, JobStateStatus::Cancelled => JobStatus::Cancelled, } } #[cfg(test)] mod tests { use super::*; use codewhale_tools::ToolCallSource; // ── JobManager: lifecycle ────────────────────────────────────────── #[test] fn permission_path_for_call_extracts_function_path_argument() { let call = ToolCall { name: "read_file".to_string(), payload: ToolPayload::Function { arguments: json!({ "path": "README.md" }).to_string(), }, source: ToolCallSource::Direct, raw_tool_call_id: None, }; assert_eq!( permission_path_for_call(&call).as_deref(), Some("README.md") ); } #[test] fn permission_path_for_call_extracts_mcp_path_argument() { let call = ToolCall { name: "mcp_fs_read".to_string(), payload: ToolPayload::Mcp { server: "fs".to_string(), tool: "read".to_string(), raw_arguments: json!({ "path": "secrets/token.txt" }), raw_tool_call_id: None, }, source: ToolCallSource::Direct, raw_tool_call_id: None, }; assert_eq!( permission_path_for_call(&call).as_deref(), Some("secrets/token.txt") ); } #[test] fn permission_path_for_call_ignores_shell_payload() { let call = ToolCall { name: "exec_shell".to_string(), payload: ToolPayload::LocalShell { params: codewhale_protocol::LocalShellParams { command: "cargo test".to_string(), cwd: None, timeout_ms: None, }, }, source: ToolCallSource::Direct, raw_tool_call_id: None, }; assert_eq!(permission_path_for_call(&call), None); } #[test] fn approval_request_frame_includes_matched_rule() { let requirement = ExecApprovalRequirement::NeedsApproval { reason: "Typed ask rule 'tool=exec_shell command=cargo test' requires approval." .to_string(), proposed_execpolicy_amendment: None, proposed_network_policy_amendments: Vec::new(), }; let frame = approval_request_frame( &requirement, Some("tool=exec_shell command=cargo test"), "call-1".to_string(), "approval-1".to_string(), "turn-1".to_string(), "cargo test --workspace".to_string(), "/repo".to_string(), ) .expect("approval frame"); let EventFrame::ExecApprovalRequest { request } = frame else { panic!("expected exec approval request frame"); }; assert_eq!( request.matched_rule.as_deref(), Some("tool=exec_shell command=cargo test") ); assert_eq!(request.reason, requirement.reason()); } #[test] fn enqueue_creates_queued_job_with_zero_progress() { let mut jm = JobManager::default(); let job = jm.enqueue("build"); assert_eq!(job.name, "build"); assert_eq!(job.status, JobStatus::Queued); assert_eq!(job.progress, Some(0)); assert!(job.detail.is_none()); assert_eq!(job.history.len(), 1); assert_eq!(job.history[0].phase, "created"); } #[test] fn set_running_transitions_from_queued() { let mut jm = JobManager::default(); let job = jm.enqueue("deploy"); let id = job.id.clone(); jm.set_running(&id); let jobs = jm.list(); let updated = jobs.iter().find(|j| j.id == id).unwrap(); assert_eq!(updated.status, JobStatus::Running); assert_eq!(updated.history.last().unwrap().phase, "running"); } #[test] fn update_progress_clamps_to_100() { let mut jm = JobManager::default(); let job = jm.enqueue("task"); let id = job.id.clone(); jm.update_progress(&id, 150, Some("over".to_string())); let jobs = jm.list(); let updated = jobs.iter().find(|j| j.id == id).unwrap(); assert_eq!(updated.progress, Some(100)); } #[test] fn complete_sets_progress_to_100() { let mut jm = JobManager::default(); let job = jm.enqueue("task"); let id = job.id.clone(); jm.set_running(&id); jm.complete(&id); let jobs = jm.list(); let updated = jobs.iter().find(|j| j.id == id).unwrap(); assert_eq!(updated.status, JobStatus::Completed); assert_eq!(updated.progress, Some(100)); } #[test] fn fail_increments_attempt_and_sets_backoff() { let mut jm = JobManager::default(); let job = jm.enqueue("fragile"); let id = job.id.clone(); jm.set_running(&id); jm.fail(&id, "crashed"); let jobs = jm.list(); let updated = jobs.iter().find(|j| j.id == id).unwrap(); assert_eq!(updated.status, JobStatus::Failed); assert_eq!(updated.retry.attempt, 1); assert!(updated.retry.next_backoff_ms > 0); assert!(updated.retry.next_retry_at.is_some()); assert_eq!(updated.detail.as_deref(), Some("crashed")); } #[test] fn fail_clears_retry_after_max_attempts() { let mut jm = JobManager::default(); let job = jm.enqueue("fragile"); let id = job.id.clone(); for _ in 0..=DEFAULT_JOB_MAX_ATTEMPTS { jm.set_running(&id); jm.fail(&id, "boom"); } let jobs = jm.list(); let updated = jobs.iter().find(|j| j.id == id).unwrap(); assert_eq!(updated.retry.attempt, DEFAULT_JOB_MAX_ATTEMPTS); assert_eq!(updated.retry.next_backoff_ms, 0); assert!(updated.retry.next_retry_at.is_none()); } #[test] fn cancel_sets_status_and_clears_retry() { let mut jm = JobManager::default(); let job = jm.enqueue("task"); let id = job.id.clone(); jm.cancel(&id); let jobs = jm.list(); let updated = jobs.iter().find(|j| j.id == id).unwrap(); assert_eq!(updated.status, JobStatus::Cancelled); assert_eq!(updated.retry.next_backoff_ms, 0); } #[test] fn pause_and_resume_round_trip() { let mut jm = JobManager::default(); let job = jm.enqueue("task"); let id = job.id.clone(); jm.set_running(&id); jm.pause(&id, Some("waiting".to_string())); let jobs = jm.list(); let paused = jobs.iter().find(|j| j.id == id).unwrap(); assert_eq!(paused.status, JobStatus::Paused); assert_eq!(paused.detail.as_deref(), Some("waiting")); jm.resume(&id, None); let jobs = jm.list(); let resumed = jobs.iter().find(|j| j.id == id).unwrap(); assert_eq!(resumed.status, JobStatus::Running); assert_eq!(resumed.history.last().unwrap().phase, "resumed"); } #[test] fn list_returns_jobs_sorted_by_updated_at_desc() { let mut jm = JobManager::default(); jm.enqueue("first"); jm.enqueue("second"); jm.enqueue("third"); let jobs = jm.list(); assert_eq!(jobs.len(), 3); for window in jobs.windows(2) { assert!(window[0].updated_at >= window[1].updated_at); } } #[test] fn history_returns_entries_for_existing_job() { let mut jm = JobManager::default(); let job = jm.enqueue("task"); let id = job.id.clone(); jm.set_running(&id); jm.complete(&id); let history = jm.history(&id); assert_eq!(history.len(), 3); // created, running, completed assert_eq!(history[0].phase, "created"); assert_eq!(history[1].phase, "running"); assert_eq!(history[2].phase, "completed"); } #[test] fn history_returns_empty_for_unknown_job() { let jm = JobManager::default(); assert!(jm.history("nonexistent").is_empty()); } #[test] fn resume_pending_requeues_running_and_queued() { let mut jm = JobManager::default(); let _j1 = jm.enqueue("queued_task"); let j2 = jm.enqueue("running_task"); let j3 = jm.enqueue("completed_task"); let id2 = j2.id.clone(); let id3 = j3.id.clone(); jm.set_running(&id2); jm.set_running(&id3); jm.complete(&id3); let resumed = jm.resume_pending(); assert_eq!(resumed.len(), 2); for job in &resumed { assert_eq!(job.status, JobStatus::Queued); } } // ── JobManager: backoff ──────────────────────────────────────────── #[test] fn deterministic_backoff_zero_on_first_attempt() { let retry = JobRetryMetadata { attempt: 0, ..Default::default() }; assert_eq!(JobManager::deterministic_backoff_ms(&retry), 0); } #[test] fn deterministic_backoff_exponential_growth() { let base = DEFAULT_JOB_BACKOFF_BASE_MS; for attempt in 1..=5 { let retry = JobRetryMetadata { attempt, backoff_base_ms: base, ..Default::default() }; let expected = base * 2u64.pow(attempt.saturating_sub(1).min(20)); assert_eq!( JobManager::deterministic_backoff_ms(&retry), expected, "attempt {attempt}" ); } } #[test] fn deterministic_backoff_saturates_at_high_exponent() { let retry = JobRetryMetadata { attempt: 63, backoff_base_ms: 1000, ..Default::default() }; // Should not panic; result saturates let _ = JobManager::deterministic_backoff_ms(&retry); } // ── JobManager: history truncation ───────────────────────────────── #[test] fn push_history_truncates_beyond_max() { let mut jm = JobManager::default(); let job = jm.enqueue("task"); let id = job.id.clone(); // Generate more history entries than the limit for i in 0..(MAX_JOB_HISTORY_ENTRIES + 20) { jm.update_progress(&id, (i % 100) as u8, Some(format!("step {i}"))); } let history = jm.history(&id); assert_eq!(history.len(), MAX_JOB_HISTORY_ENTRIES); } // ── JobManager: persistence encoding/parsing ─────────────────────── #[test] fn encode_and_parse_persisted_detail_round_trip() { let mut jm = JobManager::default(); let job = jm.enqueue("task"); let id = job.id.clone(); jm.set_running(&id); jm.fail(&id, "oops"); let job = jm.list().into_iter().find(|j| j.id == id).unwrap(); let encoded = JobManager::encode_persisted_detail(&job).unwrap().unwrap(); let parsed = JobManager::parse_persisted_detail(Some(&encoded)).unwrap(); assert_eq!(parsed.status, job.status); assert_eq!(parsed.detail, job.detail); assert_eq!(parsed.retry.attempt, job.retry.attempt); assert_eq!(parsed.history.len(), job.history.len()); } #[test] fn parse_persisted_detail_returns_none_for_none_input() { assert!(JobManager::parse_persisted_detail(None).is_none()); } #[test] fn parse_persisted_detail_returns_none_for_invalid_json() { assert!(JobManager::parse_persisted_detail(Some("not json")).is_none()); } // ── Helper functions ─────────────────────────────────────────────── #[test] fn job_status_round_trip_str() { let statuses = [ JobStatus::Queued, JobStatus::Running, JobStatus::Paused, JobStatus::Completed, JobStatus::Failed, JobStatus::Cancelled, ]; for status in &statuses { let s = job_status_to_str(*status); let parsed = job_status_from_str(s); assert_eq!(parsed, Some(*status), "round-trip failed for {s:?}"); } } #[test] fn job_status_from_str_returns_none_for_unknown() { assert_eq!(job_status_from_str("unknown"), None); assert_eq!(job_status_from_str(""), None); } #[test] fn truncate_preview_limits_to_120_chars() { let long = "a".repeat(200); let truncated = truncate_preview(&long); assert_eq!(truncated.len(), 120); } #[test] fn truncate_preview_preserves_short_strings() { let short = "hello"; assert_eq!(truncate_preview(short), "hello"); } #[test] fn runtime_status_to_job_state_maps_correctly() { assert_eq!( runtime_status_to_job_state(JobStatus::Queued), JobStateStatus::Queued ); assert_eq!( runtime_status_to_job_state(JobStatus::Running), JobStateStatus::Running ); assert_eq!( runtime_status_to_job_state(JobStatus::Paused), JobStateStatus::Running ); assert_eq!( runtime_status_to_job_state(JobStatus::Completed), JobStateStatus::Completed ); assert_eq!( runtime_status_to_job_state(JobStatus::Failed), JobStateStatus::Failed ); assert_eq!( runtime_status_to_job_state(JobStatus::Cancelled), JobStateStatus::Cancelled ); } #[test] fn job_state_status_to_runtime_maps_correctly() { assert_eq!( job_state_status_to_runtime(JobStateStatus::Queued), JobStatus::Queued ); assert_eq!( job_state_status_to_runtime(JobStateStatus::Running), JobStatus::Running ); assert_eq!( job_state_status_to_runtime(JobStateStatus::Completed), JobStatus::Completed ); assert_eq!( job_state_status_to_runtime(JobStateStatus::Failed), JobStatus::Failed ); assert_eq!( job_state_status_to_runtime(JobStateStatus::Cancelled), JobStatus::Cancelled ); } #[test] fn preview_from_initial_history_new() { let preview = preview_from_initial_history(&InitialHistory::New); assert_eq!(preview, "New conversation"); } #[test] fn preview_from_initial_history_forked() { let preview = preview_from_initial_history(&InitialHistory::Forked(vec![json!("hello")])); assert!(preview.contains("hello")); } #[test] fn preview_from_initial_history_resumed() { let preview = preview_from_initial_history(&InitialHistory::Resumed { conversation_id: "test".to_string(), history: vec![json!("world")], rollout_path: PathBuf::from("/tmp/test"), }); assert!(preview.contains("world")); } #[test] fn json_optional_string_handles_null() { assert!(json_optional_string(&Value::Null).is_none()); } #[test] fn json_optional_string_handles_string() { assert_eq!( json_optional_string(&Value::String("hello".to_string())), Some("hello".to_string()) ); } #[test] fn json_optional_string_handles_non_string() { assert!(json_optional_string(&json!(42)).is_none()); } #[test] fn parse_retry_metadata_returns_default_for_none() { let retry = parse_retry_metadata(None); assert_eq!(retry.attempt, 0); assert_eq!(retry.max_attempts, DEFAULT_JOB_MAX_ATTEMPTS); assert_eq!(retry.backoff_base_ms, DEFAULT_JOB_BACKOFF_BASE_MS); } #[test] fn parse_retry_metadata_parses_fields() { let value = json!({ "attempt": 2, "max_attempts": 5, "backoff_base_ms": 1000, "next_backoff_ms": 2000, "next_retry_at": 1234567890i64 }); let retry = parse_retry_metadata(Some(&value)); assert_eq!(retry.attempt, 2); assert_eq!(retry.max_attempts, 5); assert_eq!(retry.backoff_base_ms, 1000); assert_eq!(retry.next_backoff_ms, 2000); assert_eq!(retry.next_retry_at, Some(1234567890)); } #[test] fn parse_history_entry_returns_none_without_status() { let value = json!({"at": 1, "phase": "test"}); assert!(parse_history_entry(&value).is_none()); } #[test] fn parse_history_entry_parses_valid_entry() { let value = json!({ "at": 100, "phase": "running", "status": "running", "progress": 50, "detail": "working", "retry": {"attempt": 0, "max_attempts": 3, "backoff_base_ms": 500} }); let entry = parse_history_entry(&value).unwrap(); assert_eq!(entry.at, 100); assert_eq!(entry.phase, "running"); assert_eq!(entry.status, JobStatus::Running); assert_eq!(entry.progress, Some(50)); assert_eq!(entry.detail.as_deref(), Some("working")); } }