Improve MCP SSE error diagnostics

This commit is contained in:
zhuang biaowei
2026-05-28 16:39:01 +08:00
committed by Hunter Bown
parent 29417b3b37
commit 58c57cb798
+136 -2
View File
@@ -1145,8 +1145,15 @@ impl McpTransport for SseTransport {
.body(msg)
.send()
.await?;
if !response.status().is_success() {
anyhow::bail!("Failed to send message via SSE POST: {}", response.status());
let status = response.status();
if !status.is_success() {
let body_excerpt = bounded_body_excerpt(response, ERROR_BODY_PREVIEW_BYTES).await;
anyhow::bail!(
"MCP SSE POST rejected (transport=sse endpoint={} status={}): {}",
mask_url_secrets(endpoint),
status,
body_excerpt
);
}
Ok(())
}
@@ -3450,6 +3457,49 @@ mod tests {
);
}
#[tokio::test]
async fn mcp_pool_call_tool_preserves_tool_names_with_dashes() {
let sent = Arc::new(Mutex::new(Vec::new()));
let transport = ScriptedValueTransport {
sent: Arc::clone(&sent),
responses: VecDeque::from([json_frame(serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"result": {"ok": true}
}))]),
};
let mut conn = test_connection(Box::new(transport));
conn.name = "dephy".to_string();
conn.tools = vec![McpTool {
name: "company--search".to_string(),
description: None,
input_schema: serde_json::json!({}),
}];
let mut pool = McpPool::new(McpConfig {
timeouts: McpTimeouts::default(),
servers: HashMap::new(),
});
pool.connections.insert("dephy".to_string(), conn);
let result = pool
.call_tool(
"mcp_dephy_company--search",
serde_json::json!({"query": "dephy"}),
)
.await
.unwrap();
assert_eq!(result, serde_json::json!({"ok": true}));
let sent = sent.lock().unwrap();
assert_eq!(sent[0]["method"], "tools/call");
assert_eq!(sent[0]["params"]["name"], "company--search");
assert_eq!(
sent[0]["params"]["arguments"],
serde_json::json!({"query": "dephy"})
);
}
/// #1244: when an MCP stdio server fails to spawn, the underlying OS
/// error (e.g. ENOENT for a missing binary) must reach the user via the
/// snapshot.error string. Regression test for `err.to_string()` dropping
@@ -4143,6 +4193,90 @@ mod tests {
server.abort();
}
#[tokio::test]
async fn sse_post_error_includes_response_body_excerpt() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let cancel_token = tokio_util::sync::CancellationToken::new();
let server_cancel = cancel_token.clone();
let server = tokio::spawn(async move {
loop {
let Ok((mut socket, _)) = listener.accept().await else {
break;
};
let server_cancel = server_cancel.clone();
tokio::spawn(async move {
let mut request = Vec::new();
let mut buf = [0; 1024];
loop {
let n = socket.read(&mut buf).await.unwrap();
if n == 0 {
return;
}
request.extend_from_slice(&buf[..n]);
if request.windows(4).any(|window| window == b"\r\n\r\n") {
break;
}
}
let request = String::from_utf8_lossy(&request);
if request.starts_with("GET /sse ") {
socket
.write_all(
b"HTTP/1.1 200 OK\r\nContent-Type: text/event-stream\r\n\r\n",
)
.await
.unwrap();
socket
.write_all(b"event: endpoint\ndata: /messages\n\n")
.await
.unwrap();
server_cancel.cancelled().await;
} else if request.starts_with("POST /messages ") {
socket
.write_all(
b"HTTP/1.1 400 Bad Request\r\nContent-Type: application/json\r\nContent-Length: 25\r\n\r\n{\"error\":\"missing query\"}",
)
.await
.unwrap();
}
});
}
});
let client = reqwest::Client::new();
let url = format!("http://{addr}/sse");
let mut transport = SseTransport::connect(
client,
url,
HashMap::new(),
cancel_token.clone(),
Duration::from_secs(2),
)
.await
.unwrap();
let err = transport
.send(json_frame(serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize"
})))
.await
.expect_err("POST rejection should be returned");
let err = format!("{err:#}");
assert!(
err.contains("400 Bad Request") && err.contains("missing query"),
"SSE POST error should include status and body, got: {err}"
);
cancel_token.cancel();
server.abort();
}
#[test]
fn session_id_starts_none() {
let transport = StreamableHttpTransport::new(