From 534b9923ae495b08168e38037d9393cbdb5bd89c Mon Sep 17 00:00:00 2001 From: Alex Clarke Date: Wed, 13 May 2026 13:21:45 -0600 Subject: [PATCH] feat: Added initial support for native Loki agent nodes in the graph-based agent system --- src/function/supervisor.rs | 94 ++++++++++++++++++- src/graph/agent.rs | 188 +++++++++++++++++++++++++++++++++++++ src/graph/mod.rs | 2 + 3 files changed, 283 insertions(+), 1 deletion(-) create mode 100644 src/graph/agent.rs diff --git a/src/function/supervisor.rs b/src/function/supervisor.rs index 00f3940..a8d2659 100644 --- a/src/function/supervisor.rs +++ b/src/function/supervisor.rs @@ -326,7 +326,7 @@ pub async fn handle_supervisor_tool( } } -fn run_child_agent( +pub fn run_child_agent( mut child_ctx: RequestContext, initial_input: Input, abort_signal: AbortSignal, @@ -374,6 +374,98 @@ fn run_child_agent( }) } +/// Spawn an agent synchronously from a graph node and return its accumulated +/// output. This is similar to `handle_spawn` but runs the child agent in the +/// current task (no tokio::spawn, no supervisor handle registration) so the +/// graph executor can sequence agent nodes directly. +pub async fn run_agent_for_graph( + parent_ctx: &mut RequestContext, + agent_name: &str, + prompt: &str, +) -> Result { + let short_uuid = &Uuid::new_v4().to_string()[..8]; + let agent_id = format!("graph_agent_{agent_name}_{short_uuid}"); + let current_depth = parent_ctx.current_depth + 1; + + if let Some(supervisor) = parent_ctx.supervisor.as_ref().cloned() { + let max_depth = supervisor.read().max_depth(); + if current_depth > max_depth { + bail!("Max agent depth exceeded ({current_depth}/{max_depth})"); + } + } + + if !parent_ctx.app.config.function_calling_support { + bail!("Function calling support must be enabled to spawn agents."); + } + + let child_inbox = Arc::new(Inbox::new()); + parent_ctx.ensure_root_escalation_queue(); + let child_abort = create_abort_signal(); + + let app_config = Arc::clone(&parent_ctx.app.config); + let current_model = parent_ctx.current_model().clone(); + let info_flag = parent_ctx.info_flag; + let child_app_state = Arc::new(AppState { + config: Arc::new(app_config.as_ref().clone()), + vault: parent_ctx.app.vault.clone(), + mcp_factory: parent_ctx.app.mcp_factory.clone(), + rag_cache: parent_ctx.app.rag_cache.clone(), + mcp_config: parent_ctx.app.mcp_config.clone(), + mcp_log_path: parent_ctx.app.mcp_log_path.clone(), + mcp_registry: parent_ctx.app.mcp_registry.clone(), + functions: parent_ctx.app.functions.clone(), + }); + + let agent = Agent::init( + app_config.as_ref(), + child_app_state.as_ref(), + ¤t_model, + info_flag, + agent_name, + child_abort.clone(), + ) + .await?; + + let agent_mcp_servers = agent.mcp_server_names().to_vec(); + let session = agent.agent_session().map(|v| v.to_string()); + let should_init_supervisor = agent.can_spawn_agents(); + let agent_max_concurrent = agent.max_concurrent_agents(); + let agent_max_depth = agent.max_agent_depth(); + + let mut child_ctx = RequestContext::new_for_child( + Arc::clone(&child_app_state), + parent_ctx, + current_depth, + Arc::clone(&child_inbox), + agent_id.clone(), + ); + child_ctx.rag = agent.rag(); + child_ctx.agent = Some(agent); + if should_init_supervisor { + child_ctx.supervisor = Some(Arc::new(RwLock::new(Supervisor::new( + agent_max_concurrent, + agent_max_depth, + )))); + } + + if let Some(session) = session { + child_ctx + .use_session(app_config.as_ref(), Some(&session), child_abort.clone()) + .await?; + sync_agent_functions_to_ctx(&mut child_ctx)?; + } else { + populate_agent_mcp_runtime(&mut child_ctx, &agent_mcp_servers).await?; + sync_agent_functions_to_ctx(&mut child_ctx)?; + child_ctx.init_agent_shared_variables()?; + } + + let input = Input::from_str(&child_ctx, prompt, None); + + debug!("Spawning agent '{agent_name}' for graph node as '{agent_id}'"); + + run_child_agent(child_ctx, input, child_abort).await +} + async fn populate_agent_mcp_runtime(ctx: &mut RequestContext, server_ids: &[String]) -> Result<()> { if !ctx.app.config.mcp_server_support { return Ok(()); diff --git a/src/graph/agent.rs b/src/graph/agent.rs new file mode 100644 index 0000000..c8dbff6 --- /dev/null +++ b/src/graph/agent.rs @@ -0,0 +1,188 @@ +//! Execution of `agent`-type graph nodes. +//! +//! Spawns a child agent via `function::supervisor::run_agent_for_graph`, +//! interpolating the prompt against current graph state. After the agent +//! finishes, applies the node's `state_updates` (templates can reference +//! `{{output}}` for the agent's stdout). + +use super::state::StateManager; +use super::types::AgentNode; +use crate::config::RequestContext; +use crate::function::supervisor::run_agent_for_graph; +use anyhow::{Context, Result}; +use serde_json::Value; +use std::time::Duration; +use tokio::time::timeout; + +const OUTPUT_KEY: &str = "output"; +const DEFAULT_TIMEOUT_SECS: u64 = 300; + +pub struct AgentNodeExecutor; + +impl AgentNodeExecutor { + /// Interpolate the node's prompt, spawn the agent, wait for it to + /// finish, then apply `state_updates`. Returns the agent's full output. + pub async fn execute( + node: &AgentNode, + state_manager: &mut StateManager, + parent_ctx: &mut RequestContext, + ) -> Result { + let prompt = state_manager + .interpolate(&node.prompt) + .with_context(|| format!("Failed to interpolate prompt for agent '{}'", node.agent))?; + + let timeout_dur = Duration::from_secs(node.timeout.unwrap_or(DEFAULT_TIMEOUT_SECS)); + + let output = timeout( + timeout_dur, + run_agent_for_graph(parent_ctx, &node.agent, &prompt), + ) + .await + .with_context(|| { + format!( + "Agent '{}' timed out after {}s", + node.agent, + timeout_dur.as_secs() + ) + })? + .with_context(|| format!("Agent '{}' failed", node.agent))?; + + apply_state_updates(node, state_manager, &output); + + Ok(output) + } +} + +/// Exposes the agent's output as `{{output}}` for template evaluation, then +/// applies every key/template in `state_updates`. The temporary `output` +/// state key is removed at the end so it doesn't leak into subsequent +/// nodes' templates. +fn apply_state_updates(node: &AgentNode, state_manager: &mut StateManager, output: &str) { + let Some(updates) = &node.state_updates else { + return; + }; + let prev_output = state_manager.state().get(OUTPUT_KEY).cloned(); + state_manager + .state_mut() + .set(OUTPUT_KEY.into(), Value::String(output.to_string())); + + for (key, template) in updates { + let value = state_manager.interpolate_lenient(template); + state_manager + .state_mut() + .set(key.clone(), Value::String(value)); + } + + match prev_output { + Some(v) => state_manager.state_mut().set(OUTPUT_KEY.into(), v), + None => { + state_manager + .state_mut() + .set(OUTPUT_KEY.into(), Value::Null); + } + } +} + +#[cfg(test)] +mod tests { + use super::super::types::AgentNode; + use super::*; + use serde_json::json; + use std::collections::HashMap; + + fn manager_with(pairs: &[(&str, Value)]) -> StateManager { + let mut map = HashMap::new(); + for (k, v) in pairs { + map.insert((*k).into(), v.clone()); + } + StateManager::new(map) + } + + fn node_with(prompt: &str, updates: Option>) -> AgentNode { + AgentNode { + agent: "test_agent".into(), + prompt: prompt.into(), + state_updates: updates, + timeout: None, + } + } + + #[test] + fn state_updates_use_output_placeholder() { + let node = { + let mut u = HashMap::new(); + u.insert("findings".into(), "{{output}}".into()); + node_with("hi", Some(u)) + }; + let mut state = manager_with(&[]); + apply_state_updates(&node, &mut state, "agent finished its work"); + assert_eq!( + state.state().get("findings"), + Some(&json!("agent finished its work")) + ); + } + + #[test] + fn state_updates_can_reference_existing_keys_and_output() { + let node = { + let mut u = HashMap::new(); + u.insert("summary".into(), "{{topic}}: {{output}}".into()); + node_with("hi", Some(u)) + }; + let mut state = manager_with(&[("topic", json!("auth"))]); + apply_state_updates(&node, &mut state, "JWT vs sessions"); + assert_eq!( + state.state().get("summary"), + Some(&json!("auth: JWT vs sessions")) + ); + } + + #[test] + fn output_key_is_cleaned_up_after_state_updates() { + let node = { + let mut u = HashMap::new(); + u.insert("findings".into(), "{{output}}".into()); + node_with("hi", Some(u)) + }; + let mut state = manager_with(&[]); + apply_state_updates(&node, &mut state, "anything"); + assert_eq!(state.state().get("output"), Some(&Value::Null)); + } + + #[test] + fn pre_existing_output_value_is_preserved() { + let node = { + let mut u = HashMap::new(); + u.insert("greeting".into(), "{{output}}".into()); + node_with("hi", Some(u)) + }; + let mut state = manager_with(&[("output", json!("preserved"))]); + apply_state_updates(&node, &mut state, "new agent output"); + assert_eq!( + state.state().get("greeting"), + Some(&json!("new agent output")) + ); + assert_eq!(state.state().get("output"), Some(&json!("preserved"))); + } + + #[test] + fn no_state_updates_is_a_noop() { + let node = node_with("hi", None); + let mut state = manager_with(&[("k", json!("v"))]); + apply_state_updates(&node, &mut state, "ignored"); + assert_eq!(state.state().get("k"), Some(&json!("v"))); + assert!(state.state().get("output").is_none()); + } + + #[test] + fn interpolate_lenient_on_state_updates_handles_missing_keys() { + let node = { + let mut u = HashMap::new(); + u.insert("decorated".into(), "[{{missing}}] {{output}}".into()); + node_with("hi", Some(u)) + }; + let mut state = manager_with(&[]); + apply_state_updates(&node, &mut state, "DATA"); + assert_eq!(state.state().get("decorated"), Some(&json!("[] DATA"))); + } +} diff --git a/src/graph/mod.rs b/src/graph/mod.rs index 3b342fc..2b013c7 100644 --- a/src/graph/mod.rs +++ b/src/graph/mod.rs @@ -1,12 +1,14 @@ //! Graph-based agent orchestration. Declarative YAML workflows over a shared //! JSON state, composed of agent/script/approval/input/end nodes. +pub mod agent; pub mod parser; pub mod script; pub mod state; pub mod types; pub mod validator; +pub use agent::AgentNodeExecutor; pub use parser::{GraphParser, agent_has_graph, load_agent_graph}; pub use script::ScriptExecutor; pub use state::{StateManager, StateRepresentation};