use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; use anyhow::Result; use deepseek_agent::ModelRegistry; use deepseek_config::{CliRuntimeOverrides, ConfigToml, ProviderKind}; use deepseek_execpolicy::{ AskForApproval, ExecApprovalRequirement, ExecPolicyContext, ExecPolicyDecision, ExecPolicyEngine, }; use deepseek_hooks::{HookDispatcher, HookEvent}; use deepseek_mcp::{ McpManager, McpStartupCompleteEvent, McpStartupStatus as McpManagerStartupStatus, }; use deepseek_protocol::{ AppResponse, EventFrame, ExecApprovalRequestEvent, PromptRequest, PromptResponse, ReviewDecision, Thread, ThreadForkParams, ThreadListParams, ThreadReadParams, ThreadRequest, ThreadResponse, ThreadResumeParams, ThreadSetNameParams, ThreadStatus, ToolPayload, }; use deepseek_state::{ JobStateRecord, JobStateStatus, SessionSource, StateStore, ThreadListFilters, ThreadMetadata, ThreadStatus as PersistedThreadStatus, }; use deepseek_tools::{ToolCall, ToolRegistry}; use serde_json::{Value, json}; use uuid::Uuid; #[derive(Debug, Clone)] pub enum InitialHistory { New, Forked(Vec), Resumed { conversation_id: String, history: Vec, rollout_path: PathBuf, }, } #[derive(Debug, Clone)] pub struct NewThread { pub thread: Thread, pub model: String, pub model_provider: String, pub cwd: PathBuf, pub approval_policy: Option, pub sandbox: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum JobStatus { Queued, Running, Paused, Completed, Failed, Cancelled, } const JOB_DETAIL_SCHEMA_VERSION: u8 = 1; const DEFAULT_JOB_MAX_ATTEMPTS: u32 = 3; const DEFAULT_JOB_BACKOFF_BASE_MS: u64 = 500; const MAX_JOB_HISTORY_ENTRIES: usize = 64; #[derive(Debug, Clone)] pub struct JobRetryMetadata { pub attempt: u32, pub max_attempts: u32, pub backoff_base_ms: u64, pub next_backoff_ms: u64, pub next_retry_at: Option, } impl Default for JobRetryMetadata { fn default() -> Self { Self { attempt: 0, max_attempts: DEFAULT_JOB_MAX_ATTEMPTS, backoff_base_ms: DEFAULT_JOB_BACKOFF_BASE_MS, next_backoff_ms: 0, next_retry_at: None, } } } #[derive(Debug, Clone)] pub struct JobHistoryEntry { pub at: i64, pub phase: String, pub status: JobStatus, pub progress: Option, pub detail: Option, pub retry: JobRetryMetadata, } #[derive(Debug, Clone)] struct PersistedJobDetail { pub status: JobStatus, pub detail: Option, pub retry: JobRetryMetadata, pub history: Vec, } #[derive(Debug, Clone)] pub struct JobRecord { pub id: String, pub name: String, pub status: JobStatus, pub progress: Option, pub detail: Option, pub retry: JobRetryMetadata, pub history: Vec, pub created_at: i64, pub updated_at: i64, } #[derive(Debug, Default)] pub struct JobManager { jobs: HashMap, } impl JobManager { fn now_ts() -> i64 { chrono::Utc::now().timestamp() } fn deterministic_backoff_ms(retry: &JobRetryMetadata) -> u64 { if retry.attempt == 0 { return 0; } let exponent = retry.attempt.saturating_sub(1).min(20); let multiplier = 1u64.checked_shl(exponent).unwrap_or(u64::MAX); retry.backoff_base_ms.saturating_mul(multiplier) } fn clear_retry_schedule(retry: &mut JobRetryMetadata) { retry.next_backoff_ms = 0; retry.next_retry_at = None; } fn push_history(job: &mut JobRecord, phase: &str) { job.history.push(JobHistoryEntry { at: job.updated_at, phase: phase.to_string(), status: job.status, progress: job.progress, detail: job.detail.clone(), retry: job.retry.clone(), }); if job.history.len() > MAX_JOB_HISTORY_ENTRIES { let to_drain = job.history.len() - MAX_JOB_HISTORY_ENTRIES; job.history.drain(0..to_drain); } } fn parse_persisted_detail(raw: Option<&str>) -> Option { let raw = raw?; let parsed: Value = serde_json::from_str(raw).ok()?; let status = parsed .get("status") .and_then(Value::as_str) .and_then(job_status_from_str)?; let detail = parsed.get("detail").and_then(json_optional_string); let retry = parse_retry_metadata(parsed.get("retry")); let history = parsed .get("history") .and_then(Value::as_array) .map(|items| { items .iter() .filter_map(parse_history_entry) .collect::>() }) .unwrap_or_default(); Some(PersistedJobDetail { status, detail, retry, history, }) } fn encode_persisted_detail(job: &JobRecord) -> Result> { let encoded = json!({ "schema_version": JOB_DETAIL_SCHEMA_VERSION, "status": job_status_to_str(job.status), "detail": job.detail.clone(), "retry": job_retry_to_value(&job.retry), "history": job.history.iter().map(job_history_to_value).collect::>() }) .to_string(); Ok(Some(encoded)) } pub fn enqueue(&mut self, name: impl Into) -> JobRecord { let now = Self::now_ts(); let id = format!("job-{}", Uuid::new_v4()); let mut job = JobRecord { id: id.clone(), name: name.into(), status: JobStatus::Queued, progress: Some(0), detail: None, retry: JobRetryMetadata::default(), history: Vec::new(), created_at: now, updated_at: now, }; Self::push_history(&mut job, "created"); self.jobs.insert(id, job.clone()); job } pub fn set_running(&mut self, id: &str) { if let Some(job) = self.jobs.get_mut(id) { job.status = JobStatus::Running; Self::clear_retry_schedule(&mut job.retry); job.updated_at = Self::now_ts(); Self::push_history(job, "running"); } } pub fn update_progress(&mut self, id: &str, progress: u8, detail: Option) { if let Some(job) = self.jobs.get_mut(id) { job.progress = Some(progress.min(100)); job.detail = detail; job.updated_at = Self::now_ts(); Self::push_history(job, "progress_updated"); } } pub fn complete(&mut self, id: &str) { if let Some(job) = self.jobs.get_mut(id) { job.status = JobStatus::Completed; job.progress = Some(100); Self::clear_retry_schedule(&mut job.retry); job.updated_at = Self::now_ts(); Self::push_history(job, "completed"); } } pub fn fail(&mut self, id: &str, detail: impl Into) { if let Some(job) = self.jobs.get_mut(id) { let now = Self::now_ts(); job.status = JobStatus::Failed; job.detail = Some(detail.into()); if job.retry.attempt < job.retry.max_attempts { job.retry.attempt += 1; job.retry.next_backoff_ms = Self::deterministic_backoff_ms(&job.retry); let delay_secs = ((job.retry.next_backoff_ms.saturating_add(999)) / 1000) .min(i64::MAX as u64) as i64; job.retry.next_retry_at = Some(now.saturating_add(delay_secs)); } else { Self::clear_retry_schedule(&mut job.retry); } job.updated_at = now; Self::push_history(job, "failed"); } } pub fn cancel(&mut self, id: &str) { if let Some(job) = self.jobs.get_mut(id) { job.status = JobStatus::Cancelled; Self::clear_retry_schedule(&mut job.retry); job.updated_at = Self::now_ts(); Self::push_history(job, "cancelled"); } } pub fn pause(&mut self, id: &str, detail: Option) { if let Some(job) = self.jobs.get_mut(id) { job.status = JobStatus::Paused; if detail.is_some() { job.detail = detail; } job.updated_at = Self::now_ts(); Self::push_history(job, "paused"); } } pub fn resume(&mut self, id: &str, detail: Option) { if let Some(job) = self.jobs.get_mut(id) { job.status = JobStatus::Running; if detail.is_some() { job.detail = detail; } Self::clear_retry_schedule(&mut job.retry); job.updated_at = Self::now_ts(); Self::push_history(job, "resumed"); } } pub fn list(&self) -> Vec { let mut out = self.jobs.values().cloned().collect::>(); out.sort_by_key(|job| -job.updated_at); out } pub fn history(&self, id: &str) -> Vec { self.jobs .get(id) .map(|job| job.history.clone()) .unwrap_or_default() } pub fn resume_pending(&mut self) -> Vec { let mut resumed = Vec::new(); for job in self.jobs.values_mut() { if matches!(job.status, JobStatus::Queued | JobStatus::Running) { job.status = JobStatus::Queued; job.updated_at = Self::now_ts(); Self::push_history(job, "queued_after_resume"); resumed.push(job.clone()); } } resumed } pub fn load_from_store(&mut self, store: &StateStore) -> Result<()> { let persisted = store.list_jobs(Some(500))?; for job in persisted { let fallback_status = job_state_status_to_runtime(job.status.clone()); let parsed = Self::parse_persisted_detail(job.detail.as_deref()); let (status, detail, retry, history) = if let Some(detail_state) = parsed { ( detail_state.status, detail_state.detail, detail_state.retry, detail_state.history, ) } else { ( fallback_status, job.detail, JobRetryMetadata::default(), Vec::new(), ) }; self.jobs.insert( job.id.clone(), JobRecord { id: job.id, name: job.name, status, progress: job.progress, detail, retry, history, created_at: job.created_at, updated_at: job.updated_at, }, ); } Ok(()) } pub fn persist_job(&self, store: &StateStore, id: &str) -> Result<()> { let Some(job) = self.jobs.get(id) else { return Ok(()); }; let encoded_detail = Self::encode_persisted_detail(job)?; store.upsert_job(&JobStateRecord { id: job.id.clone(), name: job.name.clone(), status: runtime_status_to_job_state(job.status), progress: job.progress, detail: encoded_detail, created_at: job.created_at, updated_at: job.updated_at, }) } pub fn persist_all(&self, store: &StateStore) -> Result<()> { for id in self.jobs.keys() { self.persist_job(store, id)?; } Ok(()) } } pub struct ThreadManager { store: StateStore, running_threads: HashMap, cli_version: String, } impl ThreadManager { pub fn new(store: StateStore) -> Self { Self { store, running_threads: HashMap::new(), cli_version: env!("CARGO_PKG_VERSION").to_string(), } } pub fn state_store(&self) -> &StateStore { &self.store } pub fn spawn_thread_with_history( &mut self, model_provider: String, cwd: PathBuf, initial_history: InitialHistory, persist_extended_history: bool, ) -> Result { let id = format!("thread-{}", Uuid::new_v4()); let now = chrono::Utc::now().timestamp(); let preview = preview_from_initial_history(&initial_history); let source = match initial_history { InitialHistory::New => SessionSource::Interactive, InitialHistory::Forked(_) => SessionSource::Fork, InitialHistory::Resumed { .. } => SessionSource::Resume, }; let thread = Thread { id: id.clone(), preview, ephemeral: !persist_extended_history, model_provider: model_provider.clone(), created_at: now, updated_at: now, status: ThreadStatus::Running, path: None, cwd: cwd.clone(), cli_version: self.cli_version.clone(), source: match source { SessionSource::Interactive => deepseek_protocol::SessionSource::Interactive, SessionSource::Resume => deepseek_protocol::SessionSource::Resume, SessionSource::Fork => deepseek_protocol::SessionSource::Fork, SessionSource::Api => deepseek_protocol::SessionSource::Api, SessionSource::Unknown => deepseek_protocol::SessionSource::Unknown, }, name: None, }; self.persist_thread(&thread, None)?; match &initial_history { InitialHistory::Forked(items) => { for item in items { self.store.append_message( &thread.id, "history", &item.to_string(), Some(item.clone()), )?; } } InitialHistory::Resumed { history, .. } => { for item in history { self.store.append_message( &thread.id, "history", &item.to_string(), Some(item.clone()), )?; } } InitialHistory::New => {} } self.running_threads .insert(thread.id.clone(), thread.clone()); Ok(NewThread { thread, model: "auto".to_string(), model_provider, cwd, approval_policy: None, sandbox: None, }) } pub fn resume_thread_with_history( &mut self, params: &ThreadResumeParams, fallback_cwd: &Path, model_provider: String, ) -> Result> { if params.history.is_none() && let Some(thread) = self.running_threads.get(¶ms.thread_id).cloned() { return Ok(Some(NewThread { model: params.model.clone().unwrap_or_else(|| "auto".to_string()), model_provider: params.model_provider.clone().unwrap_or(model_provider), cwd: params.cwd.clone().unwrap_or_else(|| thread.cwd.clone()), approval_policy: params.approval_policy.clone(), sandbox: params.sandbox.clone(), thread, })); } let persisted = self.store.get_thread(¶ms.thread_id)?; let Some(metadata) = persisted else { return Ok(None); }; let mut thread = to_protocol_thread(metadata); thread.status = ThreadStatus::Running; thread.updated_at = chrono::Utc::now().timestamp(); thread.cwd = params .cwd .clone() .unwrap_or_else(|| fallback_cwd.to_path_buf()); self.persist_thread(&thread, None)?; self.running_threads .insert(thread.id.clone(), thread.clone()); if let Some(history) = params.history.as_ref() { for item in history { self.store.append_message( &thread.id, "history", &item.to_string(), Some(item.clone()), )?; } } Ok(Some(NewThread { model: params.model.clone().unwrap_or_else(|| "auto".to_string()), model_provider: params.model_provider.clone().unwrap_or(model_provider), cwd: thread.cwd.clone(), approval_policy: params.approval_policy.clone(), sandbox: params.sandbox.clone(), thread, })) } pub fn fork_thread( &mut self, params: &ThreadForkParams, fallback_cwd: &Path, ) -> Result> { let parent = self.store.get_thread(¶ms.thread_id)?; let Some(parent) = parent else { return Ok(None); }; let parent_thread = to_protocol_thread(parent); let new = self.spawn_thread_with_history( params .model_provider .clone() .unwrap_or_else(|| parent_thread.model_provider.clone()), params .cwd .clone() .unwrap_or_else(|| fallback_cwd.to_path_buf()), InitialHistory::Forked(vec![json!({ "type": "fork", "from_thread_id": parent_thread.id })]), params.persist_extended_history, )?; Ok(Some(new)) } pub fn list_threads(&self, params: &ThreadListParams) -> Result> { let list = self.store.list_threads(ThreadListFilters { include_archived: params.include_archived, limit: params.limit, })?; Ok(list.into_iter().map(to_protocol_thread).collect()) } pub fn read_thread(&self, params: &ThreadReadParams) -> Result> { Ok(self .store .get_thread(¶ms.thread_id)? .map(to_protocol_thread)) } pub fn set_thread_name(&mut self, params: &ThreadSetNameParams) -> Result> { let Some(mut metadata) = self.store.get_thread(¶ms.thread_id)? else { return Ok(None); }; metadata.name = Some(params.name.clone()); metadata.updated_at = chrono::Utc::now().timestamp(); self.store.upsert_thread(&metadata)?; let updated = to_protocol_thread(metadata); self.running_threads .insert(updated.id.clone(), updated.clone()); Ok(Some(updated)) } pub fn archive_thread(&mut self, thread_id: &str) -> Result<()> { self.store.mark_archived(thread_id)?; if let Some(thread) = self.running_threads.get_mut(thread_id) { thread.status = ThreadStatus::Archived; } Ok(()) } pub fn unarchive_thread(&mut self, thread_id: &str) -> Result<()> { self.store.mark_unarchived(thread_id)?; Ok(()) } pub fn touch_message(&mut self, thread_id: &str, input: &str) -> Result<()> { let Some(mut metadata) = self.store.get_thread(thread_id)? else { return Ok(()); }; metadata.updated_at = chrono::Utc::now().timestamp(); metadata.preview = truncate_preview(input); metadata.status = PersistedThreadStatus::Running; self.store.upsert_thread(&metadata)?; if let Some(thread) = self.running_threads.get_mut(thread_id) { thread.updated_at = metadata.updated_at; thread.preview = metadata.preview; thread.status = ThreadStatus::Running; } let message_id = self.store.append_message(thread_id, "user", input, None)?; self.store.save_checkpoint( thread_id, "latest", &json!({ "reason": "thread_message", "message_id": message_id, "role": "user", "preview": truncate_preview(input), "updated_at": metadata.updated_at }), )?; Ok(()) } fn persist_thread(&self, thread: &Thread, rollout_path: Option) -> Result<()> { self.store.upsert_thread(&ThreadMetadata { id: thread.id.clone(), rollout_path, preview: thread.preview.clone(), ephemeral: thread.ephemeral, model_provider: thread.model_provider.clone(), created_at: thread.created_at, updated_at: thread.updated_at, status: to_persisted_status(&thread.status), path: thread.path.clone(), cwd: thread.cwd.clone(), cli_version: thread.cli_version.clone(), source: to_persisted_source(&thread.source), name: thread.name.clone(), sandbox_policy: None, approval_mode: None, archived: matches!(thread.status, ThreadStatus::Archived), archived_at: None, git_sha: None, git_branch: None, git_origin_url: None, memory_mode: None, }) } } pub struct Runtime { pub config: ConfigToml, pub model_registry: ModelRegistry, pub thread_manager: ThreadManager, pub tool_registry: Arc, pub mcp_manager: Arc, pub exec_policy: ExecPolicyEngine, pub hooks: HookDispatcher, pub jobs: JobManager, } impl Runtime { pub fn new( config: ConfigToml, model_registry: ModelRegistry, state: StateStore, tool_registry: Arc, mcp_manager: Arc, exec_policy: ExecPolicyEngine, hooks: HookDispatcher, ) -> Self { let mut jobs = JobManager::default(); let _ = jobs.load_from_store(&state); Self { config, model_registry, thread_manager: ThreadManager::new(state), tool_registry, mcp_manager, exec_policy, hooks, jobs, } } fn persisted_thread_data(&self, thread_id: &str) -> Result { let history = self .thread_manager .state_store() .list_messages(thread_id, Some(500))? .into_iter() .map(|message| { json!({ "id": message.id, "role": message.role, "content": message.content, "item": message.item, "created_at": message.created_at }) }) .collect::>(); let checkpoint = self .thread_manager .state_store() .load_checkpoint(thread_id, None)? .map(|record| { json!({ "checkpoint_id": record.checkpoint_id, "state": record.state, "created_at": record.created_at }) }); Ok(json!({ "history": history, "checkpoint": checkpoint })) } fn persist_latest_checkpoint(&self, thread_id: &str, reason: &str, state: Value) -> Result<()> { self.thread_manager.state_store().save_checkpoint( thread_id, "latest", &json!({ "reason": reason, "saved_at": chrono::Utc::now().timestamp(), "state": state }), ) } pub async fn handle_thread(&mut self, req: ThreadRequest) -> Result { match req { ThreadRequest::Create { .. } => { let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")); let new = self.thread_manager.spawn_thread_with_history( "deepseek".to_string(), cwd, InitialHistory::New, false, )?; let mut response = thread_response_from_new("created", new); response.data = self.persisted_thread_data(&response.thread_id)?; Ok(response) } ThreadRequest::Start(params) => { let cwd = params.cwd.clone().unwrap_or_else(|| { std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")) }); let new = self.thread_manager.spawn_thread_with_history( params .model_provider .clone() .unwrap_or_else(|| "deepseek".to_string()), cwd, InitialHistory::New, params.persist_extended_history, )?; let mut response = thread_response_from_new("started", new); response.data = self.persisted_thread_data(&response.thread_id)?; Ok(response) } ThreadRequest::Resume(params) => { let fallback_cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")); if let Some(new) = self.thread_manager.resume_thread_with_history( ¶ms, &fallback_cwd, "deepseek".to_string(), )? { let mut response = thread_response_from_new("resumed", new); response.data = self.persisted_thread_data(&response.thread_id)?; Ok(response) } else { Ok(ThreadResponse { thread_id: params.thread_id, status: "missing".to_string(), thread: None, threads: Vec::new(), model: None, model_provider: None, cwd: None, approval_policy: params.approval_policy, sandbox: params.sandbox, events: Vec::new(), data: json!({"error":"thread not found"}), }) } } ThreadRequest::Fork(params) => { let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")); if let Some(new) = self.thread_manager.fork_thread(¶ms, &cwd)? { let mut response = thread_response_from_new("forked", new); response.data = self.persisted_thread_data(&response.thread_id)?; Ok(response) } else { Ok(ThreadResponse { thread_id: params.thread_id, status: "missing".to_string(), thread: None, threads: Vec::new(), model: None, model_provider: None, cwd: None, approval_policy: params.approval_policy, sandbox: params.sandbox, events: Vec::new(), data: json!({"error":"thread not found"}), }) } } ThreadRequest::List(params) => Ok(ThreadResponse { thread_id: "list".to_string(), status: "ok".to_string(), thread: None, threads: self.thread_manager.list_threads(¶ms)?, model: None, model_provider: None, cwd: None, approval_policy: None, sandbox: None, events: Vec::new(), data: json!({}), }), ThreadRequest::Read(params) => { let id = params.thread_id.clone(); let data = self.persisted_thread_data(&id)?; Ok(ThreadResponse { thread_id: id, status: "ok".to_string(), thread: self.thread_manager.read_thread(¶ms)?, threads: Vec::new(), model: None, model_provider: None, cwd: None, approval_policy: None, sandbox: None, events: Vec::new(), data, }) } ThreadRequest::SetName(params) => Ok(ThreadResponse { thread_id: params.thread_id.clone(), status: "ok".to_string(), thread: self.thread_manager.set_thread_name(¶ms)?, threads: Vec::new(), model: None, model_provider: None, cwd: None, approval_policy: None, sandbox: None, events: Vec::new(), data: json!({}), }), ThreadRequest::Archive { thread_id } => { self.thread_manager.archive_thread(&thread_id)?; Ok(ThreadResponse { thread_id, status: "archived".to_string(), thread: None, threads: Vec::new(), model: None, model_provider: None, cwd: None, approval_policy: None, sandbox: None, events: Vec::new(), data: json!({}), }) } ThreadRequest::Unarchive { thread_id } => { self.thread_manager.unarchive_thread(&thread_id)?; Ok(ThreadResponse { thread_id, status: "unarchived".to_string(), thread: None, threads: Vec::new(), model: None, model_provider: None, cwd: None, approval_policy: None, sandbox: None, events: Vec::new(), data: json!({}), }) } ThreadRequest::Message { thread_id, input } => { self.thread_manager.touch_message(&thread_id, &input)?; let response_id = format!("{thread_id}:{}", input.len()); self.hooks .emit(HookEvent::ResponseStart { response_id: response_id.clone(), }) .await; self.hooks .emit(HookEvent::ResponseEnd { response_id: response_id.clone(), }) .await; Ok(ThreadResponse { thread_id, status: "accepted".to_string(), thread: None, threads: Vec::new(), model: None, model_provider: None, cwd: None, approval_policy: None, sandbox: None, events: vec![ EventFrame::ResponseStart { response_id: response_id.clone(), }, EventFrame::ResponseDelta { response_id: response_id.clone(), delta: "queued".to_string(), }, EventFrame::ResponseEnd { response_id }, ], data: json!({}), }) } } } pub async fn handle_prompt( &mut self, req: PromptRequest, cli_overrides: &CliRuntimeOverrides, ) -> Result { let resolved = self.config.resolve_runtime_options(cli_overrides); let requested_model = req.model.clone().unwrap_or_else(|| resolved.model.clone()); let selection = self .model_registry .resolve(Some(&requested_model), Some(resolved.provider)); let resolved_model = selection.resolved.id.clone(); let response_id = format!("resp-{}", Uuid::new_v4()); self.hooks .emit(HookEvent::ResponseStart { response_id: response_id.clone(), }) .await; self.hooks .emit(HookEvent::ResponseDelta { response_id: response_id.clone(), delta: "model-selected".to_string(), }) .await; self.hooks .emit(HookEvent::ResponseEnd { response_id: response_id.clone(), }) .await; let payload = json!({ "provider": resolved.provider.as_str(), "model": resolved_model.clone(), "prompt": req.prompt, "telemetry": resolved.telemetry, "base_url": resolved.base_url, "has_api_key": resolved.api_key.as_ref().is_some_and(|k| !k.trim().is_empty()), "approval_policy": resolved.approval_policy, "sandbox_mode": resolved.sandbox_mode }); if let Some(thread_id) = req.thread_id.as_ref() { self.thread_manager.touch_message(thread_id, &req.prompt)?; let assistant_message_id = self.thread_manager.store.append_message( thread_id, "assistant", &payload.to_string(), Some(payload.clone()), )?; self.persist_latest_checkpoint( thread_id, "prompt_response", json!({ "response_id": response_id.clone(), "model": resolved_model.clone(), "provider": resolved.provider.as_str(), "assistant_message_id": assistant_message_id }), )?; } Ok(PromptResponse { output: payload.to_string(), model: resolved_model, events: vec![ EventFrame::ResponseStart { response_id: response_id.clone(), }, EventFrame::ResponseDelta { response_id: response_id.clone(), delta: "model-selected".to_string(), }, EventFrame::ResponseEnd { response_id }, ], }) } pub async fn invoke_tool( &self, call: ToolCall, approval_mode: AskForApproval, cwd: &Path, ) -> Result { let fallback_cwd = cwd.display().to_string(); let (command, policy_cwd, execution_kind) = call.execution_subject(&fallback_cwd); let decision = self.exec_policy.check(ExecPolicyContext { command: &command, cwd: &policy_cwd, ask_for_approval: approval_mode, sandbox_mode: None, })?; let precheck = policy_precheck_payload(&decision, &command, &policy_cwd, execution_kind); let response_id = format!("tool-{}", Uuid::new_v4()); let call_id = call .raw_tool_call_id .clone() .unwrap_or_else(|| format!("tool-call-{}", Uuid::new_v4())); self.hooks .emit(HookEvent::ToolLifecycle { response_id: response_id.clone(), tool_name: call.name.clone(), phase: "precheck".to_string(), payload: precheck.clone(), }) .await; if !decision.allow { let reason = decision.reason().to_string(); let approval_id = format!("approval-{}", Uuid::new_v4()); let error_frame = EventFrame::Error { response_id: response_id.clone(), message: reason.clone(), }; self.hooks .emit(HookEvent::ApprovalLifecycle { approval_id, phase: "denied".to_string(), reason: Some(reason.clone()), }) .await; self.hooks .emit(HookEvent::GenericEventFrame { frame: error_frame.clone(), }) .await; return Ok(json!({ "ok": false, "status": "denied", "execution_kind": execution_kind, "response_id": response_id, "precheck": precheck, "error": reason, "events": [event_frame_payload(&error_frame)], })); } if decision.requires_approval { let approval_id = format!("approval-{}", Uuid::new_v4()); let reason = decision.reason().to_string(); let maybe_approval_frame = approval_request_frame( &decision.requirement, call_id, approval_id.clone(), response_id.clone(), command.clone(), policy_cwd.clone(), ); self.hooks .emit(HookEvent::ApprovalLifecycle { approval_id: approval_id.clone(), phase: "requested".to_string(), reason: Some(reason.clone()), }) .await; let mut events = Vec::new(); if let Some(frame) = maybe_approval_frame { self.hooks .emit(HookEvent::GenericEventFrame { frame: frame.clone(), }) .await; events.push(event_frame_payload(&frame)); } return Ok(json!({ "ok": false, "status": "approval_required", "execution_kind": execution_kind, "response_id": response_id, "approval_id": approval_id, "precheck": precheck, "error": reason, "events": events, })); } let start_frame = EventFrame::ToolCallStart { response_id: response_id.clone(), tool_name: call.name.clone(), arguments: tool_payload_value(&call.payload), }; self.hooks .emit(HookEvent::GenericEventFrame { frame: start_frame.clone(), }) .await; self.hooks .emit(HookEvent::ToolLifecycle { response_id: response_id.clone(), tool_name: call.name.clone(), phase: "dispatching".to_string(), payload: json!({ "call_id": call_id, "execution_kind": execution_kind }), }) .await; match self.tool_registry.dispatch(call.clone(), true).await { Ok(tool_output) => { let result_frame = EventFrame::ToolCallResult { response_id: response_id.clone(), tool_name: call.name.clone(), output: tool_output_value(&tool_output), }; self.hooks .emit(HookEvent::GenericEventFrame { frame: result_frame.clone(), }) .await; self.hooks .emit(HookEvent::ToolLifecycle { response_id: response_id.clone(), tool_name: call.name, phase: "completed".to_string(), payload: json!({ "ok": true }), }) .await; Ok(json!({ "ok": true, "status": "completed", "execution_kind": execution_kind, "response_id": response_id, "precheck": precheck, "output": tool_output, "events": [ event_frame_payload(&start_frame), event_frame_payload(&result_frame) ] })) } Err(err) => { let message = format!("{err:?}"); let error_frame = EventFrame::Error { response_id: response_id.clone(), message: message.clone(), }; self.hooks .emit(HookEvent::GenericEventFrame { frame: error_frame.clone(), }) .await; self.hooks .emit(HookEvent::ToolLifecycle { response_id: response_id.clone(), tool_name: call.name, phase: "failed".to_string(), payload: json!({ "error": message.clone() }), }) .await; Ok(json!({ "ok": false, "status": "failed", "execution_kind": execution_kind, "response_id": response_id, "precheck": precheck, "error": message, "events": [ event_frame_payload(&start_frame), event_frame_payload(&error_frame) ] })) } } } pub async fn mcp_startup(&self) -> McpStartupCompleteEvent { let mut updates = Vec::new(); let summary = self.mcp_manager.start_all(|update| { updates.push(update); }); for update in updates { let status = match update.status { McpManagerStartupStatus::Starting => deepseek_protocol::McpStartupStatus::Starting, McpManagerStartupStatus::Ready => deepseek_protocol::McpStartupStatus::Ready, McpManagerStartupStatus::Failed { error } => { deepseek_protocol::McpStartupStatus::Failed { error } } McpManagerStartupStatus::Cancelled => { deepseek_protocol::McpStartupStatus::Cancelled } }; self.hooks .emit(HookEvent::GenericEventFrame { frame: EventFrame::McpStartupUpdate { update: deepseek_protocol::McpStartupUpdateEvent { server_name: update.server_name, status, }, }, }) .await; } self.hooks .emit(HookEvent::GenericEventFrame { frame: EventFrame::McpStartupComplete { summary: deepseek_protocol::McpStartupCompleteEvent { ready: summary.ready.clone(), failed: summary .failed .iter() .map(|f| deepseek_protocol::McpStartupFailure { server_name: f.server_name.clone(), error: f.error.clone(), }) .collect(), cancelled: summary.cancelled.clone(), }, }, }) .await; summary } pub fn app_status(&self) -> AppResponse { let jobs = self.jobs.list(); let events = jobs .iter() .flat_map(|job| { job.history.iter().map(|entry| EventFrame::ResponseDelta { response_id: job.id.clone(), delta: json!({ "kind": "job_transition", "job_id": job.id.clone(), "phase": entry.phase.clone(), "status": job_status_to_str(entry.status), "progress": entry.progress, "detail": entry.detail.clone(), "retry": job_retry_to_value(&entry.retry), "at": entry.at }) .to_string(), }) }) .collect::>(); AppResponse { ok: true, data: json!({ "jobs": jobs.into_iter().map(|job| { json!({ "id": job.id, "name": job.name, "status": job_status_to_str(job.status), "progress": job.progress, "detail": job.detail, "retry": job_retry_to_value(&job.retry), "history": job.history.iter().map(job_history_to_value).collect::>() }) }).collect::>() }), events, } } pub fn provider_default(&self) -> ProviderKind { self.config.provider } pub fn save_thread_checkpoint( &self, thread_id: &str, checkpoint_id: &str, state: &Value, ) -> Result<()> { self.thread_manager .state_store() .save_checkpoint(thread_id, checkpoint_id, state) } pub fn load_thread_checkpoint( &self, thread_id: &str, checkpoint_id: Option<&str>, ) -> Result> { Ok(self .thread_manager .state_store() .load_checkpoint(thread_id, checkpoint_id)? .map(|checkpoint| checkpoint.state)) } pub fn enqueue_job(&mut self, name: impl Into) -> Result { let job = self.jobs.enqueue(name); self.jobs .persist_job(self.thread_manager.state_store(), &job.id)?; Ok(job) } pub fn set_job_running(&mut self, job_id: &str) -> Result<()> { self.jobs.set_running(job_id); self.jobs .persist_job(self.thread_manager.state_store(), job_id) } pub fn update_job_progress( &mut self, job_id: &str, progress: u8, detail: Option, ) -> Result<()> { self.jobs.update_progress(job_id, progress, detail); self.jobs .persist_job(self.thread_manager.state_store(), job_id) } pub fn complete_job(&mut self, job_id: &str) -> Result<()> { self.jobs.complete(job_id); self.jobs .persist_job(self.thread_manager.state_store(), job_id) } pub fn fail_job(&mut self, job_id: &str, detail: impl Into) -> Result<()> { self.jobs.fail(job_id, detail); self.jobs .persist_job(self.thread_manager.state_store(), job_id) } pub fn cancel_job(&mut self, job_id: &str) -> Result<()> { self.jobs.cancel(job_id); self.jobs .persist_job(self.thread_manager.state_store(), job_id) } pub fn pause_job(&mut self, job_id: &str, detail: Option) -> Result<()> { self.jobs.pause(job_id, detail); self.jobs .persist_job(self.thread_manager.state_store(), job_id) } pub fn resume_job(&mut self, job_id: &str, detail: Option) -> Result<()> { self.jobs.resume(job_id, detail); self.jobs .persist_job(self.thread_manager.state_store(), job_id) } pub fn job_history(&self, job_id: &str) -> Vec { self.jobs.history(job_id) } } fn thread_response_from_new(status: &str, new: NewThread) -> ThreadResponse { ThreadResponse { thread_id: new.thread.id.clone(), status: status.to_string(), thread: Some(new.thread), threads: Vec::new(), model: Some(new.model), model_provider: Some(new.model_provider), cwd: Some(new.cwd), approval_policy: new.approval_policy, sandbox: new.sandbox, events: Vec::new(), data: json!({}), } } fn preview_from_initial_history(initial_history: &InitialHistory) -> String { match initial_history { InitialHistory::New => "New conversation".to_string(), InitialHistory::Forked(items) => truncate_preview( &items .first() .map(Value::to_string) .unwrap_or_else(|| "Forked conversation".to_string()), ), InitialHistory::Resumed { history, .. } => truncate_preview( &history .first() .map(Value::to_string) .unwrap_or_else(|| "Resumed conversation".to_string()), ), } } fn truncate_preview(value: &str) -> String { value.chars().take(120).collect() } fn to_protocol_thread(thread: ThreadMetadata) -> Thread { Thread { id: thread.id, preview: thread.preview, ephemeral: thread.ephemeral, model_provider: thread.model_provider, created_at: thread.created_at, updated_at: thread.updated_at, status: match thread.status { PersistedThreadStatus::Running => ThreadStatus::Running, PersistedThreadStatus::Idle => ThreadStatus::Idle, PersistedThreadStatus::Completed => ThreadStatus::Completed, PersistedThreadStatus::Failed => ThreadStatus::Failed, PersistedThreadStatus::Paused => ThreadStatus::Paused, PersistedThreadStatus::Archived => ThreadStatus::Archived, }, path: thread.path, cwd: thread.cwd, cli_version: thread.cli_version, source: match thread.source { SessionSource::Interactive => deepseek_protocol::SessionSource::Interactive, SessionSource::Resume => deepseek_protocol::SessionSource::Resume, SessionSource::Fork => deepseek_protocol::SessionSource::Fork, SessionSource::Api => deepseek_protocol::SessionSource::Api, SessionSource::Unknown => deepseek_protocol::SessionSource::Unknown, }, name: thread.name, } } fn to_persisted_status(status: &ThreadStatus) -> PersistedThreadStatus { match status { ThreadStatus::Running => PersistedThreadStatus::Running, ThreadStatus::Idle => PersistedThreadStatus::Idle, ThreadStatus::Completed => PersistedThreadStatus::Completed, ThreadStatus::Failed => PersistedThreadStatus::Failed, ThreadStatus::Paused => PersistedThreadStatus::Paused, ThreadStatus::Archived => PersistedThreadStatus::Archived, } } fn to_persisted_source(source: &deepseek_protocol::SessionSource) -> SessionSource { match source { deepseek_protocol::SessionSource::Interactive => SessionSource::Interactive, deepseek_protocol::SessionSource::Resume => SessionSource::Resume, deepseek_protocol::SessionSource::Fork => SessionSource::Fork, deepseek_protocol::SessionSource::Api => SessionSource::Api, deepseek_protocol::SessionSource::Unknown => SessionSource::Unknown, } } fn approval_request_frame( requirement: &ExecApprovalRequirement, call_id: String, approval_id: String, turn_id: String, command: String, cwd: String, ) -> Option { let ExecApprovalRequirement::NeedsApproval { reason, proposed_execpolicy_amendment, proposed_network_policy_amendments, } = requirement else { return None; }; let mut available_decisions = vec![ ReviewDecision::Approved, ReviewDecision::ApprovedForSession, ReviewDecision::Denied, ReviewDecision::Abort, ]; if proposed_execpolicy_amendment .as_ref() .is_some_and(|amendment| !amendment.prefixes.is_empty()) { available_decisions.push(ReviewDecision::ApprovedExecpolicyAmendment); } available_decisions.extend(proposed_network_policy_amendments.iter().cloned().map( |amendment| ReviewDecision::NetworkPolicyAmendment { host: amendment.host, action: amendment.action, }, )); Some(EventFrame::ExecApprovalRequest { request: ExecApprovalRequestEvent { call_id, approval_id, turn_id, command, cwd, reason: reason.clone(), network_approval_context: None, proposed_execpolicy_amendment: proposed_execpolicy_amendment .as_ref() .map(|amendment| amendment.prefixes.clone()) .unwrap_or_default(), proposed_network_policy_amendments: proposed_network_policy_amendments.clone(), additional_permissions: Vec::new(), available_decisions, }, }) } fn approval_requirement_payload(requirement: &ExecApprovalRequirement) -> Value { match requirement { ExecApprovalRequirement::Skip { bypass_sandbox, proposed_execpolicy_amendment, } => json!({ "type": "skip", "bypass_sandbox": bypass_sandbox, "reason": requirement.reason(), "proposed_execpolicy_amendment": proposed_execpolicy_amendment .as_ref() .map(|amendment| amendment.prefixes.clone()) .unwrap_or_default() }), ExecApprovalRequirement::NeedsApproval { reason, proposed_execpolicy_amendment, proposed_network_policy_amendments, } => json!({ "type": "needs_approval", "reason": reason, "proposed_execpolicy_amendment": proposed_execpolicy_amendment .as_ref() .map(|amendment| amendment.prefixes.clone()) .unwrap_or_default(), "proposed_network_policy_amendments": proposed_network_policy_amendments }), ExecApprovalRequirement::Forbidden { reason } => json!({ "type": "forbidden", "reason": reason }), } } fn policy_precheck_payload( decision: &ExecPolicyDecision, command: &str, cwd: &str, execution_kind: &str, ) -> Value { json!({ "execution_kind": execution_kind, "command": command, "cwd": cwd, "allow": decision.allow, "requires_approval": decision.requires_approval, "matched_rule": decision.matched_rule.clone(), "phase": decision.requirement.phase(), "reason": decision.reason(), "requirement": approval_requirement_payload(&decision.requirement) }) } fn tool_payload_value(payload: &ToolPayload) -> Value { serde_json::to_value(payload).unwrap_or_else( |_| json!({"type":"serialization_error","message":"tool payload unavailable"}), ) } fn tool_output_value(output: &deepseek_protocol::ToolOutput) -> Value { serde_json::to_value(output).unwrap_or_else( |_| json!({"type":"serialization_error","message":"tool output unavailable"}), ) } fn event_frame_payload(frame: &EventFrame) -> Value { serde_json::to_value(frame) .unwrap_or_else(|_| json!({"event":"error","message":"failed to encode event frame"})) } fn json_optional_string(value: &Value) -> Option { if value.is_null() { None } else { value.as_str().map(ToString::to_string) } } fn parse_retry_metadata(value: Option<&Value>) -> JobRetryMetadata { let Some(value) = value else { return JobRetryMetadata::default(); }; JobRetryMetadata { attempt: value .get("attempt") .and_then(Value::as_u64) .unwrap_or(0) .min(u32::MAX as u64) as u32, max_attempts: value .get("max_attempts") .and_then(Value::as_u64) .unwrap_or(DEFAULT_JOB_MAX_ATTEMPTS as u64) .min(u32::MAX as u64) as u32, backoff_base_ms: value .get("backoff_base_ms") .and_then(Value::as_u64) .unwrap_or(DEFAULT_JOB_BACKOFF_BASE_MS), next_backoff_ms: value .get("next_backoff_ms") .and_then(Value::as_u64) .unwrap_or(0), next_retry_at: value.get("next_retry_at").and_then(Value::as_i64), } } fn parse_history_entry(value: &Value) -> Option { let status = value .get("status") .and_then(Value::as_str) .and_then(job_status_from_str)?; Some(JobHistoryEntry { at: value.get("at").and_then(Value::as_i64).unwrap_or(0), phase: value .get("phase") .and_then(Value::as_str) .unwrap_or("unknown") .to_string(), status, progress: value .get("progress") .and_then(Value::as_u64) .map(|v| v.min(u8::MAX as u64) as u8), detail: value.get("detail").and_then(json_optional_string), retry: parse_retry_metadata(value.get("retry")), }) } fn job_status_to_str(status: JobStatus) -> &'static str { match status { JobStatus::Queued => "queued", JobStatus::Running => "running", JobStatus::Paused => "paused", JobStatus::Completed => "completed", JobStatus::Failed => "failed", JobStatus::Cancelled => "cancelled", } } fn job_status_from_str(value: &str) -> Option { match value { "queued" => Some(JobStatus::Queued), "running" => Some(JobStatus::Running), "paused" => Some(JobStatus::Paused), "completed" => Some(JobStatus::Completed), "failed" => Some(JobStatus::Failed), "cancelled" => Some(JobStatus::Cancelled), _ => None, } } fn job_retry_to_value(retry: &JobRetryMetadata) -> Value { json!({ "attempt": retry.attempt, "max_attempts": retry.max_attempts, "backoff_base_ms": retry.backoff_base_ms, "next_backoff_ms": retry.next_backoff_ms, "next_retry_at": retry.next_retry_at }) } fn job_history_to_value(entry: &JobHistoryEntry) -> Value { json!({ "at": entry.at, "phase": entry.phase.clone(), "status": job_status_to_str(entry.status), "progress": entry.progress, "detail": entry.detail.clone(), "retry": job_retry_to_value(&entry.retry) }) } fn runtime_status_to_job_state(status: JobStatus) -> JobStateStatus { match status { JobStatus::Queued => JobStateStatus::Queued, JobStatus::Running => JobStateStatus::Running, JobStatus::Paused => JobStateStatus::Running, JobStatus::Completed => JobStateStatus::Completed, JobStatus::Failed => JobStateStatus::Failed, JobStatus::Cancelled => JobStateStatus::Cancelled, } } fn job_state_status_to_runtime(status: JobStateStatus) -> JobStatus { match status { JobStateStatus::Queued => JobStatus::Queued, JobStateStatus::Running => JobStatus::Running, JobStateStatus::Completed => JobStatus::Completed, JobStateStatus::Failed => JobStatus::Failed, JobStateStatus::Cancelled => JobStatus::Cancelled, } }