diff --git a/Cargo.lock b/Cargo.lock index 62acbb4..b9c03cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2038,7 +2038,7 @@ dependencies = [ "once_cell", "prost", "prost-types", - "reqwest", + "reqwest 0.12.28", "secret-vault-value", "serde", "serde_json", @@ -3098,7 +3098,7 @@ dependencies = [ "rand 0.10.0", "rayon", "reedline", - "reqwest", + "reqwest 0.12.28", "reqwest-eventsource", "rmcp", "rust-embed", @@ -4448,11 +4448,45 @@ dependencies = [ "url", "wasm-bindgen", "wasm-bindgen-futures", - "wasm-streams", + "wasm-streams 0.4.2", "web-sys", "webpki-roots", ] +[[package]] +name = "reqwest" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab3f43e3283ab1488b624b44b0e988d0acea0b3214e694730a055cb6b2efa801" +dependencies = [ + "base64", + "bytes", + "futures-core", + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.9.0", + "hyper-util", + "js-sys", + "log", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "sync_wrapper", + "tokio", + "tokio-util", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-streams 0.5.0", + "web-sys", +] + [[package]] name = "reqwest-eventsource" version = "0.6.0" @@ -4465,7 +4499,7 @@ dependencies = [ "mime", "nom 7.1.3", "pin-project-lite", - "reqwest", + "reqwest 0.12.28", "thiserror 1.0.69", ] @@ -4502,13 +4536,16 @@ dependencies = [ "base64", "chrono", "futures", + "http 1.4.0", "pastey", "pin-project-lite", "process-wrap", + "reqwest 0.13.2", "rmcp-macros", "schemars 1.2.1", "serde", "serde_json", + "sse-stream", "thiserror 2.0.18", "tokio", "tokio-stream", @@ -5193,6 +5230,19 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "sse-stream" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c5e6deb40826033bd7b11c7ef25ef71193fabd71f680f40dd16538a2704d2f4" +dependencies = [ + "bytes", + "futures-util", + "http-body 1.0.1", + "http-body-util", + "pin-project-lite", +] + [[package]] name = "stable_deref_trait" version = "1.2.1" @@ -5954,7 +6004,7 @@ dependencies = [ "getrandom 0.3.4", "pin-project", "rand 0.9.2", - "reqwest", + "reqwest 0.12.28", "serde", "serde_json", "time", @@ -6294,6 +6344,19 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wasm-streams" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d1ec4f6517c9e11ae630e200b2b65d193279042e28edd4a2cda233e46670bbb" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasmparser" version = "0.244.0" diff --git a/Cargo.toml b/Cargo.toml index 5d15c52..d0b7585 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -89,7 +89,7 @@ duct = "1.0.0" argc = "1.23.0" strum_macros = "0.27.2" indoc = "2.0.6" -rmcp = { version = "0.16.0", features = ["client", "transport-child-process"] } +rmcp = { version = "0.16.0", features = ["client", "transport-child-process", "transport-streamable-http-client-reqwest"] } num_cpus = "1.17.0" tree-sitter = "0.26.8" tree-sitter-language = "0.1" diff --git a/assets/functions/mcp.json b/assets/functions/mcp.json index d3794f6..0ae8f79 100644 --- a/assets/functions/mcp.json +++ b/assets/functions/mcp.json @@ -1,6 +1,7 @@ { "mcpServers": { "github": { + "type": "stdio", "command": "docker", "args": [ "run", @@ -15,14 +16,17 @@ } }, "atlassian": { + "type": "stdio", "command": "npx", "args": ["-y", "mcp-remote@0.1.13", "https://mcp.atlassian.com/v1/mcp"] }, "docker": { + "type": "stdio", "command": "uvx", "args": ["mcp-server-docker"] }, "ddg-search": { + "type": "stdio", "command": "uvx", "args": ["duckduckgo-mcp-server"] } diff --git a/docs/function-calling/MCP-SERVERS.md b/docs/function-calling/MCP-SERVERS.md index 56a3121..94cd614 100644 --- a/docs/function-calling/MCP-SERVERS.md +++ b/docs/function-calling/MCP-SERVERS.md @@ -37,9 +37,98 @@ this directory using the following command: loki --info | grep functions_dir | awk '{print $2}' ``` -The syntax for the `functions/mcp.json` file is identical to the syntax for MCP server configurations for Claude Desktop. +The syntax for the `functions/mcp.json` file is compatible with MCP server configurations for Claude Desktop. So any time you're looking to add a new server, look at the docs for it and find the configuration example for -Claude desktop. You should be able to use the exact same configuration in your `functions/mcp.json` file. +Claude Desktop. You should be able to use the exact same configuration in your `functions/mcp.json` file. + +Every server entry **must** include a `"type"` field set to one of: `"stdio"`, `"http"`, or `"sse"`. + +### Transport Types + +Loki supports three MCP transport types: + +| Type | Use Case | +|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `stdio` | Spawns a local subprocess and communicates over stdin/stdout | +| `http` | Connects to a remote server via [Streamable HTTP](https://modelcontextprotocol.io/docs/concepts/transports#streamable-http) | +| `sse` | Connects to a remote server via the legacy [HTTP+SSE](https://modelcontextprotocol.io/docs/concepts/transports#http-with-sse) transport (Claude Desktop format) | + +### Stdio Servers + +Stdio is the standard transport for locally-installed MCP servers. Loki spawns the process and communicates +over stdin/stdout: + +```json +{ + "mcpServers": { + "github": { + "type": "stdio", + "command": "docker", + "args": ["run", "-i", "--rm", "ghcr.io/github/github-mcp-server"], + "env": { + "GITHUB_PERSONAL_ACCESS_TOKEN": "YOUR_GITHUB_TOKEN" + } + } + } +} +``` + +| Field | Required | Description | +|-----------|----------|------------------------------------------| +| `type` | yes | Must be `"stdio"` | +| `command` | yes | The executable to spawn | +| `args` | no | Arguments passed to the command | +| `env` | no | Environment variables for the subprocess | +| `cwd` | no | Working directory for the subprocess | + +### HTTP (Streamable HTTP) Servers + +For remote MCP servers that support the Streamable HTTP transport: + +```json +{ + "mcpServers": { + "datadog": { + "type": "http", + "url": "https://mcp.datadoghq.com/api/unstable/mcp-server/mcp" + } + } +} +``` + +| Field | Required | Description | +|-----------|----------|--------------------------------------------------------| +| `type` | yes | Must be `"http"` | +| `url` | yes | The server endpoint URL | +| `headers` | no | Custom HTTP headers to include with every request | + +### SSE Servers + +For remote MCP servers that use the legacy HTTP+SSE transport (the format used by Claude Desktop): + +```json +{ + "mcpServers": { + "my-sse-server": { + "type": "sse", + "url": "http://127.0.0.1:64342/sse", + "headers": { + "Authorization": "Bearer my-token" + } + } + } +} +``` + +| Field | Required | Description | +|-----------|----------|--------------------------------------------------------| +| `type` | yes | Must be `"sse"` | +| `url` | yes | The server SSE endpoint URL | +| `headers` | no | Custom HTTP headers to include with every request | + +**Note:** Both `http` and `sse` types use the same underlying transport, which auto-negotiates the +protocol with the server. The `type` field primarily serves as documentation of which protocol the +server speaks. Neither type supports `command`, `args`, or `cwd` fields. ### Secret Injection As mentioned in the [Loki Vault documentation](../VAULT.md), you can use Loki Vault to inject secrets into your MCP configuration file. diff --git a/src/config/mcp_factory.rs b/src/config/mcp_factory.rs index 74dcd57..ec461f6 100644 --- a/src/config/mcp_factory.rs +++ b/src/config/mcp_factory.rs @@ -24,13 +24,14 @@ //! construction / test round-trips. //! //! The key type `McpServerKey` hashes the server name plus its full -//! command/args/env so that two scopes requesting an identically- -//! configured server share an `Arc`, while two scopes requesting -//! differently-configured servers (e.g., different API tokens) get -//! independent subprocesses. This is the sharing-vs-isolation property -//! described in `docs/REST-API-ARCHITECTURE.md` section 5. +//! transport config (command/args/env for stdio; url/headers for +//! http/sse) so that two scopes requesting an identically-configured +//! server share an `Arc`, while two scopes requesting differently- +//! configured servers (e.g., different API tokens) get independent +//! connections. This is the sharing-vs-isolation property described +//! in `docs/REST-API-ARCHITECTURE.md` section 5. -use crate::mcp::{ConnectedServer, JsonField, McpServer, spawn_mcp_server}; +use crate::mcp::{ConnectedServer, JsonField, McpServer, McpTransportType, spawn_mcp_server}; use anyhow::Result; use parking_lot::Mutex; @@ -41,50 +42,66 @@ use std::sync::{Arc, Weak}; #[derive(Clone, Debug, Eq, Hash, PartialEq)] pub struct McpServerKey { pub name: String, - pub command: String, - pub args: Vec, - pub env: Vec<(String, String)>, + pub transport: McpTransportKey, +} + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub enum McpTransportKey { + Stdio { + command: String, + args: Vec, + env: Vec<(String, String)>, + }, + Remote { + transport_type: McpTransportType, + url: String, + headers: Vec<(String, String)>, + }, } impl McpServerKey { - pub fn new( - name: impl Into, - command: impl Into, - args: impl IntoIterator, - env: impl IntoIterator, - ) -> Self { - let mut args: Vec = args.into_iter().collect(); - args.sort(); - let mut env: Vec<(String, String)> = env.into_iter().collect(); - env.sort(); + pub fn from_spec(name: &str, spec: &McpServer) -> Self { + let transport = if spec.is_remote() { + let url = spec.url.clone().unwrap_or_default(); + let mut headers: Vec<(String, String)> = spec + .headers + .as_ref() + .map(|h| h.iter().map(|(k, v)| (k.clone(), v.clone())).collect()) + .unwrap_or_default(); + headers.sort(); + McpTransportKey::Remote { + transport_type: spec.transport_type.clone(), + url, + headers, + } + } else { + let command = spec.command.clone().unwrap_or_default(); + let mut args = spec.args.clone().unwrap_or_default(); + args.sort(); + let mut env: Vec<(String, String)> = spec + .env + .as_ref() + .map(|e| { + e.iter() + .map(|(k, v)| { + let v_str = match v { + JsonField::Str(s) => s.clone(), + JsonField::Bool(b) => b.to_string(), + JsonField::Int(i) => i.to_string(), + }; + (k.clone(), v_str) + }) + .collect() + }) + .unwrap_or_default(); + env.sort(); + McpTransportKey::Stdio { command, args, env } + }; Self { name: name.into(), - command: command.into(), - args, - env, + transport, } } - - pub fn from_spec(name: &str, spec: &McpServer) -> Self { - let args = spec.args.clone().unwrap_or_default(); - let env: Vec<(String, String)> = spec - .env - .as_ref() - .map(|e| { - e.iter() - .map(|(k, v)| { - let v_str = match v { - JsonField::Str(s) => s.clone(), - JsonField::Bool(b) => b.to_string(), - JsonField::Int(i) => i.to_string(), - }; - (k.clone(), v_str) - }) - .collect() - }) - .unwrap_or_default(); - Self::new(name, &spec.command, args, env) - } } #[derive(Default)] diff --git a/src/mcp/mod.rs b/src/mcp/mod.rs index 6dc22c7..79921d0 100644 --- a/src/mcp/mod.rs +++ b/src/mcp/mod.rs @@ -7,7 +7,9 @@ use anyhow::{Context, Result, anyhow}; use futures_util::{StreamExt, TryStreamExt, stream}; use indoc::formatdoc; use rmcp::service::RunningService; +use rmcp::transport::StreamableHttpClientTransport; use rmcp::transport::TokioChildProcess; +use rmcp::transport::streamable_http_client::StreamableHttpClientTransportConfig; use rmcp::{RoleClient, ServiceExt}; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; @@ -15,6 +17,7 @@ use std::fs::OpenOptions; use std::path::{Path, PathBuf}; use std::process::Stdio; use std::sync::Arc; +use http::{HeaderName, HeaderValue}; use tokio::process::Command; pub const MCP_INVOKE_META_FUNCTION_NAME_PREFIX: &str = "mcp_invoke"; @@ -50,11 +53,67 @@ pub(crate) struct McpServersConfig { } #[derive(Debug, Clone, Deserialize)] +#[serde(deny_unknown_fields)] pub(crate) struct McpServer { - pub command: String, + #[serde(rename = "type")] + pub transport_type: McpTransportType, + pub command: Option, pub args: Option>, pub env: Option>, pub cwd: Option, + pub url: Option, + pub headers: Option>, +} + +impl McpServer { + pub fn is_remote(&self) -> bool { + matches!( + self.transport_type, + McpTransportType::Http | McpTransportType::Sse + ) + } + + pub fn validate(&self, name: &str) -> Result<()> { + if self.is_remote() { + let type_label = match self.transport_type { + McpTransportType::Http => "http", + McpTransportType::Sse => "sse", + _ => unreachable!(), + }; + if self.url.is_none() { + return Err(anyhow!( + "MCP server '{name}' has type \"{type_label}\" but is missing a \"url\" field" + )); + } + if self.command.is_some() || self.args.is_some() || self.cwd.is_some() { + return Err(anyhow!( + "MCP server '{name}' has type \"{type_label}\" but also specifies stdio fields \ + (command/args/cwd). Remove the stdio fields or change the type to \"stdio\"." + )); + } + } else { + if self.command.is_none() { + return Err(anyhow!( + "MCP server '{name}' is missing a \"command\" field (required for stdio transport)" + )); + } + if self.url.is_some() || self.headers.is_some() { + return Err(anyhow!( + "MCP server '{name}' has type \"stdio\" but also specifies remote fields \ + (url/headers). Remove the remote fields or change the type to \"http\" or \"sse\"." + )); + } + } + Ok(()) + } +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Hash)] +#[serde(rename_all = "lowercase")] +pub(crate) enum McpTransportType { + Stdio, + Http, + Sse, } #[derive(Debug, Clone, Deserialize)] @@ -126,6 +185,11 @@ impl McpRegistry { let mcp_servers_config: McpServersConfig = serde_json::from_str(&parsed_content).with_context(err)?; + + for (name, spec) in &mcp_servers_config.mcp_servers { + spec.validate(name)?; + } + registry.config = Some(mcp_servers_config); if start_mcp_servers && app_config.mcp_server_support { @@ -264,7 +328,54 @@ pub(crate) async fn spawn_mcp_server( spec: &McpServer, log_path: Option<&Path>, ) -> Result> { - let mut cmd = Command::new(&spec.command); + if spec.is_remote() { + let url = spec.url.as_deref().expect("validated: remote spec has url"); + spawn_remote_mcp_server(url, spec.headers.as_ref()).await + } else { + let command = spec + .command + .as_deref() + .expect("validated: stdio spec has command"); + spawn_stdio_mcp_server(command, spec, log_path).await + } +} + +async fn spawn_remote_mcp_server( + url: &str, + headers: Option<&HashMap>, +) -> Result> { + let transport = if let Some(hdrs) = headers + && !hdrs.is_empty() + { + let mut custom = HashMap::new(); + for (k, v) in hdrs { + let name = k + .parse::() + .with_context(|| format!("Invalid header name: {k}"))?; + let value = v + .parse::() + .with_context(|| format!("Invalid header value for {k}"))?; + custom.insert(name, value); + } + let config = StreamableHttpClientTransportConfig::with_uri(url).custom_headers(custom); + StreamableHttpClientTransport::from_config(config) + } else { + StreamableHttpClientTransport::from_uri(url) + }; + let service = Arc::new( + ().serve(transport) + .await + .with_context(|| format!("Failed to connect to remote MCP server: {url}"))?, + ); + Ok(service) +} + +async fn spawn_stdio_mcp_server( + command: &str, + spec: &McpServer, + log_path: Option<&Path>, +) -> Result> { + let mut cmd = Command::new(command); if let Some(args) = &spec.args { cmd.args(args); } @@ -299,7 +410,7 @@ pub(crate) async fn spawn_mcp_server( let service = Arc::new( ().serve(transport) .await - .with_context(|| format!("Failed to start MCP server: {}", &spec.command))?, + .with_context(|| format!("Failed to start MCP server: {command}"))?, ); Ok(service) }