From 5bd0766a601a56a3b6265472da442e2f7612e323 Mon Sep 17 00:00:00 2001 From: Alex Clarke Date: Mon, 18 May 2026 13:46:52 -0600 Subject: [PATCH] style: Cleaned up all graph agent code --- README.md | 2 +- graph.example.yaml | 85 ++++---- src/config/agent.rs | 89 ++++---- src/config/paths.rs | 2 +- src/config/rag_cache.rs | 7 +- src/config/request_context.rs | 2 +- src/function/mod.rs | 3 - src/function/supervisor.rs | 5 +- src/graph/agent.rs | 37 ++-- src/graph/dispatch.rs | 22 +- src/graph/executor.rs | 59 +----- src/graph/llm.rs | 56 ++--- src/graph/logging.rs | 21 +- src/graph/mod.rs | 18 +- src/graph/parser.rs | 372 ++++++++++++++++++---------------- src/graph/rag.rs | 22 +- src/graph/script.rs | 57 +++--- src/graph/state.rs | 89 +++++--- src/graph/structured.rs | 24 ++- src/graph/types.rs | 111 +--------- src/graph/user_interaction.rs | 40 ++-- src/graph/validator.rs | 87 +++++--- src/repl/mod.rs | 2 +- 23 files changed, 560 insertions(+), 652 deletions(-) diff --git a/README.md b/README.md index a50dd8b..d7d1db0 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ Coming from [AIChat](https://github.com/sigoden/aichat)? Follow the [migration g * [Sessions](https://github.com/Dark-Alex-17/loki/wiki/Sessions): Manage and persist conversational contexts and settings across multiple interactions. * [Roles](https://github.com/Dark-Alex-17/loki/wiki/Roles): Customize model behavior for specific tasks or domains. * [Agents](https://github.com/Dark-Alex-17/loki/wiki/Agents): Leverage AI agents to perform complex tasks and workflows, including sub-agent spawning, teammate messaging, and user interaction tools. - * [Graph Agents](https://github.com/Dark-Alex-17/loki/wiki/Graph-Agents): Define an agent as a declarative, YAML-driven workflow — a directed graph of typed nodes (LLM calls, scripts, approvals, user input, RAG retrieval, sub-agent spawns). See [`graph.example.yaml`](graph.example.yaml) for a fully-commented reference. + * [Graph Agents](https://github.com/Dark-Alex-17/loki/wiki/Graph-Agents): Define an agent as a declarative, YAML-driven workflow. A directed graph of typed nodes (LLM calls, scripts, approvals, user input, RAG retrieval, sub-agent spawns). * [Todo System](https://github.com/Dark-Alex-17/loki/wiki/TODO-System): Built-in task tracking for improved LLM reliability with smaller models. * [Environment Variables](https://github.com/Dark-Alex-17/loki/wiki/Environment-Variables): Override and customize your Loki configuration at runtime with environment variables. * [Client Configurations](https://github.com/Dark-Alex-17/loki/wiki/Clients): Configuration instructions for various LLM providers. diff --git a/graph.example.yaml b/graph.example.yaml index acfb17b..c0eb1a2 100644 --- a/graph.example.yaml +++ b/graph.example.yaml @@ -2,13 +2,14 @@ # Location: /agents//graph.yaml # # A graph agent is defined by THIS FILE ALONE. An agent directory contains -# EITHER a config.yaml (a normal LLM-loop agent) OR a graph.yaml (a graph -# agent) -- never both. The presence of graph.yaml is what makes the agent +# EITHER a config.yaml (a normal LLM-loop agent) or a graph.yaml (a graph +# agent), never both. The presence of graph.yaml is what makes the agent # a graph agent. # -# This file is a REFERENCE: it documents every available field. It is not a -# runnable agent as-is -- the `agent:`, `script:`, and `documents:` values -# point at things that would need to exist for a real agent. +# This file is meant to serve as a reference only: it documents every +# available field. It is not a runnable agent as-is. The `agent:`, +# `script:`, and `documents:` values point at things that would need to +# exist for a real agent. # # Full documentation: # https://github.com/Dark-Alex-17/loki/wiki/Graph-Agents @@ -28,7 +29,7 @@ version: "1.0" # Graph SCHEMA version. Only "1.0" is accepte # The same knobs a normal agent's config.yaml carries. In a graph agent they # live here instead of in a config.yaml. # --------------------------------------------------------------------------- -model: anthropic:claude-sonnet-4-6 # Default model for `llm` nodes that don't override it +model: claude:claude-sonnet-4-6 # Default model for `llm` nodes that don't override it temperature: 0.0 # Default sampling temperature for `llm` nodes top_p: null # Default sampling top-p for `llm` nodes @@ -42,15 +43,11 @@ mcp_servers: # MCP servers an `llm` node may reference via conversation_starters: # Suggested prompts surfaced in the UI - "Research LOINC code 2160-0" -# NOTE: `can_spawn_agents` is NOT a field here. It is DERIVED: a graph can -# spawn child agents iff it contains at least one `agent` node (this graph -# does -- see `deep_dive`). - # --------------------------------------------------------------------------- # Execution settings (all optional) # --------------------------------------------------------------------------- settings: - max_loop_iterations: 100 # PER-NODE visit cap. If one node id is entered more + max_loop_iterations: 100 # Per-node visit cap. If one node id is entered more # than this many times, execution aborts. Default 100. timeout: 600 # Optional wall-clock cap (seconds) on the whole run, # checked between node transitions. @@ -60,14 +57,16 @@ settings: # --------------------------------------------------------------------------- # Seed state (optional) # Values placed into graph state before any node runs; reference anywhere via -# {{key}}. NOTE: `initial_prompt` is seeded automatically by Loki with the -# caller's prompt -- do not set it here. +# {{key}}. +# +# Note: `initial_prompt` is seeded automatically by Loki with the +# caller's prompt. So there's no need to set it here. # --------------------------------------------------------------------------- initial_state: audience: "clinician" - # Seed an empty default for any key that a STRICT field (a node prompt / + # Seed an empty default for any key that a strict field (a node prompt / # instructions / question / End output) references but that is only set on - # SOME paths. `refinement` is set only if the `refine` input node runs; + # some paths. `refinement` is set only if the `refine` input node runs; # seeding it "" keeps `finalize`'s strict prompt from failing on the # approve-directly path. refinement: "" @@ -80,7 +79,7 @@ start: triage # ID of the first node to run (must exist in `nodes # --------------------------------------------------------------------------- # Nodes # Each node is keyed by its id. The `id:` inside a node must match its key -# (it may also be omitted -- Loki fills it in from the key). +# (it may also be omitted and thus Loki fills it in from the key). # # Node types: agent | script | approval | input | llm | rag | end # --------------------------------------------------------------------------- @@ -88,18 +87,18 @@ nodes: # --- llm node ----------------------------------------------------------- # A one-shot LLM call (with an optional bounded tool-call loop). Runs in a - # fresh isolated context. Tools are STRICTLY opt-in (see `tools`). + # fresh isolated context. Tools are strictly opt-in (see `tools`). triage: id: triage type: llm description: Classify the request and extract its topic. instructions: | # Optional system prompt (templated against state) You triage research requests for a {{audience}} audience. - prompt: | # REQUIRED user prompt (templated against state) + prompt: | # Required user prompt (templated against state) Classify this request and extract the key topic: {{initial_prompt}} - tools: [] # Tool whitelist. Omitted or [] = NO tools at all. - # A list narrows to EXACTLY those entries. + tools: [] # Tool whitelist. Omitted or [] = no tools at all. + # A list narrows to exactly those entries. output_schema: # Optional JSON Schema. The output is parsed to JSON type: object # and its top-level object keys auto-merge into state properties: # (so `topic` / `needs_research` become {{topic}} etc). @@ -108,7 +107,7 @@ nodes: required: [topic, needs_research] state_updates: # {{output}} = this node's result (here, the parsed object) triage_result: "{{output}}" - next: retrieve # REQUIRED for llm nodes: the success route + next: retrieve # Required for llm nodes: the success route # --- rag node ----------------------------------------------------------- # Hybrid (vector + keyword) retrieval against a per-node knowledge base. @@ -117,14 +116,14 @@ nodes: retrieve: id: retrieve type: rag - documents: # REQUIRED. Files, directories, URLs, loader paths. + documents: # Required. Files, directories, URLs, loader paths. - ./knowledge/ # relative paths resolve against the agent directory - https://example.com/reference query: "{{topic}}" # Retrieval query (templated). Default: {{initial_prompt}}. top_k: 5 # Chunks to retrieve. Default = the KB's own top_k. timeout: 120 # Retrieval timeout in seconds. Default 120. # Knowledge-base BUILD config (optional; used only when the KB is first - # built). When embedding_model + chunk_size + chunk_overlap are ALL set, + # built). When embedding_model + chunk_size + chunk_overlap are all set, # the KB builds with no interactive prompts (works in non-interactive runs). embedding_model: openai:text-embedding-3-small chunk_size: 1000 @@ -138,9 +137,9 @@ nodes: # --- script node -------------------------------------------------------- # Runs a .sh / .py / .ts script. The script receives state via the - # GRAPH_STATE env var (inline JSON) OR GRAPH_STATE_FILE (path to a JSON - # file, used when state exceeds 32 KiB) -- exactly one is set. It must print - # a single JSON OBJECT on stdout: keys merge into state, and the reserved + # GRAPH_STATE env var (inline JSON) or GRAPH_STATE_FILE (path to a JSON + # file, used when state exceeds 32 KiB). Exactly one is set. It must print + # a single JSON object on stdout: keys merge into state, and the reserved # `_next` key (if present) overrides routing. decide: id: decide @@ -150,15 +149,15 @@ nodes: state_updates: # Applied after the stdout JSON is merged decided_for: "{{topic}}" next: summarize # Default route if the script emits no `_next` - fallback: summarize # Route taken if the script FAILS (crash / bad JSON) + fallback: summarize # Route taken if the script fails (crash / bad JSON) # This script is expected to emit `_next: deep_dive` (or no `_next`, in # which case `next` is used). Because `deep_dive` is reached only via the # script's dynamic `_next`, the startup validator will report it as an - # "unreachable" WARNING -- that is expected for `_next`-routed targets. + # "unreachable" warning. That is expected for `_next`-routed targets. # --- agent node --------------------------------------------------------- - # Spawns a full Loki sub-agent and waits for it. The child uses ITS OWN - # tool stack -- agent nodes have NO `tools:` field. No schema hint is + # Spawns a full Loki sub-agent and waits for it. The child uses its own + # tool stack. Agent nodes have NO `tools:` field. No schema hint is # injected even when `output_schema` is set (unlike llm nodes). deep_dive: id: deep_dive @@ -168,7 +167,7 @@ nodes: Research {{topic}} in depth. Existing context: {{context}} timeout: 600 # Optional wall-clock cap, seconds. Default 300. - output_schema: # Optional -- same extraction as llm nodes + output_schema: # Optional. Same extraction as llm nodes type: object properties: summary: { type: string } @@ -178,7 +177,7 @@ nodes: required: [summary, findings] state_updates: research: "{{output}}" - next: review # REQUIRED for agent nodes + next: review # Required for agent nodes # --- llm node with a narrowed tool whitelist ---------------------------- summarize: @@ -186,22 +185,22 @@ nodes: type: llm instructions: "You write concise summaries for a {{audience}} audience." prompt: "Summarize the topic {{topic}}, using your tools as needed." - tools: # Narrow whitelist: EXACTLY these entries, nothing else + tools: # Narrow whitelist: Exactly these entries, nothing else - web_search_loki.sh # an exact global-tool / custom-tool name - mcp:pubmed-search # `mcp:` includes that server's functions - model: anthropic:claude-haiku-4-5 # Optional per-node model override + model: claude:claude-haiku-4-5 # Optional per-node model override temperature: 0.3 # Optional per-node sampling override - max_attempts: 2 # Retry count on TRANSIENT errors only. Default 1. + max_attempts: 2 # Retry count on transient errors only. Default 1. max_iterations: 10 # Tool-call-loop turn cap. Default 10. fallback: review # Route here if all attempts fail timeout: 300 # Optional node wall-clock cap, seconds (unset = no timeout) state_updates: research: "{{output}}" - next: review # REQUIRED for llm nodes: the success route + next: review # Required for llm nodes: the success route # --- approval node ------------------------------------------------------ - # Human-in-the-loop checkpoint. `user__ask` ALWAYS offers a free-form - # "type your own answer" option, so `on_other` is REQUIRED. + # Human-in-the-loop checkpoint. `user__ask` always offers a free-form + # "type your own answer" option, so `on_other` is required. review: id: review type: approval @@ -216,9 +215,9 @@ nodes: routes: # Map each listed option to its next node "yes": finalize "no": rejected_end - on_other: refine # REQUIRED: route for ANY answer not in `routes` + on_other: refine # Required: route for ANY answer not in `routes` state_updates: - decision: "{{choice}}" # {{choice}} = the chosen option OR the free-form text + decision: "{{choice}}" # {{choice}} = the chosen option or the free-form text # --- input node --------------------------------------------------------- # Collects a free-form string from the user. @@ -227,13 +226,13 @@ nodes: type: input question: "What should be changed about the result?" default: "minor wording only" # Optional: used if the user submits empty input. - # NOTE: a substituted default is NOT re-validated, + # Note: a substituted default is not re-validated, # so make sure it would satisfy `validation`. validation: "len(input) > 0" # Optional length predicate: len(input) N, # in > >= < <= == . Length only -- no regex. state_updates: refinement: "{{input}}" # {{input}} = the user's text - next: finalize # REQUIRED for input nodes: the success route + next: finalize # Required for input nodes: the success route # --- llm node (final synthesis) ----------------------------------------- finalize: @@ -253,7 +252,7 @@ nodes: done: id: done type: end - state_updates: # Optional: applied BEFORE `output` is rendered + state_updates: # Optional: applied before `output` is rendered status: "completed" output: | [{{status}}] {{final_answer}} diff --git a/src/config/agent.rs b/src/config/agent.rs index 18f362a..742ad1b 100644 --- a/src/config/agent.rs +++ b/src/config/agent.rs @@ -782,9 +782,6 @@ pub struct AgentVariable { pub value: String, } -/// Resolve document path specs (URLs, loader-protocol paths, relative or -/// absolute file paths) into the concrete paths `Rag::init` expects. -/// Relative paths are joined against the agent's data directory. fn resolve_document_paths( documents: &[String], loaders: &HashMap, @@ -817,10 +814,6 @@ fn resolve_document_paths( Ok(document_paths) } -/// Build or load a knowledge base for every `rag` node in the graph. Each -/// node's RAG lives in `/.yaml`. A missing knowledge base is -/// a hard error (interactive: after a declined confirm; non-interactive: -/// immediately) — a graph with an uninitialized `rag` node cannot run. #[allow(clippy::too_many_arguments)] async fn init_graph_rags( app: &AppConfig, @@ -876,11 +869,13 @@ async fn init_graph_rags( on the node, or run the agent once interactively." ); } + let ans = Confirm::new(&format!( "Initialize RAG knowledge base for rag node '{node_id}'?" )) .with_default(true) .prompt()?; + if !ans { bail!( "Agent '{agent_name}' has rag node '{node_id}' but its RAG was not \ @@ -888,6 +883,7 @@ async fn init_graph_rags( ); } } + let document_paths = resolve_document_paths(&rag_node.documents, loaders, agent_data_dir)?; let app_clone = app.clone(); @@ -1043,34 +1039,31 @@ variables: #[test] fn from_graph_maps_agent_level_fields() { - let yaml = r#" -name: graph_name_ignored -description: A graph agent -model: anthropic:claude-sonnet-4-6 -temperature: 0.3 -top_p: 0.8 -global_tools: - - fetch_pdf.sh -mcp_servers: - - pubmed-search -conversation_starters: - - "Start here" -start: e -nodes: - e: - id: e - type: end - output: done -"#; - let graph: Graph = serde_yaml::from_str(yaml).unwrap(); + let yaml = formatdoc! {r#" + name: graph_name_ignored + description: A graph agent + model: claude:claude-sonnet-4-6 + temperature: 0.3 + top_p: 0.8 + global_tools: + - fetch_pdf.sh + mcp_servers: + - pubmed-search + conversation_starters: + - "Start here" + start: e + nodes: + e: + id: e + type: end + output: done + "#}; + let graph: Graph = serde_yaml::from_str(&yaml).unwrap(); let config = AgentConfig::from_graph("my-agent-dir", &graph); assert_eq!(config.name, "my-agent-dir"); assert_eq!(config.description, "A graph agent"); - assert_eq!( - config.model_id.as_deref(), - Some("anthropic:claude-sonnet-4-6") - ); + assert_eq!(config.model_id.as_deref(), Some("claude:claude-sonnet-4-6")); assert_eq!(config.temperature, Some(0.3)); assert_eq!(config.top_p, Some(0.8)); assert_eq!(config.global_tools, vec!["fetch_pdf.sh"]); @@ -1080,22 +1073,22 @@ nodes: #[test] fn from_graph_derives_can_spawn_agents_from_agent_nodes() { - let with_agent = r#" -name: g -start: a -nodes: - a: - id: a - type: agent - agent: helper - prompt: hi - next: e - e: - id: e - type: end - output: done -"#; - let graph: Graph = serde_yaml::from_str(with_agent).unwrap(); + let with_agent = formatdoc! {r#" + name: g + start: a + nodes: + a: + id: a + type: agent + agent: helper + prompt: hi + next: e + e: + id: e + type: end + output: done + "#}; + let graph: Graph = serde_yaml::from_str(&with_agent).unwrap(); assert!(AgentConfig::from_graph("d", &graph).can_spawn_agents); let no_agent = @@ -1110,7 +1103,6 @@ nodes: let graph: Graph = serde_yaml::from_str(yaml).unwrap(); let config = AgentConfig::from_graph("d", &graph); - // LLM-loop concepts a graph agent does not have: left at Default. assert!(!config.auto_continue); assert!(config.instructions.is_empty()); assert!(config.documents.is_empty()); @@ -1119,7 +1111,6 @@ nodes: assert_eq!(config.max_auto_continues, 0); assert_eq!(config.summarization_threshold, 0); - // Consumed by graph execution: kept at their real defaults. assert_eq!( config.max_concurrent_agents, default_max_concurrent_agents() diff --git a/src/config/paths.rs b/src/config/paths.rs index dc1f996..b1df31e 100644 --- a/src/config/paths.rs +++ b/src/config/paths.rs @@ -3,7 +3,7 @@ use super::{ AGENT_GRAPH_FILE_NAME, AGENTS_DIR_NAME, BASH_PROMPT_UTILS_FILE_NAME, CONFIG_FILE_NAME, ENV_FILE_NAME, FUNCTIONS_BIN_DIR_NAME, FUNCTIONS_DIR_NAME, GLOBAL_TOOLS_DIR_NAME, GLOBAL_TOOLS_UTILS_DIR_NAME, MACROS_DIR_NAME, MCP_FILE_NAME, ModelsOverride, RAGS_DIR_NAME, - ROLES_DIR_NAME, paths, + ROLES_DIR_NAME, }; use crate::client::ProviderModels; use crate::utils::{get_env_name, list_file_names, normalize_env_name}; diff --git a/src/config/rag_cache.rs b/src/config/rag_cache.rs index fac61c5..5a43eb6 100644 --- a/src/config/rag_cache.rs +++ b/src/config/rag_cache.rs @@ -9,12 +9,7 @@ use std::sync::{Arc, Weak}; pub enum RagKey { Named(String), Agent(String), - /// A `rag` node's per-node knowledge base, keyed by owning agent name - /// and node id. - GraphNode { - agent: String, - node: String, - }, + GraphNode { agent: String, node: String }, } #[derive(Default)] diff --git a/src/config/request_context.rs b/src/config/request_context.rs index 4313e87..f80db3a 100644 --- a/src/config/request_context.rs +++ b/src/config/request_context.rs @@ -27,6 +27,7 @@ use crate::utils::{ list_file_names, now, render_prompt, temp_file, }; +use crate::graph; use anyhow::{Context, Error, Result, bail}; use indoc::formatdoc; use inquire::{Confirm, MultiSelect, Text, list_option::ListOption, validator::Validation}; @@ -37,7 +38,6 @@ use std::fs::{File, OpenOptions, read_dir, read_to_string, remove_dir_all, remov use std::io::Write; use std::path::{Path, PathBuf}; use std::sync::Arc; -use crate::graph; pub struct AutoContinueConfig { pub enabled: bool, diff --git a/src/function/mod.rs b/src/function/mod.rs index 867f6df..2fad1fa 100644 --- a/src/function/mod.rs +++ b/src/function/mod.rs @@ -50,9 +50,6 @@ enum BinaryType<'a> { Agent, } -/// Canonical set of script languages that Loki can execute. This is the -/// single source of truth for both function-tool scripts and graph script -/// nodes. #[derive(Debug, Clone, Copy, PartialEq, Eq, AsRefStr)] pub enum Language { Bash, diff --git a/src/function/supervisor.rs b/src/function/supervisor.rs index 46e27ab..fa76afd 100644 --- a/src/function/supervisor.rs +++ b/src/function/supervisor.rs @@ -5,6 +5,7 @@ use crate::supervisor::mailbox::{Envelope, EnvelopePayload, Inbox}; use crate::supervisor::{AgentExitStatus, AgentHandle, AgentResult, Supervisor}; use crate::utils::{AbortSignal, create_abort_signal}; +use crate::graph; use anyhow::{Context, Result, anyhow, bail}; use chrono::Utc; use indexmap::IndexMap; @@ -332,8 +333,8 @@ pub fn run_child_agent( abort_signal: AbortSignal, ) -> Pin> + Send>> { Box::pin(async move { - if crate::graph::active_agent_graph_name(&child_ctx).is_some() { - return crate::graph::run_active_agent_graph( + if graph::active_agent_graph_name(&child_ctx).is_some() { + return graph::run_active_agent_graph( &mut child_ctx, &initial_input.text(), abort_signal, diff --git a/src/graph/agent.rs b/src/graph/agent.rs index 8ec37ff..45fbb75 100644 --- a/src/graph/agent.rs +++ b/src/graph/agent.rs @@ -1,10 +1,3 @@ -//! Execution of `agent`-type graph nodes. -//! -//! Spawns a child agent via `function::supervisor::run_agent_for_graph`, -//! interpolating the prompt against current graph state. After the agent -//! finishes, applies the node's `state_updates` (templates can reference -//! `{{output}}` for the agent's stdout). - use super::state::StateManager; use super::structured; use super::types::AgentNode; @@ -22,8 +15,6 @@ const DEFAULT_TIMEOUT_SECS: u64 = 300; pub struct AgentNodeExecutor; impl AgentNodeExecutor { - /// Interpolate the node's prompt, spawn the agent, wait for it to - /// finish, then apply `state_updates`. Returns the agent's full output. pub async fn execute( node: &AgentNode, state_manager: &mut StateManager, @@ -90,14 +81,6 @@ fn indent_prompt(prompt: &str, prefix_spaces: usize) -> Vec { out } -/// Exposes the agent's output as `{{output}}` for template evaluation, then -/// applies every key/template in `state_updates`. The temporary `output` -/// state key is removed at the end so it doesn't leak into subsequent -/// nodes' templates. -/// -/// When `node.output_schema` is set AND the output is a JSON object, its -/// top-level keys are also auto-merged into state permanently (before -/// state_updates evaluation, so explicit state_updates can override). fn apply_state_updates(node: &AgentNode, state_manager: &mut StateManager, output: &Value) { if node.output_schema.is_some() && let Some(obj) = output.as_object() @@ -165,7 +148,9 @@ mod tests { node_with("hi", Some(u)) }; let mut state = manager_with(&[]); + apply_state_updates(&node, &mut state, &json!("agent finished its work")); + assert_eq!( state.state().get("findings"), Some(&json!("agent finished its work")) @@ -180,7 +165,9 @@ mod tests { node_with("hi", Some(u)) }; let mut state = manager_with(&[("topic", json!("auth"))]); + apply_state_updates(&node, &mut state, &json!("JWT vs sessions")); + assert_eq!( state.state().get("summary"), Some(&json!("auth: JWT vs sessions")) @@ -195,7 +182,9 @@ mod tests { node_with("hi", Some(u)) }; let mut state = manager_with(&[]); + apply_state_updates(&node, &mut state, &json!("anything")); + assert_eq!(state.state().get("output"), Some(&Value::Null)); } @@ -207,7 +196,9 @@ mod tests { node_with("hi", Some(u)) }; let mut state = manager_with(&[("output", json!("preserved"))]); + apply_state_updates(&node, &mut state, &json!("new agent output")); + assert_eq!( state.state().get("greeting"), Some(&json!("new agent output")) @@ -219,7 +210,9 @@ mod tests { fn no_state_updates_is_a_noop() { let node = node_with("hi", None); let mut state = manager_with(&[("k", json!("v"))]); + apply_state_updates(&node, &mut state, &json!("ignored")); + assert_eq!(state.state().get("k"), Some(&json!("v"))); assert!(state.state().get("output").is_none()); } @@ -232,7 +225,9 @@ mod tests { node_with("hi", Some(u)) }; let mut state = manager_with(&[]); + apply_state_updates(&node, &mut state, &json!("DATA")); + assert_eq!(state.state().get("decorated"), Some(&json!("[] DATA"))); } @@ -251,7 +246,9 @@ mod tests { let node = node_with_schema("hi", None, json!({"type": "object"})); let mut state = manager_with(&[]); let output = json!({"goal": "do X", "summary": "details"}); + apply_state_updates(&node, &mut state, &output); + assert_eq!(state.state().get("goal"), Some(&json!("do X"))); assert_eq!(state.state().get("summary"), Some(&json!("details"))); } @@ -265,7 +262,9 @@ mod tests { "config": { "key": "value" }, "count": 42 }); + apply_state_updates(&node, &mut state, &output); + assert_eq!(state.state().get("tags"), Some(&json!(["a", "b"]))); assert_eq!(state.state().get("config"), Some(&json!({"key": "value"}))); assert_eq!(state.state().get("count"), Some(&json!(42))); @@ -278,7 +277,9 @@ mod tests { let node = node_with_schema("hi", Some(u), json!({"type": "object"})); let mut state = manager_with(&[]); let output = json!({"goal": "do X"}); + apply_state_updates(&node, &mut state, &output); + assert_eq!(state.state().get("goal"), Some(&json!("renamed-do X"))); } @@ -287,7 +288,9 @@ mod tests { let node = node_with("hi", None); let mut state = manager_with(&[]); let output = json!({"goal": "do X"}); + apply_state_updates(&node, &mut state, &output); + assert!(state.state().get("goal").is_none()); } } diff --git a/src/graph/dispatch.rs b/src/graph/dispatch.rs index 50bceeb..a65b36f 100644 --- a/src/graph/dispatch.rs +++ b/src/graph/dispatch.rs @@ -1,37 +1,25 @@ -//! Helpers for running the active agent through its `graph.yaml` instead -//! of the LLM loop. Used at every agent-execution entry point: top-level -//! CLI (`start_directive`), REPL (`ask`), and child-agent spawn -//! (`run_child_agent`). - use super::{GraphExecutor, GraphParser, agent_has_graph}; use crate::config::RequestContext; use crate::config::paths; use crate::utils::AbortSignal; -use anyhow::{Context, Result}; +use anyhow::{Context, Result, anyhow}; +use log::info; use serde_json::Value; -/// If the active agent owns a `graph.yaml`, returns its name. Lets -/// callers branch between graph and LLM-loop execution without -/// re-implementing the lookup. pub fn active_agent_graph_name(ctx: &RequestContext) -> Option { let name = ctx.agent.as_ref()?.name().to_string(); agent_has_graph(&name).then_some(name) } -/// Run the active agent's graph end-to-end and return the resolved -/// End-node output. The caller's prompt is seeded into the graph state -/// as `initial_prompt` so nodes can reference it via -/// `{{initial_prompt}}`. Any sub-agents the graph spawned via the -/// supervisor are cancelled on return. pub async fn run_active_agent_graph( ctx: &mut RequestContext, prompt: &str, abort_signal: AbortSignal, ) -> Result { - let agent_name = active_agent_graph_name(ctx) - .ok_or_else(|| anyhow::anyhow!("Active agent has no graph.yaml"))?; + let agent_name = + active_agent_graph_name(ctx).ok_or_else(|| anyhow!("Active agent has no graph.yaml"))?; - log::info!("Agent '{agent_name}' has graph.yaml; routing to graph executor"); + info!("Agent '{agent_name}' has graph.yaml; routing to graph executor"); let agent_dir = paths::agent_data_dir(&agent_name); let graph_path = paths::agent_graph_file(&agent_name); diff --git a/src/graph/executor.rs b/src/graph/executor.rs index 71a389f..a1773e0 100644 --- a/src/graph/executor.rs +++ b/src/graph/executor.rs @@ -1,15 +1,6 @@ -//! Main execution loop for graph workflows. -//! -//! Dispatches each node to its type-specific executor, handles routing -//! (static `Node.next`, script `_next` override, approval `routes` and -//! `on_other`), enforces `max_loop_iterations` and an optional -//! whole-graph timeout, and resolves the final `End` node's `output` -//! template as the graph's return value. - use super::agent::AgentNodeExecutor; use super::llm::LlmNodeExecutor; use super::logging::GraphLogger; -use super::parser::GraphParser; use super::rag::RagNodeExecutor; use super::script::ScriptExecutor; use super::state::StateManager; @@ -21,7 +12,7 @@ use crate::utils::AbortSignal; use anyhow::{Context, Result, anyhow, bail}; use serde_json::Value; use std::collections::HashMap; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -38,18 +29,6 @@ impl GraphExecutor { } } - /// Load a graph from disk and construct the executor in one step. - /// `base_dir` is also used to resolve relative script paths. - pub fn from_path(graph_path: impl AsRef, base_dir: impl Into) -> Result { - let base_dir = base_dir.into(); - let parser = GraphParser::new(&base_dir); - let graph = parser.load_from_file(graph_path)?; - Ok(Self::new(graph, base_dir)) - } - - /// Run the graph to completion. Returns the resolved `output` template - /// of the terminal `End` node. Any failure is logged via the - /// `GraphLogger` before being propagated. pub async fn execute( self, ctx: &mut RequestContext, @@ -222,9 +201,6 @@ async fn step( } } -/// Apply the end node's `state_updates`, then interpolate its `output` -/// template against the resulting state. Both use lenient interpolation -/// so the graph still produces a result even when some keys are absent. fn resolve_end_output(end_node: &EndNode, state: &mut StateManager) -> String { apply_simple_state_updates(end_node.state_updates.as_ref(), state); state.interpolate_lenient(&end_node.output) @@ -264,6 +240,7 @@ mod tests { fn resolve_end_output_interpolates_template_against_state() { let mut state = state_with(&[("name", json!("alice"))]); let node = end_node("done: {{name}}", None); + assert_eq!(resolve_end_output(&node, &mut state), "done: alice"); } @@ -273,6 +250,7 @@ mod tests { updates.insert("summary".into(), "completed for {{user}}".into()); let node = end_node("RESULT: {{summary}}", Some(updates)); let mut state = state_with(&[("user", json!("bob"))]); + assert_eq!( resolve_end_output(&node, &mut state), "RESULT: completed for bob" @@ -287,6 +265,7 @@ mod tests { fn resolve_end_output_with_empty_template_returns_empty_string() { let mut state = state_with(&[]); let node = end_node("", None); + assert_eq!(resolve_end_output(&node, &mut state), ""); } @@ -294,13 +273,16 @@ mod tests { fn resolve_end_output_lenient_on_missing_keys() { let mut state = state_with(&[]); let node = end_node("hello {{unknown}}!", None); + assert_eq!(resolve_end_output(&node, &mut state), "hello !"); } #[test] fn apply_simple_state_updates_does_nothing_when_none() { let mut state = state_with(&[("k", json!("v"))]); + apply_simple_state_updates(None, &mut state); + assert_eq!(state.state().get("k"), Some(&json!("v"))); } @@ -309,32 +291,9 @@ mod tests { let mut updates = HashMap::new(); updates.insert("k".into(), "new-{{k}}".into()); let mut state = state_with(&[("k", json!("old"))]); + apply_simple_state_updates(Some(&updates), &mut state); + assert_eq!(state.state().get("k"), Some(&json!("new-old"))); } - - #[test] - fn from_path_loads_and_constructs_executor() { - use std::io::Write; - let path = std::env::temp_dir().join(format!( - "loki-graph-executor-test-{}.yaml", - std::process::id() - )); - let yaml = r#" -name: test_graph -start: only -nodes: - only: - type: end - output: hello -"#; - std::fs::write(&path, yaml).unwrap(); - - let parent = path.parent().unwrap().to_path_buf(); - let executor = GraphExecutor::from_path(&path, &parent).unwrap(); - assert_eq!(executor.graph.name, "test_graph"); - assert_eq!(executor.graph.start, "only"); - - let _ = std::fs::remove_file(&path); - } } diff --git a/src/graph/llm.rs b/src/graph/llm.rs index 5f75cef..1a02498 100644 --- a/src/graph/llm.rs +++ b/src/graph/llm.rs @@ -1,14 +1,8 @@ -//! Execution of `llm`-type graph nodes — one-shot LLM calls with a -//! bounded tool-call loop, an opt-in tool whitelist (delegated to -//! `Role.enabled_tools` / `Role.enabled_mcp_servers`), and per-node -//! overrides for model/temperature/top_p. See -//! `docs/implementation/graph-agents/10.5-llm-nodes.md` for the design. - use super::state::StateManager; use super::structured; use super::types::LlmNode; use crate::client::{Model, ModelType, call_chat_completions}; -use crate::config::{RequestContext, Role, RoleLike}; +use crate::config::{Input, RequestContext, Role, RoleLike}; use crate::utils::{create_abort_signal, dimmed_text}; use anyhow::{Context, Error, Result, anyhow, bail}; use serde_json::Value; @@ -22,12 +16,6 @@ const OUTPUT_KEY: &str = "output"; pub struct LlmNodeExecutor; impl LlmNodeExecutor { - /// Run the LLM call and resolve routing. Returns the next node ID - /// to visit. Handles the tolerant-fail contract internally: - /// success → `node_next`; failure with `fallback` → `fallback`; - /// failure without `fallback` → `node_next`. State updates are - /// applied in both success and failure paths, with `{{output}}` - /// resolving to the LLM's response or an error description. pub async fn execute( node: &LlmNode, node_next: Option<&str>, @@ -54,6 +42,7 @@ impl LlmNodeExecutor { (Value::String(format!("LLM node failed: {e}")), true) } }; + apply_state_updates_with_output(node, state_manager, &output); next_for_llm_node(node_next, failed, node.fallback.as_deref()) } @@ -151,7 +140,7 @@ async fn run_chat_loop(node: &LlmNode, prompt: &str, ctx: &mut RequestContext) - let abort = create_abort_signal(); let app_cfg = Arc::clone(&ctx.app.config); let role_for_input = ctx.role.clone(); - let mut input = crate::config::Input::from_str(ctx, prompt, role_for_input); + let mut input = Input::from_str(ctx, prompt, role_for_input); let mut accumulated = String::new(); for turn in 0..node.max_iterations { @@ -299,9 +288,6 @@ fn validate_tools_subset( Ok(()) } -/// Loose substring match against the transient error messages we expect -/// from network/API failures or empty-output cases. Brittle by nature; -/// a typed error enum would be better long-term. fn is_transient(err: &Error) -> bool { let s = format!("{err:#}"); s.contains("timed out") @@ -322,17 +308,9 @@ fn next_for_llm_node( } node_next .map(String::from) - .ok_or_else(|| anyhow::anyhow!("llm node has no `next` set; llm nodes need static routing")) + .ok_or_else(|| anyhow!("llm node has no `next` set; llm nodes need static routing")) } -/// Expose the LLM call's final output as `{{output}}` for the duration -/// of `state_updates` evaluation, then restore the prior value (or set -/// it to `Null` if there wasn't one). Same pattern as -/// `AgentNodeExecutor`'s `{{output}}` scoping. -/// -/// When `node.output_schema` is set AND the output is a JSON object, its -/// top-level keys are also auto-merged into state permanently (before -/// state_updates evaluation, so explicit state_updates can override). fn apply_state_updates_with_output( node: &LlmNode, state_manager: &mut StateManager, @@ -424,7 +402,9 @@ mod tests { u.insert("response".into(), "{{output}}".into()); let node = node_with(Some(u)); let mut state = manager_with(&[]); + apply_state_updates_with_output(&node, &mut state, &json!("the answer")); + assert_eq!(state.state().get("response"), Some(&json!("the answer"))); } @@ -434,7 +414,9 @@ mod tests { u.insert("summary".into(), "{{topic}}: {{output}}".into()); let node = node_with(Some(u)); let mut state = manager_with(&[("topic", json!("LOINC"))]); + apply_state_updates_with_output(&node, &mut state, &json!("abc")); + assert_eq!(state.state().get("summary"), Some(&json!("LOINC: abc"))); } @@ -444,7 +426,9 @@ mod tests { u.insert("k".into(), "{{output}}".into()); let node = node_with(Some(u)); let mut state = manager_with(&[]); + apply_state_updates_with_output(&node, &mut state, &json!("anything")); + assert_eq!(state.state().get(OUTPUT_KEY), Some(&json!(null))); } @@ -454,7 +438,9 @@ mod tests { u.insert("greeting".into(), "{{output}}".into()); let node = node_with(Some(u)); let mut state = manager_with(&[("output", json!("preserved"))]); + apply_state_updates_with_output(&node, &mut state, &json!("new")); + assert_eq!(state.state().get("greeting"), Some(&json!("new"))); assert_eq!(state.state().get(OUTPUT_KEY), Some(&json!("preserved"))); } @@ -463,7 +449,9 @@ mod tests { fn no_state_updates_is_a_noop() { let node = node_with(None); let mut state = manager_with(&[("k", json!("v"))]); + apply_state_updates_with_output(&node, &mut state, &json!("x")); + assert_eq!(state.state().get("k"), Some(&json!("v"))); assert!(state.state().get(OUTPUT_KEY).is_none()); } @@ -506,7 +494,9 @@ mod tests { let node = node_with_schema(None, json!({"type": "object"})); let mut state = manager_with(&[]); let output = json!({"goal": "do X", "summary": "details"}); + apply_state_updates_with_output(&node, &mut state, &output); + assert_eq!(state.state().get("goal"), Some(&json!("do X"))); assert_eq!(state.state().get("summary"), Some(&json!("details"))); } @@ -520,7 +510,9 @@ mod tests { "config": { "key": "value" }, "count": 42 }); + apply_state_updates_with_output(&node, &mut state, &output); + assert_eq!(state.state().get("tags"), Some(&json!(["a", "b"]))); assert_eq!(state.state().get("config"), Some(&json!({"key": "value"}))); assert_eq!(state.state().get("count"), Some(&json!(42))); @@ -533,7 +525,9 @@ mod tests { let node = node_with_schema(Some(u), json!({"type": "object"})); let mut state = manager_with(&[]); let output = json!({"goal": "do X"}); + apply_state_updates_with_output(&node, &mut state, &output); + assert_eq!(state.state().get("goal"), Some(&json!("renamed-do X"))); } @@ -542,7 +536,9 @@ mod tests { let node = node_with_schema(None, json!({"type": "array"})); let mut state = manager_with(&[]); let output = json!([1, 2, 3]); + apply_state_updates_with_output(&node, &mut state, &output); + assert!(state.state().get("0").is_none()); assert!(state.state().get(OUTPUT_KEY).is_none()); } @@ -552,14 +548,18 @@ mod tests { let node = node_with(None); let mut state = manager_with(&[]); let output = json!({"goal": "do X"}); + apply_state_updates_with_output(&node, &mut state, &output); + assert!(state.state().get("goal").is_none()); } #[test] fn format_schema_hint_includes_schema_and_instruction() { let schema = json!({"type": "object", "properties": {"goal": {"type": "string"}}}); + let hint = format_schema_hint(&schema); + assert!(hint.contains("Schema:")); assert!(hint.contains("\"goal\"")); assert!(hint.contains("JSON")); @@ -582,7 +582,9 @@ mod tests { "web_search_loki".to_string(), "mcp:github".to_string(), ]; + let (regular, mcp) = categorize_tools(Some(&entries)); + assert_eq!(regular, vec!["read_query", "web_search_loki"]); assert_eq!(mcp, vec!["pubmed-search", "github"]); } @@ -590,6 +592,7 @@ mod tests { #[test] fn categorize_tools_with_none_returns_empty() { let (regular, mcp) = categorize_tools(None); + assert!(regular.is_empty()); assert!(mcp.is_empty()); } @@ -597,6 +600,7 @@ mod tests { #[test] fn categorize_tools_with_empty_returns_empty() { let (regular, mcp) = categorize_tools(Some(&[])); + assert!(regular.is_empty()); assert!(mcp.is_empty()); } diff --git a/src/graph/logging.rs b/src/graph/logging.rs index 321286c..af29ef5 100644 --- a/src/graph/logging.rs +++ b/src/graph/logging.rs @@ -1,14 +1,3 @@ -//! Structured logging and per-node timing for graph execution. -//! -//! Two output channels, both owned by [`GraphLogger`]: -//! - **`tracing`** (`info!`/`debug!`/`warn!`/`error!`) — respects -//! `RUST_LOG`; this is the developer-facing channel. -//! - **stderr narration** — the dimmed `▸` lines the user follows along -//! with during execution. -//! -//! The logger also accumulates per-node wall-clock timings and emits a -//! performance summary (slowest-first) when the graph completes. - use super::state::StateManager; use super::types::{Node, NodeType}; use crate::utils::dimmed_text; @@ -107,10 +96,6 @@ impl GraphLogger { } } - /// Log a state snapshot before a node runs. No-op unless the graph's - /// `log_state_snapshots` setting is enabled. Keys + byte size go to - /// `debug`; the full state goes to `trace` (it may contain secrets, - /// so it is never logged at a more visible level). pub fn state_snapshot(&self, node_id: &str, state: &StateManager) { if !self.log_state_snapshots { return; @@ -118,6 +103,7 @@ impl GraphLogger { let snapshot = state.snapshot(); let mut keys: Vec<&str> = snapshot.keys().map(String::as_str).collect(); keys.sort_unstable(); + debug!( "[graph:{}] [{node_id}] state: {} bytes, keys={:?}", self.graph_name, @@ -136,10 +122,12 @@ impl GraphLogger { } let mut rows: Vec<(&String, &NodeTiming)> = self.timings.iter().collect(); rows.sort_by_key(|b| Reverse(b.1.total)); + info!( "[graph:{}] performance summary (slowest first):", self.graph_name ); + for (node_id, t) in rows { let avg = t.total / t.count.max(1) as u32; info!( @@ -190,9 +178,11 @@ mod tests { #[test] fn node_timing_max_tracks_largest() { let mut t = NodeTiming::default(); + t.record(Duration::from_millis(10)); t.record(Duration::from_millis(80)); t.record(Duration::from_millis(40)); + assert_eq!(t.max, Duration::from_millis(80)); assert_eq!(t.count, 3); assert_eq!(t.total, Duration::from_millis(130)); @@ -201,6 +191,7 @@ mod tests { #[test] fn new_logger_has_no_timings() { let logger = GraphLogger::new("g", true); + assert!(logger.timings.is_empty()); assert!(logger.log_state_snapshots); } diff --git a/src/graph/mod.rs b/src/graph/mod.rs index 1a96f0c..c236d2c 100644 --- a/src/graph/mod.rs +++ b/src/graph/mod.rs @@ -1,6 +1,3 @@ -//! Graph-based agent orchestration. Declarative YAML workflows over a shared -//! JSON state, composed of agent/script/approval/input/end nodes. - pub mod agent; pub mod dispatch; pub mod executor; @@ -15,26 +12,13 @@ pub mod types; pub mod user_interaction; pub mod validator; -pub use agent::AgentNodeExecutor; pub use dispatch::{active_agent_graph_name, run_active_agent_graph}; pub use executor::GraphExecutor; -pub use llm::LlmNodeExecutor; -pub use logging::GraphLogger; pub use parser::{GraphParser, agent_has_graph}; -pub use rag::RagNodeExecutor; -pub use script::ScriptExecutor; -pub use state::{StateManager, StateRepresentation}; -pub use types::{ - AgentNode, ApprovalNode, EndNode, Graph, GraphSettings, GraphState, InputNode, LlmNode, Node, - NodeType, RagNode, ScriptNode, -}; -pub use user_interaction::{ApprovalNodeExecutor, InputNodeExecutor}; -pub use validator::{GraphValidator, ValidationError, ValidationResult}; +pub use types::{Graph, NodeType}; pub const GRAPH_SCHEMA_VERSION: &str = "1.0"; pub const DEFAULT_MAX_LOOP_ITERATIONS: usize = 100; -/// Serialized-state size above which scripts receive state via a temp file -/// instead of an env var. pub const MAX_STATE_SIZE_BYTES: usize = 32 * 1024; diff --git a/src/graph/parser.rs b/src/graph/parser.rs index 315f1a5..2085187 100644 --- a/src/graph/parser.rs +++ b/src/graph/parser.rs @@ -1,5 +1,3 @@ -//! YAML parsing for graph definitions. - use super::types::Graph; use crate::config::paths; use anyhow::{Context, Error, Result, anyhow, bail}; @@ -8,9 +6,6 @@ use std::path::{Path, PathBuf}; const SUPPORTED_VERSIONS: &[&str] = &["1.0"]; -/// Parser for graph YAML files. The `base_dir` is used to resolve relative -/// paths passed to [`GraphParser::load_from_file`], and is typically an -/// agent directory. pub struct GraphParser { base_dir: PathBuf, } @@ -22,8 +17,6 @@ impl GraphParser { } } - /// Load and validate a graph from a YAML file. Relative paths are - /// resolved against `base_dir`. pub fn load_from_file(&self, path: impl AsRef) -> Result { let path = path.as_ref(); let full_path = if path.is_absolute() { @@ -39,7 +32,6 @@ impl GraphParser { .with_context(|| format!("Failed to parse graph file at '{}'", full_path.display())) } - /// Load and validate a graph from a YAML string. pub fn load_from_string(&self, yaml: &str) -> Result { let mut graph: Graph = serde_yaml::from_str(yaml).map_err(enhance_yaml_error)?; @@ -71,6 +63,7 @@ fn validate_schema_version(version: &str) -> Result<()> { SUPPORTED_VERSIONS.join(", ") ); } + Ok(()) } @@ -125,16 +118,19 @@ fn enhance_yaml_error(error: serde_yaml::Error) -> Error { anyhow!("YAML parsing error: {}{}", msg, hint) } -/// Returns true if the named agent has a `graph.yaml` in its data directory. pub fn agent_has_graph(agent_name: &str) -> bool { paths::agent_graph_file(agent_name).exists() } #[cfg(test)] mod tests { + use super::super::GRAPH_SCHEMA_VERSION; use super::super::types::NodeType; use super::*; - use std::env; + use indoc::formatdoc; + use std::fs::File; + use std::io::Write; + use std::{env, fs, process}; fn parser() -> GraphParser { GraphParser::new(env::current_dir().unwrap()) @@ -142,23 +138,25 @@ mod tests { #[test] fn parses_a_simple_graph() { - let yaml = r#" -name: simple_graph -version: "1.0" -start: node1 -nodes: - node1: - id: node1 - type: agent - agent: test_agent - prompt: "Hello world" - next: node2 - node2: - id: node2 - type: end - output: done -"#; - let graph = parser().load_from_string(yaml).unwrap(); + let yaml = formatdoc! {r#" + name: simple_graph + version: "1.0" + start: node1 + nodes: + node1: + id: node1 + type: agent + agent: test_agent + prompt: "Hello world" + next: node2 + node2: + id: node2 + type: end + output: done + "#}; + + let graph = parser().load_from_string(&yaml).unwrap(); + assert_eq!(graph.name, "simple_graph"); assert_eq!(graph.start, "node1"); assert_eq!(graph.nodes.len(), 2); @@ -170,36 +168,40 @@ nodes: #[test] fn auto_fills_node_ids_from_keys() { - let yaml = r#" -name: auto_id_graph -version: "1.0" -start: node1 -nodes: - node1: - type: agent - agent: test_agent - prompt: Test - next: node2 - node2: - type: end - output: done -"#; - let graph = parser().load_from_string(yaml).unwrap(); + let yaml = formatdoc! {r#" + name: auto_id_graph + version: "1.0" + start: node1 + nodes: + node1: + type: agent + agent: test_agent + prompt: Test + next: node2 + node2: + type: end + output: done + "#}; + + let graph = parser().load_from_string(&yaml).unwrap(); + assert_eq!(graph.nodes.get("node1").unwrap().id, "node1"); assert_eq!(graph.nodes.get("node2").unwrap().id, "node2"); } #[test] fn rejects_missing_start_node() { - let yaml = r#" -name: bad_graph -version: "1.0" -start: nonexistent -nodes: - node1: - type: end -"#; - let err = parser().load_from_string(yaml).unwrap_err().to_string(); + let yaml = formatdoc! {r#" + name: bad_graph + version: "1.0" + start: nonexistent + nodes: + node1: + type: end + "#}; + + let err = parser().load_from_string(&yaml).unwrap_err().to_string(); + assert!( err.contains("Start node 'nonexistent' not found"), "got: {err}" @@ -208,41 +210,47 @@ nodes: #[test] fn rejects_empty_graph_name() { - let yaml = r#" -name: "" -version: "1.0" -start: node1 -nodes: - node1: - type: end -"#; - let err = parser().load_from_string(yaml).unwrap_err().to_string(); + let yaml = formatdoc! {r#" + name: "" + version: "1.0" + start: node1 + nodes: + node1: + type: end + "#}; + + let err = parser().load_from_string(&yaml).unwrap_err().to_string(); + assert!(err.contains("non-empty 'name'"), "got: {err}"); } #[test] fn rejects_no_nodes() { - let yaml = r#" -name: empty_graph -version: "1.0" -start: node1 -nodes: {} -"#; - let err = parser().load_from_string(yaml).unwrap_err().to_string(); + let yaml = formatdoc! {r#" + name: empty_graph + version: "1.0" + start: node1 + nodes: {} + "#, "{}"}; + + let err = parser().load_from_string(&yaml).unwrap_err().to_string(); + assert!(err.contains("no nodes defined"), "got: {err}"); } #[test] fn rejects_unsupported_version() { - let yaml = r#" -name: future_graph -version: "2.0" -start: node1 -nodes: - node1: - type: end -"#; - let err = parser().load_from_string(yaml).unwrap_err().to_string(); + let yaml = formatdoc! {r#" + name: future_graph + version: "2.0" + start: node1 + nodes: + node1: + type: end + "#}; + + let err = parser().load_from_string(&yaml).unwrap_err().to_string(); + assert!( err.contains("Unsupported graph schema version"), "got: {err}" @@ -251,42 +259,46 @@ nodes: #[test] fn rejects_node_id_mismatch() { - let yaml = r#" -name: mismatch_graph -version: "1.0" -start: node1 -nodes: - node1: - id: different_id - type: end -"#; - let err = parser().load_from_string(yaml).unwrap_err().to_string(); + let yaml = formatdoc! {r#" + name: mismatch_graph + version: "1.0" + start: node1 + nodes: + node1: + id: different_id + type: end + "#}; + + let err = parser().load_from_string(&yaml).unwrap_err().to_string(); + assert!(err.contains("Node ID mismatch"), "got: {err}"); } #[test] fn parses_approval_node_with_routes() { - let yaml = r#" -name: approval_graph -version: "1.0" -start: approval1 -nodes: - approval1: - type: approval - question: "Proceed with deployment?" - options: - - "Yes" - - "No" - routes: - "Yes": deploy - "No": cancel - on_other: cancel - deploy: - type: end - cancel: - type: end -"#; - let graph = parser().load_from_string(yaml).unwrap(); + let yaml = formatdoc! {r#" + name: approval_graph + version: "1.0" + start: approval1 + nodes: + approval1: + type: approval + question: "Proceed with deployment?" + options: + - "Yes" + - "No" + routes: + "Yes": deploy + "No": cancel + on_other: cancel + deploy: + type: end + cancel: + type: end + "#}; + + let graph = parser().load_from_string(&yaml).unwrap(); + let approval = graph.nodes.get("approval1").unwrap(); match &approval.node_type { NodeType::Approval(a) => { @@ -300,19 +312,21 @@ nodes: #[test] fn parses_settings_overrides() { - let yaml = r#" -name: settings_graph -version: "1.0" -start: node1 -settings: - max_loop_iterations: 50 - timeout: 300 - log_state_snapshots: false -nodes: - node1: - type: end -"#; - let graph = parser().load_from_string(yaml).unwrap(); + let yaml = formatdoc! {r#" + name: settings_graph + version: "1.0" + start: node1 + settings: + max_loop_iterations: 50 + timeout: 300 + log_state_snapshots: false + nodes: + node1: + type: end + "#}; + + let graph = parser().load_from_string(&yaml).unwrap(); + assert_eq!(graph.settings.max_loop_iterations, 50); assert_eq!(graph.settings.timeout, Some(300)); assert!(!graph.settings.log_state_snapshots); @@ -321,19 +335,21 @@ nodes: #[test] fn parses_initial_state() { - let yaml = r#" -name: state_graph -version: "1.0" -start: node1 -initial_state: - user_name: "Alice" - count: 42 - enabled: true -nodes: - node1: - type: end -"#; - let graph = parser().load_from_string(yaml).unwrap(); + let yaml = formatdoc! {r#" + name: state_graph + version: "1.0" + start: node1 + initial_state: + user_name: "Alice" + count: 42 + enabled: true + nodes: + node1: + type: end + "#}; + + let graph = parser().load_from_string(&yaml).unwrap(); + assert_eq!(graph.initial_state.len(), 3); assert_eq!(graph.initial_state.get("user_name").unwrap(), "Alice"); assert_eq!( @@ -348,28 +364,32 @@ nodes: #[test] fn uses_default_version_when_absent() { - let yaml = r#" -name: no_version -start: node1 -nodes: - node1: - type: end -"#; - let graph = parser().load_from_string(yaml).unwrap(); - assert_eq!(graph.version, super::super::GRAPH_SCHEMA_VERSION); + let yaml = formatdoc! {r#" + name: no_version + start: node1 + nodes: + node1: + type: end + "#}; + + let graph = parser().load_from_string(&yaml).unwrap(); + + assert_eq!(graph.version, GRAPH_SCHEMA_VERSION); } #[test] fn rejects_unknown_node_type_with_hint() { - let yaml = r#" -name: bad_type -version: "1.0" -start: node1 -nodes: - node1: - type: nonsense -"#; - let err = parser().load_from_string(yaml).unwrap_err().to_string(); + let yaml = formatdoc! {r#" + name: bad_type + version: "1.0" + start: node1 + nodes: + node1: + type: nonsense + "#}; + + let err = parser().load_from_string(&yaml).unwrap_err().to_string(); + assert!( err.contains("Valid node types") || err.contains("unknown variant"), "got: {err}" @@ -379,49 +399,50 @@ nodes: #[test] fn rejects_malformed_yaml() { let yaml = "name: bad\n bad: indent\nstart: a"; + let result = parser().load_from_string(yaml); + assert!(result.is_err()); } #[test] fn missing_required_fields_have_a_hint() { - let yaml = r#" -name: missing_start -version: "1.0" -nodes: - node1: - type: end -"#; - let err = parser().load_from_string(yaml).unwrap_err().to_string(); + let yaml = formatdoc! {r#" + name: missing_start + version: "1.0" + nodes: + node1: + type: end + "#}; + + let err = parser().load_from_string(&yaml).unwrap_err().to_string(); + assert!(err.contains("Hint"), "got: {err}"); } #[test] fn load_from_file_reads_disk() { - use std::io::Write; let dir = env::temp_dir(); - let path = dir.join(format!( - "loki_graph_parser_test_{}.yaml", - std::process::id() - )); - let yaml = r#" -name: disk_graph -version: "1.0" -start: only -nodes: - only: - type: end - output: ok -"#; + let path = dir.join(format!("loki_graph_parser_test_{}.yaml", process::id())); + let yaml = formatdoc! {r#" + name: disk_graph + version: "1.0" + start: only + nodes: + only: + type: end + output: ok + "#}; { - let mut f = std::fs::File::create(&path).unwrap(); + let mut f = File::create(&path).unwrap(); f.write_all(yaml.as_bytes()).unwrap(); } let graph = GraphParser::new(dir).load_from_file(&path).unwrap(); + assert_eq!(graph.name, "disk_graph"); - let _ = std::fs::remove_file(&path); + let _ = fs::remove_file(&path); } #[test] @@ -430,6 +451,7 @@ nodes: .load_from_file("/definitely/not/a/real/path/to_any_graph.yaml") .unwrap_err() .to_string(); + assert!(err.contains("Failed to read graph file"), "got: {err}"); } diff --git a/src/graph/rag.rs b/src/graph/rag.rs index a4ef9b7..2017e50 100644 --- a/src/graph/rag.rs +++ b/src/graph/rag.rs @@ -1,11 +1,3 @@ -//! Execution of `rag`-type graph nodes. -//! -//! A `rag` node runs a hybrid (vector + keyword) retrieval against the -//! per-node knowledge base built at agent-load time, and writes the result -//! into graph state. The result is exposed to `state_updates` as -//! `{{output}}` — a JSON object `{ context, sources }` where `sources` is -//! an array of source paths. - use super::state::StateManager; use super::types::RagNode; use crate::config::RequestContext; @@ -22,9 +14,6 @@ const DEFAULT_RAG_TIMEOUT_SECS: u64 = 120; pub struct RagNodeExecutor; impl RagNodeExecutor { - /// Interpolate the node's query, run the retrieval against this node's - /// knowledge base, expose the result as `{{output}}` for `state_updates`, - /// and return `node_next`. pub async fn execute( node: &RagNode, node_id: &str, @@ -74,8 +63,6 @@ impl RagNodeExecutor { } /// Assemble the `{{output}}` value as `{ "context": , "sources": [...] }`. -/// `Rag::search` returns sources as a `- {path}` bullet list; it is split -/// into a JSON array so downstream templates can index `{{output.sources[0]}}`. fn build_rag_output(context: String, sources_str: &str) -> Value { let sources: Vec = sources_str .lines() @@ -84,14 +71,13 @@ fn build_rag_output(context: String, sources_str: &str) -> Value { .map(|s| Value::String(s.to_string())) .collect(); let mut obj = Map::new(); + obj.insert("context".into(), Value::String(context)); obj.insert("sources".into(), Value::Array(sources)); + Value::Object(obj) } -/// Expose the retrieval result as `{{output}}` for the duration of -/// `state_updates` evaluation, then restore the prior value. Same scoping -/// pattern as `llm`/`agent` nodes. fn apply_state_updates(node: &RagNode, state_manager: &mut StateManager, output: &Value) { let Some(updates) = &node.state_updates else { return; @@ -124,6 +110,7 @@ mod tests { #[test] fn build_rag_output_splits_bullet_sources_into_array() { let out = build_rag_output("ctx".into(), "- a.md\n- https://x.com/spec"); + assert_eq!(out["context"], json!("ctx")); assert_eq!(out["sources"], json!(["a.md", "https://x.com/spec"])); } @@ -131,18 +118,21 @@ mod tests { #[test] fn build_rag_output_handles_empty_sources() { let out = build_rag_output("ctx".into(), ""); + assert_eq!(out["sources"], json!([])); } #[test] fn build_rag_output_ignores_blank_lines() { let out = build_rag_output("c".into(), "- a\n\n- b\n"); + assert_eq!(out["sources"], json!(["a", "b"])); } #[test] fn build_rag_output_tolerates_unprefixed_lines() { let out = build_rag_output("c".into(), "plain/path"); + assert_eq!(out["sources"], json!(["plain/path"])); } } diff --git a/src/graph/script.rs b/src/graph/script.rs index 8d842f9..8e82aab 100644 --- a/src/graph/script.rs +++ b/src/graph/script.rs @@ -1,11 +1,3 @@ -//! Script execution for `script`-type graph nodes. -//! -//! Scripts receive graph state via either `GRAPH_STATE` (inline JSON env var) -//! or `GRAPH_STATE_FILE` (path to a file containing the JSON) when state -//! exceeds [`super::MAX_STATE_SIZE_BYTES`]. Scripts MUST print a single JSON -//! object on stdout. The `_next` key (if present) is consumed for routing -//! and removed before the remaining keys are merged into state. - use super::state::{StateManager, StateRepresentation}; use super::types::ScriptNode; use crate::function::Language; @@ -18,9 +10,6 @@ use std::time::Duration; use tokio::process::Command; use tokio::time::timeout; -/// Executor for script nodes. `base_dir` is the directory script paths are -/// resolved against (typically the owning agent's data directory) and is -/// also used as the child process's working directory. pub struct ScriptExecutor { base_dir: PathBuf, } @@ -32,10 +21,6 @@ impl ScriptExecutor { } } - /// Run the script, merge its JSON output into state (extracting `_next` - /// for routing), and then apply any `state_updates` templates. Returns - /// the routing decision from `_next`, or `None` if the script did not - /// emit one (in which case the executor falls back to `Node.next`). pub async fn execute( &self, node: &ScriptNode, @@ -153,6 +138,7 @@ fn detect_language(script_path: &Path) -> Result { .and_then(|e| e.to_str()) .ok_or_else(|| anyhow!("Script has no file extension: '{}'", script_path.display()))? .to_string(); + match Language::from(&ext) { Language::Unsupported => bail!( "Unsupported script extension '.{}' for '{}'", @@ -171,9 +157,11 @@ fn build_command(language: Language, script_path: &Path) -> Result { ) })?; let mut cmd = Command::new(program); + for arg in prefix_args { cmd.arg(arg); } + cmd.arg(script_path); Ok(cmd) } @@ -183,8 +171,10 @@ mod tests { use super::super::MAX_STATE_SIZE_BYTES; use super::*; use crate::utils::temp_file; + use indoc::formatdoc; use serde_json::json; use std::collections::HashMap; + use std::env::temp_dir; use std::fs; fn cmd_available(name: &str) -> bool { @@ -226,6 +216,7 @@ echo '{"quality": 0.85, "issues": 3, "_next": "approve"}' ); let mut state = StateManager::new(HashMap::new()); let executor = ScriptExecutor::new(&dir); + let next = executor .execute( &node_for(path.file_name().unwrap().to_str().unwrap(), 5), @@ -233,6 +224,7 @@ echo '{"quality": 0.85, "issues": 3, "_next": "approve"}' ) .await .unwrap(); + assert_eq!(next.as_deref(), Some("approve")); assert_eq!(state.state().get("quality"), Some(&json!(0.85))); assert_eq!(state.state().get("issues"), Some(&json!(3))); @@ -257,6 +249,7 @@ printf '{"greeting": "hello %s"}' "$NAME" initial.insert("name".into(), json!("alice")); let mut state = StateManager::new(initial); let executor = ScriptExecutor::new(&dir); + let _ = executor .execute( &node_for(path.file_name().unwrap().to_str().unwrap(), 5), @@ -264,6 +257,7 @@ printf '{"greeting": "hello %s"}' "$NAME" ) .await .unwrap(); + assert_eq!(state.state().get("greeting"), Some(&json!("hello alice"))); cleanup(&dir); } @@ -281,6 +275,7 @@ echo '{"ok": true}' ); let mut state = StateManager::new(HashMap::new()); let executor = ScriptExecutor::new(&dir); + let next = executor .execute( &node_for(path.file_name().unwrap().to_str().unwrap(), 5), @@ -288,6 +283,7 @@ echo '{"ok": true}' ) .await .unwrap(); + assert!(next.is_none()); assert_eq!(state.state().get("ok"), Some(&json!(true))); cleanup(&dir); @@ -321,12 +317,14 @@ echo '{"raw": "hello"}' #[tokio::test] async fn missing_script_file_errors_before_spawning() { let mut state = StateManager::new(HashMap::new()); - let executor = ScriptExecutor::new(std::env::temp_dir()); + let executor = ScriptExecutor::new(temp_dir()); + let err = executor .execute(&node_for("__does_not_exist__.sh", 5), &mut state) .await .unwrap_err() .to_string(); + assert!(err.contains("Script file not found"), "got: {err}"); } @@ -338,6 +336,7 @@ echo '{"raw": "hello"}' let (dir, path) = write_script("#!/bin/bash\n", "sh"); let mut state = StateManager::new(HashMap::new()); let executor = ScriptExecutor::new(&dir); + let err = executor .execute( &node_for(path.file_name().unwrap().to_str().unwrap(), 5), @@ -346,6 +345,7 @@ echo '{"raw": "hello"}' .await .unwrap_err() .to_string(); + assert!(err.contains("produced no output"), "got: {err}"); cleanup(&dir); } @@ -356,13 +356,15 @@ echo '{"raw": "hello"}' return; } let (dir, path) = write_script( - r#"#!/bin/bash -echo "not json at all" -"#, + &formatdoc! {r#" + #!/bin/bash + echo "not json at all" + "#}, "sh", ); let mut state = StateManager::new(HashMap::new()); let executor = ScriptExecutor::new(&dir); + let err = executor .execute( &node_for(path.file_name().unwrap().to_str().unwrap(), 5), @@ -371,6 +373,7 @@ echo "not json at all" .await .unwrap_err() .to_string(); + assert!(err.contains("merge output"), "got: {err}"); cleanup(&dir); } @@ -381,14 +384,16 @@ echo "not json at all" return; } let (dir, path) = write_script( - r#"#!/bin/bash -echo "bad happened" >&2 -exit 7 -"#, + &formatdoc! {r#" + #!/bin/bash + echo "bad happened" >&2 + exit 7 + "#}, "sh", ); let mut state = StateManager::new(HashMap::new()); let executor = ScriptExecutor::new(&dir); + let err = executor .execute( &node_for(path.file_name().unwrap().to_str().unwrap(), 5), @@ -397,6 +402,7 @@ exit 7 .await .unwrap_err() .to_string(); + assert!(err.contains("exit code"), "got: {err}"); assert!(err.contains("bad happened"), "got: {err}"); cleanup(&dir); @@ -416,6 +422,7 @@ echo '{"ok":true}' ); let mut state = StateManager::new(HashMap::new()); let executor = ScriptExecutor::new(&dir); + let err = executor .execute( &node_for(path.file_name().unwrap().to_str().unwrap(), 1), @@ -424,6 +431,7 @@ echo '{"ok":true}' .await .unwrap_err() .to_string(); + assert!(err.contains("timed out"), "got: {err}"); cleanup(&dir); } @@ -493,6 +501,7 @@ print(json.dumps({ ) .await .unwrap(); + assert_eq!(next.as_deref(), Some("next_node")); assert_eq!(state.state().get("doubled"), Some(&json!(42))); cleanup(&dir); @@ -503,6 +512,7 @@ print(json.dumps({ let (dir, path) = write_script("echo hi", "xyz"); let mut state = StateManager::new(HashMap::new()); let executor = ScriptExecutor::new(&dir); + let err = executor .execute( &node_for(path.file_name().unwrap().to_str().unwrap(), 5), @@ -511,6 +521,7 @@ print(json.dumps({ .await .unwrap_err() .to_string(); + assert!( err.contains("Unsupported script extension '.xyz'"), "got: {err}" diff --git a/src/graph/state.rs b/src/graph/state.rs index 31f435e..7b78caa 100644 --- a/src/graph/state.rs +++ b/src/graph/state.rs @@ -1,5 +1,3 @@ -//! State management and template interpolation for graph execution. - use super::MAX_STATE_SIZE_BYTES; use super::types::GraphState; use crate::utils::temp_file; @@ -8,21 +6,13 @@ use fancy_regex::Regex; use serde_json::Value; use std::collections::HashMap; use std::fs; -use std::fs::{read_to_string, write}; +use std::fs::write; use std::path::PathBuf; use std::sync::LazyLock; static TEMPLATE_VAR_RE: LazyLock = LazyLock::new(|| Regex::new(r"\{\{([a-zA-Z0-9_\.\[\]]+)\}\}").expect("invalid template regex")); -/// Wraps [`GraphState`] with template interpolation, script-output merging, -/// and a large-state temp-file fallback for use with scripts. -/// -/// Template syntax: `{{key}}` for top-level keys, `{{a.b.c}}` for nested -/// JSON paths, and `{{arr[0]}}` / `{{a.b[2].c}}` / `{{matrix[0][1]}}` for -/// array indices. Use [`StateManager::interpolate`] for strict interpolation -/// (errors on missing keys) or [`StateManager::interpolate_lenient`] for -/// best-effort (missing keys become empty strings). pub struct StateManager { state: GraphState, temp_file: Option, @@ -44,8 +34,6 @@ impl StateManager { &mut self.state } - /// Replace every `{{key}}` in `template` with its state value. Returns - /// an error if any referenced key is missing. pub fn interpolate(&self, template: &str) -> Result { let mut missing = Vec::new(); let result = self.interpolate_inner(template, |key| { @@ -65,8 +53,6 @@ impl StateManager { Ok(result) } - /// Same as [`Self::interpolate`] but missing keys are silently replaced - /// with an empty string. pub fn interpolate_lenient(&self, template: &str) -> String { self.interpolate_inner(template, |_| String::new()) } @@ -96,6 +82,7 @@ impl StateManager { for idx in root_indices { current = current.get(idx)?; } + for part in parts { let (segment_key, indices) = split_indices(part)?; if !segment_key.is_empty() { @@ -105,12 +92,10 @@ impl StateManager { current = current.get(idx)?; } } + Some(current) } - /// Serialize the state for transport to a script. State larger than - /// [`MAX_STATE_SIZE_BYTES`] is written to a unique temp file; the file - /// is cleaned up when the `StateManager` is dropped. pub fn serialize_state(&mut self) -> Result { let json = self.state.to_json()?; if json.len() > MAX_STATE_SIZE_BYTES { @@ -119,16 +104,14 @@ impl StateManager { format!("Failed to write state to temp file at '{}'", path.display()) })?; self.temp_file = Some(path.clone()); + Ok(StateRepresentation::File(path)) } else { Ok(StateRepresentation::Inline(json)) } } - pub fn to_json_string(&self) -> Result { - self.state.to_json() - } - + #[cfg(test)] pub fn from_json_string(json: &str) -> Result { let data: HashMap = serde_json::from_str(json).context("Failed to parse state JSON")?; @@ -143,13 +126,11 @@ impl StateManager { self.state.size_bytes() } + #[cfg(test)] pub fn is_large(&self) -> bool { self.size_bytes() > MAX_STATE_SIZE_BYTES } - /// Merge a script's JSON-object stdout into state. The reserved `_next` - /// key is extracted (used by the executor for routing) and is not stored - /// in state. Errors if the output is not a JSON object. pub fn merge_script_output(&mut self, json_output: &str) -> Result> { let value: Value = serde_json::from_str(json_output).context("Script output must be valid JSON")?; @@ -170,8 +151,6 @@ impl StateManager { Ok(next_node) } - /// Remove the temp file backing this state, if any. Called automatically - /// on drop. pub fn cleanup(&mut self) { if let Some(path) = self.temp_file.take() { let _ = fs::remove_file(path); @@ -185,19 +164,18 @@ impl Drop for StateManager { } } -/// How serialized state is delivered to a script: inline JSON for small -/// state, or a file path for state above [`MAX_STATE_SIZE_BYTES`]. #[derive(Debug, Clone)] pub enum StateRepresentation { Inline(String), File(PathBuf), } +#[cfg(test)] impl StateRepresentation { pub fn as_string(&self) -> Result { match self { StateRepresentation::Inline(s) => Ok(s.clone()), - StateRepresentation::File(path) => read_to_string(path) + StateRepresentation::File(path) => fs::read_to_string(path) .with_context(|| format!("Failed to read state file at '{}'", path.display())), } } @@ -222,6 +200,7 @@ fn split_indices(segment: &str) -> Option<(&str, Vec)> { }; let mut indices = Vec::new(); let mut rest = &segment[bracket_start.unwrap()..]; + while !rest.is_empty() { if !rest.starts_with('[') { return None; @@ -231,6 +210,7 @@ fn split_indices(segment: &str) -> Option<(&str, Vec)> { indices.push(idx); rest = &rest[close + 1..]; } + Some((key, indices)) } @@ -261,9 +241,11 @@ mod tests { #[test] fn simple_interpolation_replaces_top_level_keys() { let manager = manager_with(&[("name", json!("Alice")), ("age", json!(30))]); + let result = manager .interpolate("Hello {{name}}, you are {{age}} years old") .unwrap(); + assert_eq!(result, "Hello Alice, you are 30 years old"); } @@ -271,9 +253,11 @@ mod tests { fn nested_interpolation_walks_objects() { let manager = manager_with(&[("user", json!({ "name": "Bob", "email": "bob@example.com" }))]); + let result = manager .interpolate("User: {{user.name}} ({{user.email}})") .unwrap(); + assert_eq!(result, "User: Bob (bob@example.com)"); } @@ -285,19 +269,23 @@ mod tests { "api": { "key": "secret123", "endpoint": "https://api.example.com" } }), )]); + let result = manager .interpolate("API: {{config.api.endpoint}} with key {{config.api.key}}") .unwrap(); + assert_eq!(result, "API: https://api.example.com with key secret123"); } #[test] fn strict_interpolation_errors_on_missing_keys() { let manager = manager_with(&[]); + let err = manager .interpolate("Hello {{name}}") .unwrap_err() .to_string(); + assert!(err.contains("not found"), "got: {err}"); assert!(err.contains("name"), "got: {err}"); } @@ -305,24 +293,30 @@ mod tests { #[test] fn strict_interpolation_collects_all_missing_keys() { let manager = manager_with(&[]); + let err = manager .interpolate("{{a}} and {{b}}") .unwrap_err() .to_string(); + assert!(err.contains("'a'") && err.contains("'b'"), "got: {err}"); } #[test] fn lenient_interpolation_substitutes_empty_for_missing() { let manager = manager_with(&[("name", json!("Alice"))]); + let result = manager.interpolate_lenient("Hello {{name}}, age: {{age}}"); + assert_eq!(result, "Hello Alice, age: "); } #[test] fn lenient_interpolation_handles_missing_intermediate() { let manager = manager_with(&[("user", json!({ "name": "Bob" }))]); + let result = manager.interpolate_lenient("email: {{user.email}}"); + assert_eq!(result, "email: "); } @@ -333,6 +327,7 @@ mod tests { ("count", json!(42)), ("nothing", json!(null)), ]); + assert_eq!(manager.interpolate("{{on}}").unwrap(), "true"); assert_eq!(manager.interpolate("{{count}}").unwrap(), "42"); assert_eq!(manager.interpolate("{{nothing}}").unwrap(), "null"); @@ -341,20 +336,25 @@ mod tests { #[test] fn interpolates_arrays_as_json() { let manager = manager_with(&[("items", json!(["a", "b", "c"]))]); + let result = manager.interpolate("{{items}}").unwrap(); + assert_eq!(result, r#"["a","b","c"]"#); } #[test] fn interpolates_objects_as_json() { let manager = manager_with(&[("data", json!({ "key": "value" }))]); + let result = manager.interpolate("{{data}}").unwrap(); + assert_eq!(result, r#"{"key":"value"}"#); } #[test] fn interpolates_array_indices() { let manager = manager_with(&[("items", json!(["a", "b", "c"]))]); + assert_eq!(manager.interpolate("{{items[0]}}").unwrap(), "a"); assert_eq!(manager.interpolate("{{items[2]}}").unwrap(), "c"); } @@ -362,24 +362,29 @@ mod tests { #[test] fn interpolates_array_indices_inside_nested_paths() { let manager = manager_with(&[("outer", json!({ "inner": { "arr": ["x", "y", "z"] } }))]); + let result = manager .interpolate("first={{outer.inner.arr[0]}} last={{outer.inner.arr[2]}}") .unwrap(); + assert_eq!(result, "first=x last=z"); } #[test] fn interpolates_object_fields_after_array_index() { let manager = manager_with(&[("users", json!([{ "name": "Alice" }, { "name": "Bob" }]))]); + let result = manager .interpolate("{{users[0].name}} and {{users[1].name}}") .unwrap(); + assert_eq!(result, "Alice and Bob"); } #[test] fn interpolates_nested_array_indices() { let manager = manager_with(&[("matrix", json!([[1, 2], [3, 4]]))]); + assert_eq!(manager.interpolate("{{matrix[0][1]}}").unwrap(), "2"); assert_eq!(manager.interpolate("{{matrix[1][0]}}").unwrap(), "3"); } @@ -387,21 +392,27 @@ mod tests { #[test] fn out_of_bounds_array_index_is_missing() { let manager = manager_with(&[("items", json!(["a", "b"]))]); + let err = manager.interpolate("{{items[5]}}").unwrap_err().to_string(); + assert!(err.contains("not found"), "got: {err}"); } #[test] fn replaces_all_occurrences_of_same_key() { let manager = manager_with(&[("n", json!("Alice"))]); + let result = manager.interpolate("{{n}} and {{n}} again").unwrap(); + assert_eq!(result, "Alice and Alice again"); } #[test] fn passes_through_templates_with_no_variables() { let manager = manager_with(&[]); + let result = manager.interpolate("No variables here").unwrap(); + assert_eq!(result, "No variables here"); } @@ -409,14 +420,18 @@ mod tests { fn from_json_string_round_trips() { let json = r#"{"name": "Alice", "age": 30}"#; let manager = StateManager::from_json_string(json).unwrap(); + let result = manager.interpolate("{{name}} is {{age}}").unwrap(); + assert_eq!(result, "Alice is 30"); } #[test] fn snapshot_clones_state_data() { let manager = manager_with(&[("k1", json!("v1")), ("k2", json!(42))]); + let snap = manager.snapshot(); + assert_eq!(snap.len(), 2); assert_eq!(snap.get("k1"), Some(&json!("v1"))); assert_eq!(snap.get("k2"), Some(&json!(42))); @@ -425,7 +440,9 @@ mod tests { #[test] fn small_state_serializes_inline() { let mut manager = manager_with(&[("key", json!("value"))]); + let repr = manager.serialize_state().unwrap(); + assert!(matches!(repr, StateRepresentation::Inline(_))); assert!(!manager.is_large()); assert!(!repr.is_file()); @@ -443,7 +460,7 @@ mod tests { assert!(path.exists()); let contents = repr.as_string().unwrap(); - let parsed: serde_json::Value = serde_json::from_str(&contents).unwrap(); + let parsed: Value = serde_json::from_str(&contents).unwrap(); assert_eq!( parsed.get("blob").unwrap().as_str().unwrap().len(), big.len() @@ -457,7 +474,9 @@ mod tests { fn merge_script_output_merges_keys_into_state() { let mut manager = manager_with(&[]); let output = r#"{"quality_score": 0.85, "issues_found": 3, "status": "complete"}"#; + let next = manager.merge_script_output(output).unwrap(); + assert_eq!(next, None); assert_eq!(manager.state().get("quality_score"), Some(&json!(0.85))); assert_eq!(manager.state().get("issues_found"), Some(&json!(3))); @@ -468,7 +487,9 @@ mod tests { fn merge_script_output_extracts_next_key_for_routing() { let mut manager = manager_with(&[]); let output = r#"{"_next": "approval_gate", "quality_score": 0.85}"#; + let next = manager.merge_script_output(output).unwrap(); + assert_eq!(next.as_deref(), Some("approval_gate")); assert_eq!(manager.state().get("quality_score"), Some(&json!(0.85))); assert!( @@ -480,29 +501,35 @@ mod tests { #[test] fn merge_script_output_rejects_invalid_json() { let mut manager = manager_with(&[]); + let err = manager .merge_script_output("not json") .unwrap_err() .to_string(); + assert!(err.contains("valid JSON"), "got: {err}"); } #[test] fn merge_script_output_rejects_non_object() { let mut manager = manager_with(&[]); + let err = manager .merge_script_output("[1, 2, 3]") .unwrap_err() .to_string(); + assert!(err.contains("must be a JSON object"), "got: {err}"); } #[test] fn merge_script_output_overwrites_existing_state_keys() { let mut manager = manager_with(&[("status", json!("pending"))]); + let _ = manager .merge_script_output(r#"{"status": "complete"}"#) .unwrap(); + assert_eq!(manager.state().get("status"), Some(&json!("complete"))); } } diff --git a/src/graph/structured.rs b/src/graph/structured.rs index c311341..26acc5e 100644 --- a/src/graph/structured.rs +++ b/src/graph/structured.rs @@ -1,12 +1,3 @@ -//! Structured-output extraction for `llm` and `agent` nodes. Takes the -//! raw final text of a node and a user-supplied JSON Schema, and returns -//! a parsed [`serde_json::Value`] conforming to that schema (best-effort). -//! -//! Strategy: try to parse `raw` directly first (with light cleanup of -//! markdown fences), and only invoke a follow-up LLM call against the -//! built-in `structured-output` role if direct parsing fails. On -//! extractor-output parse failure, perform one repair retry. - use crate::client::call_chat_completions; use crate::config::{Input, RequestContext, Role, RoleLike}; use crate::utils::{create_abort_signal, dimmed_text}; @@ -67,6 +58,7 @@ async fn extract_via_extractor( "{}", dimmed_text("▸ structured-output: extractor returned invalid JSON, retrying") ); + Box::pin(extract_via_extractor(&output, schema, parent_ctx, true)).await } } @@ -101,11 +93,13 @@ async fn run_one_shot(prompt: &str, ctx: &mut RequestContext) -> Result let (output, tool_results) = call_chat_completions(&input, false, false, client.as_ref(), ctx, abort).await?; ctx.after_chat_completion(app_cfg.as_ref(), &input, &output, &tool_results)?; + Ok(output) } fn try_parse_json(raw: &str) -> Option { let cleaned = strip_code_fences(raw.trim()); + serde_json::from_str(cleaned).ok() } @@ -129,26 +123,32 @@ mod tests { #[test] fn try_parse_json_accepts_plain_object() { let v = try_parse_json(r#"{"a": 1}"#).unwrap(); + assert_eq!(v, json!({"a": 1})); } #[test] fn try_parse_json_strips_json_fences() { let raw = "```json\n{\"a\": 1}\n```"; + let v = try_parse_json(raw).unwrap(); + assert_eq!(v, json!({"a": 1})); } #[test] fn try_parse_json_strips_bare_fences() { let raw = "```\n{\"a\": 1}\n```"; + let v = try_parse_json(raw).unwrap(); + assert_eq!(v, json!({"a": 1})); } #[test] fn try_parse_json_tolerates_whitespace() { let v = try_parse_json(" \n {\"x\": true}\n\n").unwrap(); + assert_eq!(v, json!({"x": true})); } @@ -165,13 +165,16 @@ mod tests { #[test] fn try_parse_json_accepts_arrays() { let v = try_parse_json("[1, 2, 3]").unwrap(); + assert_eq!(v, json!([1, 2, 3])); } #[test] fn build_extractor_prompt_includes_schema_and_input() { let schema = json!({"type": "object"}); + let prompt = build_extractor_prompt("hello", &schema, false); + assert!(prompt.contains("Schema:")); assert!(prompt.contains("Input:")); assert!(prompt.contains("hello")); @@ -180,7 +183,9 @@ mod tests { #[test] fn build_extractor_prompt_repair_includes_repair_instruction() { let schema = json!({"type": "object"}); + let prompt = build_extractor_prompt("oops", &schema, true); + assert!(prompt.contains("previous response")); assert!(prompt.contains("oops")); } @@ -188,6 +193,7 @@ mod tests { #[test] fn build_extractor_role_disables_tools_and_mcp() { let role = build_extractor_role().expect("builtin role must exist"); + assert_eq!(role.enabled_tools().as_deref(), Some("")); assert_eq!(role.enabled_mcp_servers().as_deref(), Some("")); } diff --git a/src/graph/types.rs b/src/graph/types.rs index 6ad9097..5a7bc0c 100644 --- a/src/graph/types.rs +++ b/src/graph/types.rs @@ -1,12 +1,9 @@ -//! Core data structures for graph-based agent orchestration. - use anyhow::Result; use indexmap::IndexMap; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; -/// A graph definition loaded from YAML. #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Graph { pub name: String, @@ -17,29 +14,21 @@ pub struct Graph { #[serde(default = "default_schema_version")] pub version: String, - /// Default chat model for the agent. Used when an `llm` node does not - /// set its own `model`. Consulted in single-file mode (an agent with - /// a `graph.yaml` and no `config.yaml`). #[serde(default, skip_serializing_if = "Option::is_none")] pub model: Option, - /// Default sampling temperature. Single-file mode only. #[serde(default, skip_serializing_if = "Option::is_none")] pub temperature: Option, - /// Default sampling top-p. Single-file mode only. #[serde(default, skip_serializing_if = "Option::is_none")] pub top_p: Option, - /// Global tools available to the agent's nodes. Single-file mode only. #[serde(default)] pub global_tools: Vec, - /// MCP servers available to the agent's nodes. Single-file mode only. #[serde(default)] pub mcp_servers: Vec, - /// Suggested prompts surfaced in the UI. Single-file mode only. #[serde(default)] pub conversation_starters: Vec, @@ -67,9 +56,6 @@ impl Graph { self.nodes.keys().map(|s| s.as_str()).collect() } - /// Returns true if any node is an `agent`-type node. Used to derive - /// `can_spawn_agents` when synthesizing an agent config from a - /// single-file graph. pub fn has_agent_node(&self) -> bool { self.nodes .values() @@ -81,7 +67,6 @@ fn default_schema_version() -> String { super::GRAPH_SCHEMA_VERSION.to_string() } -/// Graph-level settings. #[derive(Debug, Clone, Deserialize, Serialize)] pub struct GraphSettings { #[serde(default = "default_max_loop_iterations")] @@ -116,12 +101,8 @@ fn default_true() -> bool { true } -/// A node in the graph. `node_type` is flattened into the YAML, so a node's -/// variant-specific fields live alongside `id`, `description`. #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Node { - /// Unique node identifier. May be omitted in YAML; the parser fills it - /// in from the surrounding `nodes:` map key. #[serde(default)] pub id: String, @@ -131,15 +112,10 @@ pub struct Node { #[serde(flatten)] pub node_type: NodeType, - /// Static next-node routing. Used by agent/input nodes. - /// Approval nodes use their `routes` map instead. - /// Script nodes: this is populated by `_next` in JSON output. #[serde(default, skip_serializing_if = "Option::is_none")] pub next: Option, } -/// The supported node variants. YAML uses an internal `type` tag in lowercase -/// (e.g. `type: agent`). #[derive(Debug, Clone, Deserialize, Serialize)] #[serde(tag = "type", rename_all = "lowercase")] pub enum NodeType { @@ -152,12 +128,6 @@ pub enum NodeType { End(EndNode), } -/// `agent`-type node: spawn an agent with a templated prompt. The agent -/// uses the full tool stack from its own directory (`global_tools` and -/// `mcp_servers` in `config.yaml` plus any per-agent `tools.{sh,py,ts,js}` -/// script); there is no per-node tool override here. For tool-filtered -/// one-shot LLM steps, use an `llm`-type node (future). To use different -/// tool sets via agent variants, see `docs/graph-agents/agent-tools.md`. #[derive(Debug, Clone, Deserialize, Serialize)] pub struct AgentNode { pub agent: String, @@ -167,10 +137,6 @@ pub struct AgentNode { #[serde(default, skip_serializing_if = "Option::is_none")] pub state_updates: Option>, - /// JSON Schema describing the expected shape of the agent's final - /// output. When set, the agent's raw text is post-processed through - /// a built-in structured-output extractor and parsed as JSON. Top- - /// level keys of the parsed object are auto-merged into state. #[serde(default, skip_serializing_if = "Option::is_none")] pub output_schema: Option, @@ -178,9 +144,6 @@ pub struct AgentNode { pub timeout: Option, } -/// `script`-type node: run a Python/TypeScript/Bash script that prints a -/// JSON object on stdout. Keys merge into state; the special `_next` key -/// overrides routing and is not merged. #[derive(Debug, Clone, Deserialize, Serialize)] pub struct ScriptNode { pub script: String, @@ -188,7 +151,6 @@ pub struct ScriptNode { #[serde(default, skip_serializing_if = "Option::is_none")] pub state_updates: Option>, - /// Fallback node to route to if the script fails to run or returns empty #[serde(default, skip_serializing_if = "Option::is_none")] pub fallback: Option, @@ -200,8 +162,6 @@ fn default_script_timeout() -> u64 { 30 } -/// `approval`-type node: prompt the user with `options` and route based on -/// their choice via the `routes` map. #[derive(Debug, Clone, Deserialize, Serialize)] pub struct ApprovalNode { pub question: String, @@ -210,20 +170,12 @@ pub struct ApprovalNode { pub routes: HashMap, - /// REQUIRED. The user_ask tool always permits a free-form "type your - /// own answer" response in addition to the listed `options`. When the - /// user supplies any answer that does NOT match a key in `routes`, - /// execution routes to this node. The free-form text is available to - /// downstream nodes via `state_updates` (e.g. `clarification: "{{choice}}"`). pub on_other: String, #[serde(default, skip_serializing_if = "Option::is_none")] pub state_updates: Option>, } -/// `input`-type node: collect free-form text from the user. Routes via the -/// top-level `next` field; the user's text is exposed to templates as -/// `{{input}}` in `state_updates`. #[derive(Debug, Clone, Deserialize, Serialize)] pub struct InputNode { pub question: String, @@ -238,36 +190,13 @@ pub struct InputNode { pub state_updates: Option>, } -/// `llm`-type node: a one-shot LLM call (with bounded tool-call loop) -/// against a caller-supplied system prompt + user prompt. Unlike -/// `agent`-type nodes, this does NOT spawn a sub-agent; it runs in a -/// fresh isolated context. Tool access is opt-in via the `tools` -/// whitelist (no tools when unset). -/// -/// Routing (tolerant-fail): -/// - success → `Node.next` -/// - failure WITH fallback → `fallback` -/// - failure WITHOUT fallback → `Node.next` -/// -/// `state_updates` are always applied. `{{output}}` resolves to the -/// LLM's response on success, or to an error description on failure. #[derive(Debug, Clone, Deserialize, Serialize)] pub struct LlmNode { - /// User-turn prompt. Templated against state. REQUIRED. pub prompt: String, - /// Optional system prompt. When set, the LLM call uses an inline - /// Role with `instructions` as `Role.prompt`. Templated against - /// state. #[serde(default, skip_serializing_if = "Option::is_none")] pub instructions: Option, - /// Whitelist narrowing the active agent's tool universe. - /// Each entry is either an exact function name (`global_tools` - /// entry or `tools.{sh,py,ts}` subcommand) or the shorthand - /// `mcp:` (where `` must be in the agent's - /// `mcp_servers`). Unset or `[]` = no tools — tools are strictly - /// opt-in. #[serde(default, skip_serializing_if = "Option::is_none")] pub tools: Option>, @@ -283,21 +212,15 @@ pub struct LlmNode { #[serde(default, skip_serializing_if = "Option::is_none")] pub fallback: Option, - /// Number of attempts on transient errors. Default 1 = no retries. #[serde(default = "default_llm_max_attempts")] pub max_attempts: u32, - /// Hard cap on tool-call-loop turns within a single attempt. #[serde(default = "default_llm_max_iterations")] pub max_iterations: u32, #[serde(default, skip_serializing_if = "Option::is_none")] pub state_updates: Option>, - /// JSON Schema (as parsed JSON) describing the expected shape of the - /// node's output. When set, the raw LLM response is post-processed - /// through a built-in structured-output extractor and parsed as JSON. - /// Top-level keys of the parsed object are auto-merged into state. #[serde(default, skip_serializing_if = "Option::is_none")] pub output_schema: Option, @@ -313,47 +236,28 @@ fn default_llm_max_iterations() -> u32 { 10 } -/// `rag`-type node: run a hybrid (vector + keyword) retrieval against a -/// per-node knowledge base and write the result into state. The retrieved -/// context and the list of source paths are exposed to `state_updates` via -/// `{{output.context}}` and `{{output.sources}}` (the whole result is -/// `{{output}}`, a JSON object). The knowledge base is built once at agent -/// load time into `/.yaml`. #[derive(Debug, Clone, Deserialize, Serialize)] pub struct RagNode { - /// Knowledge sources (files, directories, URLs, loader-protocol paths). - /// REQUIRED — this is what makes the node a RAG node. pub documents: Vec, - /// Retrieval query, templated against state. Defaults to - /// `{{initial_prompt}}` when omitted. #[serde(default, skip_serializing_if = "Option::is_none")] pub query: Option, - /// Number of chunks to retrieve. Defaults to the knowledge base's own - /// configured `top_k` when omitted. #[serde(default, skip_serializing_if = "Option::is_none")] pub top_k: Option, - /// Embedding model for building the knowledge base. When this plus - /// `chunk_size` and `chunk_overlap` are all set, knowledge-base - /// construction runs non-interactively (no prompts). #[serde(default, skip_serializing_if = "Option::is_none")] pub embedding_model: Option, - /// Chunk size for splitting documents at build time. #[serde(default, skip_serializing_if = "Option::is_none")] pub chunk_size: Option, - /// Chunk overlap for splitting documents at build time. #[serde(default, skip_serializing_if = "Option::is_none")] pub chunk_overlap: Option, - /// Reranker model applied to hybrid-search results. #[serde(default, skip_serializing_if = "Option::is_none")] pub reranker_model: Option, - /// Embedding-request batch size at build time. #[serde(default, skip_serializing_if = "Option::is_none")] pub batch_size: Option, @@ -364,8 +268,6 @@ pub struct RagNode { pub timeout: Option, } -/// `end`-type node: terminate execution; `output` (templated) is returned -/// as the graph's final result. #[derive(Debug, Clone, Deserialize, Serialize)] pub struct EndNode { #[serde(default)] @@ -375,7 +277,6 @@ pub struct EndNode { pub state_updates: Option>, } -/// Runtime state for a graph execution: KV store plus visit history. #[derive(Debug, Clone, Default)] pub struct GraphState { data: HashMap, @@ -400,7 +301,6 @@ impl GraphState { self.data.insert(key, value); } - /// Merge a JSON object into state. Existing keys are overwritten. pub fn merge(&mut self, json_obj: &serde_json::Map) { for (key, value) in json_obj { self.data.insert(key.clone(), value.clone()); @@ -411,8 +311,6 @@ impl GraphState { &self.data } - /// Record that a node has been entered. Updates both history and loop - /// counts. pub fn visit_node(&mut self, node_id: &str) { self.history.push(node_id.to_string()); *self.loop_counts.entry(node_id.to_string()).or_insert(0) += 1; @@ -422,10 +320,7 @@ impl GraphState { self.loop_counts.get(node_id).copied().unwrap_or(0) } - pub fn history(&self) -> &[String] { - &self.history - } - + #[cfg(test)] pub fn current_node(&self) -> Option<&str> { self.history.last().map(|s| s.as_str()) } @@ -667,7 +562,7 @@ on_other: edit_loop assert_eq!(state.loop_count("node1"), 2); assert_eq!(state.loop_count("node2"), 1); assert_eq!(state.loop_count("never"), 0); - assert_eq!(state.history().len(), 3); + assert_eq!(state.history.len(), 3); assert_eq!(state.current_node(), Some("node1")); } @@ -707,7 +602,7 @@ on_other: edit_loop initial.insert("user".to_string(), json!("alice")); let state = GraphState::new(initial); assert_eq!(state.get("user"), Some(&json!("alice"))); - assert!(state.history().is_empty()); + assert!(state.history.is_empty()); } #[test] diff --git a/src/graph/user_interaction.rs b/src/graph/user_interaction.rs index 5d15f44..7bef499 100644 --- a/src/graph/user_interaction.rs +++ b/src/graph/user_interaction.rs @@ -1,12 +1,3 @@ -//! Execution of `approval` and `input` graph nodes via Loki's existing -//! user-interaction system (`user__ask`, `user__input`). -//! -//! Both delegate to [`crate::function::user_interaction::handle_user_tool`], -//! which prompts the user directly at depth 0 (via `inquire`) and escalates -//! to the parent through the escalation queue at depth > 0. We interpret the -//! returned JSON's `answer` field for the user's response and an `error` -//! field for escalation timeout/cancellation. - use super::state::StateManager; use super::types::{ApprovalNode, InputNode}; use crate::config::RequestContext; @@ -21,9 +12,6 @@ const INPUT_KEY: &str = "input"; pub struct ApprovalNodeExecutor; impl ApprovalNodeExecutor { - /// Prompt the user with the (templated) question and routes the - /// selected option through the node's `routes` map. Returns the next - /// node ID. An escalation timeout/error propagates as a failure. pub async fn execute( node: &ApprovalNode, state_manager: &mut StateManager, @@ -60,11 +48,6 @@ impl ApprovalNodeExecutor { pub struct InputNodeExecutor; impl InputNodeExecutor { - /// Prompt the user for free-form text. If a `default` is configured - /// and the user submits an empty response, the default is substituted. - /// Optional `validation` is evaluated against the final value. Returns - /// `node_next` (the parent `Node.next`) on success; an escalation - /// timeout/error propagates as a failure. pub async fn execute( node: &InputNode, node_next: Option<&str>, @@ -122,12 +105,14 @@ fn build_input_question(node: &InputNode, state_manager: &StateManager) -> Resul let mut question = state_manager .interpolate(&node.question) .context("Failed to interpolate input question")?; + if let Some(default_template) = &node.default { let default = state_manager.interpolate_lenient(default_template); if !default.is_empty() { question = format!("{question} [default: {default}]"); } } + Ok(question) } @@ -135,6 +120,7 @@ fn resolve_approval_route(node: &ApprovalNode, choice: &str) -> Result { if let Some(target) = node.routes.get(choice) { return Ok(target.clone()); } + Ok(node.on_other.clone()) } @@ -151,12 +137,14 @@ fn apply_state_updates_with_var( state_manager .state_mut() .set(var_name.into(), Value::String(var_value.to_string())); + for (key, template) in updates { let value = state_manager.interpolate_lenient(template); state_manager .state_mut() .set(key.clone(), Value::String(value)); } + match prev { Some(v) => state_manager.state_mut().set(var_name.into(), v), None => { @@ -220,6 +208,7 @@ mod tests { for (k, v) in pairs { map.insert((*k).into(), v.clone()); } + StateManager::new(map) } @@ -228,6 +217,7 @@ mod tests { for (k, v) in routes { r.insert((*k).into(), (*v).into()); } + ApprovalNode { question: "?".into(), options: options.iter().map(|s| (*s).into()).collect(), @@ -280,6 +270,7 @@ mod tests { &[("yes", "deploy"), ("no", "cancel")], "clarify", ); + assert_eq!(resolve_approval_route(&node, "yes").unwrap(), "deploy"); assert_eq!(resolve_approval_route(&node, "no").unwrap(), "cancel"); } @@ -291,6 +282,7 @@ mod tests { &[("yes", "deploy"), ("no", "cancel")], "clarify", ); + assert_eq!(resolve_approval_route(&node, "maybe").unwrap(), "clarify"); assert_eq!( resolve_approval_route(&node, "free-form text").unwrap(), @@ -303,7 +295,9 @@ mod tests { let mut updates = HashMap::new(); updates.insert("decision".into(), "{{choice}}".into()); let mut state = manager_with(&[]); + apply_state_updates_with_var(&Some(updates), &mut state, CHOICE_KEY, "approve"); + assert_eq!(state.state().get("decision"), Some(&json!("approve"))); assert_eq!(state.state().get(CHOICE_KEY), Some(&Value::Null)); } @@ -313,7 +307,9 @@ mod tests { let mut updates = HashMap::new(); updates.insert("decision".into(), "{{choice}}".into()); let mut state = manager_with(&[("choice", json!("preserved"))]); + apply_state_updates_with_var(&Some(updates), &mut state, CHOICE_KEY, "approve"); + assert_eq!(state.state().get("decision"), Some(&json!("approve"))); assert_eq!(state.state().get(CHOICE_KEY), Some(&json!("preserved"))); } @@ -323,7 +319,9 @@ mod tests { let mut updates = HashMap::new(); updates.insert("api_key".into(), "{{input}}".into()); let mut state = manager_with(&[]); + apply_state_updates_with_var(&Some(updates), &mut state, INPUT_KEY, "sk-12345"); + assert_eq!(state.state().get("api_key"), Some(&json!("sk-12345"))); assert_eq!(state.state().get(INPUT_KEY), Some(&Value::Null)); } @@ -333,7 +331,9 @@ mod tests { let state = manager_with(&[("name", json!("alice"))]); let mut node = input("Hi, what's your name?"); node.default = Some("{{name}}".into()); + let q = build_input_question(&node, &state).unwrap(); + assert_eq!(q, "Hi, what's your name? [default: alice]"); } @@ -342,7 +342,9 @@ mod tests { let state = manager_with(&[]); let mut node = input("Enter value:"); node.default = Some("{{missing}}".into()); + let q = build_input_question(&node, &state).unwrap(); + assert_eq!(q, "Enter value:"); } @@ -350,14 +352,18 @@ mod tests { fn input_question_uses_no_default_when_field_absent() { let state = manager_with(&[]); let node = input("Enter value:"); + let q = build_input_question(&node, &state).unwrap(); + assert_eq!(q, "Enter value:"); } #[test] fn no_state_updates_means_var_never_appears_in_state() { let mut state = manager_with(&[]); + apply_state_updates_with_var(&None, &mut state, CHOICE_KEY, "approve"); + assert!(state.state().get(CHOICE_KEY).is_none()); } } diff --git a/src/graph/validator.rs b/src/graph/validator.rs index cee0ee0..05cb91f 100644 --- a/src/graph/validator.rs +++ b/src/graph/validator.rs @@ -1,14 +1,3 @@ -//! Static validation for graph definitions: reference integrity, cycles, -//! reachability, terminal nodes, script/agent existence, and approval -//! routes-vs-options consistency. -//! -//! The validator only follows **declared static edges** (`next`, approval -//! `routes` and `on_other`, script/llm `fallback`). Script nodes can also route -//! dynamically via `_next` in their JSON output at runtime; those edges are -//! invisible here. As a result, unreachable-node detection and "no reachable -//! End node" are reported as warnings (not errors) to avoid false positives -//! against dynamically-routed graphs. - use super::types::{Graph, Node, NodeType}; use crate::client::{Model, ModelType}; use crate::config::{Agent, AppConfig, paths}; @@ -17,7 +6,6 @@ use std::collections::{HashSet, VecDeque}; use std::path::PathBuf; use std::sync::Arc; -/// A single validation finding, optionally scoped to a node. #[derive(Debug, Clone)] pub struct ValidationError { pub node_id: Option, @@ -40,8 +28,6 @@ impl ValidationError { } } -/// Aggregated validation findings: blocking `errors` and informational -/// `warnings`. #[derive(Debug, Default)] pub struct ValidationResult { pub errors: Vec, @@ -61,8 +47,6 @@ impl ValidationResult { self.warnings.push(w); } - /// Consume into a `Result`, aggregating all errors into a single message. - /// Warnings are dropped. pub fn into_result(self) -> Result<()> { if self.is_valid() { return Ok(()); @@ -75,6 +59,7 @@ impl ValidationResult { None => format!(" {}", e.message), }) .collect(); + bail!( "Graph validation failed with {} error(s):\n{}", self.errors.len(), @@ -83,9 +68,6 @@ impl ValidationResult { } } -/// Agent-level context that `llm`-node `tools` and `model` references are -/// validated against. Supplying it lets a typo in a node's `tools` or -/// `model` be caught at graph load time instead of when the node runs. pub struct AgentValidationContext { pub tool_names: HashSet, pub mcp_servers: HashSet, @@ -107,8 +89,6 @@ impl AgentValidationContext { } } -/// Validator for graph structures. `base_dir` is used to resolve relative -/// script paths (typically the owning agent's data directory). pub struct GraphValidator { base_dir: PathBuf, agent_ctx: Option, @@ -138,6 +118,7 @@ impl GraphValidator { self.validate_approval_routes(graph, &mut result); self.validate_rag_nodes(graph, &mut result); self.validate_llm_nodes(graph, &mut result); + result } @@ -166,10 +147,12 @@ impl GraphValidator { let Some(ctx) = &self.agent_ctx else { return; }; + for (node_id, node) in &graph.nodes { let NodeType::Llm(llm) = &node.node_type else { continue; }; + if let Some(tools) = &llm.tools { for entry in tools { if let Some(server) = entry.strip_prefix("mcp:") { @@ -187,6 +170,7 @@ impl GraphValidator { } } } + if let Some(model_id) = &llm.model && Model::retrieve_model(ctx.app_config.as_ref(), model_id, ModelType::Chat) .is_err() @@ -332,14 +316,12 @@ impl GraphValidator { } } -/// All declared outgoing targets from a node, paired with a human-readable -/// label for use in error messages. Used both for cycle detection and -/// reference validation. fn declared_targets(node: &Node) -> Vec<(String, &'static str)> { let mut out = Vec::new(); if let Some(n) = &node.next { out.push((n.clone(), "'next'")); } + match &node.node_type { NodeType::Approval(a) => { for v in a.routes.values() { @@ -407,6 +389,7 @@ fn detect_cycle_dfs( if !graph.has_node(&next) { continue; } + if !visited.contains(&next) { if let Some(cycle) = detect_cycle_dfs(graph, &next, visited, rec_stack, path) { return Some(cycle); @@ -438,6 +421,7 @@ mod tests { for (id, node) in nodes { map.insert(id.to_string(), node); } + Graph { name: "t".into(), description: String::new(), @@ -472,6 +456,7 @@ mod tests { for (k, v) in routes { r.insert((*k).into(), (*v).into()); } + Node { id: id.into(), description: String::new(), @@ -506,6 +491,7 @@ mod tests { m.insert("ctx".into(), "{{output.context}}".into()); m }); + Node { id: id.into(), description: String::new(), @@ -556,7 +542,9 @@ mod tests { ], "l", ); + let result = validator().validate(&graph); + assert!(!result.is_valid()); assert!(result.errors.iter().any(|e| e.message.contains("ghost"))); } @@ -575,6 +563,7 @@ mod tests { n.tools = tools.map(|t| t.iter().map(|s| s.to_string()).collect()); n.model = model.map(String::from); } + node } @@ -587,9 +576,11 @@ mod tests { ], "l", ); + let result = validator() .with_agent_context(agent_ctx(&["read_query"], &[])) .validate(&graph); + assert!(!result.is_valid()); assert!( result @@ -608,9 +599,11 @@ mod tests { ], "l", ); + let result = validator() .with_agent_context(agent_ctx(&["read_query"], &[])) .validate(&graph); + assert!(result.is_valid()); } @@ -623,9 +616,11 @@ mod tests { ], "l", ); + let result = validator() .with_agent_context(agent_ctx(&[], &["pubmed-search"])) .validate(&graph); + assert!(!result.is_valid()); assert!( result @@ -647,9 +642,11 @@ mod tests { ], "l", ); + let result = validator() .with_agent_context(agent_ctx(&[], &["pubmed-search"])) .validate(&graph); + assert!(result.is_valid()); } @@ -662,9 +659,11 @@ mod tests { ], "l", ); + let result = validator() .with_agent_context(agent_ctx(&[], &[])) .validate(&graph); + assert!(!result.is_valid()); assert!( result @@ -683,7 +682,9 @@ mod tests { ], "l", ); + let result = validator().validate(&graph); + assert!(result.is_valid()); } @@ -693,7 +694,9 @@ mod tests { vec![("r", rag_node("r", &[], true)), ("end", end_node("end"))], "r", ); + let result = validator().validate(&graph); + assert!(!result.is_valid()); assert!( result @@ -712,7 +715,9 @@ mod tests { ], "r", ); + let result = validator().validate(&graph); + assert!(result.is_valid()); assert!( result @@ -731,7 +736,9 @@ mod tests { ], "r", ); + let result = validator().validate(&graph); + assert!(result.is_valid()); assert!( !result @@ -765,7 +772,9 @@ mod tests { let mut start = end_node("start"); start.next = Some("end".into()); let graph = graph_with(vec![("start", start), ("end", end_node("end"))], "start"); + let result = validator().validate(&graph); + assert!(result.is_valid(), "errors: {:?}", result.errors); } @@ -774,7 +783,9 @@ mod tests { let mut n = end_node("n1"); n.next = Some("nope".into()); let graph = graph_with(vec![("n1", n), ("end", end_node("end"))], "n1"); + let result = validator().validate(&graph); + assert!(!result.is_valid()); assert!( result @@ -794,7 +805,9 @@ mod tests { "end", ); let graph = graph_with(vec![("ap", approval), ("end", end_node("end"))], "ap"); + let result = validator().validate(&graph); + assert!(!result.is_valid()); assert!( result @@ -808,7 +821,9 @@ mod tests { fn flags_missing_approval_on_other_target() { let approval = approval_node("ap", &["yes"], &[("yes", "end")], "missing"); let graph = graph_with(vec![("ap", approval), ("end", end_node("end"))], "ap"); + let result = validator().validate(&graph); + assert!(!result.is_valid()); assert!( result @@ -823,7 +838,9 @@ mod tests { fn flags_missing_script_fallback_target() { let scr = script_node("s", "does-not-exist.py", Some("nowhere")); let graph = graph_with(vec![("s", scr), ("end", end_node("end"))], "s"); + let result = validator().validate(&graph); + assert!( result .errors @@ -839,7 +856,9 @@ mod tests { let mut b = end_node("b"); b.next = Some("a".into()); let graph = graph_with(vec![("a", a), ("b", b)], "a"); + let result = validator().validate(&graph); + assert!(!result.is_valid()); assert!( result @@ -854,7 +873,9 @@ mod tests { let mut a = end_node("a"); a.next = Some("a".into()); let graph = graph_with(vec![("a", a)], "a"); + let result = validator().validate(&graph); + assert!( result .errors @@ -869,7 +890,9 @@ mod tests { vec![("start", end_node("start")), ("orphan", end_node("orphan"))], "start", ); + let result = validator().validate(&graph); + assert!( result.warnings.iter().any( |w| w.node_id.as_deref() == Some("orphan") && w.message.contains("unreachable") @@ -883,7 +906,9 @@ mod tests { let b = agent_node("b", "__no_such_agent__", None); a.next = Some("b".into()); let graph = graph_with(vec![("a", a), ("b", b)], "a"); + let result = validator().validate(&graph); + assert!( result .errors @@ -911,7 +936,9 @@ mod tests { vec![("start", start), ("orphan_end", end_node("orphan_end"))], "start", ); + let result = validator().validate(&graph); + assert!(result.is_valid(), "unexpected errors: {:?}", result.errors); assert!( result @@ -930,7 +957,9 @@ mod tests { vec![("start", start), ("s", scr), ("end", end_node("end"))], "start", ); + let result = validator().validate(&graph); + assert!( result .errors @@ -944,7 +973,9 @@ mod tests { fn errors_when_referenced_agent_missing() { let agent = agent_node("a", "__definitely_no_such_agent__", Some("end")); let graph = graph_with(vec![("a", agent), ("end", end_node("end"))], "a"); + let result = validator().validate(&graph); + assert!(result.errors.iter().any(|e| { e.message .contains("Agent '__definitely_no_such_agent__' not found") @@ -955,7 +986,9 @@ mod tests { fn errors_when_approval_option_has_no_route() { let approval = approval_node("ap", &["yes", "no"], &[("yes", "end")], "end"); let graph = graph_with(vec![("ap", approval), ("end", end_node("end"))], "ap"); + let result = validator().validate(&graph); + assert!( result .errors @@ -968,7 +1001,9 @@ mod tests { fn warns_when_approval_has_extra_route() { let approval = approval_node("ap", &["yes"], &[("yes", "end"), ("maybe", "end")], "end"); let graph = graph_with(vec![("ap", approval), ("end", end_node("end"))], "ap"); + let result = validator().validate(&graph); + assert!(result.warnings.iter().any(|w| { w.message .contains("Route 'maybe' has no corresponding option") @@ -982,11 +1017,13 @@ mod tests { let mut b = end_node("b"); b.next = Some("missing2".into()); let graph = graph_with(vec![("a", a), ("b", b)], "a"); + let err = validator() .validate(&graph) .into_result() .unwrap_err() .to_string(); + assert!(err.contains("missing1"), "got: {err}"); assert!(err.contains("missing2"), "got: {err}"); assert!(err.contains("validation failed"), "got: {err}"); @@ -996,7 +1033,9 @@ mod tests { fn into_result_returns_ok_when_no_errors() { let mut start = end_node("start"); start.next = Some("end".into()); + let graph = graph_with(vec![("start", start), ("end", end_node("end"))], "start"); + assert!(validator().validate(&graph).into_result().is_ok()); } } diff --git a/src/repl/mod.rs b/src/repl/mod.rs index bbe020d..2d11d35 100644 --- a/src/repl/mod.rs +++ b/src/repl/mod.rs @@ -22,6 +22,7 @@ use anyhow::{Context, Result, bail}; use crossterm::cursor::SetCursorStyle; use fancy_regex::Regex; use indoc::indoc; +use log::warn; use parking_lot::RwLock; use reedline::CursorConfig; use reedline::{ @@ -32,7 +33,6 @@ use reedline::{ use reedline::{MenuBuilder, Signal}; use std::sync::LazyLock; use std::{env, process, sync::Arc}; -use log::warn; const MENU_NAME: &str = "completion_menu";