From 9bc4f8b621fd00d5384865c11bf8ba8f283624ee Mon Sep 17 00:00:00 2001 From: Alex Clarke Date: Thu, 14 May 2026 11:10:45 -0600 Subject: [PATCH] feat: wired together graph execution and agent graph dispatch --- src/config/paths.rs | 7 ++- src/function/mod.rs | 91 ++++++++++++++++++++++++++++++++++++++ src/function/supervisor.rs | 9 ++++ src/graph/agent.rs | 24 ++++++++++ src/graph/dispatch.rs | 59 ++++++++++++++++++++++++ src/graph/executor.rs | 39 ++++++++++++---- src/graph/mod.rs | 2 + src/graph/script.rs | 23 ++++++++++ src/graph/types.rs | 5 ++- src/main.rs | 11 +++++ src/repl/mod.rs | 12 ++++- 11 files changed, 270 insertions(+), 12 deletions(-) create mode 100644 src/graph/dispatch.rs diff --git a/src/config/paths.rs b/src/config/paths.rs index aa2ce14..eeec88c 100644 --- a/src/config/paths.rs +++ b/src/config/paths.rs @@ -1,5 +1,10 @@ use super::role::Role; -use super::{AGENTS_DIR_NAME, BASH_PROMPT_UTILS_FILE_NAME, CONFIG_FILE_NAME, ENV_FILE_NAME, FUNCTIONS_BIN_DIR_NAME, FUNCTIONS_DIR_NAME, GLOBAL_TOOLS_DIR_NAME, GLOBAL_TOOLS_UTILS_DIR_NAME, MACROS_DIR_NAME, MCP_FILE_NAME, ModelsOverride, RAGS_DIR_NAME, ROLES_DIR_NAME, paths, AGENT_GRAPH_FILE_NAME}; +use super::{ + AGENT_GRAPH_FILE_NAME, AGENTS_DIR_NAME, BASH_PROMPT_UTILS_FILE_NAME, CONFIG_FILE_NAME, + ENV_FILE_NAME, FUNCTIONS_BIN_DIR_NAME, FUNCTIONS_DIR_NAME, GLOBAL_TOOLS_DIR_NAME, + GLOBAL_TOOLS_UTILS_DIR_NAME, MACROS_DIR_NAME, MCP_FILE_NAME, ModelsOverride, RAGS_DIR_NAME, + ROLES_DIR_NAME, paths, +}; use crate::client::ProviderModels; use crate::utils::{get_env_name, list_file_names, normalize_env_name}; diff --git a/src/function/mod.rs b/src/function/mod.rs index 1512d83..30751d0 100644 --- a/src/function/mod.rs +++ b/src/function/mod.rs @@ -317,6 +317,25 @@ impl Functions { self.declarations.iter().find(|v| v.name == name) } + /// Narrow the declared tool list to a caller-supplied whitelist. + /// Entries are matched by exact name. The shorthand `mcp:` + /// expands to the three MCP meta-functions Loki registers per + /// server (`mcp_invoke_`, `mcp_search_`, + /// `mcp_describe_`). + pub fn retain_named(&mut self, allowed: &[String]) { + let mut expanded: std::collections::HashSet = std::collections::HashSet::new(); + for entry in allowed { + if let Some(server) = entry.strip_prefix("mcp:") { + expanded.insert(format!("{MCP_INVOKE_META_FUNCTION_NAME_PREFIX}_{server}")); + expanded.insert(format!("{MCP_SEARCH_META_FUNCTION_NAME_PREFIX}_{server}")); + expanded.insert(format!("{MCP_DESCRIBE_META_FUNCTION_NAME_PREFIX}_{server}")); + } else { + expanded.insert(entry.clone()); + } + } + self.declarations.retain(|d| expanded.contains(&d.name)); + } + pub fn contains(&self, name: &str) -> bool { self.declarations.iter().any(|v| v.name == name) } @@ -1730,4 +1749,76 @@ mod tests { assert_eq!(result.call.name, "my_tool"); assert_eq!(result.output, json!({"result": "ok"})); } + + fn function_with_names(names: &[&str]) -> Functions { + let declarations = names + .iter() + .map(|n| FunctionDeclaration { + name: (*n).to_string(), + description: String::new(), + parameters: JsonSchema::default(), + agent: false, + }) + .collect(); + Functions { declarations } + } + + #[test] + fn retain_named_keeps_only_exact_matches() { + let mut f = function_with_names(&["read_query", "describe_table", "web_search_loki"]); + f.retain_named(&["read_query".to_string()]); + assert!(f.contains("read_query")); + assert!(!f.contains("describe_table")); + assert!(!f.contains("web_search_loki")); + } + + #[test] + fn retain_named_with_empty_list_removes_all() { + let mut f = function_with_names(&["a", "b", "c"]); + f.retain_named(&[]); + assert!(!f.contains("a")); + assert!(!f.contains("b")); + assert!(!f.contains("c")); + assert!(f.is_empty()); + } + + #[test] + fn retain_named_with_unknown_name_drops_everything() { + let mut f = function_with_names(&["a", "b"]); + f.retain_named(&["nonexistent".to_string()]); + assert!(f.is_empty()); + } + + #[test] + fn retain_named_with_mcp_shorthand_keeps_all_three_meta_functions() { + let mut f = Functions::default(); + f.append_mcp_meta_functions(vec!["github".to_string(), "slack".to_string()]); + f.retain_named(&["mcp:github".to_string()]); + assert!(f.contains("mcp_invoke_github")); + assert!(f.contains("mcp_search_github")); + assert!(f.contains("mcp_describe_github")); + assert!(!f.contains("mcp_invoke_slack")); + assert!(!f.contains("mcp_search_slack")); + assert!(!f.contains("mcp_describe_slack")); + } + + #[test] + fn retain_named_mixes_exact_names_and_mcp_shorthand() { + let mut f = function_with_names(&["read_query", "describe_table"]); + f.append_mcp_meta_functions(vec!["pubmed".to_string()]); + f.retain_named(&["read_query".to_string(), "mcp:pubmed".to_string()]); + assert!(f.contains("read_query")); + assert!(!f.contains("describe_table")); + assert!(f.contains("mcp_invoke_pubmed")); + assert!(f.contains("mcp_search_pubmed")); + assert!(f.contains("mcp_describe_pubmed")); + } + + #[test] + fn retain_named_with_mcp_shorthand_for_unknown_server_drops_other_servers() { + let mut f = Functions::default(); + f.append_mcp_meta_functions(vec!["alpha".to_string()]); + f.retain_named(&["mcp:beta".to_string()]); + assert!(!f.contains("mcp_invoke_alpha")); + } } diff --git a/src/function/supervisor.rs b/src/function/supervisor.rs index a8d2659..46e27ab 100644 --- a/src/function/supervisor.rs +++ b/src/function/supervisor.rs @@ -332,6 +332,15 @@ pub fn run_child_agent( abort_signal: AbortSignal, ) -> Pin> + Send>> { Box::pin(async move { + if crate::graph::active_agent_graph_name(&child_ctx).is_some() { + return crate::graph::run_active_agent_graph( + &mut child_ctx, + &initial_input.text(), + abort_signal, + ) + .await; + } + let mut accumulated_output = String::new(); let mut input = initial_input; let app = Arc::clone(&child_ctx.app.config); diff --git a/src/graph/agent.rs b/src/graph/agent.rs index c8dbff6..997bb5f 100644 --- a/src/graph/agent.rs +++ b/src/graph/agent.rs @@ -9,6 +9,7 @@ use super::state::StateManager; use super::types::AgentNode; use crate::config::RequestContext; use crate::function::supervisor::run_agent_for_graph; +use crate::utils::dimmed_text; use anyhow::{Context, Result}; use serde_json::Value; use std::time::Duration; @@ -31,6 +32,14 @@ impl AgentNodeExecutor { .interpolate(&node.prompt) .with_context(|| format!("Failed to interpolate prompt for agent '{}'", node.agent))?; + eprintln!( + "{}", + dimmed_text(&format!("▸ spawning agent '{}' with prompt:", node.agent)) + ); + for line in indent_prompt(&prompt, 6) { + eprintln!("{}", dimmed_text(&line)); + } + let timeout_dur = Duration::from_secs(node.timeout.unwrap_or(DEFAULT_TIMEOUT_SECS)); let output = timeout( @@ -53,6 +62,21 @@ impl AgentNodeExecutor { } } +fn indent_prompt(prompt: &str, prefix_spaces: usize) -> Vec { + const MAX_LINES: usize = 12; + let pad = " ".repeat(prefix_spaces); + let mut out: Vec = prompt + .lines() + .take(MAX_LINES) + .map(|line| format!("{pad}{line}")) + .collect(); + let total = prompt.lines().count(); + if total > MAX_LINES { + out.push(format!("{pad}... ({} more lines)", total - MAX_LINES)); + } + out +} + /// Exposes the agent's output as `{{output}}` for template evaluation, then /// applies every key/template in `state_updates`. The temporary `output` /// state key is removed at the end so it doesn't leak into subsequent diff --git a/src/graph/dispatch.rs b/src/graph/dispatch.rs new file mode 100644 index 0000000..d9a6a94 --- /dev/null +++ b/src/graph/dispatch.rs @@ -0,0 +1,59 @@ +//! Helpers for running the active agent through its `graph.yaml` instead +//! of the LLM loop. Used at every agent-execution entry point: top-level +//! CLI (`start_directive`), REPL (`ask`), and child-agent spawn +//! (`run_child_agent`). + +use super::{GraphExecutor, GraphParser, agent_has_graph}; +use crate::config::RequestContext; +use crate::config::paths; +use crate::utils::AbortSignal; +use anyhow::{Context, Result}; +use serde_json::Value; + +/// If the active agent owns a `graph.yaml`, returns its name. Lets +/// callers branch between graph and LLM-loop execution without +/// re-implementing the lookup. +pub fn active_agent_graph_name(ctx: &RequestContext) -> Option { + let name = ctx.agent.as_ref()?.name().to_string(); + agent_has_graph(&name).then_some(name) +} + +/// Run the active agent's graph end-to-end and return the resolved +/// End-node output. The caller's prompt is seeded into the graph state +/// as `initial_prompt` so nodes can reference it via +/// `{{initial_prompt}}`. Any sub-agents the graph spawned via the +/// supervisor are cancelled on return. +pub async fn run_active_agent_graph( + ctx: &mut RequestContext, + prompt: &str, + abort_signal: AbortSignal, +) -> Result { + let agent_name = active_agent_graph_name(ctx) + .ok_or_else(|| anyhow::anyhow!("Active agent has no graph.yaml"))?; + + log::info!("Agent '{agent_name}' has graph.yaml; routing to graph executor"); + + let agent_dir = paths::agent_data_dir(&agent_name); + let graph_path = paths::agent_graph_path(&agent_name); + + let parser = GraphParser::new(&agent_dir); + let mut graph = parser + .load_from_file(&graph_path) + .with_context(|| format!("Failed to load graph.yaml for agent '{agent_name}'"))?; + + graph + .initial_state + .insert("initial_prompt".into(), Value::String(prompt.to_string())); + + let executor = GraphExecutor::new(graph, agent_dir); + let output = executor + .execute(ctx, abort_signal) + .await + .with_context(|| format!("Graph execution failed for agent '{agent_name}'"))?; + + if let Some(supervisor) = ctx.supervisor.clone() { + supervisor.read().cancel_all(); + } + + Ok(output) +} diff --git a/src/graph/executor.rs b/src/graph/executor.rs index c9adbf7..76f359c 100644 --- a/src/graph/executor.rs +++ b/src/graph/executor.rs @@ -14,8 +14,8 @@ use super::types::{EndNode, Graph, Node, NodeType}; use super::user_interaction::{ApprovalNodeExecutor, InputNodeExecutor}; use super::validator::GraphValidator; use crate::config::RequestContext; -use crate::utils::AbortSignal; -use anyhow::{Context, Result, bail, anyhow}; +use crate::utils::{AbortSignal, dimmed_text}; +use anyhow::{Context, Result, anyhow, bail}; use serde_json::Value; use std::collections::HashMap; use std::path::{Path, PathBuf}; @@ -74,6 +74,10 @@ impl GraphExecutor { let mut current = graph.start.clone(); info!("[graph:{}] start at '{}'", graph.name, current); + eprintln!( + "{}", + dimmed_text(&format!("▸ graph: {} (start: {})", graph.name, current)) + ); let output = loop { if abort_signal.aborted() { @@ -102,14 +106,18 @@ impl GraphExecutor { ); } - let node = graph.get_node(¤t).ok_or_else(|| { - anyhow!("Node '{}' not found in graph '{}'", current, graph.name) - })?; + let node = graph + .get_node(¤t) + .ok_or_else(|| anyhow!("Node '{}' not found in graph '{}'", current, graph.name))?; debug!( "[graph:{}] entering '{}' (visit {})", graph.name, current, visits ); + eprintln!( + "{}", + dimmed_text(&format!("▸ {} ({})", current, node_type_label(node))) + ); let next = step( node, @@ -134,6 +142,13 @@ impl GraphExecutor { current, start.elapsed() ); + eprintln!( + "{}", + dimmed_text(&format!( + "▸ graph done in {:.2}s", + start.elapsed().as_secs_f64() + )) + ); break out; } } @@ -148,6 +163,16 @@ enum StepResult { End(String), } +fn node_type_label(node: &Node) -> &'static str { + match &node.node_type { + NodeType::Agent(_) => "agent", + NodeType::Script(_) => "script", + NodeType::Approval(_) => "approval", + NodeType::Input(_) => "input", + NodeType::End(_) => "end", + } +} + async fn step( node: &Node, state: &mut StateManager, @@ -179,9 +204,7 @@ async fn step( } }; let next = dynamic.or_else(|| node.next.clone()).ok_or_else(|| { - anyhow!( - "script node '{current}' did not emit `_next` and has no static `next`" - ) + anyhow!("script node '{current}' did not emit `_next` and has no static `next`") })?; Ok(StepResult::Continue(next)) } diff --git a/src/graph/mod.rs b/src/graph/mod.rs index 1f4d200..06947cc 100644 --- a/src/graph/mod.rs +++ b/src/graph/mod.rs @@ -2,6 +2,7 @@ //! JSON state, composed of agent/script/approval/input/end nodes. pub mod agent; +pub mod dispatch; pub mod executor; pub mod parser; pub mod script; @@ -11,6 +12,7 @@ pub mod user_interaction; pub mod validator; pub use agent::AgentNodeExecutor; +pub use dispatch::{active_agent_graph_name, run_active_agent_graph}; pub use executor::GraphExecutor; pub use parser::{GraphParser, agent_has_graph, load_agent_graph}; pub use script::ScriptExecutor; diff --git a/src/graph/script.rs b/src/graph/script.rs index bf702e1..8d842f9 100644 --- a/src/graph/script.rs +++ b/src/graph/script.rs @@ -9,6 +9,7 @@ use super::state::{StateManager, StateRepresentation}; use super::types::ScriptNode; use crate::function::Language; +use crate::utils::dimmed_text; use anyhow::{Context, Result, anyhow, bail}; use serde_json::Value; use std::path::{Path, PathBuf}; @@ -45,6 +46,11 @@ impl ScriptExecutor { bail!("Script file not found: '{}'", script_path.display()); } + eprintln!( + "{}", + dimmed_text(&format!("▸ running script '{}'", node.script)) + ); + let language = detect_language(&script_path)?; let state_repr = state_manager.serialize_state()?; @@ -106,6 +112,23 @@ impl ScriptExecutor { ) })?; + if let Ok(parsed) = serde_json::from_str::>(json_output) { + let keys: Vec<&str> = parsed + .keys() + .filter(|k| k.as_str() != "_next") + .map(|s| s.as_str()) + .collect(); + if !keys.is_empty() { + eprintln!( + "{}", + dimmed_text(&format!("▸ merged: {}", keys.join(", "))) + ); + } + if let Some(n) = &next { + eprintln!("{}", dimmed_text(&format!("▸ script set _next = '{n}'"))); + } + } + apply_state_updates(node, state_manager); Ok(next) diff --git a/src/graph/types.rs b/src/graph/types.rs index 1dd959d..e1dbe3f 100644 --- a/src/graph/types.rs +++ b/src/graph/types.rs @@ -118,8 +118,9 @@ pub enum NodeType { /// `agent`-type node: spawn an agent with a templated prompt. The agent /// uses the full tool stack from its own directory (`global_tools` and /// `mcp_servers` in `config.yaml` plus any per-agent `tools.{sh,py,ts,js}` -/// script). There is no per-node tool override; to use different tool -/// sets, create agent variants. See `docs/graph-agents/agent-tools.md`. +/// script); there is no per-node tool override here. For tool-filtered +/// one-shot LLM steps, use an `llm`-type node (future). To use different +/// tool sets via agent variants, see `docs/graph-agents/agent-tools.md`. #[derive(Debug, Clone, Deserialize, Serialize)] pub struct AgentNode { pub agent: String, diff --git a/src/main.rs b/src/main.rs index 0e09e23..6596898 100644 --- a/src/main.rs +++ b/src/main.rs @@ -312,6 +312,17 @@ async fn start_directive( abort_signal: AbortSignal, ) -> Result<()> { let app: Arc = Arc::clone(&ctx.app.config); + + if graph::active_agent_graph_name(ctx).is_some() { + ctx.before_chat_completion(&input)?; + let output = + graph::run_active_agent_graph(ctx, &input.text(), abort_signal.clone()).await?; + app.print_markdown(&output)?; + ctx.after_chat_completion(app.as_ref(), &input, &output, &[])?; + ctx.exit_session()?; + return Ok(()); + } + let client = input.create_client()?; let extract_code = !*IS_STDOUT_TERMINAL && code_mode; ctx.before_chat_completion(&input)?; diff --git a/src/repl/mod.rs b/src/repl/mod.rs index c46426e..32ee455 100644 --- a/src/repl/mod.rs +++ b/src/repl/mod.rs @@ -858,8 +858,18 @@ async fn ask( tokio::time::sleep(std::time::Duration::from_millis(100)).await; } - let client = input.create_client()?; let app = Arc::clone(&ctx.app.config); + + if crate::graph::active_agent_graph_name(ctx).is_some() { + ctx.before_chat_completion(&input)?; + let output = + crate::graph::run_active_agent_graph(ctx, &input.text(), abort_signal.clone()).await?; + app.print_markdown(&output)?; + ctx.after_chat_completion(app.as_ref(), &input, &output, &[])?; + return Ok(()); + } + + let client = input.create_client()?; ctx.before_chat_completion(&input)?; let (output, tool_results) = if input.stream() { call_chat_completions_streaming(&input, client.as_ref(), ctx, abort_signal.clone()).await?