From d102cbd0f9ed35b1ac7ad56031c155607c2cbab5 Mon Sep 17 00:00:00 2001 From: cyq <15000851237@163.com> Date: Wed, 27 May 2026 10:13:19 +0800 Subject: [PATCH] feat(protocol): add runtime event envelope --- Cargo.lock | 1 + crates/protocol/src/lib.rs | 30 +++++++ crates/protocol/tests/parity_protocol.rs | 101 ++++++++++++++++++++++- crates/tui/Cargo.toml | 1 + crates/tui/src/runtime_api.rs | 64 +++++++++++--- docs/RUNTIME_API.md | 20 ++++- 6 files changed, 203 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 83777fd1..bc855429 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -983,6 +983,7 @@ dependencies = [ "clap", "clap_complete", "codewhale-config", + "codewhale-protocol", "codewhale-secrets", "codewhale-tools", "colored", diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index 643710ea..84c73202 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -1,8 +1,38 @@ +use std::collections::BTreeMap; use std::path::PathBuf; use serde::{Deserialize, Serialize}; use serde_json::Value; +pub mod runtime { + use super::*; + + pub const RUNTIME_EVENT_ENVELOPE_SCHEMA_VERSION: u32 = 1; + + #[derive(Debug, Clone, Serialize, Deserialize)] + pub struct RuntimeEventEnvelope { + #[serde(default = "default_runtime_event_envelope_schema_version")] + pub schema_version: u32, + pub seq: u64, + pub event: String, + pub kind: String, + pub thread_id: String, + pub turn_id: Option, + pub item_id: Option, + pub timestamp: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub created_at: Option, + pub payload: Value, + #[serde(default)] + #[serde(flatten)] + pub extra: BTreeMap, + } + + fn default_runtime_event_envelope_schema_version() -> u32 { + RUNTIME_EVENT_ENVELOPE_SCHEMA_VERSION + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Envelope { pub request_id: String, diff --git a/crates/protocol/tests/parity_protocol.rs b/crates/protocol/tests/parity_protocol.rs index 89f3002e..0b6f082d 100644 --- a/crates/protocol/tests/parity_protocol.rs +++ b/crates/protocol/tests/parity_protocol.rs @@ -1,4 +1,8 @@ -use codewhale_protocol::{EventFrame, ThreadListParams, ThreadRequest, ThreadResumeParams}; +use codewhale_protocol::{ + EventFrame, ThreadListParams, ThreadRequest, ThreadResumeParams, + runtime::{RUNTIME_EVENT_ENVELOPE_SCHEMA_VERSION, RuntimeEventEnvelope}, +}; +use serde_json::{Value, json}; #[test] fn thread_resume_params_round_trip() { @@ -48,3 +52,98 @@ fn event_frame_serialization_contains_expected_tag() { let encoded = serde_json::to_string(&frame).expect("serialize frame"); assert!(encoded.contains("turn_complete")); } + +#[test] +fn runtime_event_envelope_roundtrip() { + let input = json!({ + "schema_version": 1, + "seq": 12, + "event": "item.delta", + "kind": "item.delta", + "thread_id": "thr_123", + "turn_id": "turn_456", + "item_id": "item_789", + "timestamp": "2026-02-11T20:18:49.123Z", + "payload": { "delta": "ok", "kind": "agent_message" }, + }); + let envelope: RuntimeEventEnvelope = + serde_json::from_value(input).expect("deserialize runtime event envelope"); + assert_eq!(envelope.schema_version, 1); + assert_eq!(envelope.seq, 12); + assert_eq!(envelope.event, "item.delta"); + assert_eq!(envelope.kind, "item.delta"); + assert_eq!(envelope.thread_id, "thr_123"); + + let encoded = serde_json::to_value(&envelope).expect("serialize runtime event envelope"); + assert_eq!(encoded["event"], encoded["kind"]); + assert_eq!(encoded["seq"], 12); +} + +#[test] +fn runtime_event_envelope_defaults_to_api_schema_version() { + let input = json!({ + "seq": 15, + "event": "thread.started", + "kind": "thread.started", + "thread_id": "thr_default_version", + "timestamp": "2026-02-11T20:18:49.123Z", + "payload": {}, + }); + let envelope: RuntimeEventEnvelope = serde_json::from_value(input) + .expect("deserialize runtime event envelope without schema version"); + + assert_eq!( + envelope.schema_version, + RUNTIME_EVENT_ENVELOPE_SCHEMA_VERSION + ); +} + +#[test] +fn runtime_event_envelope_thread_level_keeps_turn_and_item_ids() { + let input = json!({ + "schema_version": 1, + "seq": 14, + "event": "thread.started", + "kind": "thread.started", + "thread_id": "thr_thread", + "timestamp": "2026-02-11T20:18:49.123Z", + "payload": { "thread": { "id": "thr_thread" } }, + }); + let envelope: RuntimeEventEnvelope = serde_json::from_value(input) + .expect("deserialize runtime event envelope without thread-level turn/item ids"); + assert!(envelope.turn_id.is_none()); + assert!(envelope.item_id.is_none()); + + let encoded = serde_json::to_value(envelope).expect("serialize runtime event envelope"); + assert!(encoded.get("turn_id").is_some()); + assert!(encoded.get("item_id").is_some()); + assert!(encoded["turn_id"].is_null()); + assert!(encoded["item_id"].is_null()); +} + +#[test] +fn runtime_event_envelope_preserves_unknown_fields() { + let input: Value = json!({ + "schema_version": 1, + "seq": 13, + "event": "turn.completed", + "kind": "turn.completed", + "thread_id": "thr_unknown", + "timestamp": "2026-02-11T20:18:49.123Z", + "payload": {}, + "forward_compatibility_hint": "v2-ready", + }); + let envelope: RuntimeEventEnvelope = serde_json::from_value(input.clone()) + .expect("deserialize runtime event envelope with unknown field"); + assert!(envelope.extra.contains_key("forward_compatibility_hint")); + + let encoded = serde_json::to_value(envelope).expect("serialize runtime event envelope"); + assert_eq!(encoded["forward_compatibility_hint"], "v2-ready"); + assert_eq!(encoded["schema_version"], 1); + assert_eq!(encoded["seq"], 13); + assert_eq!(encoded["event"], "turn.completed"); + assert_eq!(encoded["kind"], "turn.completed"); + assert_eq!(encoded["thread_id"], "thr_unknown"); + assert!(encoded["turn_id"].is_null()); + assert!(encoded["item_id"].is_null()); +} diff --git a/crates/tui/Cargo.toml b/crates/tui/Cargo.toml index 6aa5486c..bfe2b699 100644 --- a/crates/tui/Cargo.toml +++ b/crates/tui/Cargo.toml @@ -28,6 +28,7 @@ path = "src/bin/deepseek_tui_legacy_shim.rs" anyhow = "1.0.100" arboard = "3.4" codewhale-config = { path = "../config", version = "0.8.46" } +codewhale-protocol = { path = "../protocol", version = "0.8.46" } codewhale-secrets = { path = "../secrets", version = "0.8.46" } codewhale-tools = { path = "../tools", version = "0.8.46" } schemaui = { version = "0.12.0", default-features = false, optional = true } diff --git a/crates/tui/src/runtime_api.rs b/crates/tui/src/runtime_api.rs index 20110cc4..4eb3aff5 100644 --- a/crates/tui/src/runtime_api.rs +++ b/crates/tui/src/runtime_api.rs @@ -19,6 +19,7 @@ use axum::response::{IntoResponse, Response}; use axum::routing::{get, post}; use axum::{Json, Router}; use chrono::Utc; +use codewhale_protocol::runtime::{RUNTIME_EVENT_ENVELOPE_SCHEMA_VERSION, RuntimeEventEnvelope}; use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; use tokio::net::TcpListener; @@ -1469,15 +1470,23 @@ async fn stream_turn( } fn runtime_event_payload(event: crate::runtime_threads::RuntimeEventRecord) -> serde_json::Value { - json!({ - "seq": event.seq, - "timestamp": event.timestamp, - "thread_id": event.thread_id, - "turn_id": event.turn_id, - "item_id": event.item_id, - "event": event.event, - "payload": event.payload, - }) + let event_name = event.event.clone(); + let timestamp = event.timestamp.to_rfc3339(); + let schema_version = RUNTIME_EVENT_ENVELOPE_SCHEMA_VERSION; + let envelope = RuntimeEventEnvelope { + schema_version, + seq: event.seq, + event: event_name.clone(), + kind: event_name, + thread_id: event.thread_id, + turn_id: event.turn_id, + item_id: event.item_id, + timestamp: timestamp.clone(), + created_at: Some(timestamp), + payload: event.payload, + extra: Default::default(), + }; + serde_json::to_value(envelope).expect("serialize runtime event envelope") } fn map_compat_stream_event(event: &crate::runtime_threads::RuntimeEventRecord) -> Option { @@ -2622,6 +2631,30 @@ mod tests { chunk_text.contains("event:"), "expected SSE event chunk, got: {chunk_text}" ); + let (event_name, payload) = parse_sse_frame(&chunk_text)?; + assert_eq!(event_name, "thread.started"); + assert!( + event_name.starts_with("item.") + || event_name.starts_with("turn.") + || event_name.starts_with("thread.") + || event_name == "turn.completed" + || event_name == "turn.started" + || event_name == "thread.started", + "unexpected first event name: {event_name}" + ); + assert_eq!(payload["event"], payload["kind"]); + assert!(payload.get("turn_id").is_some()); + assert!(payload.get("item_id").is_some()); + assert!(payload["turn_id"].is_null()); + assert!(payload["item_id"].is_null()); + assert_eq!(payload["thread_id"], thread_id); + assert!( + payload["schema_version"] + .as_u64() + .is_some_and(|version| version >= 1) + ); + assert!(payload.get("seq").and_then(Value::as_u64).is_some()); + assert!(payload["payload"].is_object() || payload["payload"].is_array()); handle.abort(); Ok(()) @@ -2712,7 +2745,15 @@ mod tests { .await? .error_for_status()?; let frame_a = read_first_sse_frame(resp_a).await?; - let (_event_a, payload_a) = parse_sse_frame(&frame_a)?; + let (event_a, payload_a) = parse_sse_frame(&frame_a)?; + assert_eq!(event_a, "thread.started"); + assert!(payload_a.get("turn_id").is_some()); + assert!(payload_a.get("item_id").is_some()); + assert!(payload_a["turn_id"].is_null()); + assert!(payload_a["item_id"].is_null()); + assert!(payload_a.get("schema_version").is_some()); + assert_eq!(payload_a["event"], payload_a["kind"]); + assert_eq!(payload_a["thread_id"], thread_id); let seq_a = payload_a .get("seq") .and_then(Value::as_u64) @@ -2727,6 +2768,9 @@ mod tests { .error_for_status()?; let frame_b = read_first_sse_frame(resp_b).await?; let (_event_b, payload_b) = parse_sse_frame(&frame_b)?; + assert!(payload_b.get("schema_version").is_some()); + assert_eq!(payload_b["event"], payload_b["kind"]); + assert_eq!(payload_b["thread_id"], thread_id); let seq_b = payload_b .get("seq") .and_then(Value::as_u64) diff --git a/docs/RUNTIME_API.md b/docs/RUNTIME_API.md index 504154f0..8e5d5b2e 100644 --- a/docs/RUNTIME_API.md +++ b/docs/RUNTIME_API.md @@ -286,16 +286,19 @@ Events are append-only with a global monotonic `seq` for replay/resume. ### SSE event stream -The SSE event payload shape: +The SSE event payload shape for `/v1/threads/{id}/events`: ```json { + "schema_version": 1, "seq": 42, - "timestamp": "2026-02-11T20:18:49.123Z", + "event": "item.delta", + "kind": "item.delta", "thread_id": "thr_1234abcd", "turn_id": "turn_5678efgh", "item_id": "item_90ab12cd", - "event": "item.delta", + "timestamp": "2026-02-11T20:18:49.123Z", + "created_at": "2026-02-11T20:18:49.123Z", "payload": { "delta": "partial output", "kind": "agent_message" @@ -303,6 +306,17 @@ The SSE event payload shape: } ``` +Compatibility notes: + +- `schema_version` is the HTTP/SSE envelope schema version. It is independent of + the runtime store schema used for persisted thread/turn/event records. +- `event` remains the SSE event name in existing clients; it is preserved as-is. +- `kind` mirrors `event` in the stable envelope for typed clients. +- `thread.started`, `turn.started`, and `turn.completed` are emitted as SSE event + names exactly as before. +- `created_at` is currently a duplicate of `timestamp` (`timestamp` is the existing + canonical field). + Common event names: `thread.started`, `thread.forked`, `turn.started`, `turn.lifecycle`, `turn.steered`, `turn.interrupt_requested`, `turn.completed`, `item.started`, `item.delta`, `item.completed`,