feat(protocol): add runtime event envelope

This commit is contained in:
cyq
2026-05-27 10:13:19 +08:00
parent 54151a4bc9
commit d102cbd0f9
6 changed files with 203 additions and 14 deletions
Generated
+1
View File
@@ -983,6 +983,7 @@ dependencies = [
"clap",
"clap_complete",
"codewhale-config",
"codewhale-protocol",
"codewhale-secrets",
"codewhale-tools",
"colored",
+30
View File
@@ -1,8 +1,38 @@
use std::collections::BTreeMap;
use std::path::PathBuf;
use serde::{Deserialize, Serialize};
use serde_json::Value;
pub mod runtime {
use super::*;
pub const RUNTIME_EVENT_ENVELOPE_SCHEMA_VERSION: u32 = 1;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuntimeEventEnvelope {
#[serde(default = "default_runtime_event_envelope_schema_version")]
pub schema_version: u32,
pub seq: u64,
pub event: String,
pub kind: String,
pub thread_id: String,
pub turn_id: Option<String>,
pub item_id: Option<String>,
pub timestamp: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub created_at: Option<String>,
pub payload: Value,
#[serde(default)]
#[serde(flatten)]
pub extra: BTreeMap<String, Value>,
}
fn default_runtime_event_envelope_schema_version() -> u32 {
RUNTIME_EVENT_ENVELOPE_SCHEMA_VERSION
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Envelope<T> {
pub request_id: String,
+100 -1
View File
@@ -1,4 +1,8 @@
use codewhale_protocol::{EventFrame, ThreadListParams, ThreadRequest, ThreadResumeParams};
use codewhale_protocol::{
EventFrame, ThreadListParams, ThreadRequest, ThreadResumeParams,
runtime::{RUNTIME_EVENT_ENVELOPE_SCHEMA_VERSION, RuntimeEventEnvelope},
};
use serde_json::{Value, json};
#[test]
fn thread_resume_params_round_trip() {
@@ -48,3 +52,98 @@ fn event_frame_serialization_contains_expected_tag() {
let encoded = serde_json::to_string(&frame).expect("serialize frame");
assert!(encoded.contains("turn_complete"));
}
#[test]
fn runtime_event_envelope_roundtrip() {
let input = json!({
"schema_version": 1,
"seq": 12,
"event": "item.delta",
"kind": "item.delta",
"thread_id": "thr_123",
"turn_id": "turn_456",
"item_id": "item_789",
"timestamp": "2026-02-11T20:18:49.123Z",
"payload": { "delta": "ok", "kind": "agent_message" },
});
let envelope: RuntimeEventEnvelope =
serde_json::from_value(input).expect("deserialize runtime event envelope");
assert_eq!(envelope.schema_version, 1);
assert_eq!(envelope.seq, 12);
assert_eq!(envelope.event, "item.delta");
assert_eq!(envelope.kind, "item.delta");
assert_eq!(envelope.thread_id, "thr_123");
let encoded = serde_json::to_value(&envelope).expect("serialize runtime event envelope");
assert_eq!(encoded["event"], encoded["kind"]);
assert_eq!(encoded["seq"], 12);
}
#[test]
fn runtime_event_envelope_defaults_to_api_schema_version() {
let input = json!({
"seq": 15,
"event": "thread.started",
"kind": "thread.started",
"thread_id": "thr_default_version",
"timestamp": "2026-02-11T20:18:49.123Z",
"payload": {},
});
let envelope: RuntimeEventEnvelope = serde_json::from_value(input)
.expect("deserialize runtime event envelope without schema version");
assert_eq!(
envelope.schema_version,
RUNTIME_EVENT_ENVELOPE_SCHEMA_VERSION
);
}
#[test]
fn runtime_event_envelope_thread_level_keeps_turn_and_item_ids() {
let input = json!({
"schema_version": 1,
"seq": 14,
"event": "thread.started",
"kind": "thread.started",
"thread_id": "thr_thread",
"timestamp": "2026-02-11T20:18:49.123Z",
"payload": { "thread": { "id": "thr_thread" } },
});
let envelope: RuntimeEventEnvelope = serde_json::from_value(input)
.expect("deserialize runtime event envelope without thread-level turn/item ids");
assert!(envelope.turn_id.is_none());
assert!(envelope.item_id.is_none());
let encoded = serde_json::to_value(envelope).expect("serialize runtime event envelope");
assert!(encoded.get("turn_id").is_some());
assert!(encoded.get("item_id").is_some());
assert!(encoded["turn_id"].is_null());
assert!(encoded["item_id"].is_null());
}
#[test]
fn runtime_event_envelope_preserves_unknown_fields() {
let input: Value = json!({
"schema_version": 1,
"seq": 13,
"event": "turn.completed",
"kind": "turn.completed",
"thread_id": "thr_unknown",
"timestamp": "2026-02-11T20:18:49.123Z",
"payload": {},
"forward_compatibility_hint": "v2-ready",
});
let envelope: RuntimeEventEnvelope = serde_json::from_value(input.clone())
.expect("deserialize runtime event envelope with unknown field");
assert!(envelope.extra.contains_key("forward_compatibility_hint"));
let encoded = serde_json::to_value(envelope).expect("serialize runtime event envelope");
assert_eq!(encoded["forward_compatibility_hint"], "v2-ready");
assert_eq!(encoded["schema_version"], 1);
assert_eq!(encoded["seq"], 13);
assert_eq!(encoded["event"], "turn.completed");
assert_eq!(encoded["kind"], "turn.completed");
assert_eq!(encoded["thread_id"], "thr_unknown");
assert!(encoded["turn_id"].is_null());
assert!(encoded["item_id"].is_null());
}
+1
View File
@@ -28,6 +28,7 @@ path = "src/bin/deepseek_tui_legacy_shim.rs"
anyhow = "1.0.100"
arboard = "3.4"
codewhale-config = { path = "../config", version = "0.8.46" }
codewhale-protocol = { path = "../protocol", version = "0.8.46" }
codewhale-secrets = { path = "../secrets", version = "0.8.46" }
codewhale-tools = { path = "../tools", version = "0.8.46" }
schemaui = { version = "0.12.0", default-features = false, optional = true }
+54 -10
View File
@@ -19,6 +19,7 @@ use axum::response::{IntoResponse, Response};
use axum::routing::{get, post};
use axum::{Json, Router};
use chrono::Utc;
use codewhale_protocol::runtime::{RUNTIME_EVENT_ENVELOPE_SCHEMA_VERSION, RuntimeEventEnvelope};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use tokio::net::TcpListener;
@@ -1469,15 +1470,23 @@ async fn stream_turn(
}
fn runtime_event_payload(event: crate::runtime_threads::RuntimeEventRecord) -> serde_json::Value {
json!({
"seq": event.seq,
"timestamp": event.timestamp,
"thread_id": event.thread_id,
"turn_id": event.turn_id,
"item_id": event.item_id,
"event": event.event,
"payload": event.payload,
})
let event_name = event.event.clone();
let timestamp = event.timestamp.to_rfc3339();
let schema_version = RUNTIME_EVENT_ENVELOPE_SCHEMA_VERSION;
let envelope = RuntimeEventEnvelope {
schema_version,
seq: event.seq,
event: event_name.clone(),
kind: event_name,
thread_id: event.thread_id,
turn_id: event.turn_id,
item_id: event.item_id,
timestamp: timestamp.clone(),
created_at: Some(timestamp),
payload: event.payload,
extra: Default::default(),
};
serde_json::to_value(envelope).expect("serialize runtime event envelope")
}
fn map_compat_stream_event(event: &crate::runtime_threads::RuntimeEventRecord) -> Option<SseEvent> {
@@ -2622,6 +2631,30 @@ mod tests {
chunk_text.contains("event:"),
"expected SSE event chunk, got: {chunk_text}"
);
let (event_name, payload) = parse_sse_frame(&chunk_text)?;
assert_eq!(event_name, "thread.started");
assert!(
event_name.starts_with("item.")
|| event_name.starts_with("turn.")
|| event_name.starts_with("thread.")
|| event_name == "turn.completed"
|| event_name == "turn.started"
|| event_name == "thread.started",
"unexpected first event name: {event_name}"
);
assert_eq!(payload["event"], payload["kind"]);
assert!(payload.get("turn_id").is_some());
assert!(payload.get("item_id").is_some());
assert!(payload["turn_id"].is_null());
assert!(payload["item_id"].is_null());
assert_eq!(payload["thread_id"], thread_id);
assert!(
payload["schema_version"]
.as_u64()
.is_some_and(|version| version >= 1)
);
assert!(payload.get("seq").and_then(Value::as_u64).is_some());
assert!(payload["payload"].is_object() || payload["payload"].is_array());
handle.abort();
Ok(())
@@ -2712,7 +2745,15 @@ mod tests {
.await?
.error_for_status()?;
let frame_a = read_first_sse_frame(resp_a).await?;
let (_event_a, payload_a) = parse_sse_frame(&frame_a)?;
let (event_a, payload_a) = parse_sse_frame(&frame_a)?;
assert_eq!(event_a, "thread.started");
assert!(payload_a.get("turn_id").is_some());
assert!(payload_a.get("item_id").is_some());
assert!(payload_a["turn_id"].is_null());
assert!(payload_a["item_id"].is_null());
assert!(payload_a.get("schema_version").is_some());
assert_eq!(payload_a["event"], payload_a["kind"]);
assert_eq!(payload_a["thread_id"], thread_id);
let seq_a = payload_a
.get("seq")
.and_then(Value::as_u64)
@@ -2727,6 +2768,9 @@ mod tests {
.error_for_status()?;
let frame_b = read_first_sse_frame(resp_b).await?;
let (_event_b, payload_b) = parse_sse_frame(&frame_b)?;
assert!(payload_b.get("schema_version").is_some());
assert_eq!(payload_b["event"], payload_b["kind"]);
assert_eq!(payload_b["thread_id"], thread_id);
let seq_b = payload_b
.get("seq")
.and_then(Value::as_u64)
+17 -3
View File
@@ -286,16 +286,19 @@ Events are append-only with a global monotonic `seq` for replay/resume.
### SSE event stream
The SSE event payload shape:
The SSE event payload shape for `/v1/threads/{id}/events`:
```json
{
"schema_version": 1,
"seq": 42,
"timestamp": "2026-02-11T20:18:49.123Z",
"event": "item.delta",
"kind": "item.delta",
"thread_id": "thr_1234abcd",
"turn_id": "turn_5678efgh",
"item_id": "item_90ab12cd",
"event": "item.delta",
"timestamp": "2026-02-11T20:18:49.123Z",
"created_at": "2026-02-11T20:18:49.123Z",
"payload": {
"delta": "partial output",
"kind": "agent_message"
@@ -303,6 +306,17 @@ The SSE event payload shape:
}
```
Compatibility notes:
- `schema_version` is the HTTP/SSE envelope schema version. It is independent of
the runtime store schema used for persisted thread/turn/event records.
- `event` remains the SSE event name in existing clients; it is preserved as-is.
- `kind` mirrors `event` in the stable envelope for typed clients.
- `thread.started`, `turn.started`, and `turn.completed` are emitted as SSE event
names exactly as before.
- `created_at` is currently a duplicate of `timestamp` (`timestamp` is the existing
canonical field).
Common event names: `thread.started`, `thread.forked`, `turn.started`,
`turn.lifecycle`, `turn.steered`, `turn.interrupt_requested`,
`turn.completed`, `item.started`, `item.delta`, `item.completed`,