Merge pull request #2430 from Hmbown/codex/harvest-2333-unix-socket

feat(hooks): add opt-in Unix socket event sink
This commit is contained in:
Hunter Bown
2026-05-31 04:27:11 -07:00
committed by GitHub
3 changed files with 251 additions and 1 deletions
+10 -1
View File
@@ -13,7 +13,7 @@ use codewhale_agent::ModelRegistry;
use codewhale_config::{CliRuntimeOverrides, ConfigStore};
use codewhale_core::Runtime;
use codewhale_execpolicy::ExecPolicyEngine;
use codewhale_hooks::{HookDispatcher, JsonlHookSink, StdoutHookSink};
use codewhale_hooks::{HookDispatcher, JsonlHookSink, StdoutHookSink, UnixSocketHookSink};
use codewhale_mcp::McpManager;
use codewhale_protocol::{
AppRequest, AppResponse, PromptRequest, PromptResponse, ThreadRequest, ThreadResponse,
@@ -314,6 +314,15 @@ fn build_state(config_path: Option<PathBuf>, auth_token: Option<String>) -> Resu
.unwrap_or_else(|| PathBuf::from(".deepseek/events.jsonl"));
hooks.add_sink(Arc::new(JsonlHookSink::new(hook_log_path)));
if let Some(socket_path) = config
.hook_sinks
.as_ref()
.and_then(|sinks| sinks.unix_socket_path.as_ref())
.filter(|path| !path.as_os_str().is_empty())
{
hooks.add_sink(Arc::new(UnixSocketHookSink::new(socket_path.clone())));
}
let runtime = Runtime::new(
config.clone(),
registry.clone(),
+122
View File
@@ -272,10 +272,25 @@ pub struct ConfigToml {
/// applies the defaults documented in [`LspConfigToml`].
#[serde(default)]
pub lsp: Option<LspConfigToml>,
/// App-server hook sink configuration. Kept separate from the TUI
/// lifecycle `[hooks]` table so config rewrites preserve existing hooks.
#[serde(default)]
pub hook_sinks: Option<HookSinksToml>,
#[serde(flatten)]
pub extras: BTreeMap<String, toml::Value>,
}
/// On-disk schema for the `[hook_sinks]` table.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct HookSinksToml {
/// Unix domain socket path used by the app-server event sink.
///
/// When unset, no Unix socket sink is registered. There is deliberately no
/// shared `/tmp` default because socket ownership should be explicit.
#[serde(default)]
pub unix_socket_path: Option<PathBuf>,
}
/// On-disk schema for the `[skills]` table (#140). See `config.example.toml`
/// for documentation.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
@@ -470,6 +485,11 @@ impl ConfigToml {
"approval_policy" => self.approval_policy.clone(),
"sandbox_mode" => self.sandbox_mode.clone(),
"tools.always_load" => self.tools.as_ref().map(|tools| tools.always_load.join(",")),
"hook_sinks.unix_socket_path" => self
.hook_sinks
.as_ref()
.and_then(|sinks| sinks.unix_socket_path.as_ref())
.map(|path| path.display().to_string()),
"providers.deepseek.api_key" => self.providers.deepseek.api_key.clone(),
"providers.deepseek.base_url" => self.providers.deepseek.base_url.clone(),
"providers.deepseek.model" => self.providers.deepseek.model.clone(),
@@ -592,6 +612,11 @@ impl ConfigToml {
}
"approval_policy" => self.approval_policy = Some(value.to_string()),
"sandbox_mode" => self.sandbox_mode = Some(value.to_string()),
"hook_sinks.unix_socket_path" => {
self.hook_sinks
.get_or_insert_with(HookSinksToml::default)
.unix_socket_path = Some(PathBuf::from(value));
}
"providers.deepseek.api_key" => {
let value = value.to_string();
self.providers.deepseek.api_key = Some(value.clone());
@@ -796,6 +821,11 @@ impl ConfigToml {
"telemetry" => self.telemetry = None,
"approval_policy" => self.approval_policy = None,
"sandbox_mode" => self.sandbox_mode = None,
"hook_sinks.unix_socket_path" => {
if let Some(sinks) = self.hook_sinks.as_mut() {
sinks.unix_socket_path = None;
}
}
"providers.deepseek.api_key" => {
self.providers.deepseek.api_key = None;
self.api_key = None;
@@ -919,6 +949,16 @@ impl ConfigToml {
if let Some(v) = self.sandbox_mode.as_ref() {
out.insert("sandbox_mode".to_string(), v.clone());
}
if let Some(v) = self
.hook_sinks
.as_ref()
.and_then(|sinks| sinks.unix_socket_path.as_ref())
{
out.insert(
"hook_sinks.unix_socket_path".to_string(),
v.display().to_string(),
);
}
if let Some(v) = self.providers.deepseek.api_key.as_ref() {
out.insert("providers.deepseek.api_key".to_string(), redact_secret(v));
}
@@ -2730,6 +2770,88 @@ mod tests {
);
}
#[test]
fn hook_sinks_config_uses_separate_table_from_lifecycle_hooks() -> Result<()> {
let raw = r#"
[hooks]
enabled = true
default_timeout_secs = 20
[[hooks.hooks]]
event = "message_submit"
command = "echo ok"
[hook_sinks]
unix_socket_path = "/tmp/cw-hooks.sock"
"#;
let config: ConfigToml = toml::from_str(raw)?;
assert_eq!(
config.get_value("hook_sinks.unix_socket_path").as_deref(),
Some("/tmp/cw-hooks.sock")
);
assert!(
config.extras.contains_key("hooks"),
"legacy lifecycle hooks table must remain an opaque extra"
);
let serialized = toml::to_string_pretty(&config)?;
let round_tripped: ConfigToml = toml::from_str(&serialized)?;
let hooks = round_tripped
.extras
.get("hooks")
.and_then(toml::Value::as_table)
.expect("hooks table preserved");
assert_eq!(
hooks.get("enabled").and_then(toml::Value::as_bool),
Some(true)
);
assert_eq!(
hooks
.get("default_timeout_secs")
.and_then(toml::Value::as_integer),
Some(20)
);
assert!(
hooks.get("hooks").and_then(toml::Value::as_array).is_some(),
"nested lifecycle hooks array must survive config rewrites"
);
assert_eq!(
round_tripped
.get_value("hook_sinks.unix_socket_path")
.as_deref(),
Some("/tmp/cw-hooks.sock")
);
Ok(())
}
#[test]
fn hook_sinks_unix_socket_path_round_trips_through_key_value_api() -> Result<()> {
let mut config = ConfigToml::default();
config.set_value("hook_sinks.unix_socket_path", "/tmp/cw-events.sock")?;
assert_eq!(
config.get_value("hook_sinks.unix_socket_path").as_deref(),
Some("/tmp/cw-events.sock")
);
assert_eq!(
config
.list_values()
.get("hook_sinks.unix_socket_path")
.map(String::as_str),
Some("/tmp/cw-events.sock")
);
config.unset_value("hook_sinks.unix_socket_path")?;
assert_eq!(config.get_value("hook_sinks.unix_socket_path"), None);
Ok(())
}
/// End-to-end smoke for the preferred Kimi Code setup path:
/// 1. Start from a fresh root config that uses DeepSeek defaults.
/// 2. Mutate it through the same key-value setters the
+119
View File
@@ -152,6 +152,63 @@ impl HookSink for WebhookHookSink {
}
}
/// A [`HookSink`] that sends events over a Unix domain socket.
///
/// Each event is serialized as a single JSON line (`{"at": "...", "event": {...}}\n`)
/// and written to the socket. If the socket is not available (listener not running),
/// the event is silently dropped - hook sinks are best-effort observability, not
/// control flow.
///
/// On non-Unix platforms this struct exists but its [`HookSink::emit`] is a no-op.
#[derive(Debug, Clone)]
pub struct UnixSocketHookSink {
#[cfg(unix)]
path: PathBuf,
}
impl UnixSocketHookSink {
/// Create a sink that connects to the Unix domain socket at `path`.
pub fn new(path: PathBuf) -> Self {
#[cfg(unix)]
{
Self { path }
}
#[cfg(not(unix))]
{
let _ = path;
Self {}
}
}
}
#[async_trait]
impl HookSink for UnixSocketHookSink {
#[cfg(unix)]
async fn emit(&self, event: &HookEvent) -> Result<()> {
let mut stream = match tokio::net::UnixStream::connect(&self.path).await {
Ok(s) => s,
Err(_) => return Ok(()), // listener not running, skip silently
};
let payload = json!({
"at": Utc::now().to_rfc3339(),
"event": event
});
let mut line = serde_json::to_string(&payload).context("failed to encode hook event")?;
line.push('\n');
stream
.write_all(line.as_bytes())
.await
.context("failed to write to unix socket")?;
Ok(())
}
#[cfg(not(unix))]
async fn emit(&self, _event: &HookEvent) -> Result<()> {
// Unix sockets are not available on this platform.
Ok(())
}
}
#[derive(Default, Clone)]
pub struct HookDispatcher {
sinks: Vec<Arc<dyn HookSink>>,
@@ -255,6 +312,57 @@ mod tests {
assert_eq!(second.events(), first.events());
}
#[cfg(unix)]
#[tokio::test]
async fn unix_socket_sink_skips_when_listener_absent() {
let (root, socket_path) = unique_short_socket_path("missing");
let sink = UnixSocketHookSink::new(socket_path);
let result = sink
.emit(&HookEvent::ResponseStart {
response_id: "resp-1".to_string(),
})
.await;
assert!(result.is_ok());
let _ = std::fs::remove_dir_all(root);
}
#[cfg(unix)]
#[tokio::test]
async fn unix_socket_sink_sends_event_to_listener() {
use tokio::io::AsyncBufReadExt;
use tokio::net::UnixListener;
let (root, socket_path) = unique_short_socket_path("send");
std::fs::create_dir_all(&root).expect("mkdir");
let _ = std::fs::remove_file(&socket_path);
let listener = UnixListener::bind(&socket_path).expect("bind");
let sink = UnixSocketHookSink::new(socket_path.clone());
let handle = tokio::spawn(async move {
let (stream, _) = listener.accept().await.expect("accept");
let mut reader = tokio::io::BufReader::new(stream);
let mut line = String::new();
reader.read_line(&mut line).await.expect("read_line");
line
});
sink.emit(&HookEvent::ResponseStart {
response_id: "resp-42".to_string(),
})
.await
.expect("emit");
let received = handle.await.expect("join");
let parsed: Value = serde_json::from_str(&received).expect("parse");
assert_eq!(parsed["event"]["type"], "response_start");
assert_eq!(parsed["event"]["response_id"], "resp-42");
assert!(parsed["at"].as_str().is_some());
let _ = std::fs::remove_file(&socket_path);
let _ = std::fs::remove_dir_all(root);
}
#[derive(Default)]
struct RecordingSink {
events: Mutex<Vec<Value>>,
@@ -293,4 +401,15 @@ mod tests {
std::process::id()
))
}
#[cfg(unix)]
fn unique_short_socket_path(label: &str) -> (PathBuf, PathBuf) {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
let root = PathBuf::from("/tmp").join(format!("cw-hk-{}-{nanos}", std::process::id()));
let path = root.join(format!("{label}.sock"));
(root, path)
}
}