feat: legacy SSE support for MCP server configurations
This commit is contained in:
+38
-12
@@ -1,3 +1,5 @@
|
||||
mod sse_transport;
|
||||
|
||||
use crate::config::AppConfig;
|
||||
use crate::config::paths;
|
||||
use crate::utils::{AbortSignal, abortable_run_with_spinner};
|
||||
@@ -5,6 +7,7 @@ use crate::vault::Vault;
|
||||
use crate::vault::interpolate_secrets;
|
||||
use anyhow::{Context, Result, anyhow};
|
||||
use futures_util::{StreamExt, TryStreamExt, stream};
|
||||
use http::{HeaderName, HeaderValue};
|
||||
use indoc::formatdoc;
|
||||
use rmcp::service::RunningService;
|
||||
use rmcp::transport::StreamableHttpClientTransport;
|
||||
@@ -12,12 +15,12 @@ use rmcp::transport::TokioChildProcess;
|
||||
use rmcp::transport::streamable_http_client::StreamableHttpClientTransportConfig;
|
||||
use rmcp::{RoleClient, ServiceExt};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sse_transport::LegacySseTransport;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fs::OpenOptions;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process::Stdio;
|
||||
use std::sync::Arc;
|
||||
use http::{HeaderName, HeaderValue};
|
||||
use tokio::process::Command;
|
||||
|
||||
pub const MCP_INVOKE_META_FUNCTION_NAME_PREFIX: &str = "mcp_invoke";
|
||||
@@ -328,19 +331,26 @@ pub(crate) async fn spawn_mcp_server(
|
||||
spec: &McpServer,
|
||||
log_path: Option<&Path>,
|
||||
) -> Result<Arc<ConnectedServer>> {
|
||||
if spec.is_remote() {
|
||||
let url = spec.url.as_deref().expect("validated: remote spec has url");
|
||||
spawn_remote_mcp_server(url, spec.headers.as_ref()).await
|
||||
} else {
|
||||
let command = spec
|
||||
.command
|
||||
.as_deref()
|
||||
.expect("validated: stdio spec has command");
|
||||
spawn_stdio_mcp_server(command, spec, log_path).await
|
||||
match spec.transport_type {
|
||||
McpTransportType::Http => {
|
||||
let url = spec.url.as_deref().expect("validated: http spec has url");
|
||||
spawn_http_mcp_server(url, spec.headers.as_ref()).await
|
||||
}
|
||||
McpTransportType::Sse => {
|
||||
let url = spec.url.as_deref().expect("validated: sse spec has url");
|
||||
spawn_sse_mcp_server(url, spec.headers.as_ref()).await
|
||||
}
|
||||
McpTransportType::Stdio => {
|
||||
let command = spec
|
||||
.command
|
||||
.as_deref()
|
||||
.expect("validated: stdio spec has command");
|
||||
spawn_stdio_mcp_server(command, spec, log_path).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn spawn_remote_mcp_server(
|
||||
async fn spawn_http_mcp_server(
|
||||
url: &str,
|
||||
headers: Option<&HashMap<String, String>>,
|
||||
) -> Result<Arc<ConnectedServer>> {
|
||||
@@ -365,7 +375,23 @@ async fn spawn_remote_mcp_server(
|
||||
let service = Arc::new(
|
||||
().serve(transport)
|
||||
.await
|
||||
.with_context(|| format!("Failed to connect to remote MCP server: {url}"))?,
|
||||
.with_context(|| format!("Failed to connect to HTTP MCP server: {url}"))?,
|
||||
);
|
||||
Ok(service)
|
||||
}
|
||||
|
||||
async fn spawn_sse_mcp_server(
|
||||
url: &str,
|
||||
headers: Option<&HashMap<String, String>>,
|
||||
) -> Result<Arc<ConnectedServer>> {
|
||||
let sse = LegacySseTransport::connect(url, headers)
|
||||
.await
|
||||
.with_context(|| format!("Failed to connect to SSE MCP server: {url}"))?;
|
||||
let (sink, stream) = sse.into_parts();
|
||||
let service = Arc::new(
|
||||
().serve((sink, stream))
|
||||
.await
|
||||
.with_context(|| format!("Failed to initialize SSE MCP server: {url}"))?,
|
||||
);
|
||||
Ok(service)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user