feat: wired together graph execution and agent graph dispatch

This commit is contained in:
2026-05-14 11:10:45 -06:00
parent 84497d3d65
commit 9bc4f8b621
11 changed files with 270 additions and 12 deletions
+6 -1
View File
@@ -1,5 +1,10 @@
use super::role::Role;
use super::{AGENTS_DIR_NAME, BASH_PROMPT_UTILS_FILE_NAME, CONFIG_FILE_NAME, ENV_FILE_NAME, FUNCTIONS_BIN_DIR_NAME, FUNCTIONS_DIR_NAME, GLOBAL_TOOLS_DIR_NAME, GLOBAL_TOOLS_UTILS_DIR_NAME, MACROS_DIR_NAME, MCP_FILE_NAME, ModelsOverride, RAGS_DIR_NAME, ROLES_DIR_NAME, paths, AGENT_GRAPH_FILE_NAME};
use super::{
AGENT_GRAPH_FILE_NAME, AGENTS_DIR_NAME, BASH_PROMPT_UTILS_FILE_NAME, CONFIG_FILE_NAME,
ENV_FILE_NAME, FUNCTIONS_BIN_DIR_NAME, FUNCTIONS_DIR_NAME, GLOBAL_TOOLS_DIR_NAME,
GLOBAL_TOOLS_UTILS_DIR_NAME, MACROS_DIR_NAME, MCP_FILE_NAME, ModelsOverride, RAGS_DIR_NAME,
ROLES_DIR_NAME, paths,
};
use crate::client::ProviderModels;
use crate::utils::{get_env_name, list_file_names, normalize_env_name};
+91
View File
@@ -317,6 +317,25 @@ impl Functions {
self.declarations.iter().find(|v| v.name == name)
}
/// Narrow the declared tool list to a caller-supplied whitelist.
/// Entries are matched by exact name. The shorthand `mcp:<server>`
/// expands to the three MCP meta-functions Loki registers per
/// server (`mcp_invoke_<server>`, `mcp_search_<server>`,
/// `mcp_describe_<server>`).
pub fn retain_named(&mut self, allowed: &[String]) {
let mut expanded: std::collections::HashSet<String> = std::collections::HashSet::new();
for entry in allowed {
if let Some(server) = entry.strip_prefix("mcp:") {
expanded.insert(format!("{MCP_INVOKE_META_FUNCTION_NAME_PREFIX}_{server}"));
expanded.insert(format!("{MCP_SEARCH_META_FUNCTION_NAME_PREFIX}_{server}"));
expanded.insert(format!("{MCP_DESCRIBE_META_FUNCTION_NAME_PREFIX}_{server}"));
} else {
expanded.insert(entry.clone());
}
}
self.declarations.retain(|d| expanded.contains(&d.name));
}
pub fn contains(&self, name: &str) -> bool {
self.declarations.iter().any(|v| v.name == name)
}
@@ -1730,4 +1749,76 @@ mod tests {
assert_eq!(result.call.name, "my_tool");
assert_eq!(result.output, json!({"result": "ok"}));
}
fn function_with_names(names: &[&str]) -> Functions {
let declarations = names
.iter()
.map(|n| FunctionDeclaration {
name: (*n).to_string(),
description: String::new(),
parameters: JsonSchema::default(),
agent: false,
})
.collect();
Functions { declarations }
}
#[test]
fn retain_named_keeps_only_exact_matches() {
let mut f = function_with_names(&["read_query", "describe_table", "web_search_loki"]);
f.retain_named(&["read_query".to_string()]);
assert!(f.contains("read_query"));
assert!(!f.contains("describe_table"));
assert!(!f.contains("web_search_loki"));
}
#[test]
fn retain_named_with_empty_list_removes_all() {
let mut f = function_with_names(&["a", "b", "c"]);
f.retain_named(&[]);
assert!(!f.contains("a"));
assert!(!f.contains("b"));
assert!(!f.contains("c"));
assert!(f.is_empty());
}
#[test]
fn retain_named_with_unknown_name_drops_everything() {
let mut f = function_with_names(&["a", "b"]);
f.retain_named(&["nonexistent".to_string()]);
assert!(f.is_empty());
}
#[test]
fn retain_named_with_mcp_shorthand_keeps_all_three_meta_functions() {
let mut f = Functions::default();
f.append_mcp_meta_functions(vec!["github".to_string(), "slack".to_string()]);
f.retain_named(&["mcp:github".to_string()]);
assert!(f.contains("mcp_invoke_github"));
assert!(f.contains("mcp_search_github"));
assert!(f.contains("mcp_describe_github"));
assert!(!f.contains("mcp_invoke_slack"));
assert!(!f.contains("mcp_search_slack"));
assert!(!f.contains("mcp_describe_slack"));
}
#[test]
fn retain_named_mixes_exact_names_and_mcp_shorthand() {
let mut f = function_with_names(&["read_query", "describe_table"]);
f.append_mcp_meta_functions(vec!["pubmed".to_string()]);
f.retain_named(&["read_query".to_string(), "mcp:pubmed".to_string()]);
assert!(f.contains("read_query"));
assert!(!f.contains("describe_table"));
assert!(f.contains("mcp_invoke_pubmed"));
assert!(f.contains("mcp_search_pubmed"));
assert!(f.contains("mcp_describe_pubmed"));
}
#[test]
fn retain_named_with_mcp_shorthand_for_unknown_server_drops_other_servers() {
let mut f = Functions::default();
f.append_mcp_meta_functions(vec!["alpha".to_string()]);
f.retain_named(&["mcp:beta".to_string()]);
assert!(!f.contains("mcp_invoke_alpha"));
}
}
+9
View File
@@ -332,6 +332,15 @@ pub fn run_child_agent(
abort_signal: AbortSignal,
) -> Pin<Box<dyn Future<Output = Result<String>> + Send>> {
Box::pin(async move {
if crate::graph::active_agent_graph_name(&child_ctx).is_some() {
return crate::graph::run_active_agent_graph(
&mut child_ctx,
&initial_input.text(),
abort_signal,
)
.await;
}
let mut accumulated_output = String::new();
let mut input = initial_input;
let app = Arc::clone(&child_ctx.app.config);
+24
View File
@@ -9,6 +9,7 @@ use super::state::StateManager;
use super::types::AgentNode;
use crate::config::RequestContext;
use crate::function::supervisor::run_agent_for_graph;
use crate::utils::dimmed_text;
use anyhow::{Context, Result};
use serde_json::Value;
use std::time::Duration;
@@ -31,6 +32,14 @@ impl AgentNodeExecutor {
.interpolate(&node.prompt)
.with_context(|| format!("Failed to interpolate prompt for agent '{}'", node.agent))?;
eprintln!(
"{}",
dimmed_text(&format!("▸ spawning agent '{}' with prompt:", node.agent))
);
for line in indent_prompt(&prompt, 6) {
eprintln!("{}", dimmed_text(&line));
}
let timeout_dur = Duration::from_secs(node.timeout.unwrap_or(DEFAULT_TIMEOUT_SECS));
let output = timeout(
@@ -53,6 +62,21 @@ impl AgentNodeExecutor {
}
}
fn indent_prompt(prompt: &str, prefix_spaces: usize) -> Vec<String> {
const MAX_LINES: usize = 12;
let pad = " ".repeat(prefix_spaces);
let mut out: Vec<String> = prompt
.lines()
.take(MAX_LINES)
.map(|line| format!("{pad}{line}"))
.collect();
let total = prompt.lines().count();
if total > MAX_LINES {
out.push(format!("{pad}... ({} more lines)", total - MAX_LINES));
}
out
}
/// Exposes the agent's output as `{{output}}` for template evaluation, then
/// applies every key/template in `state_updates`. The temporary `output`
/// state key is removed at the end so it doesn't leak into subsequent
+59
View File
@@ -0,0 +1,59 @@
//! Helpers for running the active agent through its `graph.yaml` instead
//! of the LLM loop. Used at every agent-execution entry point: top-level
//! CLI (`start_directive`), REPL (`ask`), and child-agent spawn
//! (`run_child_agent`).
use super::{GraphExecutor, GraphParser, agent_has_graph};
use crate::config::RequestContext;
use crate::config::paths;
use crate::utils::AbortSignal;
use anyhow::{Context, Result};
use serde_json::Value;
/// If the active agent owns a `graph.yaml`, returns its name. Lets
/// callers branch between graph and LLM-loop execution without
/// re-implementing the lookup.
pub fn active_agent_graph_name(ctx: &RequestContext) -> Option<String> {
let name = ctx.agent.as_ref()?.name().to_string();
agent_has_graph(&name).then_some(name)
}
/// Run the active agent's graph end-to-end and return the resolved
/// End-node output. The caller's prompt is seeded into the graph state
/// as `initial_prompt` so nodes can reference it via
/// `{{initial_prompt}}`. Any sub-agents the graph spawned via the
/// supervisor are cancelled on return.
pub async fn run_active_agent_graph(
ctx: &mut RequestContext,
prompt: &str,
abort_signal: AbortSignal,
) -> Result<String> {
let agent_name = active_agent_graph_name(ctx)
.ok_or_else(|| anyhow::anyhow!("Active agent has no graph.yaml"))?;
log::info!("Agent '{agent_name}' has graph.yaml; routing to graph executor");
let agent_dir = paths::agent_data_dir(&agent_name);
let graph_path = paths::agent_graph_path(&agent_name);
let parser = GraphParser::new(&agent_dir);
let mut graph = parser
.load_from_file(&graph_path)
.with_context(|| format!("Failed to load graph.yaml for agent '{agent_name}'"))?;
graph
.initial_state
.insert("initial_prompt".into(), Value::String(prompt.to_string()));
let executor = GraphExecutor::new(graph, agent_dir);
let output = executor
.execute(ctx, abort_signal)
.await
.with_context(|| format!("Graph execution failed for agent '{agent_name}'"))?;
if let Some(supervisor) = ctx.supervisor.clone() {
supervisor.read().cancel_all();
}
Ok(output)
}
+31 -8
View File
@@ -14,8 +14,8 @@ use super::types::{EndNode, Graph, Node, NodeType};
use super::user_interaction::{ApprovalNodeExecutor, InputNodeExecutor};
use super::validator::GraphValidator;
use crate::config::RequestContext;
use crate::utils::AbortSignal;
use anyhow::{Context, Result, bail, anyhow};
use crate::utils::{AbortSignal, dimmed_text};
use anyhow::{Context, Result, anyhow, bail};
use serde_json::Value;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
@@ -74,6 +74,10 @@ impl GraphExecutor {
let mut current = graph.start.clone();
info!("[graph:{}] start at '{}'", graph.name, current);
eprintln!(
"{}",
dimmed_text(&format!("▸ graph: {} (start: {})", graph.name, current))
);
let output = loop {
if abort_signal.aborted() {
@@ -102,14 +106,18 @@ impl GraphExecutor {
);
}
let node = graph.get_node(&current).ok_or_else(|| {
anyhow!("Node '{}' not found in graph '{}'", current, graph.name)
})?;
let node = graph
.get_node(&current)
.ok_or_else(|| anyhow!("Node '{}' not found in graph '{}'", current, graph.name))?;
debug!(
"[graph:{}] entering '{}' (visit {})",
graph.name, current, visits
);
eprintln!(
"{}",
dimmed_text(&format!("{} ({})", current, node_type_label(node)))
);
let next = step(
node,
@@ -134,6 +142,13 @@ impl GraphExecutor {
current,
start.elapsed()
);
eprintln!(
"{}",
dimmed_text(&format!(
"▸ graph done in {:.2}s",
start.elapsed().as_secs_f64()
))
);
break out;
}
}
@@ -148,6 +163,16 @@ enum StepResult {
End(String),
}
fn node_type_label(node: &Node) -> &'static str {
match &node.node_type {
NodeType::Agent(_) => "agent",
NodeType::Script(_) => "script",
NodeType::Approval(_) => "approval",
NodeType::Input(_) => "input",
NodeType::End(_) => "end",
}
}
async fn step(
node: &Node,
state: &mut StateManager,
@@ -179,9 +204,7 @@ async fn step(
}
};
let next = dynamic.or_else(|| node.next.clone()).ok_or_else(|| {
anyhow!(
"script node '{current}' did not emit `_next` and has no static `next`"
)
anyhow!("script node '{current}' did not emit `_next` and has no static `next`")
})?;
Ok(StepResult::Continue(next))
}
+2
View File
@@ -2,6 +2,7 @@
//! JSON state, composed of agent/script/approval/input/end nodes.
pub mod agent;
pub mod dispatch;
pub mod executor;
pub mod parser;
pub mod script;
@@ -11,6 +12,7 @@ pub mod user_interaction;
pub mod validator;
pub use agent::AgentNodeExecutor;
pub use dispatch::{active_agent_graph_name, run_active_agent_graph};
pub use executor::GraphExecutor;
pub use parser::{GraphParser, agent_has_graph, load_agent_graph};
pub use script::ScriptExecutor;
+23
View File
@@ -9,6 +9,7 @@
use super::state::{StateManager, StateRepresentation};
use super::types::ScriptNode;
use crate::function::Language;
use crate::utils::dimmed_text;
use anyhow::{Context, Result, anyhow, bail};
use serde_json::Value;
use std::path::{Path, PathBuf};
@@ -45,6 +46,11 @@ impl ScriptExecutor {
bail!("Script file not found: '{}'", script_path.display());
}
eprintln!(
"{}",
dimmed_text(&format!("▸ running script '{}'", node.script))
);
let language = detect_language(&script_path)?;
let state_repr = state_manager.serialize_state()?;
@@ -106,6 +112,23 @@ impl ScriptExecutor {
)
})?;
if let Ok(parsed) = serde_json::from_str::<serde_json::Map<String, Value>>(json_output) {
let keys: Vec<&str> = parsed
.keys()
.filter(|k| k.as_str() != "_next")
.map(|s| s.as_str())
.collect();
if !keys.is_empty() {
eprintln!(
"{}",
dimmed_text(&format!("▸ merged: {}", keys.join(", ")))
);
}
if let Some(n) = &next {
eprintln!("{}", dimmed_text(&format!("▸ script set _next = '{n}'")));
}
}
apply_state_updates(node, state_manager);
Ok(next)
+3 -2
View File
@@ -118,8 +118,9 @@ pub enum NodeType {
/// `agent`-type node: spawn an agent with a templated prompt. The agent
/// uses the full tool stack from its own directory (`global_tools` and
/// `mcp_servers` in `config.yaml` plus any per-agent `tools.{sh,py,ts,js}`
/// script). There is no per-node tool override; to use different tool
/// sets, create agent variants. See `docs/graph-agents/agent-tools.md`.
/// script); there is no per-node tool override here. For tool-filtered
/// one-shot LLM steps, use an `llm`-type node (future). To use different
/// tool sets via agent variants, see `docs/graph-agents/agent-tools.md`.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AgentNode {
pub agent: String,
+11
View File
@@ -312,6 +312,17 @@ async fn start_directive(
abort_signal: AbortSignal,
) -> Result<()> {
let app: Arc<AppConfig> = Arc::clone(&ctx.app.config);
if graph::active_agent_graph_name(ctx).is_some() {
ctx.before_chat_completion(&input)?;
let output =
graph::run_active_agent_graph(ctx, &input.text(), abort_signal.clone()).await?;
app.print_markdown(&output)?;
ctx.after_chat_completion(app.as_ref(), &input, &output, &[])?;
ctx.exit_session()?;
return Ok(());
}
let client = input.create_client()?;
let extract_code = !*IS_STDOUT_TERMINAL && code_mode;
ctx.before_chat_completion(&input)?;
+11 -1
View File
@@ -858,8 +858,18 @@ async fn ask(
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
let client = input.create_client()?;
let app = Arc::clone(&ctx.app.config);
if crate::graph::active_agent_graph_name(ctx).is_some() {
ctx.before_chat_completion(&input)?;
let output =
crate::graph::run_active_agent_graph(ctx, &input.text(), abort_signal.clone()).await?;
app.print_markdown(&output)?;
ctx.after_chat_completion(app.as_ref(), &input, &output, &[])?;
return Ok(());
}
let client = input.create_client()?;
ctx.before_chat_completion(&input)?;
let (output, tool_results) = if input.stream() {
call_chat_completions_streaming(&input, client.as_ref(), ctx, abort_signal.clone()).await?