feat: Improved MCP implementation to minimize the tokens needed to utilize it so it doesn't quickly overwhelm the token space for a given model

This commit is contained in:
2025-12-03 12:12:51 -07:00
parent bddec85fa5
commit 3b21ce2aa5
3 changed files with 288 additions and 84 deletions
+117 -27
View File
@@ -2,6 +2,7 @@ use crate::config::Config;
use crate::utils::{AbortSignal, abortable_run_with_spinner};
use crate::vault::interpolate_secrets;
use anyhow::{Context, Result, anyhow};
use bm25::{Document, Language, SearchEngine, SearchEngineBuilder};
use futures_util::future::BoxFuture;
use futures_util::{StreamExt, TryStreamExt, stream};
use indoc::formatdoc;
@@ -9,7 +10,7 @@ use rmcp::model::{CallToolRequestParam, CallToolResult};
use rmcp::service::RunningService;
use rmcp::transport::TokioChildProcess;
use rmcp::{RoleClient, ServiceExt};
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
@@ -20,10 +21,46 @@ use std::sync::Arc;
use tokio::process::Command;
pub const MCP_INVOKE_META_FUNCTION_NAME_PREFIX: &str = "mcp_invoke";
pub const MCP_LIST_META_FUNCTION_NAME_PREFIX: &str = "mcp_list";
pub const MCP_SEARCH_META_FUNCTION_NAME_PREFIX: &str = "mcp_search";
pub const MCP_DESCRIBE_META_FUNCTION_NAME_PREFIX: &str = "mcp_describe";
type ConnectedServer = RunningService<RoleClient, ()>;
#[derive(Clone, Debug, Default, Serialize)]
pub struct CatalogItem {
pub name: String,
pub server: String,
pub description: String,
}
#[derive(Debug)]
struct ServerCatalog {
engine: SearchEngine<String>,
items: HashMap<String, CatalogItem>,
}
impl ServerCatalog {
pub fn build_bm25(items: &HashMap<String, CatalogItem>) -> SearchEngine<String> {
let docs = items.values().map(|it| {
let contents = format!("{}\n{}\nserver:{}", it.name, it.description, it.server);
Document {
id: it.name.clone(),
contents,
}
});
SearchEngineBuilder::<String>::with_documents(Language::English, docs).build()
}
}
impl Clone for ServerCatalog {
fn clone(&self) -> Self {
Self {
engine: Self::build_bm25(&self.items),
items: self.items.clone(),
}
}
}
#[derive(Debug, Clone, Deserialize)]
struct McpServersConfig {
#[serde(rename = "mcpServers")]
@@ -50,7 +87,8 @@ enum JsonField {
pub struct McpRegistry {
log_path: Option<PathBuf>,
config: Option<McpServersConfig>,
servers: HashMap<String, Arc<RunningService<RoleClient, ()>>>,
servers: HashMap<String, Arc<ConnectedServer>>,
catalogs: HashMap<String, ServerCatalog>,
}
impl McpRegistry {
@@ -173,7 +211,7 @@ impl McpRegistry {
.collect()
};
let results: Vec<(String, Arc<_>)> = stream::iter(
let results: Vec<(String, Arc<_>, ServerCatalog)> = stream::iter(
server_ids
.into_iter()
.map(|id| async { self.start_server(id).await }),
@@ -182,13 +220,24 @@ impl McpRegistry {
.try_collect()
.await?;
self.servers = results.into_iter().collect();
self.servers = results
.clone()
.into_iter()
.map(|(id, server, _)| (id, server))
.collect();
self.catalogs = results
.into_iter()
.map(|(id, _, catalog)| (id, catalog))
.collect();
}
Ok(())
}
async fn start_server(&self, id: String) -> Result<(String, Arc<ConnectedServer>)> {
async fn start_server(
&self,
id: String,
) -> Result<(String, Arc<ConnectedServer>, ServerCatalog)> {
let server = self
.config
.as_ref()
@@ -231,14 +280,33 @@ impl McpRegistry {
.await
.with_context(|| format!("Failed to start MCP server: {}", &server.command))?,
);
debug!(
"Available tools for MCP server {id}: {:?}",
service.list_tools(None).await?
);
let tools = service.list_tools(None).await?;
debug!("Available tools for MCP server {id}: {tools:?}");
let mut items_vec = Vec::new();
for t in tools.tools {
let name = t.name.to_string();
let description = t.description.unwrap_or_default().to_string();
items_vec.push(CatalogItem {
name,
server: id.clone(),
description,
});
}
let mut items_map = HashMap::new();
items_vec.into_iter().for_each(|it| {
items_map.insert(it.name.clone(), it);
});
let catalog = ServerCatalog {
engine: ServerCatalog::build_bm25(&items_map),
items: items_map,
};
info!("Started MCP server: {id}");
Ok((id.to_string(), service))
Ok((id.to_string(), service, catalog))
}
pub async fn stop_all_servers(mut self) -> Result<Self> {
@@ -268,26 +336,48 @@ impl McpRegistry {
}
}
pub fn catalog(&self) -> BoxFuture<'static, Result<Value>> {
let servers: Vec<(String, Arc<ConnectedServer>)> = self
pub fn search_tools_server(&self, server: &str, query: &str, top_k: usize) -> Vec<CatalogItem> {
let Some(catalog) = self.catalogs.get(server) else {
return vec![];
};
let engine = &catalog.engine;
let raw = engine.search(query, top_k.min(20));
raw.into_iter()
.filter_map(|r| catalog.items.get(&r.document.id))
.take(top_k)
.cloned()
.collect()
}
pub async fn describe(&self, server_id: &str, tool: &str) -> Result<Value> {
let server = self
.servers
.iter()
.map(|(id, s)| (id.clone(), s.clone()))
.collect();
.filter(|(id, _)| &server_id == id)
.map(|(_, s)| s.clone())
.next()
.ok_or(anyhow!("{server_id} MCP server not found in config"))?;
Box::pin(async move {
let mut out = Vec::with_capacity(servers.len());
for (id, server) in servers {
let tools = server.list_tools(None).await?;
let resources = server.list_resources(None).await.unwrap_or_default();
out.push(json!({
"server": id,
"tools": tools,
"resources": resources,
}));
let tool_schema = server
.list_tools(None)
.await?
.tools
.into_iter()
.find(|it| it.name == tool)
.ok_or(anyhow!(
"{tool} not found in {server_id} MCP server catalog"
))?
.input_schema;
Ok(json!({
"type": "object",
"properties": {
"tool": {
"type": "string",
},
"arguments": tool_schema
}
Ok(Value::Array(out))
})
}))
}
pub fn invoke(