diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2a534f41..dd7f2b77 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -86,3 +86,5 @@ jobs: run: pnpm deepseek-app:typecheck - name: Test run: pnpm deepseek-app:test + - name: Build web app + run: pnpm deepseek-app:web:build diff --git a/docs/RUNTIME_API.md b/docs/RUNTIME_API.md index 833c110c..4a5e83fc 100644 --- a/docs/RUNTIME_API.md +++ b/docs/RUNTIME_API.md @@ -24,6 +24,7 @@ The runtime uses a durable Thread/Turn/Item lifecycle. - `ThreadRecord` - `id`, `created_at`, `updated_at` - `model`, `workspace`, `mode` + - `system_prompt` (optional text) - `latest_turn_id`, `latest_response_bookmark`, `archived` - `TurnRecord` - `id`, `thread_id` @@ -36,17 +37,43 @@ The runtime uses a durable Thread/Turn/Item lifecycle. The event log is append-only with global monotonic `seq` for replay/resume. +Session resume note: +- Saved session `system_prompt` currently round-trips as plain text. Structured `SystemPrompt::Blocks` metadata is not preserved when resuming into runtime threads. + ## Endpoints ### Health and Session - `GET /health` - `GET /v1/sessions?limit=50&search=` +- `GET /v1/sessions/{id}` +- `DELETE /v1/sessions/{id}` +- `POST /v1/sessions/{id}/resume-thread` - `GET /v1/workspace/status` - `GET /v1/skills` - `GET /v1/apps/mcp/servers` - `GET /v1/apps/mcp/tools?server=` +Resume session request body (all fields optional): + +```json +{ + "model": "deepseek-chat", + "mode": "agent" +} +``` + +Resume session response: + +```json +{ + "thread_id": "thr_1234abcd", + "session_id": "sess_5678efgh", + "message_count": 24, + "summary": "Resumed session 'Refactor plan' (24 messages) into thread thr_1234abcd" +} +``` + ### Compatibility Stream (Single Turn) - `POST /v1/stream` diff --git a/src/runtime_api.rs b/src/runtime_api.rs index 39b1435a..708aa99d 100644 --- a/src/runtime_api.rs +++ b/src/runtime_api.rs @@ -376,15 +376,9 @@ async fn get_session( ) -> Result, ApiError> { let manager = SessionManager::new(state.sessions_dir.clone()) .map_err(|e| ApiError::internal(format!("Failed to open sessions dir: {e}")))?; - let session = manager.load_session(&id).map_err(|e| { - if e.kind() == std::io::ErrorKind::NotFound { - ApiError::not_found(format!("Session '{id}' not found")) - } else if e.kind() == std::io::ErrorKind::InvalidData { - ApiError::bad_request(format!("Failed to parse session '{id}': {e}")) - } else { - ApiError::not_found(format!("Session '{id}' not found: {e}")) - } - })?; + let session = manager + .load_session(&id) + .map_err(|e| map_session_err(&id, e, "read"))?; Ok(Json(session_to_detail(session))) } @@ -395,21 +389,17 @@ async fn resume_session_thread( ) -> Result<(StatusCode, Json), ApiError> { let manager = SessionManager::new(state.sessions_dir.clone()) .map_err(|e| ApiError::internal(format!("Failed to open sessions dir: {e}")))?; - let session = manager.load_session(&id).map_err(|e| { - if e.kind() == std::io::ErrorKind::NotFound { - ApiError::not_found(format!("Session '{id}' not found")) - } else if e.kind() == std::io::ErrorKind::InvalidData { - ApiError::bad_request(format!("Failed to parse session '{id}': {e}")) - } else { - ApiError::not_found(format!("Session '{id}' not found: {e}")) - } - })?; + let session = manager + .load_session(&id) + .map_err(|e| map_session_err(&id, e, "read"))?; - let model = req.model.unwrap_or_else(|| { - session.metadata.model.clone() - }); + let model = req.model.unwrap_or_else(|| session.metadata.model.clone()); let mode = req.mode.unwrap_or_else(|| { - session.metadata.mode.clone().unwrap_or_else(|| "agent".to_string()) + session + .metadata + .mode + .clone() + .unwrap_or_else(|| "agent".to_string()) }); let thread = state @@ -456,13 +446,9 @@ async fn delete_session( ) -> Result { let manager = SessionManager::new(state.sessions_dir.clone()) .map_err(|e| ApiError::internal(format!("Failed to open sessions dir: {e}")))?; - manager.delete_session(&id).map_err(|e| { - if e.kind() == std::io::ErrorKind::NotFound { - ApiError::not_found(format!("Session '{id}' not found")) - } else { - ApiError::internal(format!("Failed to delete session '{id}': {e}")) - } - })?; + manager + .delete_session(&id) + .map_err(|e| map_session_err(&id, e, "delete"))?; Ok(StatusCode::NO_CONTENT) } @@ -497,6 +483,19 @@ fn session_to_detail(session: SavedSession) -> SessionDetailResponse { } } +fn map_session_err(id: &str, err: std::io::Error, action: &str) -> ApiError { + match err.kind() { + std::io::ErrorKind::NotFound => ApiError::not_found(format!("Session '{id}' not found")), + std::io::ErrorKind::InvalidData => { + ApiError::bad_request(format!("Failed to parse session '{id}': {err}")) + } + std::io::ErrorKind::InvalidInput => { + ApiError::bad_request(format!("Invalid session id '{id}'")) + } + _ => ApiError::internal(format!("Failed to {action} session '{id}': {err}")), + } +} + async fn create_task( State(state): State, Json(mut req): Json, @@ -2567,6 +2566,38 @@ mod tests { Ok(()) } + #[tokio::test] + async fn session_endpoints_reject_invalid_id() -> Result<()> { + let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else { + return Ok(()); + }; + let client = reqwest::Client::new(); + + let get_resp = client + .get(format!("http://{addr}/v1/sessions/invalid%20id")) + .send() + .await?; + assert_eq!(get_resp.status(), StatusCode::BAD_REQUEST); + + let resume_resp = client + .post(format!( + "http://{addr}/v1/sessions/invalid%20id/resume-thread" + )) + .json(&json!({})) + .send() + .await?; + assert_eq!(resume_resp.status(), StatusCode::BAD_REQUEST); + + let delete_resp = client + .delete(format!("http://{addr}/v1/sessions/invalid%20id")) + .send() + .await?; + assert_eq!(delete_resp.status(), StatusCode::BAD_REQUEST); + + handle.abort(); + Ok(()) + } + #[tokio::test] async fn session_resume_thread_returns_404_for_missing_session() -> Result<()> { let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else { diff --git a/src/runtime_threads.rs b/src/runtime_threads.rs index 74a49d6b..e45cfd7a 100644 --- a/src/runtime_threads.rs +++ b/src/runtime_threads.rs @@ -837,13 +837,13 @@ impl RuntimeThreadManager { let user_text = if user_buf.is_empty() { String::new() } else { - user_buf.drain(..).collect::>().join("\n") + std::mem::take(&mut user_buf).join("\n") }; pending_pairs.push((user_text, Some(text))); } } if !user_buf.is_empty() { - let user_text = user_buf.drain(..).collect::>().join("\n"); + let user_text = std::mem::take(&mut user_buf).join("\n"); pending_pairs.push((user_text, None)); } @@ -875,10 +875,7 @@ impl RuntimeThreadManager { if let Some(assistant_text) = assistant_text { let asst_summary = if assistant_text.len() > SUMMARY_LIMIT { - format!( - "{}...", - &assistant_text[..SUMMARY_LIMIT.saturating_sub(3)] - ) + format!("{}...", &assistant_text[..SUMMARY_LIMIT.saturating_sub(3)]) } else { assistant_text.clone() }; @@ -2017,7 +2014,7 @@ fn enforce_lru_capacity( max_active_threads: usize, ) -> Vec { let mut evicted = Vec::new(); - if active.engines.len() < max_active_threads { + if max_active_threads == 0 || active.engines.len() < max_active_threads { return evicted; } let protected = active @@ -2032,7 +2029,8 @@ fn enforce_lru_capacity( }) .collect::>(); - while active.engines.len() >= max_active_threads { + let scan_limit = active.lru.len(); + for _ in 0..scan_limit { let Some(candidate) = active.lru.pop_front() else { break; }; @@ -2192,6 +2190,45 @@ mod tests { } } + #[test] + fn enforce_lru_capacity_does_not_loop_when_all_threads_are_active() { + let mut active = ActiveThreads::default(); + let harness_a = mock_engine_handle(); + let harness_b = mock_engine_handle(); + + active.engines.insert( + "thr_a".to_string(), + ActiveThreadState { + engine: harness_a.handle, + active_turn: Some(ActiveTurnState { + turn_id: "turn_a".to_string(), + interrupt_requested: false, + auto_approve: true, + trust_mode: false, + }), + }, + ); + active.engines.insert( + "thr_b".to_string(), + ActiveThreadState { + engine: harness_b.handle, + active_turn: Some(ActiveTurnState { + turn_id: "turn_b".to_string(), + interrupt_requested: false, + auto_approve: true, + trust_mode: false, + }), + }, + ); + active.lru.push_back("thr_a".to_string()); + active.lru.push_back("thr_b".to_string()); + + let evicted = enforce_lru_capacity(&mut active, 2); + assert!(evicted.is_empty(), "no idle threads should be evicted"); + assert_eq!(active.engines.len(), 2); + assert_eq!(active.lru.len(), 2); + } + #[tokio::test] async fn thread_lifecycle_persists_across_restart() -> Result<()> { let runtime_dir = test_runtime_dir(); diff --git a/src/session_manager.rs b/src/session_manager.rs index 8a9d9bf3..989444f8 100644 --- a/src/session_manager.rs +++ b/src/session_manager.rs @@ -102,6 +102,26 @@ pub struct SessionManager { } impl SessionManager { + fn validated_session_path(&self, id: &str) -> std::io::Result { + let trimmed = id.trim(); + if trimmed.is_empty() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Session id cannot be empty", + )); + } + if !trimmed + .chars() + .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_') + { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("Invalid session id '{id}'"), + )); + } + Ok(self.sessions_dir.join(format!("{trimmed}.json"))) + } + /// Create a new `SessionManager` with the specified sessions directory pub fn new(sessions_dir: PathBuf) -> std::io::Result { // Ensure the sessions directory exists @@ -116,14 +136,13 @@ impl SessionManager { /// Save a session to disk using atomic write (temp file + rename). pub fn save_session(&self, session: &SavedSession) -> std::io::Result { - let filename = format!("{}.json", session.metadata.id); - let path = self.sessions_dir.join(&filename); + let path = self.validated_session_path(&session.metadata.id)?; let content = serde_json::to_string_pretty(session) .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; // Atomic write: write to temp file then rename to avoid corruption - let tmp_filename = format!(".{}.tmp", session.metadata.id); + let tmp_filename = format!(".{}.tmp", session.metadata.id.trim()); let tmp_path = self.sessions_dir.join(&tmp_filename); fs::write(&tmp_path, &content)?; fs::rename(&tmp_path, &path)?; @@ -228,8 +247,7 @@ impl SessionManager { /// Load a session by ID pub fn load_session(&self, id: &str) -> std::io::Result { - let filename = format!("{id}.json"); - let path = self.sessions_dir.join(&filename); + let path = self.validated_session_path(id)?; let content = fs::read_to_string(&path)?; let session: SavedSession = serde_json::from_str(&content) @@ -309,8 +327,7 @@ impl SessionManager { /// Delete a session by ID pub fn delete_session(&self, id: &str) -> std::io::Result<()> { - let filename = format!("{id}.json"); - let path = self.sessions_dir.join(&filename); + let path = self.validated_session_path(id)?; fs::remove_file(path) } @@ -583,6 +600,22 @@ mod tests { assert!(manager.load_session(&session_id).is_err()); } + #[test] + fn test_session_id_rejects_invalid_characters() { + let tmp = tempdir().expect("tempdir"); + let manager = SessionManager::new(tmp.path().join("sessions")).expect("new"); + + let err = manager + .load_session("../outside") + .expect_err("invalid id should fail"); + assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); + + let err = manager + .delete_session("sess bad") + .expect_err("invalid id should fail"); + assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); + } + #[test] fn test_truncate_title() { assert_eq!(truncate_title("Short", 50), "Short");