feat: Added MCP config merging support for remote asset installations

This commit is contained in:
2026-05-22 16:30:45 -06:00
parent 34dc4b0dce
commit e9e46158e7
5 changed files with 391 additions and 31 deletions
+358 -11
View File
@@ -1,4 +1,5 @@
use anyhow::{Context, Result, bail};
use indexmap::IndexMap;
use inquire::Select;
use std::ffi::{OsStr, OsString};
use std::fs;
@@ -6,8 +7,10 @@ use std::path::{Path, PathBuf};
use crate::config::{InstallFilter, paths};
use crate::function::Language;
use crate::mcp::{McpServer, McpServersConfig};
use crate::utils;
use crate::utils::IS_STDOUT_TERMINAL;
use crate::vault::{Vault, interpolate_secrets};
pub fn install_remote(git_url: &str, filter: Option<InstallFilter>, force: bool) -> Result<()> {
let (url, reference) = parse_url_with_ref(git_url)?;
@@ -32,11 +35,10 @@ pub fn install_remote(git_url: &str, filter: Option<InstallFilter>, force: bool)
apply_plan(&plan, force)?;
}
if plan.skipped_mcp_json.is_some() {
println!(
"\nNote: functions/mcp.json detected but MCP merge is not yet wired up \
(Step 3 of the install-remote rollout)."
);
if let Some((remote_mcp, local_mcp)) = &plan.mcp_json {
let local = local_mcp.exists().then_some(local_mcp.as_path());
let report = merge_mcp_json(local, remote_mcp, local_mcp, force)?;
print_mcp_merge_report(&report);
}
Ok(())
@@ -335,7 +337,7 @@ struct PlannedFile {
struct InstallPlan {
files: Vec<PlannedFile>,
skipped_mcp_json: Option<(PathBuf, PathBuf)>,
mcp_json: Option<(PathBuf, PathBuf)>,
}
fn plan_changes(layout: &RemoteLayout) -> Result<InstallPlan> {
@@ -369,15 +371,12 @@ fn plan_changes(layout: &RemoteLayout) -> Result<InstallPlan> {
)?;
}
let skipped_mcp_json = layout
let mcp_json = layout
.mcp_json
.as_ref()
.map(|src| (src.clone(), paths::mcp_config_file()));
Ok(InstallPlan {
files,
skipped_mcp_json,
})
Ok(InstallPlan { files, mcp_json })
}
fn plan_dir_into(
@@ -611,6 +610,217 @@ fn set_executable_bit_if_script(_path: &Path) -> Result<()> {
Ok(())
}
#[derive(Debug)]
struct McpMergeReport {
added: Vec<String>,
kept_local: Vec<String>,
replaced: Vec<String>,
renamed: Vec<(String, String)>,
final_path: PathBuf,
missing_secrets: Vec<String>,
}
enum McpConflictAction {
KeepLocal,
TakeRemote,
RenameRemote,
}
fn merge_mcp_json(
local: Option<&Path>,
remote: &Path,
target: &Path,
force: bool,
) -> Result<McpMergeReport> {
let remote_content = fs::read_to_string(remote)
.with_context(|| format!("failed to read remote mcp.json at {}", remote.display()))?;
let remote_config: McpServersConfig = serde_json::from_str(&remote_content)
.with_context(|| format!("failed to parse remote mcp.json at {}", remote.display()))?;
let mut merged = if let Some(local_path) = local {
let content = fs::read_to_string(local_path).with_context(|| {
format!("failed to read local mcp.json at {}", local_path.display())
})?;
serde_json::from_str::<McpServersConfig>(&content).with_context(|| {
format!("failed to parse local mcp.json at {}", local_path.display())
})?
} else {
McpServersConfig {
mcp_servers: IndexMap::new(),
}
};
let final_path = target.to_path_buf();
let mut report = McpMergeReport {
added: Vec::new(),
kept_local: Vec::new(),
replaced: Vec::new(),
renamed: Vec::new(),
final_path: final_path.clone(),
missing_secrets: Vec::new(),
};
let mut to_validate: Vec<String> = Vec::new();
for (name, remote_server) in remote_config.mcp_servers {
if let Some(local_server) = merged.mcp_servers.get(&name) {
if local_server == &remote_server {
continue;
}
match resolve_mcp_conflict(&name, force)? {
McpConflictAction::KeepLocal => report.kept_local.push(name),
McpConflictAction::TakeRemote => {
merged.mcp_servers.insert(name.clone(), remote_server);
report.replaced.push(name.clone());
to_validate.push(name);
}
McpConflictAction::RenameRemote => {
let new_name = unique_renamed_key(&name, &merged.mcp_servers);
merged.mcp_servers.insert(new_name.clone(), remote_server);
report.renamed.push((name, new_name.clone()));
to_validate.push(new_name);
}
}
} else {
merged.mcp_servers.insert(name.clone(), remote_server);
report.added.push(name.clone());
to_validate.push(name);
}
}
for key in &to_validate {
let spec = merged
.mcp_servers
.get(key)
.expect("entry was just inserted");
spec.validate(key).with_context(|| {
format!("MCP server '{key}' failed validation; refusing to write merged mcp.json")
})?;
}
let serialized =
serde_json::to_string_pretty(&merged).context("failed to serialize merged mcp.json")?;
write_atomically(&final_path, &serialized)?;
let vault = Vault::init_bare();
let (_parsed, missing) = interpolate_secrets(&serialized, &vault);
let mut deduped: Vec<String> = Vec::new();
for s in missing {
if !deduped.contains(&s) {
deduped.push(s);
}
}
report.missing_secrets = deduped;
Ok(report)
}
fn resolve_mcp_conflict(name: &str, force: bool) -> Result<McpConflictAction> {
if force {
return Ok(McpConflictAction::TakeRemote);
}
if !*IS_STDOUT_TERMINAL {
bail!(
"MCP server '{name}' already exists locally. Refusing to merge non-interactively. \
Re-run with --install-force or in a terminal."
);
}
let rename_label = format!("rename remote as \"{name}-remote\"");
let prompt = format!("Conflict on MCP server '{name}'");
let choice = Select::new(
&prompt,
vec![
"keep local".to_string(),
"take remote".to_string(),
rename_label.clone(),
"abort merge".to_string(),
],
)
.prompt()
.with_context(|| "failed to read MCP conflict choice")?;
if choice == "keep local" {
Ok(McpConflictAction::KeepLocal)
} else if choice == "take remote" {
Ok(McpConflictAction::TakeRemote)
} else if choice == rename_label {
Ok(McpConflictAction::RenameRemote)
} else if choice == "abort merge" {
bail!("Aborted MCP merge by user.")
} else {
unreachable!("inquire::Select returned an unexpected option")
}
}
fn unique_renamed_key(name: &str, existing: &IndexMap<String, McpServer>) -> String {
let base = format!("{name}-remote");
if !existing.contains_key(&base) {
return base;
}
for i in 2..=u32::MAX {
let candidate = format!("{name}-remote-{i}");
if !existing.contains_key(&candidate) {
return candidate;
}
}
unreachable!("ran out of suffix variants")
}
fn write_atomically(path: &Path, content: &str) -> Result<()> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)
.with_context(|| format!("failed to create directory {}", parent.display()))?;
}
let tmp_path = path.with_extension("json.tmp");
fs::write(&tmp_path, content)
.with_context(|| format!("failed to write {}", tmp_path.display()))?;
fs::rename(&tmp_path, path).with_context(|| {
format!(
"failed to rename {} to {}",
tmp_path.display(),
path.display()
)
})?;
Ok(())
}
fn print_mcp_merge_report(report: &McpMergeReport) {
println!("\nMCP merge ({}):", report.final_path.display());
println!(
" added: {}, replaced: {}, kept local: {}, renamed: {}",
report.added.len(),
report.replaced.len(),
report.kept_local.len(),
report.renamed.len()
);
if !report.added.is_empty() {
println!(" + new servers: {}", report.added.join(", "));
}
if !report.replaced.is_empty() {
println!(" ~ replaced: {}", report.replaced.join(", "));
}
if !report.kept_local.is_empty() {
println!(" = kept local: {}", report.kept_local.join(", "));
}
if !report.renamed.is_empty() {
let pairs: Vec<String> = report
.renamed
.iter()
.map(|(orig, new_)| format!("{orig} -> {new_}"))
.collect();
println!(" > renamed: {}", pairs.join(", "));
}
if !report.missing_secrets.is_empty() {
println!("\nMissing vault secrets referenced by the merged mcp.json:");
for name in &report.missing_secrets {
println!(" {{{{ {name} }}}}");
}
println!(
"\nAdd each missing secret to the vault before starting these MCP servers. \
For example: `loki --add-secret <NAME>` or `.vault add <NAME>` in the REPL."
);
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -861,4 +1071,141 @@ mod tests {
assert_eq!(classify_file(&src, &dst).unwrap(), PlannedKind::Conflict);
let _ = fs::remove_dir_all(&dir);
}
fn write_mcp(path: &Path, json: &str) {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).unwrap();
}
fs::write(path, json).unwrap();
}
const FIXTURE_REMOTE: &str = r#"{
"mcpServers": {
"alpha": {"type": "stdio", "command": "echo", "args": ["a"]},
"beta": {"type": "stdio", "command": "echo", "args": ["b"]}
}
}"#;
#[test]
fn unique_renamed_key_appends_remote_suffix() {
let map: IndexMap<String, McpServer> = IndexMap::new();
assert_eq!(unique_renamed_key("foo", &map), "foo-remote");
}
#[test]
fn unique_renamed_key_appends_numeric_when_remote_taken() {
let mut map: IndexMap<String, McpServer> = IndexMap::new();
map.insert(
"foo-remote".to_string(),
serde_json::from_str(r#"{"type":"stdio","command":"x"}"#).unwrap(),
);
assert_eq!(unique_renamed_key("foo", &map), "foo-remote-2");
}
#[test]
fn merge_into_empty_local_adds_all_remote_servers() {
let dir = fresh_temp_dir("merge-empty-");
let remote = dir.join("remote.json");
let target = dir.join("target.json");
write_mcp(&remote, FIXTURE_REMOTE);
let report = merge_mcp_json(None, &remote, &target, false).unwrap();
assert_eq!(report.added, vec!["alpha", "beta"]);
assert!(report.kept_local.is_empty());
assert!(report.replaced.is_empty());
assert!(report.renamed.is_empty());
assert!(target.exists());
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn merge_force_replaces_local_on_conflict() {
let dir = fresh_temp_dir("merge-force-");
let remote = dir.join("remote.json");
let target = dir.join("target.json");
write_mcp(
&target,
r#"{"mcpServers": {"alpha": {"type": "stdio", "command": "OLD"}}}"#,
);
write_mcp(&remote, FIXTURE_REMOTE);
let report = merge_mcp_json(Some(&target), &remote, &target, true).unwrap();
assert_eq!(report.added, vec!["beta"]);
assert_eq!(report.replaced, vec!["alpha"]);
let written = fs::read_to_string(&target).unwrap();
assert!(written.contains("\"command\": \"echo\""), "got: {written}");
assert!(!written.contains("OLD"));
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn merge_non_tty_conflict_aborts_without_force() {
let dir = fresh_temp_dir("merge-non-tty-");
let remote = dir.join("remote.json");
let target = dir.join("target.json");
write_mcp(
&target,
r#"{"mcpServers": {"alpha": {"type": "stdio", "command": "LOCAL"}}}"#,
);
write_mcp(&remote, FIXTURE_REMOTE);
let err = merge_mcp_json(Some(&target), &remote, &target, false).unwrap_err();
assert!(
err.to_string()
.contains("Refusing to merge non-interactively"),
"got: {err}"
);
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn merge_rejects_invalid_remote_server() {
let dir = fresh_temp_dir("merge-invalid-");
let remote = dir.join("remote.json");
let target = dir.join("target.json");
write_mcp(&remote, r#"{"mcpServers": {"broken": {"type": "stdio"}}}"#);
let err = merge_mcp_json(None, &remote, &target, false).unwrap_err();
assert!(
format!("{err:#}").contains("missing a \"command\" field"),
"got: {err:#}"
);
let _ = fs::remove_dir_all(&dir);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn merge_detects_missing_secrets_in_output() {
let dir = fresh_temp_dir("merge-secret-");
let remote = dir.join("remote.json");
let target = dir.join("target.json");
write_mcp(
&remote,
r#"{"mcpServers": {"x": {"type":"stdio","command":"echo","env":{"K":"{{LOKI_TEST_MERGE_SECRET}}"}}}}"#,
);
let report = merge_mcp_json(None, &remote, &target, false).unwrap();
assert_eq!(report.missing_secrets, vec!["LOKI_TEST_MERGE_SECRET"]);
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn merge_is_idempotent_on_re_run() {
let dir = fresh_temp_dir("merge-idempotent-");
let remote = dir.join("remote.json");
let target = dir.join("target.json");
write_mcp(&remote, FIXTURE_REMOTE);
merge_mcp_json(None, &remote, &target, false).unwrap();
let after_first = fs::read(&target).unwrap();
let report = merge_mcp_json(Some(&target), &remote, &target, false).unwrap();
assert!(report.added.is_empty(), "got: {:?}", report.added);
let after_second = fs::read(&target).unwrap();
assert_eq!(after_first, after_second);
let _ = fs::remove_dir_all(&dir);
}
}
+7 -6
View File
@@ -109,12 +109,13 @@ impl McpFactory {
mod tests {
use super::*;
use crate::mcp::{JsonField, McpServer, McpTransportType};
use indexmap::IndexMap;
use std::collections::HashMap;
fn stdio_spec(
command: &str,
args: Option<Vec<String>>,
env: Option<HashMap<String, JsonField>>,
env: Option<IndexMap<String, JsonField>>,
) -> McpServer {
McpServer {
transport_type: McpTransportType::Stdio,
@@ -130,7 +131,7 @@ mod tests {
fn remote_spec(
transport: McpTransportType,
url: &str,
headers: Option<HashMap<String, String>>,
headers: Option<IndexMap<String, String>>,
) -> McpServer {
McpServer {
transport_type: transport,
@@ -145,7 +146,7 @@ mod tests {
#[test]
fn key_from_stdio_spec_captures_command_args_env() {
let mut env = HashMap::new();
let mut env = IndexMap::new();
env.insert("TOKEN".into(), JsonField::Str("abc".into()));
let spec = stdio_spec("npx", Some(vec!["-y".into(), "server".into()]), Some(env));
let key = McpServerKey::from_spec("my-server", &spec);
@@ -163,7 +164,7 @@ mod tests {
#[test]
fn key_from_stdio_spec_sorts_args_and_env() {
let mut env = HashMap::new();
let mut env = IndexMap::new();
env.insert("Z_VAR".into(), JsonField::Str("z".into()));
env.insert("A_VAR".into(), JsonField::Int(42));
let spec = stdio_spec(
@@ -222,7 +223,7 @@ mod tests {
#[test]
fn key_from_remote_sse_spec_with_sorted_headers() {
let mut hdrs = HashMap::new();
let mut hdrs = IndexMap::new();
hdrs.insert("Z-Key".into(), "z-val".into());
hdrs.insert("A-Key".into(), "a-val".into());
let spec = remote_spec(McpTransportType::Sse, "http://sse.example.com", Some(hdrs));
@@ -264,7 +265,7 @@ mod tests {
#[test]
fn key_env_bool_and_int_coerce_to_string() {
let mut env = HashMap::new();
let mut env = IndexMap::new();
env.insert("FLAG".into(), JsonField::Bool(true));
env.insert("PORT".into(), JsonField::Int(3000));
let spec = stdio_spec("cmd", None, Some(env));
+3 -1
View File
@@ -29,6 +29,8 @@ use crate::utils::{
use crate::graph;
use anyhow::{Context, Error, Result, bail};
#[cfg(test)]
use indexmap::IndexMap;
use indoc::formatdoc;
use inquire::{Confirm, MultiSelect, Text, list_option::ListOption, validator::Validation};
use parking_lot::RwLock;
@@ -2899,7 +2901,7 @@ mod tests {
let mcp_config = if server_names.is_empty() {
None
} else {
let mut servers = HashMap::new();
let mut servers = IndexMap::new();
for name in server_names {
servers.insert(
name.to_string(),
+18 -11
View File
@@ -8,6 +8,7 @@ use crate::vault::interpolate_secrets;
use anyhow::{Context, Result, anyhow};
use futures_util::{StreamExt, TryStreamExt, stream};
use http::{HeaderName, HeaderValue};
use indexmap::IndexMap;
use indoc::formatdoc;
use rmcp::service::RunningService;
use rmcp::transport::StreamableHttpClientTransport;
@@ -49,23 +50,29 @@ impl Clone for ServerCatalog {
}
}
#[derive(Debug, Clone, Deserialize)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub(crate) struct McpServersConfig {
#[serde(rename = "mcpServers")]
pub mcp_servers: HashMap<String, McpServer>,
pub mcp_servers: IndexMap<String, McpServer>,
}
#[derive(Debug, Clone, Deserialize)]
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
#[serde(deny_unknown_fields)]
pub(crate) struct McpServer {
#[serde(rename = "type")]
pub transport_type: McpTransportType,
#[serde(skip_serializing_if = "Option::is_none")]
pub command: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub args: Option<Vec<String>>,
pub env: Option<HashMap<String, JsonField>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub env: Option<IndexMap<String, JsonField>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cwd: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub url: Option<String>,
pub headers: Option<HashMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub headers: Option<IndexMap<String, String>>,
}
impl McpServer {
@@ -111,7 +118,7 @@ impl McpServer {
}
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "lowercase")]
pub(crate) enum McpTransportType {
Stdio,
@@ -119,7 +126,7 @@ pub(crate) enum McpTransportType {
Sse,
}
#[derive(Debug, Clone, Deserialize)]
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
#[serde(untagged)]
pub(crate) enum JsonField {
Str(String),
@@ -352,7 +359,7 @@ pub(crate) async fn spawn_mcp_server(
async fn spawn_http_mcp_server(
url: &str,
headers: Option<&HashMap<String, String>>,
headers: Option<&IndexMap<String, String>>,
) -> Result<Arc<ConnectedServer>> {
let transport = if let Some(hdrs) = headers
&& !hdrs.is_empty()
@@ -382,7 +389,7 @@ async fn spawn_http_mcp_server(
async fn spawn_sse_mcp_server(
url: &str,
headers: Option<&HashMap<String, String>>,
headers: Option<&IndexMap<String, String>>,
) -> Result<Arc<ConnectedServer>> {
let sse = LegacySseTransport::connect(url, headers)
.await
@@ -482,7 +489,7 @@ mod tests {
}
fn make_registry_with_config(server_names: &[&str]) -> McpRegistry {
let mut mcp_servers = HashMap::new();
let mut mcp_servers = IndexMap::new();
for name in server_names {
mcp_servers.insert(name.to_string(), stdio_server("echo"));
}
@@ -530,7 +537,7 @@ mod tests {
#[test]
fn validate_stdio_with_headers_fails() {
let mut headers = HashMap::new();
let mut headers = IndexMap::new();
headers.insert("Auth".into(), "Bearer tok".into());
let spec = McpServer {
transport_type: McpTransportType::Stdio,
+5 -2
View File
@@ -3,12 +3,12 @@ use eventsource_stream::{EventStream, Eventsource};
use fmt::{Display, Formatter};
use futures_util::StreamExt;
use futures_util::stream::BoxStream;
use indexmap::IndexMap;
use mpsc::error::SendError;
use mpsc::{OwnedPermit, Receiver, Sender, channel};
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use reqwest::{Client, header};
use rmcp::model::{ClientJsonRpcMessage, ServerJsonRpcMessage};
use std::collections::HashMap;
use std::error::Error;
use std::fmt;
use std::future::Future;
@@ -28,7 +28,10 @@ pub struct LegacySseTransport {
}
impl LegacySseTransport {
pub async fn connect(sse_url: &str, headers: Option<&HashMap<String, String>>) -> Result<Self> {
pub async fn connect(
sse_url: &str,
headers: Option<&IndexMap<String, String>>,
) -> Result<Self> {
let base_url =
Url::parse(sse_url).with_context(|| format!("Invalid SSE URL: {sse_url}"))?;