From cc8b48c355891cdaedb128f74880aab95c713bed Mon Sep 17 00:00:00 2001 From: Alex Clarke Date: Wed, 13 May 2026 14:29:45 -0600 Subject: [PATCH] feat: implemented support for the graph executor --- src/graph/executor.rs | 316 ++++++++++++++++++++++++++++++++++ src/graph/mod.rs | 2 + src/graph/types.rs | 8 +- src/graph/user_interaction.rs | 2 +- 4 files changed, 324 insertions(+), 4 deletions(-) create mode 100644 src/graph/executor.rs diff --git a/src/graph/executor.rs b/src/graph/executor.rs new file mode 100644 index 0000000..c9adbf7 --- /dev/null +++ b/src/graph/executor.rs @@ -0,0 +1,316 @@ +//! Main execution loop for graph workflows. +//! +//! Dispatches each node to its type-specific executor, handles routing +//! (static `Node.next`, script `_next` override, approval `routes`, input +//! `on_timeout`), enforces `max_loop_iterations` and an optional +//! whole-graph timeout, and resolves the final `End` node's `output` +//! template as the graph's return value. + +use super::agent::AgentNodeExecutor; +use super::parser::GraphParser; +use super::script::ScriptExecutor; +use super::state::StateManager; +use super::types::{EndNode, Graph, Node, NodeType}; +use super::user_interaction::{ApprovalNodeExecutor, InputNodeExecutor}; +use super::validator::GraphValidator; +use crate::config::RequestContext; +use crate::utils::AbortSignal; +use anyhow::{Context, Result, bail, anyhow}; +use serde_json::Value; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::time::{Duration, Instant}; + +pub struct GraphExecutor { + graph: Graph, + base_dir: PathBuf, +} + +impl GraphExecutor { + pub fn new(graph: Graph, base_dir: impl Into) -> Self { + Self { + graph, + base_dir: base_dir.into(), + } + } + + /// Load a graph from disk and construct the executor in one step. + /// `base_dir` is also used to resolve relative script paths. + pub fn from_path(graph_path: impl AsRef, base_dir: impl Into) -> Result { + let base_dir = base_dir.into(); + let parser = GraphParser::new(&base_dir); + let graph = parser.load_from_file(graph_path)?; + Ok(Self::new(graph, base_dir)) + } + + /// Run the graph to completion. Returns the resolved `output` template + /// of the terminal `End` node. + pub async fn execute( + self, + ctx: &mut RequestContext, + abort_signal: AbortSignal, + ) -> Result { + let GraphExecutor { graph, base_dir } = self; + + if graph.settings.validate_before_run { + let validator = GraphValidator::new(&base_dir); + let result = validator.validate(&graph); + for w in &result.warnings { + let where_ = w + .node_id + .as_deref() + .map(|id| format!("[{id}] ")) + .unwrap_or_default(); + warn!("[graph:{}] {}{}", graph.name, where_, w.message); + } + result.into_result()?; + } + + let mut state = StateManager::new(graph.initial_state.clone()); + let script_executor = ScriptExecutor::new(&base_dir); + let max_iterations = graph.settings.max_loop_iterations; + let graph_timeout = graph.settings.timeout.map(Duration::from_secs); + let start = Instant::now(); + + let mut current = graph.start.clone(); + info!("[graph:{}] start at '{}'", graph.name, current); + + let output = loop { + if abort_signal.aborted() { + bail!("Graph '{}' aborted at '{}'", graph.name, current); + } + if let Some(t) = graph_timeout + && start.elapsed() > t + { + bail!( + "Graph '{}' timed out after {}s at '{}'", + graph.name, + t.as_secs(), + current + ); + } + + state.state_mut().visit_node(¤t); + let visits = state.state().loop_count(¤t); + if visits > max_iterations { + bail!( + "Node '{}' visited {} times (max_loop_iterations={}). \ + Possible infinite loop.", + current, + visits, + max_iterations + ); + } + + let node = graph.get_node(¤t).ok_or_else(|| { + anyhow!("Node '{}' not found in graph '{}'", current, graph.name) + })?; + + debug!( + "[graph:{}] entering '{}' (visit {})", + graph.name, current, visits + ); + + let next = step( + node, + &mut state, + ctx, + &script_executor, + &graph.name, + ¤t, + ) + .await + .with_context(|| format!("at node '{current}'"))?; + + match next { + StepResult::Continue(next_id) => { + debug!("[graph:{}] {} -> {}", graph.name, current, next_id); + current = next_id; + } + StepResult::End(out) => { + info!( + "[graph:{}] end '{}' (elapsed {:?})", + graph.name, + current, + start.elapsed() + ); + break out; + } + } + }; + + Ok(output) + } +} + +enum StepResult { + Continue(String), + End(String), +} + +async fn step( + node: &Node, + state: &mut StateManager, + ctx: &mut RequestContext, + script_executor: &ScriptExecutor, + graph_name: &str, + current: &str, +) -> Result { + match &node.node_type { + NodeType::Agent(agent_node) => { + AgentNodeExecutor::execute(agent_node, state, ctx).await?; + let next = node.next.clone().ok_or_else(|| { + anyhow!("agent node '{current}' has no `next` and is not an end node") + })?; + Ok(StepResult::Continue(next)) + } + NodeType::Script(script_node) => { + let dynamic = match script_executor.execute(script_node, state).await { + Ok(n) => n, + Err(e) => { + if let Some(fallback) = &script_node.fallback { + warn!( + "[graph:{}] script '{}' failed, routing to fallback '{}': {}", + graph_name, current, fallback, e + ); + return Ok(StepResult::Continue(fallback.clone())); + } + return Err(e); + } + }; + let next = dynamic.or_else(|| node.next.clone()).ok_or_else(|| { + anyhow!( + "script node '{current}' did not emit `_next` and has no static `next`" + ) + })?; + Ok(StepResult::Continue(next)) + } + NodeType::Approval(approval_node) => { + let next = ApprovalNodeExecutor::execute(approval_node, state, ctx).await?; + Ok(StepResult::Continue(next)) + } + NodeType::Input(input_node) => { + let next = + InputNodeExecutor::execute(input_node, node.next.as_deref(), state, ctx).await?; + Ok(StepResult::Continue(next)) + } + NodeType::End(end_node) => Ok(StepResult::End(resolve_end_output(end_node, state))), + } +} + +/// Apply the end node's `state_updates`, then interpolate its `output` +/// template against the resulting state. Both use lenient interpolation +/// so the graph still produces a result even when some keys are absent. +fn resolve_end_output(end_node: &EndNode, state: &mut StateManager) -> String { + apply_simple_state_updates(end_node.state_updates.as_ref(), state); + state.interpolate_lenient(&end_node.output) +} + +fn apply_simple_state_updates(updates: Option<&HashMap>, state: &mut StateManager) { + let Some(updates) = updates else { + return; + }; + for (key, template) in updates { + let value = state.interpolate_lenient(template); + state.state_mut().set(key.clone(), Value::String(value)); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + fn state_with(pairs: &[(&str, Value)]) -> StateManager { + let mut map = HashMap::new(); + for (k, v) in pairs { + map.insert((*k).into(), v.clone()); + } + StateManager::new(map) + } + + fn end_node(output: &str, updates: Option>) -> EndNode { + EndNode { + output: output.into(), + state_updates: updates, + } + } + + #[test] + fn resolve_end_output_interpolates_template_against_state() { + let mut state = state_with(&[("name", json!("alice"))]); + let node = end_node("done: {{name}}", None); + assert_eq!(resolve_end_output(&node, &mut state), "done: alice"); + } + + #[test] + fn resolve_end_output_applies_state_updates_before_interpolation() { + let mut updates = HashMap::new(); + updates.insert("summary".into(), "completed for {{user}}".into()); + let node = end_node("RESULT: {{summary}}", Some(updates)); + let mut state = state_with(&[("user", json!("bob"))]); + assert_eq!( + resolve_end_output(&node, &mut state), + "RESULT: completed for bob" + ); + assert_eq!( + state.state().get("summary"), + Some(&json!("completed for bob")) + ); + } + + #[test] + fn resolve_end_output_with_empty_template_returns_empty_string() { + let mut state = state_with(&[]); + let node = end_node("", None); + assert_eq!(resolve_end_output(&node, &mut state), ""); + } + + #[test] + fn resolve_end_output_lenient_on_missing_keys() { + let mut state = state_with(&[]); + let node = end_node("hello {{unknown}}!", None); + assert_eq!(resolve_end_output(&node, &mut state), "hello !"); + } + + #[test] + fn apply_simple_state_updates_does_nothing_when_none() { + let mut state = state_with(&[("k", json!("v"))]); + apply_simple_state_updates(None, &mut state); + assert_eq!(state.state().get("k"), Some(&json!("v"))); + } + + #[test] + fn apply_simple_state_updates_overwrites_existing_values() { + let mut updates = HashMap::new(); + updates.insert("k".into(), "new-{{k}}".into()); + let mut state = state_with(&[("k", json!("old"))]); + apply_simple_state_updates(Some(&updates), &mut state); + assert_eq!(state.state().get("k"), Some(&json!("new-old"))); + } + + #[test] + fn from_path_loads_and_constructs_executor() { + use std::io::Write; + let path = std::env::temp_dir().join(format!( + "loki-graph-executor-test-{}.yaml", + std::process::id() + )); + let yaml = r#" +name: test_graph +start: only +nodes: + only: + type: end + output: hello +"#; + std::fs::write(&path, yaml).unwrap(); + + let parent = path.parent().unwrap().to_path_buf(); + let executor = GraphExecutor::from_path(&path, &parent).unwrap(); + assert_eq!(executor.graph.name, "test_graph"); + assert_eq!(executor.graph.start, "only"); + + let _ = std::fs::remove_file(&path); + } +} diff --git a/src/graph/mod.rs b/src/graph/mod.rs index 3fc5c96..1f4d200 100644 --- a/src/graph/mod.rs +++ b/src/graph/mod.rs @@ -2,6 +2,7 @@ //! JSON state, composed of agent/script/approval/input/end nodes. pub mod agent; +pub mod executor; pub mod parser; pub mod script; pub mod state; @@ -10,6 +11,7 @@ pub mod user_interaction; pub mod validator; pub use agent::AgentNodeExecutor; +pub use executor::GraphExecutor; pub use parser::{GraphParser, agent_has_graph, load_agent_graph}; pub use script::ScriptExecutor; pub use state::{StateManager, StateRepresentation}; diff --git a/src/graph/types.rs b/src/graph/types.rs index e992aa0..1dd959d 100644 --- a/src/graph/types.rs +++ b/src/graph/types.rs @@ -115,9 +115,11 @@ pub enum NodeType { End(EndNode), } -/// `agent`-type node: spawn an agent with a templated prompt. Agent tools -/// come from the agent's own `config.yaml`; create agent variants for -/// different tool sets rather than overriding here. +/// `agent`-type node: spawn an agent with a templated prompt. The agent +/// uses the full tool stack from its own directory (`global_tools` and +/// `mcp_servers` in `config.yaml` plus any per-agent `tools.{sh,py,ts,js}` +/// script). There is no per-node tool override; to use different tool +/// sets, create agent variants. See `docs/graph-agents/agent-tools.md`. #[derive(Debug, Clone, Deserialize, Serialize)] pub struct AgentNode { pub agent: String, diff --git a/src/graph/user_interaction.rs b/src/graph/user_interaction.rs index 9756ac9..ea20713 100644 --- a/src/graph/user_interaction.rs +++ b/src/graph/user_interaction.rs @@ -11,7 +11,7 @@ use super::state::StateManager; use super::types::{ApprovalNode, InputNode}; use crate::config::RequestContext; use crate::function::user_interaction::{USER_FUNCTION_PREFIX, handle_user_tool}; -use anyhow::{Context, Result, bail, anyhow}; +use anyhow::{Context, Result, anyhow, bail}; use serde_json::{Value, json}; use std::collections::HashMap;