use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use anyhow::Result; use axum::extract::State; use axum::routing::{get, post}; use axum::{Json, Router}; use codewhale_agent::ModelRegistry; use codewhale_config::{CliRuntimeOverrides, ConfigStore}; use codewhale_core::Runtime; use codewhale_execpolicy::ExecPolicyEngine; use codewhale_hooks::{HookDispatcher, JsonlHookSink, StdoutHookSink}; use codewhale_mcp::McpManager; use codewhale_protocol::{ AppRequest, AppResponse, PromptRequest, PromptResponse, ThreadRequest, ThreadResponse, }; use codewhale_state::StateStore; use codewhale_tools::{ToolCall, ToolRegistry}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::sync::{Mutex, RwLock}; use tower_http::cors::CorsLayer; #[derive(Debug, Clone)] pub struct AppServerOptions { pub listen: SocketAddr, pub config_path: Option, } #[derive(Clone)] struct AppState { config_path: Option, config: Arc>, runtime: Arc>, registry: ModelRegistry, } #[derive(Debug, Clone, Serialize, Deserialize)] struct ToolCallRequest { call: ToolCall, #[serde(default)] cwd: Option, } #[derive(Debug, Deserialize)] struct JsonRpcRequest { #[serde(default)] jsonrpc: Option, #[serde(default)] id: Option, method: String, #[serde(default)] params: Value, } #[derive(Debug)] struct JsonRpcError { code: i64, message: String, data: Option, } #[derive(Debug)] struct StdioDispatchResult { result: Value, should_exit: bool, } #[derive(Debug, Deserialize)] struct ConfigGetParams { key: String, } #[derive(Debug, Deserialize)] struct ConfigSetParams { key: String, value: String, } #[derive(Debug, Deserialize)] struct ThreadIdParams { thread_id: String, } #[derive(Debug, Deserialize)] struct ThreadMessageParams { thread_id: String, input: String, } pub async fn run(options: AppServerOptions) -> Result<()> { let state = build_state(options.config_path.clone())?; let app = Router::new() .route("/healthz", get(healthz)) .route("/thread", post(thread_handler)) .route("/app", post(app_handler)) .route("/prompt", post(prompt_handler)) .route("/tool", post(tool_handler)) .route("/jobs", get(jobs_handler)) .route("/mcp/startup", post(mcp_startup_handler)) .layer(CorsLayer::permissive()) .with_state(state); let listener = tokio::net::TcpListener::bind(options.listen).await?; axum::serve(listener, app).await?; Ok(()) } pub async fn run_stdio(config_path: Option) -> Result<()> { let state = build_state(config_path)?; let stdin = tokio::io::stdin(); let stdout = tokio::io::stdout(); let mut reader = BufReader::new(stdin).lines(); let mut writer = tokio::io::BufWriter::new(stdout); while let Some(line) = reader.next_line().await? { if line.trim().is_empty() { continue; } let request: JsonRpcRequest = match serde_json::from_str(&line) { Ok(value) => value, Err(err) => { let response = jsonrpc_error( None, JsonRpcError::parse_error(format!("invalid json: {err}")), ); writer.write_all(response.to_string().as_bytes()).await?; writer.write_all(b"\n").await?; writer.flush().await?; continue; } }; if request .jsonrpc .as_deref() .is_some_and(|version| version != "2.0") { let response = jsonrpc_error( request.id, JsonRpcError::invalid_request("jsonrpc version must be 2.0"), ); writer.write_all(response.to_string().as_bytes()).await?; writer.write_all(b"\n").await?; writer.flush().await?; continue; } let response = match dispatch_stdio_request(&state, &request.method, request.params).await { Ok(dispatch) => { let encoded = jsonrpc_result(request.id, dispatch.result); writer.write_all(encoded.to_string().as_bytes()).await?; writer.write_all(b"\n").await?; writer.flush().await?; if dispatch.should_exit { break; } continue; } Err(err) => jsonrpc_error(request.id, err), }; writer.write_all(response.to_string().as_bytes()).await?; writer.write_all(b"\n").await?; writer.flush().await?; } Ok(()) } async fn healthz() -> Json { Json(json!({ "status": "ok", "protocol": "v2", "service": "deepseek-app-server" })) } async fn thread_handler( State(state): State, Json(req): Json, ) -> Json { let mut runtime = state.runtime.lock().await; match runtime.handle_thread(req).await { Ok(res) => Json(res), Err(err) => Json(ThreadResponse { thread_id: "error".to_string(), status: format!("error:{err}"), thread: None, threads: Vec::new(), model: None, model_provider: None, cwd: None, approval_policy: None, sandbox: None, events: Vec::new(), data: json!({}), }), } } async fn prompt_handler( State(state): State, Json(req): Json, ) -> Json { let mut runtime = state.runtime.lock().await; let overrides = CliRuntimeOverrides::default(); match runtime.handle_prompt(req, &overrides).await { Ok(res) => Json(res), Err(err) => Json(PromptResponse { output: err.to_string(), model: "unknown".to_string(), events: Vec::new(), }), } } async fn tool_handler( State(state): State, Json(req): Json, ) -> Json { let runtime = state.runtime.lock().await; let cwd = req .cwd .unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))); match runtime .invoke_tool( req.call, codewhale_execpolicy::AskForApproval::OnRequest, &cwd, ) .await { Ok(value) => Json(value), Err(err) => Json(json!({ "ok": false, "error": err.to_string() })), } } async fn jobs_handler(State(state): State) -> Json { let runtime = state.runtime.lock().await; Json(runtime.app_status()) } async fn mcp_startup_handler(State(state): State) -> Json { let runtime = state.runtime.lock().await; let summary = runtime.mcp_startup().await; Json(json!({ "ok": true, "summary": summary })) } async fn app_handler( State(state): State, Json(req): Json, ) -> Json { Json(process_app_request(&state, req).await) } fn build_state(config_path: Option) -> Result { let store = ConfigStore::load(config_path.clone())?; let config = store.config.clone(); let registry = ModelRegistry::default(); let state_db_path = config_path .as_ref() .and_then(|p| p.parent().map(|parent| parent.join("state.db"))); let state_store = StateStore::open(state_db_path)?; let mut hooks = HookDispatcher::default(); hooks.add_sink(Arc::new(StdoutHookSink)); let hook_log_path = config_path .as_ref() .and_then(|p| p.parent().map(|parent| parent.join("events.jsonl"))) .unwrap_or_else(|| PathBuf::from(".deepseek/events.jsonl")); hooks.add_sink(Arc::new(JsonlHookSink::new(hook_log_path))); let runtime = Runtime::new( config.clone(), registry.clone(), state_store, Arc::new(ToolRegistry::default()), Arc::new(McpManager::default()), ExecPolicyEngine::new(Vec::new(), Vec::new()), hooks, ); Ok(AppState { config_path, config: Arc::new(RwLock::new(config)), runtime: Arc::new(Mutex::new(runtime)), registry, }) } fn params_or_object(params: Value) -> Value { if params.is_null() { json!({}) } else { params } } fn parse_params(params: Value) -> std::result::Result { serde_json::from_value(params).map_err(|err| JsonRpcError::invalid_params(err.to_string())) } fn jsonrpc_result(id: Option, result: Value) -> Value { json!({ "jsonrpc": "2.0", "id": id.unwrap_or(Value::Null), "result": result }) } fn jsonrpc_error(id: Option, err: JsonRpcError) -> Value { json!({ "jsonrpc": "2.0", "id": id.unwrap_or(Value::Null), "error": { "code": err.code, "message": err.message, "data": err.data } }) } impl JsonRpcError { fn parse_error(message: impl Into) -> Self { Self { code: -32700, message: message.into(), data: None, } } fn invalid_request(message: impl Into) -> Self { Self { code: -32600, message: message.into(), data: None, } } fn method_not_found(method: &str) -> Self { Self { code: -32601, message: format!("unsupported method: {method}"), data: None, } } fn invalid_params(message: impl Into) -> Self { Self { code: -32602, message: message.into(), data: None, } } fn internal(message: impl Into) -> Self { Self { code: -32603, message: message.into(), data: None, } } } async fn handle_thread_request( state: &AppState, req: ThreadRequest, ) -> std::result::Result { let mut runtime = state.runtime.lock().await; runtime .handle_thread(req) .await .map_err(|err| JsonRpcError::internal(err.to_string())) } async fn handle_prompt_request( state: &AppState, req: PromptRequest, ) -> std::result::Result { let mut runtime = state.runtime.lock().await; runtime .handle_prompt(req, &CliRuntimeOverrides::default()) .await .map_err(|err| JsonRpcError::internal(err.to_string())) } async fn dispatch_stdio_request( state: &AppState, method: &str, params: Value, ) -> std::result::Result { let outcome = match method { "healthz" | "app/healthz" => StdioDispatchResult { result: json!({ "status": "ok", "service": "deepseek-app-server", "transport": "stdio" }), should_exit: false, }, "capabilities" => StdioDispatchResult { result: json!({ "transport": "stdio", "families": ["thread/*", "app/*", "prompt/*"], "methods": [ "healthz", "thread/capabilities", "thread/request", "thread/create", "thread/start", "thread/resume", "thread/fork", "thread/list", "thread/read", "thread/set_name", "thread/archive", "thread/unarchive", "thread/message", "app/capabilities", "app/request", "app/config/get", "app/config/set", "app/config/unset", "app/config/list", "app/models", "app/thread_loaded_list", "prompt/capabilities", "prompt/request", "prompt/run", "shutdown" ] }), should_exit: false, }, "thread/capabilities" => StdioDispatchResult { result: json!({ "methods": [ "thread/request", "thread/create", "thread/start", "thread/resume", "thread/fork", "thread/list", "thread/read", "thread/set_name", "thread/archive", "thread/unarchive", "thread/message" ] }), should_exit: false, }, "thread/request" => { let request: ThreadRequest = parse_params(params)?; let response = handle_thread_request(state, request).await?; StdioDispatchResult { result: serde_json::to_value(response) .map_err(|err| JsonRpcError::internal(err.to_string()))?, should_exit: false, } } "thread/create" => { #[derive(Debug, Deserialize)] struct CreateParams { #[serde(default)] metadata: Value, } let parsed: CreateParams = parse_params(params_or_object(params))?; let response = handle_thread_request( state, ThreadRequest::Create { metadata: parsed.metadata, }, ) .await?; StdioDispatchResult { result: serde_json::to_value(response) .map_err(|err| JsonRpcError::internal(err.to_string()))?, should_exit: false, } } "thread/start" => { let request = ThreadRequest::Start(parse_params(params_or_object(params))?); let response = handle_thread_request(state, request).await?; StdioDispatchResult { result: serde_json::to_value(response) .map_err(|err| JsonRpcError::internal(err.to_string()))?, should_exit: false, } } "thread/resume" => { let request = ThreadRequest::Resume(parse_params(params_or_object(params))?); let response = handle_thread_request(state, request).await?; StdioDispatchResult { result: serde_json::to_value(response) .map_err(|err| JsonRpcError::internal(err.to_string()))?, should_exit: false, } } "thread/fork" => { let request = ThreadRequest::Fork(parse_params(params_or_object(params))?); let response = handle_thread_request(state, request).await?; StdioDispatchResult { result: serde_json::to_value(response) .map_err(|err| JsonRpcError::internal(err.to_string()))?, should_exit: false, } } "thread/list" => { let request = ThreadRequest::List(parse_params(params_or_object(params))?); let response = handle_thread_request(state, request).await?; StdioDispatchResult { result: serde_json::to_value(response) .map_err(|err| JsonRpcError::internal(err.to_string()))?, should_exit: false, } } "thread/read" => { let request = ThreadRequest::Read(parse_params(params_or_object(params))?); let response = handle_thread_request(state, request).await?; StdioDispatchResult { result: serde_json::to_value(response) .map_err(|err| JsonRpcError::internal(err.to_string()))?, should_exit: false, } } "thread/set_name" | "thread/set-name" => { let request = ThreadRequest::SetName(parse_params(params_or_object(params))?); let response = handle_thread_request(state, request).await?; StdioDispatchResult { result: serde_json::to_value(response) .map_err(|err| JsonRpcError::internal(err.to_string()))?, should_exit: false, } } "thread/archive" => { let parsed: ThreadIdParams = parse_params(params_or_object(params))?; let response = handle_thread_request( state, ThreadRequest::Archive { thread_id: parsed.thread_id, }, ) .await?; StdioDispatchResult { result: serde_json::to_value(response) .map_err(|err| JsonRpcError::internal(err.to_string()))?, should_exit: false, } } "thread/unarchive" => { let parsed: ThreadIdParams = parse_params(params_or_object(params))?; let response = handle_thread_request( state, ThreadRequest::Unarchive { thread_id: parsed.thread_id, }, ) .await?; StdioDispatchResult { result: serde_json::to_value(response) .map_err(|err| JsonRpcError::internal(err.to_string()))?, should_exit: false, } } "thread/message" => { let parsed: ThreadMessageParams = parse_params(params_or_object(params))?; let response = handle_thread_request( state, ThreadRequest::Message { thread_id: parsed.thread_id, input: parsed.input, }, ) .await?; StdioDispatchResult { result: serde_json::to_value(response) .map_err(|err| JsonRpcError::internal(err.to_string()))?, should_exit: false, } } "app/capabilities" => { let response = process_app_request(state, AppRequest::Capabilities).await; StdioDispatchResult { result: serde_json::to_value(response) .map_err(|err| JsonRpcError::internal(err.to_string()))?, should_exit: false, } } "app/request" => { let request: AppRequest = parse_params(params)?; let response = process_app_request(state, request).await; StdioDispatchResult { result: serde_json::to_value(response) .map_err(|err| JsonRpcError::internal(err.to_string()))?, should_exit: false, } } "app/config/get" => { let parsed: ConfigGetParams = parse_params(params_or_object(params))?; let response = process_app_request(state, AppRequest::ConfigGet { key: parsed.key }).await; StdioDispatchResult { result: serde_json::to_value(response) .map_err(|err| JsonRpcError::internal(err.to_string()))?, should_exit: false, } } "app/config/set" => { let parsed: ConfigSetParams = parse_params(params_or_object(params))?; let response = process_app_request( state, AppRequest::ConfigSet { key: parsed.key, value: parsed.value, }, ) .await; StdioDispatchResult { result: serde_json::to_value(response) .map_err(|err| JsonRpcError::internal(err.to_string()))?, should_exit: false, } } "app/config/unset" => { let parsed: ConfigGetParams = parse_params(params_or_object(params))?; let response = process_app_request(state, AppRequest::ConfigUnset { key: parsed.key }).await; StdioDispatchResult { result: serde_json::to_value(response) .map_err(|err| JsonRpcError::internal(err.to_string()))?, should_exit: false, } } "app/config/list" => { let response = process_app_request(state, AppRequest::ConfigList).await; StdioDispatchResult { result: serde_json::to_value(response) .map_err(|err| JsonRpcError::internal(err.to_string()))?, should_exit: false, } } "app/models" => { let response = process_app_request(state, AppRequest::Models).await; StdioDispatchResult { result: serde_json::to_value(response) .map_err(|err| JsonRpcError::internal(err.to_string()))?, should_exit: false, } } "app/thread_loaded_list" | "app/thread-loaded-list" => { let response = process_app_request(state, AppRequest::ThreadLoadedList).await; StdioDispatchResult { result: serde_json::to_value(response) .map_err(|err| JsonRpcError::internal(err.to_string()))?, should_exit: false, } } "prompt/capabilities" => StdioDispatchResult { result: json!({ "methods": ["prompt/request", "prompt/run"] }), should_exit: false, }, "prompt/request" | "prompt/run" => { let request: PromptRequest = parse_params(params)?; let response = handle_prompt_request(state, request).await?; StdioDispatchResult { result: serde_json::to_value(response) .map_err(|err| JsonRpcError::internal(err.to_string()))?, should_exit: false, } } "shutdown" => StdioDispatchResult { result: json!({"ok": true, "status": "stopped"}), should_exit: true, }, _ => return Err(JsonRpcError::method_not_found(method)), }; Ok(outcome) } async fn process_app_request(state: &AppState, req: AppRequest) -> AppResponse { match req { AppRequest::Capabilities => AppResponse { ok: true, data: json!({ "routes": ["/thread", "/app", "/prompt", "/tool", "/jobs", "/mcp/startup"], "config": ["get", "set", "unset", "list"], "events": ["response_start", "response_delta", "response_end", "tool_call_start", "tool_call_result", "mcp_startup_update", "mcp_startup_complete"], "transport": "stdio+http", "config_path": state.config_path.as_ref().map(|p| p.display().to_string()), }), events: Vec::new(), }, AppRequest::ConfigGet { key } => { let cfg = state.config.read().await; AppResponse { ok: true, data: json!({ "key": key, "value": cfg.get_value(&key) }), events: Vec::new(), } } AppRequest::ConfigSet { key, value } => { let mut cfg = state.config.write().await; let result = cfg.set_value(&key, &value); let ok = result.is_ok(); let message = result.err().map(|e| e.to_string()); let snapshot = cfg.clone(); drop(cfg); let _ = persist_config(state, snapshot).await; AppResponse { ok, data: json!({ "key": key, "value": value, "error": message }), events: Vec::new(), } } AppRequest::ConfigUnset { key } => { let mut cfg = state.config.write().await; let result = cfg.unset_value(&key); let ok = result.is_ok(); let message = result.err().map(|e| e.to_string()); let snapshot = cfg.clone(); drop(cfg); let _ = persist_config(state, snapshot).await; AppResponse { ok, data: json!({ "key": key, "error": message }), events: Vec::new(), } } AppRequest::ConfigList => { let cfg = state.config.read().await; AppResponse { ok: true, data: json!({ "values": cfg.list_values() }), events: Vec::new(), } } AppRequest::Models => AppResponse { ok: true, data: json!({ "models": state.registry.list() }), events: Vec::new(), }, AppRequest::ThreadLoadedList => { let mut runtime = state.runtime.lock().await; let response = runtime .handle_thread(codewhale_protocol::ThreadRequest::List( codewhale_protocol::ThreadListParams { include_archived: false, limit: Some(50), }, )) .await; match response { Ok(thread_resp) => AppResponse { ok: true, data: json!({ "threads": thread_resp.threads }), events: thread_resp.events, }, Err(err) => AppResponse { ok: false, data: json!({ "error": err.to_string() }), events: Vec::new(), }, } } } } async fn persist_config(state: &AppState, config: codewhale_config::ConfigToml) -> Result<()> { if state.config_path.is_none() { return Ok(()); } let mut store = ConfigStore::load(state.config_path.clone())?; store.config = config; store.save() }