feat(runtime-api): daemon API quartet for whalescale (#561 #562 #563 #564) (#567)

Bridge work to unblock whalescale-desktop's Settings/Composer/Archived-chats
flows without requiring a daemon recompile per dev-port or client-side
aggregation.

#561 / whalescale#255 — CORS allow-list configurable
* Add `[runtime_api] cors_origins` config field, `--cors-origin URL`
  (repeatable) flag on `deepseek serve --http`, and `DEEPSEEK_CORS_ORIGINS`
  env var. User entries stack on top of the built-in defaults
  (localhost:3000, localhost:1420, tauri://localhost). Resolution preserves
  first-seen order and drops empty/duplicate values; invalid HeaderValues
  log a warning and are skipped.
* Refactor `cors_layer()` to read merged origins from `RuntimeApiState`.

#562 / whalescale#256 — `PATCH /v1/threads/{id}` accepts the full editable
field set
* Extend `UpdateThreadRequest` with `allow_shell`, `trust_mode`,
  `auto_approve`, `model`, `mode`, `title`, `system_prompt`. Each is
  optional; missing means no change. Empty-string clears `title`/
  `system_prompt`. Empty `model`/`mode` rejected with 400.
* Add `title: Option<String>` to `ThreadRecord` (additive, no schema bump
  per documented criteria — old readers ignore the field without
  misinterpretation). `list_threads_summary` now returns the user-set title
  when present, falling back to the derived input-summary title.
* `thread.updated` event payload now carries a `changes` map with only the
  fields that actually changed.

#563 / whalescale#260 — list-archived-only filter
* New `archived_only=true` query param on `GET /v1/threads` and
  `GET /v1/threads/summary`. Backed by a new `ThreadListFilter` enum
  (`ActiveOnly` | `IncludeArchived` | `ArchivedOnly`). `archived_only`
  takes precedence over `include_archived`. Default behavior unchanged.

#564 / whalescale#261 — `GET /v1/usage` aggregation
* New `RuntimeThreadManager::aggregate_usage` walks all threads/turns,
  filters by inclusive `since`/`until` RFC 3339 bounds, accumulates token
  totals + cost (via `pricing::calculate_turn_cost_from_usage`), and
  groups by `day` (default), `model`, `provider`, or `thread`.
* New `GET /v1/usage` route. `since`/`until`/`group_by` query params,
  `since > until` and unknown `group_by` rejected with 400. Empty time
  ranges yield empty `buckets` (never 404).

5 new tests cover preflight Allow-Origin echoing for both default and
extra origins, the extended PATCH field set + clear-by-empty + 400 paths,
the archived_only filter on list + summary endpoints, and the
/v1/usage envelope + validation errors. Existing 13 runtime_api tests
continue to pass; the parity gates and full workspace test suite are clean.

`docs/RUNTIME_API.md` and `config.example.toml` updated to document the
new params, body shape, endpoint, and CORS knob.

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hunter Bown
2026-05-04 02:18:19 -05:00
committed by GitHub
parent 3e56f3526e
commit 0047b3225b
6 changed files with 917 additions and 35 deletions
+21
View File
@@ -360,6 +360,27 @@ default_text_model = "deepseek-ai/deepseek-v4-pro"
# event = "session_start"
# command = "echo 'DeepSeek TUI session started'"
# ─────────────────────────────────────────────────────────────────────────────────
# Runtime API (`deepseek serve --http`) (#561)
# ─────────────────────────────────────────────────────────────────────────────────
# Tuning knobs for the local HTTP/SSE daemon. The server binds to 127.0.0.1
# by default and is intended for local UIs (whalescale-desktop, dashboards,
# automation scripts). Today this section only controls the CORS allow-list;
# host/port/workers stay on `--host`, `--port`, and `--workers` flags.
#
# Built-in defaults always include:
# http://localhost:3000 http://127.0.0.1:3000
# http://localhost:1420 http://127.0.0.1:1420
# tauri://localhost
#
# Use `cors_origins` to add extra dev origins (e.g. Vite's default `:5173`).
# User entries STACK on top of the defaults — they do not replace them. The
# CLI flag `--cors-origin URL` (repeatable) and env var
# `DEEPSEEK_CORS_ORIGINS=url1,url2` resolve to the same merged list.
#
# [runtime_api]
# cors_origins = ["http://localhost:5173", "http://127.0.0.1:5173"]
# ─────────────────────────────────────────────────────────────────────────────────
# Requirements (admin constraints) example file
# ─────────────────────────────────────────────────────────────────────────────────
+22
View File
@@ -771,6 +771,27 @@ pub struct Config {
/// Sub-agent model overrides.
#[serde(default)]
pub subagents: Option<SubagentsConfig>,
/// Runtime API server tuning (`deepseek serve --http`). Currently only
/// hosts the CORS allow-list extension (whalescale#255 / #561). When the
/// table is absent, the daemon ships with localhost:3000 / localhost:1420
/// / tauri://localhost as the only allowed dev origins.
#[serde(default)]
pub runtime_api: Option<RuntimeApiConfig>,
}
/// `[runtime_api]` table — knobs for the local HTTP/SSE daemon.
#[derive(Debug, Clone, Deserialize, Default)]
pub struct RuntimeApiConfig {
/// Additional CORS origins to allow on top of the built-in defaults
/// (`http://localhost:{3000,1420}`, `http://127.0.0.1:{3000,1420}`,
/// `tauri://localhost`). Useful when developing a UI against a non-default
/// dev server port (e.g. Vite's default `:5173`).
///
/// Resolution order (highest priority first): `--cors-origin` CLI flag,
/// `DEEPSEEK_CORS_ORIGINS` env var (comma-separated), this field. Whalescale#255 / #561.
#[serde(default)]
pub cors_origins: Option<Vec<String>>,
}
/// `[skills]` table — knobs for the community-skill installer.
@@ -2004,6 +2025,7 @@ fn merge_config(base: Config, override_cfg: Config) -> Config {
per_model: override_cfg.context.per_model.or(base.context.per_model),
},
subagents: override_cfg.subagents.or(base.subagents),
runtime_api: override_cfg.runtime_api.or(base.runtime_api),
}
}
+48
View File
@@ -398,6 +398,12 @@ struct ServeArgs {
/// Background task worker count (1-8)
#[arg(long, default_value_t = 2)]
workers: usize,
/// Additional CORS origin to allow (repeatable). Stacks on top of the
/// built-in defaults (localhost:3000, localhost:1420, tauri://localhost).
/// Also reads `DEEPSEEK_CORS_ORIGINS` (comma-separated) and
/// `[runtime_api] cors_origins` from `config.toml`. Whalescale#255.
#[arg(long = "cors-origin", value_name = "URL")]
cors_origin: Vec<String>,
}
#[derive(Subcommand, Debug, Clone)]
@@ -692,6 +698,7 @@ async fn main() -> Result<()> {
mcp_server::run_mcp_server(workspace)
} else if args.http {
let config = load_config_from_cli(&cli)?;
let cors_origins = resolve_cors_origins(&config, &args.cors_origin);
runtime_api::run_http_server(
config,
workspace,
@@ -699,6 +706,7 @@ async fn main() -> Result<()> {
host: args.host,
port: args.port,
workers: args.workers.clamp(1, 8),
cors_origins,
},
)
.await
@@ -1001,6 +1009,46 @@ fn init_plugins_dir(
Ok((readme_path, example_path, readme_status, example_status))
}
/// Resolve the user-supplied CORS origins for `deepseek serve --http`.
///
/// Sources, in priority order (later sources extend earlier ones):
/// 1. `--cors-origin URL` flags (repeatable)
/// 2. `DEEPSEEK_CORS_ORIGINS` env var (comma-separated)
/// 3. `[runtime_api] cors_origins = [...]` in `config.toml`
///
/// The runtime API always allows the built-in dev defaults
/// (localhost:3000, localhost:1420, tauri://localhost). User entries are
/// appended on top — empty strings are skipped, and duplicates are deduped
/// while preserving first-seen order. Whalescale#255 / #561.
fn resolve_cors_origins(config: &Config, flag_origins: &[String]) -> Vec<String> {
let mut out: Vec<String> = Vec::new();
let mut push = |raw: &str| {
let trimmed = raw.trim();
if trimmed.is_empty() {
return;
}
if !out.iter().any(|existing| existing == trimmed) {
out.push(trimmed.to_string());
}
};
for o in flag_origins {
push(o);
}
if let Ok(env_value) = std::env::var("DEEPSEEK_CORS_ORIGINS") {
for piece in env_value.split(',') {
push(piece);
}
}
if let Some(rt) = &config.runtime_api
&& let Some(list) = &rt.cors_origins
{
for o in list {
push(o);
}
}
out
}
fn deepseek_home_dir() -> PathBuf {
dirs::home_dir().map_or_else(|| PathBuf::from(".deepseek"), |h| h.join(".deepseek"))
}
+491 -22
View File
@@ -33,8 +33,8 @@ use crate::config::{Config, DEFAULT_TEXT_MODEL};
use crate::mcp::{McpConfig, McpPool};
use crate::runtime_threads::{
CompactThreadRequest, CreateThreadRequest, RuntimeThreadManager, RuntimeThreadManagerConfig,
SharedRuntimeThreadManager, StartTurnRequest, SteerTurnRequest, ThreadDetail, ThreadRecord,
TurnItemKind, TurnRecord, UpdateThreadRequest,
SharedRuntimeThreadManager, StartTurnRequest, SteerTurnRequest, ThreadDetail, ThreadListFilter,
ThreadRecord, TurnItemKind, TurnRecord, UpdateThreadRequest, UsageGroupBy,
};
use crate::session_manager::{SavedSession, SessionManager, SessionMetadata, default_sessions_dir};
use crate::skills::SkillRegistry;
@@ -48,6 +48,7 @@ pub struct RuntimeApiState {
workspace: PathBuf,
task_manager: SharedTaskManager,
runtime_threads: SharedRuntimeThreadManager,
cors_origins: Vec<String>,
sessions_dir: PathBuf,
mcp_config_path: PathBuf,
automations: SharedAutomationManager,
@@ -58,6 +59,23 @@ pub struct RuntimeApiOptions {
pub host: String,
pub port: u16,
pub workers: usize,
/// Additional CORS origins to allow on top of the built-in defaults
/// (`http://localhost:{3000,1420}`, `http://127.0.0.1:{3000,1420}`,
/// `tauri://localhost`). Populated by `--cors-origin` (repeatable),
/// `DEEPSEEK_CORS_ORIGINS` (comma-separated), and `[runtime_api]
/// cors_origins` in `config.toml`. Whalescale#255 / #561.
pub cors_origins: Vec<String>,
}
impl Default for RuntimeApiOptions {
fn default() -> Self {
Self {
host: "127.0.0.1".to_string(),
port: 7878,
workers: 2,
cors_origins: Vec::new(),
}
}
}
#[derive(Debug, Deserialize)]
@@ -125,6 +143,9 @@ struct TasksQuery {
struct ThreadsQuery {
limit: Option<usize>,
include_archived: Option<bool>,
/// When `true`, returns archived threads only (overrides `include_archived`).
/// Whalescale#260 / #563.
archived_only: Option<bool>,
}
#[derive(Debug, Deserialize)]
@@ -132,6 +153,22 @@ struct ThreadSummaryQuery {
limit: Option<usize>,
search: Option<String>,
include_archived: Option<bool>,
/// When `true`, returns archived threads only (overrides `include_archived`).
/// Whalescale#260 / #563.
archived_only: Option<bool>,
}
fn resolve_thread_filter(
include_archived: Option<bool>,
archived_only: Option<bool>,
) -> ThreadListFilter {
if archived_only.unwrap_or(false) {
ThreadListFilter::ArchivedOnly
} else if include_archived.unwrap_or(false) {
ThreadListFilter::IncludeArchived
} else {
ThreadListFilter::ActiveOnly
}
}
#[derive(Debug, Serialize)]
@@ -269,6 +306,7 @@ pub async fn run_http_server(
workspace,
task_manager,
runtime_threads,
cors_origins: options.cors_origins.clone(),
sessions_dir,
mcp_config_path: config.mcp_config_path(),
automations,
@@ -339,7 +377,8 @@ pub fn build_router(state: RuntimeApiState) -> Router {
.route("/v1/automations/{id}/pause", post(pause_automation))
.route("/v1/automations/{id}/resume", post(resume_automation))
.route("/v1/automations/{id}/runs", get(list_automation_runs))
.layer(cors_layer())
.route("/v1/usage", get(get_usage))
.layer(cors_layer(&state.cors_origins))
.with_state(state)
}
@@ -557,9 +596,10 @@ async fn list_threads(
State(state): State<RuntimeApiState>,
Query(query): Query<ThreadsQuery>,
) -> Result<Json<Vec<ThreadRecord>>, ApiError> {
let filter = resolve_thread_filter(query.include_archived, query.archived_only);
let threads = state
.runtime_threads
.list_threads(query.include_archived.unwrap_or(false), query.limit)
.list_threads(filter, query.limit)
.await
.map_err(|e| ApiError::internal(e.to_string()))?;
Ok(Json(threads))
@@ -571,9 +611,10 @@ async fn list_threads_summary(
) -> Result<Json<Vec<ThreadSummary>>, ApiError> {
let limit = query.limit.unwrap_or(50).clamp(1, 500);
let search = query.search.as_deref().map(str::to_ascii_lowercase);
let filter = resolve_thread_filter(query.include_archived, query.archived_only);
let threads = state
.runtime_threads
.list_threads(query.include_archived.unwrap_or(false), Some(limit))
.list_threads(filter, Some(limit))
.await
.map_err(|e| ApiError::internal(e.to_string()))?;
@@ -588,15 +629,23 @@ async fn list_threads_summary(
let latest_status =
latest_turn.map(|turn| format!("{:?}", turn.status).to_ascii_lowercase());
let title = latest_turn
.map(|turn| {
if turn.input_summary.trim().is_empty() {
"New Thread".to_string()
} else {
truncate_text(&turn.input_summary, 72)
}
})
.unwrap_or_else(|| "New Thread".to_string());
let title = thread
.title
.as_deref()
.map(str::trim)
.filter(|t| !t.is_empty())
.map(|t| truncate_text(t, 72))
.unwrap_or_else(|| {
latest_turn
.map(|turn| {
if turn.input_summary.trim().is_empty() {
"New Thread".to_string()
} else {
truncate_text(&turn.input_summary, 72)
}
})
.unwrap_or_else(|| "New Thread".to_string())
});
let preview = detail
.items
@@ -1366,15 +1415,88 @@ fn load_mcp_config_or_default(path: &std::path::Path) -> Result<McpConfig, ApiEr
})
}
fn cors_layer() -> CorsLayer {
#[derive(Debug, Deserialize)]
struct UsageQuery {
/// ISO-8601 lower bound (inclusive). When omitted, no lower bound.
since: Option<String>,
/// ISO-8601 upper bound (inclusive). When omitted, no upper bound.
until: Option<String>,
/// Bucket key. One of `day` (default), `model`, `provider`, `thread`.
group_by: Option<String>,
}
fn parse_iso8601(raw: &str, field: &str) -> Result<chrono::DateTime<Utc>, ApiError> {
chrono::DateTime::parse_from_rfc3339(raw)
.map(|dt| dt.with_timezone(&Utc))
.map_err(|e| ApiError::bad_request(format!("Invalid {field} (expected RFC 3339): {e}")))
}
async fn get_usage(
State(state): State<RuntimeApiState>,
Query(query): Query<UsageQuery>,
) -> Result<Json<Value>, ApiError> {
let since = match query.since.as_deref() {
Some(raw) => Some(parse_iso8601(raw, "since")?),
None => None,
};
let until = match query.until.as_deref() {
Some(raw) => Some(parse_iso8601(raw, "until")?),
None => None,
};
if let (Some(s), Some(u)) = (since, until)
&& s > u
{
return Err(ApiError::bad_request("since must be <= until".to_string()));
}
let group_by = match query.group_by.as_deref().unwrap_or("day") {
"day" => UsageGroupBy::Day,
"model" => UsageGroupBy::Model,
"provider" => UsageGroupBy::Provider,
"thread" => UsageGroupBy::Thread,
other => {
return Err(ApiError::bad_request(format!(
"Unsupported group_by '{other}': expected one of day, model, provider, thread"
)));
}
};
let aggregation = state
.runtime_threads
.aggregate_usage(since, until, group_by)
.await
.map_err(|e| ApiError::internal(e.to_string()))?;
Ok(Json(json!(aggregation)))
}
/// Built-in dev origins always allowed by the runtime API (whalescale#255).
const DEFAULT_CORS_ORIGINS: &[&str] = &[
"http://localhost:3000",
"http://127.0.0.1:3000",
"http://localhost:1420",
"http://127.0.0.1:1420",
"tauri://localhost",
];
fn cors_layer(extra_origins: &[String]) -> CorsLayer {
let mut origins: Vec<HeaderValue> = DEFAULT_CORS_ORIGINS
.iter()
.filter_map(|o| HeaderValue::from_str(o).ok())
.collect();
for raw in extra_origins {
let trimmed = raw.trim();
if trimmed.is_empty() {
continue;
}
match HeaderValue::from_str(trimmed) {
Ok(value) if !origins.contains(&value) => origins.push(value),
Ok(_) => {}
Err(err) => tracing::warn!(
"Ignoring invalid CORS origin '{trimmed}': {err}; expected scheme://host[:port]"
),
}
}
CorsLayer::new()
.allow_origin([
HeaderValue::from_static("http://localhost:3000"),
HeaderValue::from_static("http://127.0.0.1:3000"),
HeaderValue::from_static("http://localhost:1420"),
HeaderValue::from_static("http://127.0.0.1:1420"),
HeaderValue::from_static("tauri://localhost"),
])
.allow_origin(origins)
.allow_methods([
Method::GET,
Method::POST,
@@ -1569,6 +1691,7 @@ mod tests {
workspace: PathBuf::from("."),
task_manager: manager,
runtime_threads: runtime_threads.clone(),
cors_origins: Vec::new(),
sessions_dir,
mcp_config_path: root.join("mcp.json"),
automations,
@@ -2726,4 +2849,350 @@ mod tests {
handle.abort();
Ok(())
}
/// #561 / whalescale#255 — extra CORS origins from `RuntimeApiOptions`
/// are added on top of the built-in defaults and propagate through to the
/// `Access-Control-Allow-Origin` response header for preflight requests.
/// Built-in defaults must keep working unchanged.
#[tokio::test]
async fn cors_layer_appends_extra_origins_and_keeps_defaults() -> Result<()> {
// The cors_layer fn is the layer factory — exercise it through a
// Router with a single trivial route so we can issue OPTIONS preflights
// and observe the response headers.
let extra = vec!["http://localhost:5173".to_string()];
let layer = cors_layer(&extra);
let router: Router = Router::new()
.route("/probe", get(|| async { "ok" }))
.layer(layer);
let listener = match TcpListener::bind("127.0.0.1:0").await {
Ok(listener) => listener,
Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => return Ok(()),
Err(err) => return Err(err.into()),
};
let addr = listener.local_addr()?;
let handle = tokio::spawn(async move {
let _ = axum::serve(listener, router).await;
});
let client = reqwest::Client::new();
// The user-supplied origin is allowed.
let resp = client
.request(reqwest::Method::OPTIONS, format!("http://{addr}/probe"))
.header("Origin", "http://localhost:5173")
.header("Access-Control-Request-Method", "GET")
.send()
.await?;
assert_eq!(
resp.headers()
.get("access-control-allow-origin")
.and_then(|v| v.to_str().ok()),
Some("http://localhost:5173")
);
// A built-in default origin still works.
let resp = client
.request(reqwest::Method::OPTIONS, format!("http://{addr}/probe"))
.header("Origin", "http://localhost:1420")
.header("Access-Control-Request-Method", "GET")
.send()
.await?;
assert_eq!(
resp.headers()
.get("access-control-allow-origin")
.and_then(|v| v.to_str().ok()),
Some("http://localhost:1420")
);
// An origin that's neither configured nor a default is rejected
// (CorsLayer omits the Allow-Origin header on mismatch).
let resp = client
.request(reqwest::Method::OPTIONS, format!("http://{addr}/probe"))
.header("Origin", "http://malicious.example")
.header("Access-Control-Request-Method", "GET")
.send()
.await?;
assert!(
resp.headers().get("access-control-allow-origin").is_none(),
"non-allowed origin must not be echoed back"
);
handle.abort();
Ok(())
}
/// #561 — invalid origins (non-ASCII, etc.) are skipped without aborting
/// the layer build.
#[test]
fn cors_layer_skips_invalid_origins() {
let extras = vec![
"http://valid.example".to_string(),
// Embedded NUL char makes `HeaderValue::from_str` fail.
"http://invalid.example\0".to_string(),
" ".to_string(), // whitespace-only is dropped
];
// Should not panic.
let _ = cors_layer(&extras);
}
/// #562 / whalescale#256 — `PATCH /v1/threads/{id}` accepts the new
/// fields (allow_shell, trust_mode, auto_approve, model, mode, title,
/// system_prompt). Each is independently optional; an empty string clears
/// `title` / `system_prompt` back to None.
#[tokio::test]
async fn patch_thread_accepts_extended_field_set() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = reqwest::Client::new();
let created: serde_json::Value = client
.post(format!("http://{addr}/v1/threads"))
.json(&json!({
"model": "deepseek-v4-flash",
"mode": "agent"
}))
.send()
.await?
.error_for_status()?
.json()
.await?;
let thread_id = created["id"]
.as_str()
.context("missing thread id")?
.to_string();
// Patch every new field at once.
let patched: serde_json::Value = client
.patch(format!("http://{addr}/v1/threads/{thread_id}"))
.json(&json!({
"allow_shell": true,
"trust_mode": true,
"auto_approve": true,
"model": "deepseek-v4-pro",
"mode": "yolo",
"title": "Whalescale UI test thread",
"system_prompt": "You are a useful assistant."
}))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(patched["allow_shell"], true);
assert_eq!(patched["trust_mode"], true);
assert_eq!(patched["auto_approve"], true);
assert_eq!(patched["model"], "deepseek-v4-pro");
assert_eq!(patched["mode"], "yolo");
assert_eq!(patched["title"], "Whalescale UI test thread");
assert_eq!(patched["system_prompt"], "You are a useful assistant.");
// Empty string clears title back to None.
let cleared: serde_json::Value = client
.patch(format!("http://{addr}/v1/threads/{thread_id}"))
.json(&json!({ "title": "" }))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert!(
cleared["title"].is_null() || !cleared.as_object().unwrap().contains_key("title"),
"empty title must serialize as None: {cleared:?}"
);
// Empty patch (no fields) is still rejected.
let empty = client
.patch(format!("http://{addr}/v1/threads/{thread_id}"))
.json(&json!({}))
.send()
.await?;
assert_eq!(empty.status(), StatusCode::BAD_REQUEST);
// Empty model is rejected (validation).
let bad_model = client
.patch(format!("http://{addr}/v1/threads/{thread_id}"))
.json(&json!({ "model": " " }))
.send()
.await?;
assert_eq!(bad_model.status(), StatusCode::BAD_REQUEST);
handle.abort();
Ok(())
}
/// #563 / whalescale#260 — `archived_only=true` returns archived-only
/// (no active threads), distinct from `include_archived=true` which
/// returns both.
#[tokio::test]
async fn list_threads_archived_only_filter_matches_only_archived() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = reqwest::Client::new();
// Two threads — keep one active, archive the other.
let active: serde_json::Value = client
.post(format!("http://{addr}/v1/threads"))
.json(&json!({}))
.send()
.await?
.error_for_status()?
.json()
.await?;
let active_id = active["id"].as_str().unwrap().to_string();
let archived: serde_json::Value = client
.post(format!("http://{addr}/v1/threads"))
.json(&json!({}))
.send()
.await?
.error_for_status()?
.json()
.await?;
let archived_id = archived["id"].as_str().unwrap().to_string();
client
.patch(format!("http://{addr}/v1/threads/{archived_id}"))
.json(&json!({ "archived": true }))
.send()
.await?
.error_for_status()?;
// Default (active only) → only the unarchived one.
let active_list: serde_json::Value = client
.get(format!("http://{addr}/v1/threads"))
.send()
.await?
.error_for_status()?
.json()
.await?;
let ids: Vec<&str> = active_list
.as_array()
.unwrap()
.iter()
.filter_map(|t| t["id"].as_str())
.collect();
assert!(ids.contains(&active_id.as_str()));
assert!(!ids.contains(&archived_id.as_str()));
// archived_only=true → only the archived one.
let archived_list: serde_json::Value = client
.get(format!("http://{addr}/v1/threads?archived_only=true"))
.send()
.await?
.error_for_status()?
.json()
.await?;
let ids: Vec<&str> = archived_list
.as_array()
.unwrap()
.iter()
.filter_map(|t| t["id"].as_str())
.collect();
assert_eq!(ids, vec![archived_id.as_str()]);
// archived_only=true takes precedence over include_archived=true.
let archived_list: serde_json::Value = client
.get(format!(
"http://{addr}/v1/threads?include_archived=true&archived_only=true"
))
.send()
.await?
.error_for_status()?
.json()
.await?;
let ids: Vec<&str> = archived_list
.as_array()
.unwrap()
.iter()
.filter_map(|t| t["id"].as_str())
.collect();
assert_eq!(ids, vec![archived_id.as_str()]);
// Same filter works on the summary endpoint.
let summary: serde_json::Value = client
.get(format!(
"http://{addr}/v1/threads/summary?archived_only=true&limit=10"
))
.send()
.await?
.error_for_status()?
.json()
.await?;
let summary_ids: Vec<&str> = summary
.as_array()
.unwrap()
.iter()
.filter_map(|t| t["id"].as_str())
.collect();
assert_eq!(summary_ids, vec![archived_id.as_str()]);
handle.abort();
Ok(())
}
/// #564 / whalescale#261 — `GET /v1/usage` aggregates per-turn token +
/// cost data. With no threads the response is well-formed and totals are
/// zero with empty buckets (never a 404).
#[tokio::test]
async fn usage_endpoint_returns_empty_aggregation_for_fresh_store() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = reqwest::Client::new();
let body: serde_json::Value = client
.get(format!("http://{addr}/v1/usage"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(body["group_by"], "day");
assert_eq!(body["totals"]["input_tokens"], 0);
assert_eq!(body["totals"]["output_tokens"], 0);
assert_eq!(body["totals"]["turns"], 0);
assert!(
body["buckets"].as_array().unwrap().is_empty(),
"buckets must be empty when no turns exist: {body}"
);
// group_by query options are validated.
let bad_group = client
.get(format!("http://{addr}/v1/usage?group_by=galaxy"))
.send()
.await?;
assert_eq!(bad_group.status(), StatusCode::BAD_REQUEST);
// Each accepted group_by value succeeds.
for gb in ["day", "model", "provider", "thread"] {
let resp = client
.get(format!("http://{addr}/v1/usage?group_by={gb}"))
.send()
.await?;
assert!(resp.status().is_success(), "group_by={gb} failed: {resp:?}");
}
// Bad ISO-8601 timestamp rejected.
let bad_since = client
.get(format!("http://{addr}/v1/usage?since=not-a-date"))
.send()
.await?;
assert_eq!(bad_since.status(), StatusCode::BAD_REQUEST);
// since > until rejected.
let inverted = client
.get(format!(
"http://{addr}/v1/usage?since=2030-01-02T00:00:00Z&until=2030-01-01T00:00:00Z"
))
.send()
.await?;
assert_eq!(inverted.status(), StatusCode::BAD_REQUEST);
handle.abort();
Ok(())
}
}
+256 -10
View File
@@ -106,6 +106,13 @@ pub struct ThreadRecord {
pub system_prompt: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub task_id: Option<String>,
/// User-set title for the thread. When `None`, consumers fall back to a
/// derived title (typically the latest turn's input summary). Added in
/// v0.8.10 (#562); old runtime records simply have no `title` and behave
/// as before. Schema version is not bumped because this field is purely
/// additive metadata — older readers ignore it without misinterpretation.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub title: Option<String>,
#[serde(default)]
pub coherence_state: CoherenceState,
}
@@ -502,6 +509,20 @@ impl RuntimeThreadManagerConfig {
}
}
/// Visibility filter for `list_threads`. Default is `ActiveOnly`. The runtime
/// API exposes this as the combination of `include_archived` and
/// `archived_only` query params (see `runtime_api.rs`); whalescale#260 / #563.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum ThreadListFilter {
/// Only `archived = false` threads. The original default.
#[default]
ActiveOnly,
/// Active and archived threads, sorted as the store returns them.
IncludeArchived,
/// Only `archived = true` threads.
ArchivedOnly,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateThreadRequest {
pub model: Option<String>,
@@ -518,9 +539,21 @@ pub struct CreateThreadRequest {
pub task_id: Option<String>,
}
/// Mutable fields accepted by `PATCH /v1/threads/{id}`.
///
/// Each field is optional — missing means "no change". Extended in v0.8.10
/// (#562, whalescale#256) so the UI can flip persistent thread state without
/// having to recreate a thread or pass per-turn overrides on every send.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct UpdateThreadRequest {
pub archived: Option<bool>,
pub allow_shell: Option<bool>,
pub trust_mode: Option<bool>,
pub auto_approve: Option<bool>,
pub model: Option<String>,
pub mode: Option<String>,
pub title: Option<String>,
pub system_prompt: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -554,6 +587,60 @@ pub struct ThreadDetail {
pub latest_seq: u64,
}
/// Aggregation key for `aggregate_usage`. Whalescale#261 / #564.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum UsageGroupBy {
Day,
Model,
Provider,
Thread,
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct UsageTotals {
pub input_tokens: u64,
pub output_tokens: u64,
pub cached_tokens: u64,
pub reasoning_tokens: u64,
pub cost_usd: f64,
pub turns: u64,
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct UsageBucket {
pub key: String,
pub input_tokens: u64,
pub output_tokens: u64,
pub cached_tokens: u64,
pub reasoning_tokens: u64,
pub cost_usd: f64,
pub turns: u64,
}
#[derive(Debug, Clone, Serialize)]
pub struct UsageAggregation {
pub since: Option<DateTime<Utc>>,
pub until: Option<DateTime<Utc>>,
pub group_by: String,
pub totals: UsageTotals,
pub buckets: Vec<UsageBucket>,
}
/// Best-effort provider classification from a model name. Used as a grouping
/// key for `/v1/usage?group_by=provider`. Cost-tracking already runs the
/// model→pricing→cost path; this only labels the bucket.
fn provider_label_for_model(model: &str) -> &'static str {
if model.starts_with("deepseek-ai/") {
"nvidia-nim"
} else if model.starts_with("deepseek-") {
"deepseek"
} else if model.starts_with("openai/") || model.starts_with("anthropic/") {
"openrouter"
} else {
"unknown"
}
}
#[derive(Debug, Clone)]
struct ActiveTurnState {
turn_id: String,
@@ -719,6 +806,7 @@ impl RuntimeThreadManager {
archived: req.archived,
system_prompt: req.system_prompt,
task_id: req.task_id,
title: None,
coherence_state: CoherenceState::default(),
};
self.store.save_thread(&thread)?;
@@ -735,12 +823,14 @@ impl RuntimeThreadManager {
pub async fn list_threads(
&self,
include_archived: bool,
filter: ThreadListFilter,
limit: Option<usize>,
) -> Result<Vec<ThreadRecord>> {
let mut threads = self.store.list_threads()?;
if !include_archived {
threads.retain(|t| !t.archived);
match filter {
ThreadListFilter::ActiveOnly => threads.retain(|t| !t.archived),
ThreadListFilter::ArchivedOnly => threads.retain(|t| t.archived),
ThreadListFilter::IncludeArchived => {}
}
if let Some(limit) = limit {
threads.truncate(limit);
@@ -748,6 +838,90 @@ impl RuntimeThreadManager {
Ok(threads)
}
/// Aggregate token + cost usage across all threads/turns inside the time
/// range `[since, until]`. Each turn's cost is computed via
/// `pricing::calculate_turn_cost_from_usage` using the *thread*'s model
/// (turns inherit it). Whalescale#261 / #564.
///
/// Buckets are sorted by ascending key for deterministic output. Empty
/// ranges produce empty `buckets` (never an error).
pub async fn aggregate_usage(
&self,
since: Option<DateTime<Utc>>,
until: Option<DateTime<Utc>>,
group_by: UsageGroupBy,
) -> Result<UsageAggregation> {
use std::collections::BTreeMap;
let mut buckets: BTreeMap<String, UsageBucket> = BTreeMap::new();
let mut totals = UsageTotals::default();
for thread in self.store.list_threads()? {
let turns = self.store.list_turns_for_thread(&thread.id)?;
for turn in turns {
if let Some(s) = since
&& turn.created_at < s
{
continue;
}
if let Some(u) = until
&& turn.created_at > u
{
continue;
}
let Some(usage) = turn.usage.as_ref() else {
continue;
};
let cached = usage.prompt_cache_hit_tokens.unwrap_or(0) as u64;
let reasoning = usage.reasoning_tokens.unwrap_or(0) as u64;
let input = usage.input_tokens as u64;
let output = usage.output_tokens as u64;
let cost = crate::pricing::calculate_turn_cost_from_usage(&thread.model, usage)
.unwrap_or(0.0);
totals.input_tokens += input;
totals.output_tokens += output;
totals.cached_tokens += cached;
totals.reasoning_tokens += reasoning;
totals.cost_usd += cost;
totals.turns += 1;
let key = match group_by {
UsageGroupBy::Day => turn.created_at.format("%Y-%m-%d").to_string(),
UsageGroupBy::Model => thread.model.clone(),
UsageGroupBy::Provider => provider_label_for_model(&thread.model).to_string(),
UsageGroupBy::Thread => thread.id.clone(),
};
let bucket = buckets.entry(key.clone()).or_insert_with(|| UsageBucket {
key,
..UsageBucket::default()
});
bucket.input_tokens += input;
bucket.output_tokens += output;
bucket.cached_tokens += cached;
bucket.reasoning_tokens += reasoning;
bucket.cost_usd += cost;
bucket.turns += 1;
}
}
let group_by_str = match group_by {
UsageGroupBy::Day => "day",
UsageGroupBy::Model => "model",
UsageGroupBy::Provider => "provider",
UsageGroupBy::Thread => "thread",
}
.to_string();
Ok(UsageAggregation {
since,
until,
group_by: group_by_str,
totals,
buckets: buckets.into_values().collect(),
})
}
pub async fn get_thread(&self, id: &str) -> Result<ThreadRecord> {
self.store
.load_thread(id)
@@ -755,21 +929,93 @@ impl RuntimeThreadManager {
}
pub async fn update_thread(&self, id: &str, req: UpdateThreadRequest) -> Result<ThreadRecord> {
if req.archived.is_none() {
if req.archived.is_none()
&& req.allow_shell.is_none()
&& req.trust_mode.is_none()
&& req.auto_approve.is_none()
&& req.model.is_none()
&& req.mode.is_none()
&& req.title.is_none()
&& req.system_prompt.is_none()
{
bail!("At least one thread field is required");
}
if let Some(model) = req.model.as_ref()
&& model.trim().is_empty()
{
bail!("model must not be empty");
}
if let Some(mode) = req.mode.as_ref()
&& mode.trim().is_empty()
{
bail!("mode must not be empty");
}
let mut thread = self.get_thread(id).await?;
let mut changed = false;
let mut changes = serde_json::Map::new();
if let Some(archived) = req.archived
&& thread.archived != archived
{
thread.archived = archived;
changed = true;
changes.insert("archived".to_string(), json!(archived));
}
if let Some(allow_shell) = req.allow_shell
&& thread.allow_shell != allow_shell
{
thread.allow_shell = allow_shell;
changes.insert("allow_shell".to_string(), json!(allow_shell));
}
if let Some(trust_mode) = req.trust_mode
&& thread.trust_mode != trust_mode
{
thread.trust_mode = trust_mode;
changes.insert("trust_mode".to_string(), json!(trust_mode));
}
if let Some(auto_approve) = req.auto_approve
&& thread.auto_approve != auto_approve
{
thread.auto_approve = auto_approve;
changes.insert("auto_approve".to_string(), json!(auto_approve));
}
if let Some(model) = req.model
&& thread.model != model
{
thread.model = model.clone();
changes.insert("model".to_string(), json!(model));
}
if let Some(mode) = req.mode
&& thread.mode != mode
{
thread.mode = mode.clone();
changes.insert("mode".to_string(), json!(mode));
}
if let Some(title) = req.title {
// Empty string clears a previously-set title and reverts to derived.
let new_title = if title.trim().is_empty() {
None
} else {
Some(title)
};
if thread.title != new_title {
thread.title = new_title.clone();
changes.insert("title".to_string(), json!(new_title));
}
}
if let Some(system_prompt) = req.system_prompt {
let new_sys = if system_prompt.trim().is_empty() {
None
} else {
Some(system_prompt)
};
if thread.system_prompt != new_sys {
thread.system_prompt = new_sys.clone();
changes.insert("system_prompt".to_string(), json!(new_sys));
}
}
if changed {
if !changes.is_empty() {
thread.updated_at = Utc::now();
self.store.save_thread(&thread)?;
self.emit_event(
@@ -779,9 +1025,7 @@ impl RuntimeThreadManager {
"thread.updated",
json!({
"thread": thread.clone(),
"changes": {
"archived": thread.archived
}
"changes": Value::Object(changes),
}),
)
.await?;
@@ -2696,6 +2940,7 @@ mod tests {
archived: false,
system_prompt: None,
task_id: None,
title: None,
coherence_state: CoherenceState::default(),
}
}
@@ -3991,6 +4236,7 @@ mod tests {
archived: false,
system_prompt: None,
task_id: None,
title: None,
coherence_state: CoherenceState::default(),
};
manager.store.save_thread(&thread)?;
+79 -3
View File
@@ -114,14 +114,35 @@ there is no `[app_server]` config section.
- `POST /v1/sessions/{id}/resume-thread`
**Threads** (durable runtime data model)
- `GET /v1/threads?limit=50&include_archived=false`
- `GET /v1/threads/summary?limit=50&search=<optional>&include_archived=false`
- `GET /v1/threads?limit=50&include_archived=false&archived_only=false`
- `GET /v1/threads/summary?limit=50&search=<optional>&include_archived=false&archived_only=false`
- `POST /v1/threads`
- `GET /v1/threads/{id}`
- `PATCH /v1/threads/{id}` (currently supports `{ "archived": true|false }`)
- `PATCH /v1/threads/{id}` (see body shape below)
- `POST /v1/threads/{id}/resume`
- `POST /v1/threads/{id}/fork`
`archived_only=true` returns archived threads only (mutually overrides
`include_archived`). Default behavior is unchanged: `include_archived=false`
and `archived_only=false` returns active threads. Added in v0.8.10 (#563).
`PATCH /v1/threads/{id}` body — every field is optional, missing means
"no change". At least one field must be present. `title` and `system_prompt`
accept an empty string to clear a previously-set value. Added in v0.8.10 (#562):
```json
{
"archived": true,
"allow_shell": false,
"trust_mode": false,
"auto_approve": false,
"model": "deepseek-v4-pro",
"mode": "agent",
"title": "User-set thread title",
"system_prompt": "You are a useful assistant."
}
```
**Turns** (within a thread)
- `POST /v1/threads/{id}/turns`
- `POST /v1/threads/{id}/turns/{turn_id}/steer`
@@ -157,6 +178,42 @@ there is no `[app_server]` config section.
- `GET /v1/apps/mcp/servers`
- `GET /v1/apps/mcp/tools?server=<optional>`
**Usage** (token/cost aggregation across threads)
- `GET /v1/usage?since=<rfc3339>&until=<rfc3339>&group_by=<day|model|provider|thread>`
`since` / `until` are inclusive RFC 3339 timestamps and may be omitted (no
bound). `group_by` defaults to `day`. Buckets are sorted by ascending key.
Empty time ranges produce empty `buckets` (never a 404). Cost is computed via
the model→pricing map; turns whose model has no pricing entry contribute
tokens but `0.0` cost. Added in v0.8.10 (#564).
```json
{
"since": "2026-04-01T00:00:00Z",
"until": "2026-04-30T23:59:59Z",
"group_by": "day",
"totals": {
"input_tokens": 12345,
"output_tokens": 6789,
"cached_tokens": 0,
"reasoning_tokens": 0,
"cost_usd": 0.012,
"turns": 42
},
"buckets": [
{
"key": "2026-04-30",
"input_tokens": 1234,
"output_tokens": 678,
"cached_tokens": 0,
"reasoning_tokens": 0,
"cost_usd": 0.001,
"turns": 3
}
]
}
```
## Runtime data model
The runtime uses a durable Thread/Turn/Item lifecycle.
@@ -226,6 +283,25 @@ Common event names: `thread.started`, `thread.forked`, `turn.started`,
- **Capability responses** never leak secrets, file contents, or session
message bodies. They report *metadata*: presence, counts, status flags.
### CORS allow-list
The runtime API ships with a built-in dev-origin allow-list:
`http://localhost:3000`, `http://127.0.0.1:3000`, `http://localhost:1420`,
`http://127.0.0.1:1420`, `tauri://localhost`. To add additional origins (e.g.
when developing a UI on Vite's default `:5173`), use any of:
- CLI flag (repeatable): `deepseek serve --http --cors-origin http://localhost:5173`
- Env var (comma-separated): `DEEPSEEK_CORS_ORIGINS="http://localhost:5173,http://localhost:8080"`
- Config (`~/.deepseek/config.toml`):
```toml
[runtime_api]
cors_origins = ["http://localhost:5173"]
```
User-supplied origins **stack on top of** the built-in defaults; they do not
replace them. Wildcard origins are not supported — the explicit allow-list
model is preserved. Added in v0.8.10 (#561).
## Session lifecycle (native UI supervision)
| Operation | Endpoint |