style: Cleaned up all graph agent code

This commit is contained in:
2026-05-18 13:46:52 -06:00
parent fce08140bf
commit f14c006d28
23 changed files with 560 additions and 652 deletions
+40 -49
View File
@@ -782,9 +782,6 @@ pub struct AgentVariable {
pub value: String,
}
/// Resolve document path specs (URLs, loader-protocol paths, relative or
/// absolute file paths) into the concrete paths `Rag::init` expects.
/// Relative paths are joined against the agent's data directory.
fn resolve_document_paths(
documents: &[String],
loaders: &HashMap<String, String>,
@@ -817,10 +814,6 @@ fn resolve_document_paths(
Ok(document_paths)
}
/// Build or load a knowledge base for every `rag` node in the graph. Each
/// node's RAG lives in `<agent>/<node-id>.yaml`. A missing knowledge base is
/// a hard error (interactive: after a declined confirm; non-interactive:
/// immediately) — a graph with an uninitialized `rag` node cannot run.
#[allow(clippy::too_many_arguments)]
async fn init_graph_rags(
app: &AppConfig,
@@ -876,11 +869,13 @@ async fn init_graph_rags(
on the node, or run the agent once interactively."
);
}
let ans = Confirm::new(&format!(
"Initialize RAG knowledge base for rag node '{node_id}'?"
))
.with_default(true)
.prompt()?;
if !ans {
bail!(
"Agent '{agent_name}' has rag node '{node_id}' but its RAG was not \
@@ -888,6 +883,7 @@ async fn init_graph_rags(
);
}
}
let document_paths =
resolve_document_paths(&rag_node.documents, loaders, agent_data_dir)?;
let app_clone = app.clone();
@@ -1043,34 +1039,31 @@ variables:
#[test]
fn from_graph_maps_agent_level_fields() {
let yaml = r#"
name: graph_name_ignored
description: A graph agent
model: anthropic:claude-sonnet-4-6
temperature: 0.3
top_p: 0.8
global_tools:
- fetch_pdf.sh
mcp_servers:
- pubmed-search
conversation_starters:
- "Start here"
start: e
nodes:
e:
id: e
type: end
output: done
"#;
let graph: Graph = serde_yaml::from_str(yaml).unwrap();
let yaml = formatdoc! {r#"
name: graph_name_ignored
description: A graph agent
model: claude:claude-sonnet-4-6
temperature: 0.3
top_p: 0.8
global_tools:
- fetch_pdf.sh
mcp_servers:
- pubmed-search
conversation_starters:
- "Start here"
start: e
nodes:
e:
id: e
type: end
output: done
"#};
let graph: Graph = serde_yaml::from_str(&yaml).unwrap();
let config = AgentConfig::from_graph("my-agent-dir", &graph);
assert_eq!(config.name, "my-agent-dir");
assert_eq!(config.description, "A graph agent");
assert_eq!(
config.model_id.as_deref(),
Some("anthropic:claude-sonnet-4-6")
);
assert_eq!(config.model_id.as_deref(), Some("claude:claude-sonnet-4-6"));
assert_eq!(config.temperature, Some(0.3));
assert_eq!(config.top_p, Some(0.8));
assert_eq!(config.global_tools, vec!["fetch_pdf.sh"]);
@@ -1080,22 +1073,22 @@ nodes:
#[test]
fn from_graph_derives_can_spawn_agents_from_agent_nodes() {
let with_agent = r#"
name: g
start: a
nodes:
a:
id: a
type: agent
agent: helper
prompt: hi
next: e
e:
id: e
type: end
output: done
"#;
let graph: Graph = serde_yaml::from_str(with_agent).unwrap();
let with_agent = formatdoc! {r#"
name: g
start: a
nodes:
a:
id: a
type: agent
agent: helper
prompt: hi
next: e
e:
id: e
type: end
output: done
"#};
let graph: Graph = serde_yaml::from_str(&with_agent).unwrap();
assert!(AgentConfig::from_graph("d", &graph).can_spawn_agents);
let no_agent =
@@ -1110,7 +1103,6 @@ nodes:
let graph: Graph = serde_yaml::from_str(yaml).unwrap();
let config = AgentConfig::from_graph("d", &graph);
// LLM-loop concepts a graph agent does not have: left at Default.
assert!(!config.auto_continue);
assert!(config.instructions.is_empty());
assert!(config.documents.is_empty());
@@ -1119,7 +1111,6 @@ nodes:
assert_eq!(config.max_auto_continues, 0);
assert_eq!(config.summarization_threshold, 0);
// Consumed by graph execution: kept at their real defaults.
assert_eq!(
config.max_concurrent_agents,
default_max_concurrent_agents()
+1 -1
View File
@@ -3,7 +3,7 @@ use super::{
AGENT_GRAPH_FILE_NAME, AGENTS_DIR_NAME, BASH_PROMPT_UTILS_FILE_NAME, CONFIG_FILE_NAME,
ENV_FILE_NAME, FUNCTIONS_BIN_DIR_NAME, FUNCTIONS_DIR_NAME, GLOBAL_TOOLS_DIR_NAME,
GLOBAL_TOOLS_UTILS_DIR_NAME, MACROS_DIR_NAME, MCP_FILE_NAME, ModelsOverride, RAGS_DIR_NAME,
ROLES_DIR_NAME, paths,
ROLES_DIR_NAME,
};
use crate::client::ProviderModels;
use crate::utils::{get_env_name, list_file_names, normalize_env_name};
+1 -6
View File
@@ -9,12 +9,7 @@ use std::sync::{Arc, Weak};
pub enum RagKey {
Named(String),
Agent(String),
/// A `rag` node's per-node knowledge base, keyed by owning agent name
/// and node id.
GraphNode {
agent: String,
node: String,
},
GraphNode { agent: String, node: String },
}
#[derive(Default)]
+1 -1
View File
@@ -27,6 +27,7 @@ use crate::utils::{
list_file_names, now, render_prompt, temp_file,
};
use crate::graph;
use anyhow::{Context, Error, Result, bail};
use indoc::formatdoc;
use inquire::{Confirm, MultiSelect, Text, list_option::ListOption, validator::Validation};
@@ -37,7 +38,6 @@ use std::fs::{File, OpenOptions, read_dir, read_to_string, remove_dir_all, remov
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use crate::graph;
pub struct AutoContinueConfig {
pub enabled: bool,
-3
View File
@@ -50,9 +50,6 @@ enum BinaryType<'a> {
Agent,
}
/// Canonical set of script languages that Loki can execute. This is the
/// single source of truth for both function-tool scripts and graph script
/// nodes.
#[derive(Debug, Clone, Copy, PartialEq, Eq, AsRefStr)]
pub enum Language {
Bash,
+3 -2
View File
@@ -5,6 +5,7 @@ use crate::supervisor::mailbox::{Envelope, EnvelopePayload, Inbox};
use crate::supervisor::{AgentExitStatus, AgentHandle, AgentResult, Supervisor};
use crate::utils::{AbortSignal, create_abort_signal};
use crate::graph;
use anyhow::{Context, Result, anyhow, bail};
use chrono::Utc;
use indexmap::IndexMap;
@@ -332,8 +333,8 @@ pub fn run_child_agent(
abort_signal: AbortSignal,
) -> Pin<Box<dyn Future<Output = Result<String>> + Send>> {
Box::pin(async move {
if crate::graph::active_agent_graph_name(&child_ctx).is_some() {
return crate::graph::run_active_agent_graph(
if graph::active_agent_graph_name(&child_ctx).is_some() {
return graph::run_active_agent_graph(
&mut child_ctx,
&initial_input.text(),
abort_signal,
+20 -17
View File
@@ -1,10 +1,3 @@
//! Execution of `agent`-type graph nodes.
//!
//! Spawns a child agent via `function::supervisor::run_agent_for_graph`,
//! interpolating the prompt against current graph state. After the agent
//! finishes, applies the node's `state_updates` (templates can reference
//! `{{output}}` for the agent's stdout).
use super::state::StateManager;
use super::structured;
use super::types::AgentNode;
@@ -22,8 +15,6 @@ const DEFAULT_TIMEOUT_SECS: u64 = 300;
pub struct AgentNodeExecutor;
impl AgentNodeExecutor {
/// Interpolate the node's prompt, spawn the agent, wait for it to
/// finish, then apply `state_updates`. Returns the agent's full output.
pub async fn execute(
node: &AgentNode,
state_manager: &mut StateManager,
@@ -90,14 +81,6 @@ fn indent_prompt(prompt: &str, prefix_spaces: usize) -> Vec<String> {
out
}
/// Exposes the agent's output as `{{output}}` for template evaluation, then
/// applies every key/template in `state_updates`. The temporary `output`
/// state key is removed at the end so it doesn't leak into subsequent
/// nodes' templates.
///
/// When `node.output_schema` is set AND the output is a JSON object, its
/// top-level keys are also auto-merged into state permanently (before
/// state_updates evaluation, so explicit state_updates can override).
fn apply_state_updates(node: &AgentNode, state_manager: &mut StateManager, output: &Value) {
if node.output_schema.is_some()
&& let Some(obj) = output.as_object()
@@ -165,7 +148,9 @@ mod tests {
node_with("hi", Some(u))
};
let mut state = manager_with(&[]);
apply_state_updates(&node, &mut state, &json!("agent finished its work"));
assert_eq!(
state.state().get("findings"),
Some(&json!("agent finished its work"))
@@ -180,7 +165,9 @@ mod tests {
node_with("hi", Some(u))
};
let mut state = manager_with(&[("topic", json!("auth"))]);
apply_state_updates(&node, &mut state, &json!("JWT vs sessions"));
assert_eq!(
state.state().get("summary"),
Some(&json!("auth: JWT vs sessions"))
@@ -195,7 +182,9 @@ mod tests {
node_with("hi", Some(u))
};
let mut state = manager_with(&[]);
apply_state_updates(&node, &mut state, &json!("anything"));
assert_eq!(state.state().get("output"), Some(&Value::Null));
}
@@ -207,7 +196,9 @@ mod tests {
node_with("hi", Some(u))
};
let mut state = manager_with(&[("output", json!("preserved"))]);
apply_state_updates(&node, &mut state, &json!("new agent output"));
assert_eq!(
state.state().get("greeting"),
Some(&json!("new agent output"))
@@ -219,7 +210,9 @@ mod tests {
fn no_state_updates_is_a_noop() {
let node = node_with("hi", None);
let mut state = manager_with(&[("k", json!("v"))]);
apply_state_updates(&node, &mut state, &json!("ignored"));
assert_eq!(state.state().get("k"), Some(&json!("v")));
assert!(state.state().get("output").is_none());
}
@@ -232,7 +225,9 @@ mod tests {
node_with("hi", Some(u))
};
let mut state = manager_with(&[]);
apply_state_updates(&node, &mut state, &json!("DATA"));
assert_eq!(state.state().get("decorated"), Some(&json!("[] DATA")));
}
@@ -251,7 +246,9 @@ mod tests {
let node = node_with_schema("hi", None, json!({"type": "object"}));
let mut state = manager_with(&[]);
let output = json!({"goal": "do X", "summary": "details"});
apply_state_updates(&node, &mut state, &output);
assert_eq!(state.state().get("goal"), Some(&json!("do X")));
assert_eq!(state.state().get("summary"), Some(&json!("details")));
}
@@ -265,7 +262,9 @@ mod tests {
"config": { "key": "value" },
"count": 42
});
apply_state_updates(&node, &mut state, &output);
assert_eq!(state.state().get("tags"), Some(&json!(["a", "b"])));
assert_eq!(state.state().get("config"), Some(&json!({"key": "value"})));
assert_eq!(state.state().get("count"), Some(&json!(42)));
@@ -278,7 +277,9 @@ mod tests {
let node = node_with_schema("hi", Some(u), json!({"type": "object"}));
let mut state = manager_with(&[]);
let output = json!({"goal": "do X"});
apply_state_updates(&node, &mut state, &output);
assert_eq!(state.state().get("goal"), Some(&json!("renamed-do X")));
}
@@ -287,7 +288,9 @@ mod tests {
let node = node_with("hi", None);
let mut state = manager_with(&[]);
let output = json!({"goal": "do X"});
apply_state_updates(&node, &mut state, &output);
assert!(state.state().get("goal").is_none());
}
}
+5 -17
View File
@@ -1,37 +1,25 @@
//! Helpers for running the active agent through its `graph.yaml` instead
//! of the LLM loop. Used at every agent-execution entry point: top-level
//! CLI (`start_directive`), REPL (`ask`), and child-agent spawn
//! (`run_child_agent`).
use super::{GraphExecutor, GraphParser, agent_has_graph};
use crate::config::RequestContext;
use crate::config::paths;
use crate::utils::AbortSignal;
use anyhow::{Context, Result};
use anyhow::{Context, Result, anyhow};
use log::info;
use serde_json::Value;
/// If the active agent owns a `graph.yaml`, returns its name. Lets
/// callers branch between graph and LLM-loop execution without
/// re-implementing the lookup.
pub fn active_agent_graph_name(ctx: &RequestContext) -> Option<String> {
let name = ctx.agent.as_ref()?.name().to_string();
agent_has_graph(&name).then_some(name)
}
/// Run the active agent's graph end-to-end and return the resolved
/// End-node output. The caller's prompt is seeded into the graph state
/// as `initial_prompt` so nodes can reference it via
/// `{{initial_prompt}}`. Any sub-agents the graph spawned via the
/// supervisor are cancelled on return.
pub async fn run_active_agent_graph(
ctx: &mut RequestContext,
prompt: &str,
abort_signal: AbortSignal,
) -> Result<String> {
let agent_name = active_agent_graph_name(ctx)
.ok_or_else(|| anyhow::anyhow!("Active agent has no graph.yaml"))?;
let agent_name =
active_agent_graph_name(ctx).ok_or_else(|| anyhow!("Active agent has no graph.yaml"))?;
log::info!("Agent '{agent_name}' has graph.yaml; routing to graph executor");
info!("Agent '{agent_name}' has graph.yaml; routing to graph executor");
let agent_dir = paths::agent_data_dir(&agent_name);
let graph_path = paths::agent_graph_file(&agent_name);
+9 -50
View File
@@ -1,15 +1,6 @@
//! Main execution loop for graph workflows.
//!
//! Dispatches each node to its type-specific executor, handles routing
//! (static `Node.next`, script `_next` override, approval `routes` and
//! `on_other`), enforces `max_loop_iterations` and an optional
//! whole-graph timeout, and resolves the final `End` node's `output`
//! template as the graph's return value.
use super::agent::AgentNodeExecutor;
use super::llm::LlmNodeExecutor;
use super::logging::GraphLogger;
use super::parser::GraphParser;
use super::rag::RagNodeExecutor;
use super::script::ScriptExecutor;
use super::state::StateManager;
@@ -21,7 +12,7 @@ use crate::utils::AbortSignal;
use anyhow::{Context, Result, anyhow, bail};
use serde_json::Value;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -38,18 +29,6 @@ impl GraphExecutor {
}
}
/// Load a graph from disk and construct the executor in one step.
/// `base_dir` is also used to resolve relative script paths.
pub fn from_path(graph_path: impl AsRef<Path>, base_dir: impl Into<PathBuf>) -> Result<Self> {
let base_dir = base_dir.into();
let parser = GraphParser::new(&base_dir);
let graph = parser.load_from_file(graph_path)?;
Ok(Self::new(graph, base_dir))
}
/// Run the graph to completion. Returns the resolved `output` template
/// of the terminal `End` node. Any failure is logged via the
/// `GraphLogger` before being propagated.
pub async fn execute(
self,
ctx: &mut RequestContext,
@@ -222,9 +201,6 @@ async fn step(
}
}
/// Apply the end node's `state_updates`, then interpolate its `output`
/// template against the resulting state. Both use lenient interpolation
/// so the graph still produces a result even when some keys are absent.
fn resolve_end_output(end_node: &EndNode, state: &mut StateManager) -> String {
apply_simple_state_updates(end_node.state_updates.as_ref(), state);
state.interpolate_lenient(&end_node.output)
@@ -264,6 +240,7 @@ mod tests {
fn resolve_end_output_interpolates_template_against_state() {
let mut state = state_with(&[("name", json!("alice"))]);
let node = end_node("done: {{name}}", None);
assert_eq!(resolve_end_output(&node, &mut state), "done: alice");
}
@@ -273,6 +250,7 @@ mod tests {
updates.insert("summary".into(), "completed for {{user}}".into());
let node = end_node("RESULT: {{summary}}", Some(updates));
let mut state = state_with(&[("user", json!("bob"))]);
assert_eq!(
resolve_end_output(&node, &mut state),
"RESULT: completed for bob"
@@ -287,6 +265,7 @@ mod tests {
fn resolve_end_output_with_empty_template_returns_empty_string() {
let mut state = state_with(&[]);
let node = end_node("", None);
assert_eq!(resolve_end_output(&node, &mut state), "");
}
@@ -294,13 +273,16 @@ mod tests {
fn resolve_end_output_lenient_on_missing_keys() {
let mut state = state_with(&[]);
let node = end_node("hello {{unknown}}!", None);
assert_eq!(resolve_end_output(&node, &mut state), "hello !");
}
#[test]
fn apply_simple_state_updates_does_nothing_when_none() {
let mut state = state_with(&[("k", json!("v"))]);
apply_simple_state_updates(None, &mut state);
assert_eq!(state.state().get("k"), Some(&json!("v")));
}
@@ -309,32 +291,9 @@ mod tests {
let mut updates = HashMap::new();
updates.insert("k".into(), "new-{{k}}".into());
let mut state = state_with(&[("k", json!("old"))]);
apply_simple_state_updates(Some(&updates), &mut state);
assert_eq!(state.state().get("k"), Some(&json!("new-old")));
}
#[test]
fn from_path_loads_and_constructs_executor() {
use std::io::Write;
let path = std::env::temp_dir().join(format!(
"loki-graph-executor-test-{}.yaml",
std::process::id()
));
let yaml = r#"
name: test_graph
start: only
nodes:
only:
type: end
output: hello
"#;
std::fs::write(&path, yaml).unwrap();
let parent = path.parent().unwrap().to_path_buf();
let executor = GraphExecutor::from_path(&path, &parent).unwrap();
assert_eq!(executor.graph.name, "test_graph");
assert_eq!(executor.graph.start, "only");
let _ = std::fs::remove_file(&path);
}
}
+30 -26
View File
@@ -1,14 +1,8 @@
//! Execution of `llm`-type graph nodes — one-shot LLM calls with a
//! bounded tool-call loop, an opt-in tool whitelist (delegated to
//! `Role.enabled_tools` / `Role.enabled_mcp_servers`), and per-node
//! overrides for model/temperature/top_p. See
//! `docs/implementation/graph-agents/10.5-llm-nodes.md` for the design.
use super::state::StateManager;
use super::structured;
use super::types::LlmNode;
use crate::client::{Model, ModelType, call_chat_completions};
use crate::config::{RequestContext, Role, RoleLike};
use crate::config::{Input, RequestContext, Role, RoleLike};
use crate::utils::{create_abort_signal, dimmed_text};
use anyhow::{Context, Error, Result, anyhow, bail};
use serde_json::Value;
@@ -22,12 +16,6 @@ const OUTPUT_KEY: &str = "output";
pub struct LlmNodeExecutor;
impl LlmNodeExecutor {
/// Run the LLM call and resolve routing. Returns the next node ID
/// to visit. Handles the tolerant-fail contract internally:
/// success → `node_next`; failure with `fallback` → `fallback`;
/// failure without `fallback` → `node_next`. State updates are
/// applied in both success and failure paths, with `{{output}}`
/// resolving to the LLM's response or an error description.
pub async fn execute(
node: &LlmNode,
node_next: Option<&str>,
@@ -54,6 +42,7 @@ impl LlmNodeExecutor {
(Value::String(format!("LLM node failed: {e}")), true)
}
};
apply_state_updates_with_output(node, state_manager, &output);
next_for_llm_node(node_next, failed, node.fallback.as_deref())
}
@@ -151,7 +140,7 @@ async fn run_chat_loop(node: &LlmNode, prompt: &str, ctx: &mut RequestContext) -
let abort = create_abort_signal();
let app_cfg = Arc::clone(&ctx.app.config);
let role_for_input = ctx.role.clone();
let mut input = crate::config::Input::from_str(ctx, prompt, role_for_input);
let mut input = Input::from_str(ctx, prompt, role_for_input);
let mut accumulated = String::new();
for turn in 0..node.max_iterations {
@@ -299,9 +288,6 @@ fn validate_tools_subset(
Ok(())
}
/// Loose substring match against the transient error messages we expect
/// from network/API failures or empty-output cases. Brittle by nature;
/// a typed error enum would be better long-term.
fn is_transient(err: &Error) -> bool {
let s = format!("{err:#}");
s.contains("timed out")
@@ -322,17 +308,9 @@ fn next_for_llm_node(
}
node_next
.map(String::from)
.ok_or_else(|| anyhow::anyhow!("llm node has no `next` set; llm nodes need static routing"))
.ok_or_else(|| anyhow!("llm node has no `next` set; llm nodes need static routing"))
}
/// Expose the LLM call's final output as `{{output}}` for the duration
/// of `state_updates` evaluation, then restore the prior value (or set
/// it to `Null` if there wasn't one). Same pattern as
/// `AgentNodeExecutor`'s `{{output}}` scoping.
///
/// When `node.output_schema` is set AND the output is a JSON object, its
/// top-level keys are also auto-merged into state permanently (before
/// state_updates evaluation, so explicit state_updates can override).
fn apply_state_updates_with_output(
node: &LlmNode,
state_manager: &mut StateManager,
@@ -424,7 +402,9 @@ mod tests {
u.insert("response".into(), "{{output}}".into());
let node = node_with(Some(u));
let mut state = manager_with(&[]);
apply_state_updates_with_output(&node, &mut state, &json!("the answer"));
assert_eq!(state.state().get("response"), Some(&json!("the answer")));
}
@@ -434,7 +414,9 @@ mod tests {
u.insert("summary".into(), "{{topic}}: {{output}}".into());
let node = node_with(Some(u));
let mut state = manager_with(&[("topic", json!("LOINC"))]);
apply_state_updates_with_output(&node, &mut state, &json!("abc"));
assert_eq!(state.state().get("summary"), Some(&json!("LOINC: abc")));
}
@@ -444,7 +426,9 @@ mod tests {
u.insert("k".into(), "{{output}}".into());
let node = node_with(Some(u));
let mut state = manager_with(&[]);
apply_state_updates_with_output(&node, &mut state, &json!("anything"));
assert_eq!(state.state().get(OUTPUT_KEY), Some(&json!(null)));
}
@@ -454,7 +438,9 @@ mod tests {
u.insert("greeting".into(), "{{output}}".into());
let node = node_with(Some(u));
let mut state = manager_with(&[("output", json!("preserved"))]);
apply_state_updates_with_output(&node, &mut state, &json!("new"));
assert_eq!(state.state().get("greeting"), Some(&json!("new")));
assert_eq!(state.state().get(OUTPUT_KEY), Some(&json!("preserved")));
}
@@ -463,7 +449,9 @@ mod tests {
fn no_state_updates_is_a_noop() {
let node = node_with(None);
let mut state = manager_with(&[("k", json!("v"))]);
apply_state_updates_with_output(&node, &mut state, &json!("x"));
assert_eq!(state.state().get("k"), Some(&json!("v")));
assert!(state.state().get(OUTPUT_KEY).is_none());
}
@@ -506,7 +494,9 @@ mod tests {
let node = node_with_schema(None, json!({"type": "object"}));
let mut state = manager_with(&[]);
let output = json!({"goal": "do X", "summary": "details"});
apply_state_updates_with_output(&node, &mut state, &output);
assert_eq!(state.state().get("goal"), Some(&json!("do X")));
assert_eq!(state.state().get("summary"), Some(&json!("details")));
}
@@ -520,7 +510,9 @@ mod tests {
"config": { "key": "value" },
"count": 42
});
apply_state_updates_with_output(&node, &mut state, &output);
assert_eq!(state.state().get("tags"), Some(&json!(["a", "b"])));
assert_eq!(state.state().get("config"), Some(&json!({"key": "value"})));
assert_eq!(state.state().get("count"), Some(&json!(42)));
@@ -533,7 +525,9 @@ mod tests {
let node = node_with_schema(Some(u), json!({"type": "object"}));
let mut state = manager_with(&[]);
let output = json!({"goal": "do X"});
apply_state_updates_with_output(&node, &mut state, &output);
assert_eq!(state.state().get("goal"), Some(&json!("renamed-do X")));
}
@@ -542,7 +536,9 @@ mod tests {
let node = node_with_schema(None, json!({"type": "array"}));
let mut state = manager_with(&[]);
let output = json!([1, 2, 3]);
apply_state_updates_with_output(&node, &mut state, &output);
assert!(state.state().get("0").is_none());
assert!(state.state().get(OUTPUT_KEY).is_none());
}
@@ -552,14 +548,18 @@ mod tests {
let node = node_with(None);
let mut state = manager_with(&[]);
let output = json!({"goal": "do X"});
apply_state_updates_with_output(&node, &mut state, &output);
assert!(state.state().get("goal").is_none());
}
#[test]
fn format_schema_hint_includes_schema_and_instruction() {
let schema = json!({"type": "object", "properties": {"goal": {"type": "string"}}});
let hint = format_schema_hint(&schema);
assert!(hint.contains("Schema:"));
assert!(hint.contains("\"goal\""));
assert!(hint.contains("JSON"));
@@ -582,7 +582,9 @@ mod tests {
"web_search_loki".to_string(),
"mcp:github".to_string(),
];
let (regular, mcp) = categorize_tools(Some(&entries));
assert_eq!(regular, vec!["read_query", "web_search_loki"]);
assert_eq!(mcp, vec!["pubmed-search", "github"]);
}
@@ -590,6 +592,7 @@ mod tests {
#[test]
fn categorize_tools_with_none_returns_empty() {
let (regular, mcp) = categorize_tools(None);
assert!(regular.is_empty());
assert!(mcp.is_empty());
}
@@ -597,6 +600,7 @@ mod tests {
#[test]
fn categorize_tools_with_empty_returns_empty() {
let (regular, mcp) = categorize_tools(Some(&[]));
assert!(regular.is_empty());
assert!(mcp.is_empty());
}
+6 -15
View File
@@ -1,14 +1,3 @@
//! Structured logging and per-node timing for graph execution.
//!
//! Two output channels, both owned by [`GraphLogger`]:
//! - **`tracing`** (`info!`/`debug!`/`warn!`/`error!`) — respects
//! `RUST_LOG`; this is the developer-facing channel.
//! - **stderr narration** — the dimmed `▸` lines the user follows along
//! with during execution.
//!
//! The logger also accumulates per-node wall-clock timings and emits a
//! performance summary (slowest-first) when the graph completes.
use super::state::StateManager;
use super::types::{Node, NodeType};
use crate::utils::dimmed_text;
@@ -107,10 +96,6 @@ impl GraphLogger {
}
}
/// Log a state snapshot before a node runs. No-op unless the graph's
/// `log_state_snapshots` setting is enabled. Keys + byte size go to
/// `debug`; the full state goes to `trace` (it may contain secrets,
/// so it is never logged at a more visible level).
pub fn state_snapshot(&self, node_id: &str, state: &StateManager) {
if !self.log_state_snapshots {
return;
@@ -118,6 +103,7 @@ impl GraphLogger {
let snapshot = state.snapshot();
let mut keys: Vec<&str> = snapshot.keys().map(String::as_str).collect();
keys.sort_unstable();
debug!(
"[graph:{}] [{node_id}] state: {} bytes, keys={:?}",
self.graph_name,
@@ -136,10 +122,12 @@ impl GraphLogger {
}
let mut rows: Vec<(&String, &NodeTiming)> = self.timings.iter().collect();
rows.sort_by_key(|b| Reverse(b.1.total));
info!(
"[graph:{}] performance summary (slowest first):",
self.graph_name
);
for (node_id, t) in rows {
let avg = t.total / t.count.max(1) as u32;
info!(
@@ -190,9 +178,11 @@ mod tests {
#[test]
fn node_timing_max_tracks_largest() {
let mut t = NodeTiming::default();
t.record(Duration::from_millis(10));
t.record(Duration::from_millis(80));
t.record(Duration::from_millis(40));
assert_eq!(t.max, Duration::from_millis(80));
assert_eq!(t.count, 3);
assert_eq!(t.total, Duration::from_millis(130));
@@ -201,6 +191,7 @@ mod tests {
#[test]
fn new_logger_has_no_timings() {
let logger = GraphLogger::new("g", true);
assert!(logger.timings.is_empty());
assert!(logger.log_state_snapshots);
}
+1 -17
View File
@@ -1,6 +1,3 @@
//! Graph-based agent orchestration. Declarative YAML workflows over a shared
//! JSON state, composed of agent/script/approval/input/end nodes.
pub mod agent;
pub mod dispatch;
pub mod executor;
@@ -15,26 +12,13 @@ pub mod types;
pub mod user_interaction;
pub mod validator;
pub use agent::AgentNodeExecutor;
pub use dispatch::{active_agent_graph_name, run_active_agent_graph};
pub use executor::GraphExecutor;
pub use llm::LlmNodeExecutor;
pub use logging::GraphLogger;
pub use parser::{GraphParser, agent_has_graph};
pub use rag::RagNodeExecutor;
pub use script::ScriptExecutor;
pub use state::{StateManager, StateRepresentation};
pub use types::{
AgentNode, ApprovalNode, EndNode, Graph, GraphSettings, GraphState, InputNode, LlmNode, Node,
NodeType, RagNode, ScriptNode,
};
pub use user_interaction::{ApprovalNodeExecutor, InputNodeExecutor};
pub use validator::{GraphValidator, ValidationError, ValidationResult};
pub use types::{Graph, NodeType};
pub const GRAPH_SCHEMA_VERSION: &str = "1.0";
pub const DEFAULT_MAX_LOOP_ITERATIONS: usize = 100;
/// Serialized-state size above which scripts receive state via a temp file
/// instead of an env var.
pub const MAX_STATE_SIZE_BYTES: usize = 32 * 1024;
+197 -175
View File
@@ -1,5 +1,3 @@
//! YAML parsing for graph definitions.
use super::types::Graph;
use crate::config::paths;
use anyhow::{Context, Error, Result, anyhow, bail};
@@ -8,9 +6,6 @@ use std::path::{Path, PathBuf};
const SUPPORTED_VERSIONS: &[&str] = &["1.0"];
/// Parser for graph YAML files. The `base_dir` is used to resolve relative
/// paths passed to [`GraphParser::load_from_file`], and is typically an
/// agent directory.
pub struct GraphParser {
base_dir: PathBuf,
}
@@ -22,8 +17,6 @@ impl GraphParser {
}
}
/// Load and validate a graph from a YAML file. Relative paths are
/// resolved against `base_dir`.
pub fn load_from_file(&self, path: impl AsRef<Path>) -> Result<Graph> {
let path = path.as_ref();
let full_path = if path.is_absolute() {
@@ -39,7 +32,6 @@ impl GraphParser {
.with_context(|| format!("Failed to parse graph file at '{}'", full_path.display()))
}
/// Load and validate a graph from a YAML string.
pub fn load_from_string(&self, yaml: &str) -> Result<Graph> {
let mut graph: Graph = serde_yaml::from_str(yaml).map_err(enhance_yaml_error)?;
@@ -71,6 +63,7 @@ fn validate_schema_version(version: &str) -> Result<()> {
SUPPORTED_VERSIONS.join(", ")
);
}
Ok(())
}
@@ -125,16 +118,19 @@ fn enhance_yaml_error(error: serde_yaml::Error) -> Error {
anyhow!("YAML parsing error: {}{}", msg, hint)
}
/// Returns true if the named agent has a `graph.yaml` in its data directory.
pub fn agent_has_graph(agent_name: &str) -> bool {
paths::agent_graph_file(agent_name).exists()
}
#[cfg(test)]
mod tests {
use super::super::GRAPH_SCHEMA_VERSION;
use super::super::types::NodeType;
use super::*;
use std::env;
use indoc::formatdoc;
use std::fs::File;
use std::io::Write;
use std::{env, fs, process};
fn parser() -> GraphParser {
GraphParser::new(env::current_dir().unwrap())
@@ -142,23 +138,25 @@ mod tests {
#[test]
fn parses_a_simple_graph() {
let yaml = r#"
name: simple_graph
version: "1.0"
start: node1
nodes:
node1:
id: node1
type: agent
agent: test_agent
prompt: "Hello world"
next: node2
node2:
id: node2
type: end
output: done
"#;
let graph = parser().load_from_string(yaml).unwrap();
let yaml = formatdoc! {r#"
name: simple_graph
version: "1.0"
start: node1
nodes:
node1:
id: node1
type: agent
agent: test_agent
prompt: "Hello world"
next: node2
node2:
id: node2
type: end
output: done
"#};
let graph = parser().load_from_string(&yaml).unwrap();
assert_eq!(graph.name, "simple_graph");
assert_eq!(graph.start, "node1");
assert_eq!(graph.nodes.len(), 2);
@@ -170,36 +168,40 @@ nodes:
#[test]
fn auto_fills_node_ids_from_keys() {
let yaml = r#"
name: auto_id_graph
version: "1.0"
start: node1
nodes:
node1:
type: agent
agent: test_agent
prompt: Test
next: node2
node2:
type: end
output: done
"#;
let graph = parser().load_from_string(yaml).unwrap();
let yaml = formatdoc! {r#"
name: auto_id_graph
version: "1.0"
start: node1
nodes:
node1:
type: agent
agent: test_agent
prompt: Test
next: node2
node2:
type: end
output: done
"#};
let graph = parser().load_from_string(&yaml).unwrap();
assert_eq!(graph.nodes.get("node1").unwrap().id, "node1");
assert_eq!(graph.nodes.get("node2").unwrap().id, "node2");
}
#[test]
fn rejects_missing_start_node() {
let yaml = r#"
name: bad_graph
version: "1.0"
start: nonexistent
nodes:
node1:
type: end
"#;
let err = parser().load_from_string(yaml).unwrap_err().to_string();
let yaml = formatdoc! {r#"
name: bad_graph
version: "1.0"
start: nonexistent
nodes:
node1:
type: end
"#};
let err = parser().load_from_string(&yaml).unwrap_err().to_string();
assert!(
err.contains("Start node 'nonexistent' not found"),
"got: {err}"
@@ -208,41 +210,47 @@ nodes:
#[test]
fn rejects_empty_graph_name() {
let yaml = r#"
name: ""
version: "1.0"
start: node1
nodes:
node1:
type: end
"#;
let err = parser().load_from_string(yaml).unwrap_err().to_string();
let yaml = formatdoc! {r#"
name: ""
version: "1.0"
start: node1
nodes:
node1:
type: end
"#};
let err = parser().load_from_string(&yaml).unwrap_err().to_string();
assert!(err.contains("non-empty 'name'"), "got: {err}");
}
#[test]
fn rejects_no_nodes() {
let yaml = r#"
name: empty_graph
version: "1.0"
start: node1
nodes: {}
"#;
let err = parser().load_from_string(yaml).unwrap_err().to_string();
let yaml = formatdoc! {r#"
name: empty_graph
version: "1.0"
start: node1
nodes: {}
"#, "{}"};
let err = parser().load_from_string(&yaml).unwrap_err().to_string();
assert!(err.contains("no nodes defined"), "got: {err}");
}
#[test]
fn rejects_unsupported_version() {
let yaml = r#"
name: future_graph
version: "2.0"
start: node1
nodes:
node1:
type: end
"#;
let err = parser().load_from_string(yaml).unwrap_err().to_string();
let yaml = formatdoc! {r#"
name: future_graph
version: "2.0"
start: node1
nodes:
node1:
type: end
"#};
let err = parser().load_from_string(&yaml).unwrap_err().to_string();
assert!(
err.contains("Unsupported graph schema version"),
"got: {err}"
@@ -251,42 +259,46 @@ nodes:
#[test]
fn rejects_node_id_mismatch() {
let yaml = r#"
name: mismatch_graph
version: "1.0"
start: node1
nodes:
node1:
id: different_id
type: end
"#;
let err = parser().load_from_string(yaml).unwrap_err().to_string();
let yaml = formatdoc! {r#"
name: mismatch_graph
version: "1.0"
start: node1
nodes:
node1:
id: different_id
type: end
"#};
let err = parser().load_from_string(&yaml).unwrap_err().to_string();
assert!(err.contains("Node ID mismatch"), "got: {err}");
}
#[test]
fn parses_approval_node_with_routes() {
let yaml = r#"
name: approval_graph
version: "1.0"
start: approval1
nodes:
approval1:
type: approval
question: "Proceed with deployment?"
options:
- "Yes"
- "No"
routes:
"Yes": deploy
"No": cancel
on_other: cancel
deploy:
type: end
cancel:
type: end
"#;
let graph = parser().load_from_string(yaml).unwrap();
let yaml = formatdoc! {r#"
name: approval_graph
version: "1.0"
start: approval1
nodes:
approval1:
type: approval
question: "Proceed with deployment?"
options:
- "Yes"
- "No"
routes:
"Yes": deploy
"No": cancel
on_other: cancel
deploy:
type: end
cancel:
type: end
"#};
let graph = parser().load_from_string(&yaml).unwrap();
let approval = graph.nodes.get("approval1").unwrap();
match &approval.node_type {
NodeType::Approval(a) => {
@@ -300,19 +312,21 @@ nodes:
#[test]
fn parses_settings_overrides() {
let yaml = r#"
name: settings_graph
version: "1.0"
start: node1
settings:
max_loop_iterations: 50
timeout: 300
log_state_snapshots: false
nodes:
node1:
type: end
"#;
let graph = parser().load_from_string(yaml).unwrap();
let yaml = formatdoc! {r#"
name: settings_graph
version: "1.0"
start: node1
settings:
max_loop_iterations: 50
timeout: 300
log_state_snapshots: false
nodes:
node1:
type: end
"#};
let graph = parser().load_from_string(&yaml).unwrap();
assert_eq!(graph.settings.max_loop_iterations, 50);
assert_eq!(graph.settings.timeout, Some(300));
assert!(!graph.settings.log_state_snapshots);
@@ -321,19 +335,21 @@ nodes:
#[test]
fn parses_initial_state() {
let yaml = r#"
name: state_graph
version: "1.0"
start: node1
initial_state:
user_name: "Alice"
count: 42
enabled: true
nodes:
node1:
type: end
"#;
let graph = parser().load_from_string(yaml).unwrap();
let yaml = formatdoc! {r#"
name: state_graph
version: "1.0"
start: node1
initial_state:
user_name: "Alice"
count: 42
enabled: true
nodes:
node1:
type: end
"#};
let graph = parser().load_from_string(&yaml).unwrap();
assert_eq!(graph.initial_state.len(), 3);
assert_eq!(graph.initial_state.get("user_name").unwrap(), "Alice");
assert_eq!(
@@ -348,28 +364,32 @@ nodes:
#[test]
fn uses_default_version_when_absent() {
let yaml = r#"
name: no_version
start: node1
nodes:
node1:
type: end
"#;
let graph = parser().load_from_string(yaml).unwrap();
assert_eq!(graph.version, super::super::GRAPH_SCHEMA_VERSION);
let yaml = formatdoc! {r#"
name: no_version
start: node1
nodes:
node1:
type: end
"#};
let graph = parser().load_from_string(&yaml).unwrap();
assert_eq!(graph.version, GRAPH_SCHEMA_VERSION);
}
#[test]
fn rejects_unknown_node_type_with_hint() {
let yaml = r#"
name: bad_type
version: "1.0"
start: node1
nodes:
node1:
type: nonsense
"#;
let err = parser().load_from_string(yaml).unwrap_err().to_string();
let yaml = formatdoc! {r#"
name: bad_type
version: "1.0"
start: node1
nodes:
node1:
type: nonsense
"#};
let err = parser().load_from_string(&yaml).unwrap_err().to_string();
assert!(
err.contains("Valid node types") || err.contains("unknown variant"),
"got: {err}"
@@ -379,49 +399,50 @@ nodes:
#[test]
fn rejects_malformed_yaml() {
let yaml = "name: bad\n bad: indent\nstart: a";
let result = parser().load_from_string(yaml);
assert!(result.is_err());
}
#[test]
fn missing_required_fields_have_a_hint() {
let yaml = r#"
name: missing_start
version: "1.0"
nodes:
node1:
type: end
"#;
let err = parser().load_from_string(yaml).unwrap_err().to_string();
let yaml = formatdoc! {r#"
name: missing_start
version: "1.0"
nodes:
node1:
type: end
"#};
let err = parser().load_from_string(&yaml).unwrap_err().to_string();
assert!(err.contains("Hint"), "got: {err}");
}
#[test]
fn load_from_file_reads_disk() {
use std::io::Write;
let dir = env::temp_dir();
let path = dir.join(format!(
"loki_graph_parser_test_{}.yaml",
std::process::id()
));
let yaml = r#"
name: disk_graph
version: "1.0"
start: only
nodes:
only:
type: end
output: ok
"#;
let path = dir.join(format!("loki_graph_parser_test_{}.yaml", process::id()));
let yaml = formatdoc! {r#"
name: disk_graph
version: "1.0"
start: only
nodes:
only:
type: end
output: ok
"#};
{
let mut f = std::fs::File::create(&path).unwrap();
let mut f = File::create(&path).unwrap();
f.write_all(yaml.as_bytes()).unwrap();
}
let graph = GraphParser::new(dir).load_from_file(&path).unwrap();
assert_eq!(graph.name, "disk_graph");
let _ = std::fs::remove_file(&path);
let _ = fs::remove_file(&path);
}
#[test]
@@ -430,6 +451,7 @@ nodes:
.load_from_file("/definitely/not/a/real/path/to_any_graph.yaml")
.unwrap_err()
.to_string();
assert!(err.contains("Failed to read graph file"), "got: {err}");
}
+6 -16
View File
@@ -1,11 +1,3 @@
//! Execution of `rag`-type graph nodes.
//!
//! A `rag` node runs a hybrid (vector + keyword) retrieval against the
//! per-node knowledge base built at agent-load time, and writes the result
//! into graph state. The result is exposed to `state_updates` as
//! `{{output}}` — a JSON object `{ context, sources }` where `sources` is
//! an array of source paths.
use super::state::StateManager;
use super::types::RagNode;
use crate::config::RequestContext;
@@ -22,9 +14,6 @@ const DEFAULT_RAG_TIMEOUT_SECS: u64 = 120;
pub struct RagNodeExecutor;
impl RagNodeExecutor {
/// Interpolate the node's query, run the retrieval against this node's
/// knowledge base, expose the result as `{{output}}` for `state_updates`,
/// and return `node_next`.
pub async fn execute(
node: &RagNode,
node_id: &str,
@@ -74,8 +63,6 @@ impl RagNodeExecutor {
}
/// Assemble the `{{output}}` value as `{ "context": <ctx>, "sources": [...] }`.
/// `Rag::search` returns sources as a `- {path}` bullet list; it is split
/// into a JSON array so downstream templates can index `{{output.sources[0]}}`.
fn build_rag_output(context: String, sources_str: &str) -> Value {
let sources: Vec<Value> = sources_str
.lines()
@@ -84,14 +71,13 @@ fn build_rag_output(context: String, sources_str: &str) -> Value {
.map(|s| Value::String(s.to_string()))
.collect();
let mut obj = Map::new();
obj.insert("context".into(), Value::String(context));
obj.insert("sources".into(), Value::Array(sources));
Value::Object(obj)
}
/// Expose the retrieval result as `{{output}}` for the duration of
/// `state_updates` evaluation, then restore the prior value. Same scoping
/// pattern as `llm`/`agent` nodes.
fn apply_state_updates(node: &RagNode, state_manager: &mut StateManager, output: &Value) {
let Some(updates) = &node.state_updates else {
return;
@@ -124,6 +110,7 @@ mod tests {
#[test]
fn build_rag_output_splits_bullet_sources_into_array() {
let out = build_rag_output("ctx".into(), "- a.md\n- https://x.com/spec");
assert_eq!(out["context"], json!("ctx"));
assert_eq!(out["sources"], json!(["a.md", "https://x.com/spec"]));
}
@@ -131,18 +118,21 @@ mod tests {
#[test]
fn build_rag_output_handles_empty_sources() {
let out = build_rag_output("ctx".into(), "");
assert_eq!(out["sources"], json!([]));
}
#[test]
fn build_rag_output_ignores_blank_lines() {
let out = build_rag_output("c".into(), "- a\n\n- b\n");
assert_eq!(out["sources"], json!(["a", "b"]));
}
#[test]
fn build_rag_output_tolerates_unprefixed_lines() {
let out = build_rag_output("c".into(), "plain/path");
assert_eq!(out["sources"], json!(["plain/path"]));
}
}
+34 -23
View File
@@ -1,11 +1,3 @@
//! Script execution for `script`-type graph nodes.
//!
//! Scripts receive graph state via either `GRAPH_STATE` (inline JSON env var)
//! or `GRAPH_STATE_FILE` (path to a file containing the JSON) when state
//! exceeds [`super::MAX_STATE_SIZE_BYTES`]. Scripts MUST print a single JSON
//! object on stdout. The `_next` key (if present) is consumed for routing
//! and removed before the remaining keys are merged into state.
use super::state::{StateManager, StateRepresentation};
use super::types::ScriptNode;
use crate::function::Language;
@@ -18,9 +10,6 @@ use std::time::Duration;
use tokio::process::Command;
use tokio::time::timeout;
/// Executor for script nodes. `base_dir` is the directory script paths are
/// resolved against (typically the owning agent's data directory) and is
/// also used as the child process's working directory.
pub struct ScriptExecutor {
base_dir: PathBuf,
}
@@ -32,10 +21,6 @@ impl ScriptExecutor {
}
}
/// Run the script, merge its JSON output into state (extracting `_next`
/// for routing), and then apply any `state_updates` templates. Returns
/// the routing decision from `_next`, or `None` if the script did not
/// emit one (in which case the executor falls back to `Node.next`).
pub async fn execute(
&self,
node: &ScriptNode,
@@ -153,6 +138,7 @@ fn detect_language(script_path: &Path) -> Result<Language> {
.and_then(|e| e.to_str())
.ok_or_else(|| anyhow!("Script has no file extension: '{}'", script_path.display()))?
.to_string();
match Language::from(&ext) {
Language::Unsupported => bail!(
"Unsupported script extension '.{}' for '{}'",
@@ -171,9 +157,11 @@ fn build_command(language: Language, script_path: &Path) -> Result<Command> {
)
})?;
let mut cmd = Command::new(program);
for arg in prefix_args {
cmd.arg(arg);
}
cmd.arg(script_path);
Ok(cmd)
}
@@ -183,8 +171,10 @@ mod tests {
use super::super::MAX_STATE_SIZE_BYTES;
use super::*;
use crate::utils::temp_file;
use indoc::formatdoc;
use serde_json::json;
use std::collections::HashMap;
use std::env::temp_dir;
use std::fs;
fn cmd_available(name: &str) -> bool {
@@ -226,6 +216,7 @@ echo '{"quality": 0.85, "issues": 3, "_next": "approve"}'
);
let mut state = StateManager::new(HashMap::new());
let executor = ScriptExecutor::new(&dir);
let next = executor
.execute(
&node_for(path.file_name().unwrap().to_str().unwrap(), 5),
@@ -233,6 +224,7 @@ echo '{"quality": 0.85, "issues": 3, "_next": "approve"}'
)
.await
.unwrap();
assert_eq!(next.as_deref(), Some("approve"));
assert_eq!(state.state().get("quality"), Some(&json!(0.85)));
assert_eq!(state.state().get("issues"), Some(&json!(3)));
@@ -257,6 +249,7 @@ printf '{"greeting": "hello %s"}' "$NAME"
initial.insert("name".into(), json!("alice"));
let mut state = StateManager::new(initial);
let executor = ScriptExecutor::new(&dir);
let _ = executor
.execute(
&node_for(path.file_name().unwrap().to_str().unwrap(), 5),
@@ -264,6 +257,7 @@ printf '{"greeting": "hello %s"}' "$NAME"
)
.await
.unwrap();
assert_eq!(state.state().get("greeting"), Some(&json!("hello alice")));
cleanup(&dir);
}
@@ -281,6 +275,7 @@ echo '{"ok": true}'
);
let mut state = StateManager::new(HashMap::new());
let executor = ScriptExecutor::new(&dir);
let next = executor
.execute(
&node_for(path.file_name().unwrap().to_str().unwrap(), 5),
@@ -288,6 +283,7 @@ echo '{"ok": true}'
)
.await
.unwrap();
assert!(next.is_none());
assert_eq!(state.state().get("ok"), Some(&json!(true)));
cleanup(&dir);
@@ -321,12 +317,14 @@ echo '{"raw": "hello"}'
#[tokio::test]
async fn missing_script_file_errors_before_spawning() {
let mut state = StateManager::new(HashMap::new());
let executor = ScriptExecutor::new(std::env::temp_dir());
let executor = ScriptExecutor::new(temp_dir());
let err = executor
.execute(&node_for("__does_not_exist__.sh", 5), &mut state)
.await
.unwrap_err()
.to_string();
assert!(err.contains("Script file not found"), "got: {err}");
}
@@ -338,6 +336,7 @@ echo '{"raw": "hello"}'
let (dir, path) = write_script("#!/bin/bash\n", "sh");
let mut state = StateManager::new(HashMap::new());
let executor = ScriptExecutor::new(&dir);
let err = executor
.execute(
&node_for(path.file_name().unwrap().to_str().unwrap(), 5),
@@ -346,6 +345,7 @@ echo '{"raw": "hello"}'
.await
.unwrap_err()
.to_string();
assert!(err.contains("produced no output"), "got: {err}");
cleanup(&dir);
}
@@ -356,13 +356,15 @@ echo '{"raw": "hello"}'
return;
}
let (dir, path) = write_script(
r#"#!/bin/bash
echo "not json at all"
"#,
&formatdoc! {r#"
#!/bin/bash
echo "not json at all"
"#},
"sh",
);
let mut state = StateManager::new(HashMap::new());
let executor = ScriptExecutor::new(&dir);
let err = executor
.execute(
&node_for(path.file_name().unwrap().to_str().unwrap(), 5),
@@ -371,6 +373,7 @@ echo "not json at all"
.await
.unwrap_err()
.to_string();
assert!(err.contains("merge output"), "got: {err}");
cleanup(&dir);
}
@@ -381,14 +384,16 @@ echo "not json at all"
return;
}
let (dir, path) = write_script(
r#"#!/bin/bash
echo "bad happened" >&2
exit 7
"#,
&formatdoc! {r#"
#!/bin/bash
echo "bad happened" >&2
exit 7
"#},
"sh",
);
let mut state = StateManager::new(HashMap::new());
let executor = ScriptExecutor::new(&dir);
let err = executor
.execute(
&node_for(path.file_name().unwrap().to_str().unwrap(), 5),
@@ -397,6 +402,7 @@ exit 7
.await
.unwrap_err()
.to_string();
assert!(err.contains("exit code"), "got: {err}");
assert!(err.contains("bad happened"), "got: {err}");
cleanup(&dir);
@@ -416,6 +422,7 @@ echo '{"ok":true}'
);
let mut state = StateManager::new(HashMap::new());
let executor = ScriptExecutor::new(&dir);
let err = executor
.execute(
&node_for(path.file_name().unwrap().to_str().unwrap(), 1),
@@ -424,6 +431,7 @@ echo '{"ok":true}'
.await
.unwrap_err()
.to_string();
assert!(err.contains("timed out"), "got: {err}");
cleanup(&dir);
}
@@ -493,6 +501,7 @@ print(json.dumps({
)
.await
.unwrap();
assert_eq!(next.as_deref(), Some("next_node"));
assert_eq!(state.state().get("doubled"), Some(&json!(42)));
cleanup(&dir);
@@ -503,6 +512,7 @@ print(json.dumps({
let (dir, path) = write_script("echo hi", "xyz");
let mut state = StateManager::new(HashMap::new());
let executor = ScriptExecutor::new(&dir);
let err = executor
.execute(
&node_for(path.file_name().unwrap().to_str().unwrap(), 5),
@@ -511,6 +521,7 @@ print(json.dumps({
.await
.unwrap_err()
.to_string();
assert!(
err.contains("Unsupported script extension '.xyz'"),
"got: {err}"
+58 -31
View File
@@ -1,5 +1,3 @@
//! State management and template interpolation for graph execution.
use super::MAX_STATE_SIZE_BYTES;
use super::types::GraphState;
use crate::utils::temp_file;
@@ -8,21 +6,13 @@ use fancy_regex::Regex;
use serde_json::Value;
use std::collections::HashMap;
use std::fs;
use std::fs::{read_to_string, write};
use std::fs::write;
use std::path::PathBuf;
use std::sync::LazyLock;
static TEMPLATE_VAR_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"\{\{([a-zA-Z0-9_\.\[\]]+)\}\}").expect("invalid template regex"));
/// Wraps [`GraphState`] with template interpolation, script-output merging,
/// and a large-state temp-file fallback for use with scripts.
///
/// Template syntax: `{{key}}` for top-level keys, `{{a.b.c}}` for nested
/// JSON paths, and `{{arr[0]}}` / `{{a.b[2].c}}` / `{{matrix[0][1]}}` for
/// array indices. Use [`StateManager::interpolate`] for strict interpolation
/// (errors on missing keys) or [`StateManager::interpolate_lenient`] for
/// best-effort (missing keys become empty strings).
pub struct StateManager {
state: GraphState,
temp_file: Option<PathBuf>,
@@ -44,8 +34,6 @@ impl StateManager {
&mut self.state
}
/// Replace every `{{key}}` in `template` with its state value. Returns
/// an error if any referenced key is missing.
pub fn interpolate(&self, template: &str) -> Result<String> {
let mut missing = Vec::new();
let result = self.interpolate_inner(template, |key| {
@@ -65,8 +53,6 @@ impl StateManager {
Ok(result)
}
/// Same as [`Self::interpolate`] but missing keys are silently replaced
/// with an empty string.
pub fn interpolate_lenient(&self, template: &str) -> String {
self.interpolate_inner(template, |_| String::new())
}
@@ -96,6 +82,7 @@ impl StateManager {
for idx in root_indices {
current = current.get(idx)?;
}
for part in parts {
let (segment_key, indices) = split_indices(part)?;
if !segment_key.is_empty() {
@@ -105,12 +92,10 @@ impl StateManager {
current = current.get(idx)?;
}
}
Some(current)
}
/// Serialize the state for transport to a script. State larger than
/// [`MAX_STATE_SIZE_BYTES`] is written to a unique temp file; the file
/// is cleaned up when the `StateManager` is dropped.
pub fn serialize_state(&mut self) -> Result<StateRepresentation> {
let json = self.state.to_json()?;
if json.len() > MAX_STATE_SIZE_BYTES {
@@ -119,16 +104,14 @@ impl StateManager {
format!("Failed to write state to temp file at '{}'", path.display())
})?;
self.temp_file = Some(path.clone());
Ok(StateRepresentation::File(path))
} else {
Ok(StateRepresentation::Inline(json))
}
}
pub fn to_json_string(&self) -> Result<String> {
self.state.to_json()
}
#[cfg(test)]
pub fn from_json_string(json: &str) -> Result<Self> {
let data: HashMap<String, Value> =
serde_json::from_str(json).context("Failed to parse state JSON")?;
@@ -143,13 +126,11 @@ impl StateManager {
self.state.size_bytes()
}
#[cfg(test)]
pub fn is_large(&self) -> bool {
self.size_bytes() > MAX_STATE_SIZE_BYTES
}
/// Merge a script's JSON-object stdout into state. The reserved `_next`
/// key is extracted (used by the executor for routing) and is not stored
/// in state. Errors if the output is not a JSON object.
pub fn merge_script_output(&mut self, json_output: &str) -> Result<Option<String>> {
let value: Value =
serde_json::from_str(json_output).context("Script output must be valid JSON")?;
@@ -170,8 +151,6 @@ impl StateManager {
Ok(next_node)
}
/// Remove the temp file backing this state, if any. Called automatically
/// on drop.
pub fn cleanup(&mut self) {
if let Some(path) = self.temp_file.take() {
let _ = fs::remove_file(path);
@@ -185,19 +164,18 @@ impl Drop for StateManager {
}
}
/// How serialized state is delivered to a script: inline JSON for small
/// state, or a file path for state above [`MAX_STATE_SIZE_BYTES`].
#[derive(Debug, Clone)]
pub enum StateRepresentation {
Inline(String),
File(PathBuf),
}
#[cfg(test)]
impl StateRepresentation {
pub fn as_string(&self) -> Result<String> {
match self {
StateRepresentation::Inline(s) => Ok(s.clone()),
StateRepresentation::File(path) => read_to_string(path)
StateRepresentation::File(path) => fs::read_to_string(path)
.with_context(|| format!("Failed to read state file at '{}'", path.display())),
}
}
@@ -222,6 +200,7 @@ fn split_indices(segment: &str) -> Option<(&str, Vec<usize>)> {
};
let mut indices = Vec::new();
let mut rest = &segment[bracket_start.unwrap()..];
while !rest.is_empty() {
if !rest.starts_with('[') {
return None;
@@ -231,6 +210,7 @@ fn split_indices(segment: &str) -> Option<(&str, Vec<usize>)> {
indices.push(idx);
rest = &rest[close + 1..];
}
Some((key, indices))
}
@@ -261,9 +241,11 @@ mod tests {
#[test]
fn simple_interpolation_replaces_top_level_keys() {
let manager = manager_with(&[("name", json!("Alice")), ("age", json!(30))]);
let result = manager
.interpolate("Hello {{name}}, you are {{age}} years old")
.unwrap();
assert_eq!(result, "Hello Alice, you are 30 years old");
}
@@ -271,9 +253,11 @@ mod tests {
fn nested_interpolation_walks_objects() {
let manager =
manager_with(&[("user", json!({ "name": "Bob", "email": "bob@example.com" }))]);
let result = manager
.interpolate("User: {{user.name}} ({{user.email}})")
.unwrap();
assert_eq!(result, "User: Bob (bob@example.com)");
}
@@ -285,19 +269,23 @@ mod tests {
"api": { "key": "secret123", "endpoint": "https://api.example.com" }
}),
)]);
let result = manager
.interpolate("API: {{config.api.endpoint}} with key {{config.api.key}}")
.unwrap();
assert_eq!(result, "API: https://api.example.com with key secret123");
}
#[test]
fn strict_interpolation_errors_on_missing_keys() {
let manager = manager_with(&[]);
let err = manager
.interpolate("Hello {{name}}")
.unwrap_err()
.to_string();
assert!(err.contains("not found"), "got: {err}");
assert!(err.contains("name"), "got: {err}");
}
@@ -305,24 +293,30 @@ mod tests {
#[test]
fn strict_interpolation_collects_all_missing_keys() {
let manager = manager_with(&[]);
let err = manager
.interpolate("{{a}} and {{b}}")
.unwrap_err()
.to_string();
assert!(err.contains("'a'") && err.contains("'b'"), "got: {err}");
}
#[test]
fn lenient_interpolation_substitutes_empty_for_missing() {
let manager = manager_with(&[("name", json!("Alice"))]);
let result = manager.interpolate_lenient("Hello {{name}}, age: {{age}}");
assert_eq!(result, "Hello Alice, age: ");
}
#[test]
fn lenient_interpolation_handles_missing_intermediate() {
let manager = manager_with(&[("user", json!({ "name": "Bob" }))]);
let result = manager.interpolate_lenient("email: {{user.email}}");
assert_eq!(result, "email: ");
}
@@ -333,6 +327,7 @@ mod tests {
("count", json!(42)),
("nothing", json!(null)),
]);
assert_eq!(manager.interpolate("{{on}}").unwrap(), "true");
assert_eq!(manager.interpolate("{{count}}").unwrap(), "42");
assert_eq!(manager.interpolate("{{nothing}}").unwrap(), "null");
@@ -341,20 +336,25 @@ mod tests {
#[test]
fn interpolates_arrays_as_json() {
let manager = manager_with(&[("items", json!(["a", "b", "c"]))]);
let result = manager.interpolate("{{items}}").unwrap();
assert_eq!(result, r#"["a","b","c"]"#);
}
#[test]
fn interpolates_objects_as_json() {
let manager = manager_with(&[("data", json!({ "key": "value" }))]);
let result = manager.interpolate("{{data}}").unwrap();
assert_eq!(result, r#"{"key":"value"}"#);
}
#[test]
fn interpolates_array_indices() {
let manager = manager_with(&[("items", json!(["a", "b", "c"]))]);
assert_eq!(manager.interpolate("{{items[0]}}").unwrap(), "a");
assert_eq!(manager.interpolate("{{items[2]}}").unwrap(), "c");
}
@@ -362,24 +362,29 @@ mod tests {
#[test]
fn interpolates_array_indices_inside_nested_paths() {
let manager = manager_with(&[("outer", json!({ "inner": { "arr": ["x", "y", "z"] } }))]);
let result = manager
.interpolate("first={{outer.inner.arr[0]}} last={{outer.inner.arr[2]}}")
.unwrap();
assert_eq!(result, "first=x last=z");
}
#[test]
fn interpolates_object_fields_after_array_index() {
let manager = manager_with(&[("users", json!([{ "name": "Alice" }, { "name": "Bob" }]))]);
let result = manager
.interpolate("{{users[0].name}} and {{users[1].name}}")
.unwrap();
assert_eq!(result, "Alice and Bob");
}
#[test]
fn interpolates_nested_array_indices() {
let manager = manager_with(&[("matrix", json!([[1, 2], [3, 4]]))]);
assert_eq!(manager.interpolate("{{matrix[0][1]}}").unwrap(), "2");
assert_eq!(manager.interpolate("{{matrix[1][0]}}").unwrap(), "3");
}
@@ -387,21 +392,27 @@ mod tests {
#[test]
fn out_of_bounds_array_index_is_missing() {
let manager = manager_with(&[("items", json!(["a", "b"]))]);
let err = manager.interpolate("{{items[5]}}").unwrap_err().to_string();
assert!(err.contains("not found"), "got: {err}");
}
#[test]
fn replaces_all_occurrences_of_same_key() {
let manager = manager_with(&[("n", json!("Alice"))]);
let result = manager.interpolate("{{n}} and {{n}} again").unwrap();
assert_eq!(result, "Alice and Alice again");
}
#[test]
fn passes_through_templates_with_no_variables() {
let manager = manager_with(&[]);
let result = manager.interpolate("No variables here").unwrap();
assert_eq!(result, "No variables here");
}
@@ -409,14 +420,18 @@ mod tests {
fn from_json_string_round_trips() {
let json = r#"{"name": "Alice", "age": 30}"#;
let manager = StateManager::from_json_string(json).unwrap();
let result = manager.interpolate("{{name}} is {{age}}").unwrap();
assert_eq!(result, "Alice is 30");
}
#[test]
fn snapshot_clones_state_data() {
let manager = manager_with(&[("k1", json!("v1")), ("k2", json!(42))]);
let snap = manager.snapshot();
assert_eq!(snap.len(), 2);
assert_eq!(snap.get("k1"), Some(&json!("v1")));
assert_eq!(snap.get("k2"), Some(&json!(42)));
@@ -425,7 +440,9 @@ mod tests {
#[test]
fn small_state_serializes_inline() {
let mut manager = manager_with(&[("key", json!("value"))]);
let repr = manager.serialize_state().unwrap();
assert!(matches!(repr, StateRepresentation::Inline(_)));
assert!(!manager.is_large());
assert!(!repr.is_file());
@@ -443,7 +460,7 @@ mod tests {
assert!(path.exists());
let contents = repr.as_string().unwrap();
let parsed: serde_json::Value = serde_json::from_str(&contents).unwrap();
let parsed: Value = serde_json::from_str(&contents).unwrap();
assert_eq!(
parsed.get("blob").unwrap().as_str().unwrap().len(),
big.len()
@@ -457,7 +474,9 @@ mod tests {
fn merge_script_output_merges_keys_into_state() {
let mut manager = manager_with(&[]);
let output = r#"{"quality_score": 0.85, "issues_found": 3, "status": "complete"}"#;
let next = manager.merge_script_output(output).unwrap();
assert_eq!(next, None);
assert_eq!(manager.state().get("quality_score"), Some(&json!(0.85)));
assert_eq!(manager.state().get("issues_found"), Some(&json!(3)));
@@ -468,7 +487,9 @@ mod tests {
fn merge_script_output_extracts_next_key_for_routing() {
let mut manager = manager_with(&[]);
let output = r#"{"_next": "approval_gate", "quality_score": 0.85}"#;
let next = manager.merge_script_output(output).unwrap();
assert_eq!(next.as_deref(), Some("approval_gate"));
assert_eq!(manager.state().get("quality_score"), Some(&json!(0.85)));
assert!(
@@ -480,29 +501,35 @@ mod tests {
#[test]
fn merge_script_output_rejects_invalid_json() {
let mut manager = manager_with(&[]);
let err = manager
.merge_script_output("not json")
.unwrap_err()
.to_string();
assert!(err.contains("valid JSON"), "got: {err}");
}
#[test]
fn merge_script_output_rejects_non_object() {
let mut manager = manager_with(&[]);
let err = manager
.merge_script_output("[1, 2, 3]")
.unwrap_err()
.to_string();
assert!(err.contains("must be a JSON object"), "got: {err}");
}
#[test]
fn merge_script_output_overwrites_existing_state_keys() {
let mut manager = manager_with(&[("status", json!("pending"))]);
let _ = manager
.merge_script_output(r#"{"status": "complete"}"#)
.unwrap();
assert_eq!(manager.state().get("status"), Some(&json!("complete")));
}
}
+15 -9
View File
@@ -1,12 +1,3 @@
//! Structured-output extraction for `llm` and `agent` nodes. Takes the
//! raw final text of a node and a user-supplied JSON Schema, and returns
//! a parsed [`serde_json::Value`] conforming to that schema (best-effort).
//!
//! Strategy: try to parse `raw` directly first (with light cleanup of
//! markdown fences), and only invoke a follow-up LLM call against the
//! built-in `structured-output` role if direct parsing fails. On
//! extractor-output parse failure, perform one repair retry.
use crate::client::call_chat_completions;
use crate::config::{Input, RequestContext, Role, RoleLike};
use crate::utils::{create_abort_signal, dimmed_text};
@@ -67,6 +58,7 @@ async fn extract_via_extractor(
"{}",
dimmed_text("▸ structured-output: extractor returned invalid JSON, retrying")
);
Box::pin(extract_via_extractor(&output, schema, parent_ctx, true)).await
}
}
@@ -101,11 +93,13 @@ async fn run_one_shot(prompt: &str, ctx: &mut RequestContext) -> Result<String>
let (output, tool_results) =
call_chat_completions(&input, false, false, client.as_ref(), ctx, abort).await?;
ctx.after_chat_completion(app_cfg.as_ref(), &input, &output, &tool_results)?;
Ok(output)
}
fn try_parse_json(raw: &str) -> Option<Value> {
let cleaned = strip_code_fences(raw.trim());
serde_json::from_str(cleaned).ok()
}
@@ -129,26 +123,32 @@ mod tests {
#[test]
fn try_parse_json_accepts_plain_object() {
let v = try_parse_json(r#"{"a": 1}"#).unwrap();
assert_eq!(v, json!({"a": 1}));
}
#[test]
fn try_parse_json_strips_json_fences() {
let raw = "```json\n{\"a\": 1}\n```";
let v = try_parse_json(raw).unwrap();
assert_eq!(v, json!({"a": 1}));
}
#[test]
fn try_parse_json_strips_bare_fences() {
let raw = "```\n{\"a\": 1}\n```";
let v = try_parse_json(raw).unwrap();
assert_eq!(v, json!({"a": 1}));
}
#[test]
fn try_parse_json_tolerates_whitespace() {
let v = try_parse_json(" \n {\"x\": true}\n\n").unwrap();
assert_eq!(v, json!({"x": true}));
}
@@ -165,13 +165,16 @@ mod tests {
#[test]
fn try_parse_json_accepts_arrays() {
let v = try_parse_json("[1, 2, 3]").unwrap();
assert_eq!(v, json!([1, 2, 3]));
}
#[test]
fn build_extractor_prompt_includes_schema_and_input() {
let schema = json!({"type": "object"});
let prompt = build_extractor_prompt("hello", &schema, false);
assert!(prompt.contains("Schema:"));
assert!(prompt.contains("Input:"));
assert!(prompt.contains("hello"));
@@ -180,7 +183,9 @@ mod tests {
#[test]
fn build_extractor_prompt_repair_includes_repair_instruction() {
let schema = json!({"type": "object"});
let prompt = build_extractor_prompt("oops", &schema, true);
assert!(prompt.contains("previous response"));
assert!(prompt.contains("oops"));
}
@@ -188,6 +193,7 @@ mod tests {
#[test]
fn build_extractor_role_disables_tools_and_mcp() {
let role = build_extractor_role().expect("builtin role must exist");
assert_eq!(role.enabled_tools().as_deref(), Some(""));
assert_eq!(role.enabled_mcp_servers().as_deref(), Some(""));
}
+3 -108
View File
@@ -1,12 +1,9 @@
//! Core data structures for graph-based agent orchestration.
use anyhow::Result;
use indexmap::IndexMap;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
/// A graph definition loaded from YAML.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Graph {
pub name: String,
@@ -17,29 +14,21 @@ pub struct Graph {
#[serde(default = "default_schema_version")]
pub version: String,
/// Default chat model for the agent. Used when an `llm` node does not
/// set its own `model`. Consulted in single-file mode (an agent with
/// a `graph.yaml` and no `config.yaml`).
#[serde(default, skip_serializing_if = "Option::is_none")]
pub model: Option<String>,
/// Default sampling temperature. Single-file mode only.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub temperature: Option<f64>,
/// Default sampling top-p. Single-file mode only.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub top_p: Option<f64>,
/// Global tools available to the agent's nodes. Single-file mode only.
#[serde(default)]
pub global_tools: Vec<String>,
/// MCP servers available to the agent's nodes. Single-file mode only.
#[serde(default)]
pub mcp_servers: Vec<String>,
/// Suggested prompts surfaced in the UI. Single-file mode only.
#[serde(default)]
pub conversation_starters: Vec<String>,
@@ -67,9 +56,6 @@ impl Graph {
self.nodes.keys().map(|s| s.as_str()).collect()
}
/// Returns true if any node is an `agent`-type node. Used to derive
/// `can_spawn_agents` when synthesizing an agent config from a
/// single-file graph.
pub fn has_agent_node(&self) -> bool {
self.nodes
.values()
@@ -81,7 +67,6 @@ fn default_schema_version() -> String {
super::GRAPH_SCHEMA_VERSION.to_string()
}
/// Graph-level settings.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct GraphSettings {
#[serde(default = "default_max_loop_iterations")]
@@ -116,12 +101,8 @@ fn default_true() -> bool {
true
}
/// A node in the graph. `node_type` is flattened into the YAML, so a node's
/// variant-specific fields live alongside `id`, `description`.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Node {
/// Unique node identifier. May be omitted in YAML; the parser fills it
/// in from the surrounding `nodes:` map key.
#[serde(default)]
pub id: String,
@@ -131,15 +112,10 @@ pub struct Node {
#[serde(flatten)]
pub node_type: NodeType,
/// Static next-node routing. Used by agent/input nodes.
/// Approval nodes use their `routes` map instead.
/// Script nodes: this is populated by `_next` in JSON output.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub next: Option<String>,
}
/// The supported node variants. YAML uses an internal `type` tag in lowercase
/// (e.g. `type: agent`).
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum NodeType {
@@ -152,12 +128,6 @@ pub enum NodeType {
End(EndNode),
}
/// `agent`-type node: spawn an agent with a templated prompt. The agent
/// uses the full tool stack from its own directory (`global_tools` and
/// `mcp_servers` in `config.yaml` plus any per-agent `tools.{sh,py,ts,js}`
/// script); there is no per-node tool override here. For tool-filtered
/// one-shot LLM steps, use an `llm`-type node (future). To use different
/// tool sets via agent variants, see `docs/graph-agents/agent-tools.md`.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AgentNode {
pub agent: String,
@@ -167,10 +137,6 @@ pub struct AgentNode {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub state_updates: Option<HashMap<String, String>>,
/// JSON Schema describing the expected shape of the agent's final
/// output. When set, the agent's raw text is post-processed through
/// a built-in structured-output extractor and parsed as JSON. Top-
/// level keys of the parsed object are auto-merged into state.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub output_schema: Option<Value>,
@@ -178,9 +144,6 @@ pub struct AgentNode {
pub timeout: Option<u64>,
}
/// `script`-type node: run a Python/TypeScript/Bash script that prints a
/// JSON object on stdout. Keys merge into state; the special `_next` key
/// overrides routing and is not merged.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ScriptNode {
pub script: String,
@@ -188,7 +151,6 @@ pub struct ScriptNode {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub state_updates: Option<HashMap<String, String>>,
/// Fallback node to route to if the script fails to run or returns empty
#[serde(default, skip_serializing_if = "Option::is_none")]
pub fallback: Option<String>,
@@ -200,8 +162,6 @@ fn default_script_timeout() -> u64 {
30
}
/// `approval`-type node: prompt the user with `options` and route based on
/// their choice via the `routes` map.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ApprovalNode {
pub question: String,
@@ -210,20 +170,12 @@ pub struct ApprovalNode {
pub routes: HashMap<String, String>,
/// REQUIRED. The user_ask tool always permits a free-form "type your
/// own answer" response in addition to the listed `options`. When the
/// user supplies any answer that does NOT match a key in `routes`,
/// execution routes to this node. The free-form text is available to
/// downstream nodes via `state_updates` (e.g. `clarification: "{{choice}}"`).
pub on_other: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub state_updates: Option<HashMap<String, String>>,
}
/// `input`-type node: collect free-form text from the user. Routes via the
/// top-level `next` field; the user's text is exposed to templates as
/// `{{input}}` in `state_updates`.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct InputNode {
pub question: String,
@@ -238,36 +190,13 @@ pub struct InputNode {
pub state_updates: Option<HashMap<String, String>>,
}
/// `llm`-type node: a one-shot LLM call (with bounded tool-call loop)
/// against a caller-supplied system prompt + user prompt. Unlike
/// `agent`-type nodes, this does NOT spawn a sub-agent; it runs in a
/// fresh isolated context. Tool access is opt-in via the `tools`
/// whitelist (no tools when unset).
///
/// Routing (tolerant-fail):
/// - success → `Node.next`
/// - failure WITH fallback → `fallback`
/// - failure WITHOUT fallback → `Node.next`
///
/// `state_updates` are always applied. `{{output}}` resolves to the
/// LLM's response on success, or to an error description on failure.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct LlmNode {
/// User-turn prompt. Templated against state. REQUIRED.
pub prompt: String,
/// Optional system prompt. When set, the LLM call uses an inline
/// Role with `instructions` as `Role.prompt`. Templated against
/// state.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub instructions: Option<String>,
/// Whitelist narrowing the active agent's tool universe.
/// Each entry is either an exact function name (`global_tools`
/// entry or `tools.{sh,py,ts}` subcommand) or the shorthand
/// `mcp:<server>` (where `<server>` must be in the agent's
/// `mcp_servers`). Unset or `[]` = no tools — tools are strictly
/// opt-in.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tools: Option<Vec<String>>,
@@ -283,21 +212,15 @@ pub struct LlmNode {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub fallback: Option<String>,
/// Number of attempts on transient errors. Default 1 = no retries.
#[serde(default = "default_llm_max_attempts")]
pub max_attempts: u32,
/// Hard cap on tool-call-loop turns within a single attempt.
#[serde(default = "default_llm_max_iterations")]
pub max_iterations: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub state_updates: Option<HashMap<String, String>>,
/// JSON Schema (as parsed JSON) describing the expected shape of the
/// node's output. When set, the raw LLM response is post-processed
/// through a built-in structured-output extractor and parsed as JSON.
/// Top-level keys of the parsed object are auto-merged into state.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub output_schema: Option<Value>,
@@ -313,47 +236,28 @@ fn default_llm_max_iterations() -> u32 {
10
}
/// `rag`-type node: run a hybrid (vector + keyword) retrieval against a
/// per-node knowledge base and write the result into state. The retrieved
/// context and the list of source paths are exposed to `state_updates` via
/// `{{output.context}}` and `{{output.sources}}` (the whole result is
/// `{{output}}`, a JSON object). The knowledge base is built once at agent
/// load time into `<agent>/<node-id>.yaml`.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct RagNode {
/// Knowledge sources (files, directories, URLs, loader-protocol paths).
/// REQUIRED — this is what makes the node a RAG node.
pub documents: Vec<String>,
/// Retrieval query, templated against state. Defaults to
/// `{{initial_prompt}}` when omitted.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub query: Option<String>,
/// Number of chunks to retrieve. Defaults to the knowledge base's own
/// configured `top_k` when omitted.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub top_k: Option<usize>,
/// Embedding model for building the knowledge base. When this plus
/// `chunk_size` and `chunk_overlap` are all set, knowledge-base
/// construction runs non-interactively (no prompts).
#[serde(default, skip_serializing_if = "Option::is_none")]
pub embedding_model: Option<String>,
/// Chunk size for splitting documents at build time.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub chunk_size: Option<usize>,
/// Chunk overlap for splitting documents at build time.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub chunk_overlap: Option<usize>,
/// Reranker model applied to hybrid-search results.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reranker_model: Option<String>,
/// Embedding-request batch size at build time.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub batch_size: Option<usize>,
@@ -364,8 +268,6 @@ pub struct RagNode {
pub timeout: Option<u64>,
}
/// `end`-type node: terminate execution; `output` (templated) is returned
/// as the graph's final result.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct EndNode {
#[serde(default)]
@@ -375,7 +277,6 @@ pub struct EndNode {
pub state_updates: Option<HashMap<String, String>>,
}
/// Runtime state for a graph execution: KV store plus visit history.
#[derive(Debug, Clone, Default)]
pub struct GraphState {
data: HashMap<String, Value>,
@@ -400,7 +301,6 @@ impl GraphState {
self.data.insert(key, value);
}
/// Merge a JSON object into state. Existing keys are overwritten.
pub fn merge(&mut self, json_obj: &serde_json::Map<String, Value>) {
for (key, value) in json_obj {
self.data.insert(key.clone(), value.clone());
@@ -411,8 +311,6 @@ impl GraphState {
&self.data
}
/// Record that a node has been entered. Updates both history and loop
/// counts.
pub fn visit_node(&mut self, node_id: &str) {
self.history.push(node_id.to_string());
*self.loop_counts.entry(node_id.to_string()).or_insert(0) += 1;
@@ -422,10 +320,7 @@ impl GraphState {
self.loop_counts.get(node_id).copied().unwrap_or(0)
}
pub fn history(&self) -> &[String] {
&self.history
}
#[cfg(test)]
pub fn current_node(&self) -> Option<&str> {
self.history.last().map(|s| s.as_str())
}
@@ -667,7 +562,7 @@ on_other: edit_loop
assert_eq!(state.loop_count("node1"), 2);
assert_eq!(state.loop_count("node2"), 1);
assert_eq!(state.loop_count("never"), 0);
assert_eq!(state.history().len(), 3);
assert_eq!(state.history.len(), 3);
assert_eq!(state.current_node(), Some("node1"));
}
@@ -707,7 +602,7 @@ on_other: edit_loop
initial.insert("user".to_string(), json!("alice"));
let state = GraphState::new(initial);
assert_eq!(state.get("user"), Some(&json!("alice")));
assert!(state.history().is_empty());
assert!(state.history.is_empty());
}
#[test]
+23 -17
View File
@@ -1,12 +1,3 @@
//! Execution of `approval` and `input` graph nodes via Loki's existing
//! user-interaction system (`user__ask`, `user__input`).
//!
//! Both delegate to [`crate::function::user_interaction::handle_user_tool`],
//! which prompts the user directly at depth 0 (via `inquire`) and escalates
//! to the parent through the escalation queue at depth > 0. We interpret the
//! returned JSON's `answer` field for the user's response and an `error`
//! field for escalation timeout/cancellation.
use super::state::StateManager;
use super::types::{ApprovalNode, InputNode};
use crate::config::RequestContext;
@@ -21,9 +12,6 @@ const INPUT_KEY: &str = "input";
pub struct ApprovalNodeExecutor;
impl ApprovalNodeExecutor {
/// Prompt the user with the (templated) question and routes the
/// selected option through the node's `routes` map. Returns the next
/// node ID. An escalation timeout/error propagates as a failure.
pub async fn execute(
node: &ApprovalNode,
state_manager: &mut StateManager,
@@ -60,11 +48,6 @@ impl ApprovalNodeExecutor {
pub struct InputNodeExecutor;
impl InputNodeExecutor {
/// Prompt the user for free-form text. If a `default` is configured
/// and the user submits an empty response, the default is substituted.
/// Optional `validation` is evaluated against the final value. Returns
/// `node_next` (the parent `Node.next`) on success; an escalation
/// timeout/error propagates as a failure.
pub async fn execute(
node: &InputNode,
node_next: Option<&str>,
@@ -122,12 +105,14 @@ fn build_input_question(node: &InputNode, state_manager: &StateManager) -> Resul
let mut question = state_manager
.interpolate(&node.question)
.context("Failed to interpolate input question")?;
if let Some(default_template) = &node.default {
let default = state_manager.interpolate_lenient(default_template);
if !default.is_empty() {
question = format!("{question} [default: {default}]");
}
}
Ok(question)
}
@@ -135,6 +120,7 @@ fn resolve_approval_route(node: &ApprovalNode, choice: &str) -> Result<String> {
if let Some(target) = node.routes.get(choice) {
return Ok(target.clone());
}
Ok(node.on_other.clone())
}
@@ -151,12 +137,14 @@ fn apply_state_updates_with_var(
state_manager
.state_mut()
.set(var_name.into(), Value::String(var_value.to_string()));
for (key, template) in updates {
let value = state_manager.interpolate_lenient(template);
state_manager
.state_mut()
.set(key.clone(), Value::String(value));
}
match prev {
Some(v) => state_manager.state_mut().set(var_name.into(), v),
None => {
@@ -220,6 +208,7 @@ mod tests {
for (k, v) in pairs {
map.insert((*k).into(), v.clone());
}
StateManager::new(map)
}
@@ -228,6 +217,7 @@ mod tests {
for (k, v) in routes {
r.insert((*k).into(), (*v).into());
}
ApprovalNode {
question: "?".into(),
options: options.iter().map(|s| (*s).into()).collect(),
@@ -280,6 +270,7 @@ mod tests {
&[("yes", "deploy"), ("no", "cancel")],
"clarify",
);
assert_eq!(resolve_approval_route(&node, "yes").unwrap(), "deploy");
assert_eq!(resolve_approval_route(&node, "no").unwrap(), "cancel");
}
@@ -291,6 +282,7 @@ mod tests {
&[("yes", "deploy"), ("no", "cancel")],
"clarify",
);
assert_eq!(resolve_approval_route(&node, "maybe").unwrap(), "clarify");
assert_eq!(
resolve_approval_route(&node, "free-form text").unwrap(),
@@ -303,7 +295,9 @@ mod tests {
let mut updates = HashMap::new();
updates.insert("decision".into(), "{{choice}}".into());
let mut state = manager_with(&[]);
apply_state_updates_with_var(&Some(updates), &mut state, CHOICE_KEY, "approve");
assert_eq!(state.state().get("decision"), Some(&json!("approve")));
assert_eq!(state.state().get(CHOICE_KEY), Some(&Value::Null));
}
@@ -313,7 +307,9 @@ mod tests {
let mut updates = HashMap::new();
updates.insert("decision".into(), "{{choice}}".into());
let mut state = manager_with(&[("choice", json!("preserved"))]);
apply_state_updates_with_var(&Some(updates), &mut state, CHOICE_KEY, "approve");
assert_eq!(state.state().get("decision"), Some(&json!("approve")));
assert_eq!(state.state().get(CHOICE_KEY), Some(&json!("preserved")));
}
@@ -323,7 +319,9 @@ mod tests {
let mut updates = HashMap::new();
updates.insert("api_key".into(), "{{input}}".into());
let mut state = manager_with(&[]);
apply_state_updates_with_var(&Some(updates), &mut state, INPUT_KEY, "sk-12345");
assert_eq!(state.state().get("api_key"), Some(&json!("sk-12345")));
assert_eq!(state.state().get(INPUT_KEY), Some(&Value::Null));
}
@@ -333,7 +331,9 @@ mod tests {
let state = manager_with(&[("name", json!("alice"))]);
let mut node = input("Hi, what's your name?");
node.default = Some("{{name}}".into());
let q = build_input_question(&node, &state).unwrap();
assert_eq!(q, "Hi, what's your name? [default: alice]");
}
@@ -342,7 +342,9 @@ mod tests {
let state = manager_with(&[]);
let mut node = input("Enter value:");
node.default = Some("{{missing}}".into());
let q = build_input_question(&node, &state).unwrap();
assert_eq!(q, "Enter value:");
}
@@ -350,14 +352,18 @@ mod tests {
fn input_question_uses_no_default_when_field_absent() {
let state = manager_with(&[]);
let node = input("Enter value:");
let q = build_input_question(&node, &state).unwrap();
assert_eq!(q, "Enter value:");
}
#[test]
fn no_state_updates_means_var_never_appears_in_state() {
let mut state = manager_with(&[]);
apply_state_updates_with_var(&None, &mut state, CHOICE_KEY, "approve");
assert!(state.state().get(CHOICE_KEY).is_none());
}
}
+63 -24
View File
@@ -1,14 +1,3 @@
//! Static validation for graph definitions: reference integrity, cycles,
//! reachability, terminal nodes, script/agent existence, and approval
//! routes-vs-options consistency.
//!
//! The validator only follows **declared static edges** (`next`, approval
//! `routes` and `on_other`, script/llm `fallback`). Script nodes can also route
//! dynamically via `_next` in their JSON output at runtime; those edges are
//! invisible here. As a result, unreachable-node detection and "no reachable
//! End node" are reported as warnings (not errors) to avoid false positives
//! against dynamically-routed graphs.
use super::types::{Graph, Node, NodeType};
use crate::client::{Model, ModelType};
use crate::config::{Agent, AppConfig, paths};
@@ -17,7 +6,6 @@ use std::collections::{HashSet, VecDeque};
use std::path::PathBuf;
use std::sync::Arc;
/// A single validation finding, optionally scoped to a node.
#[derive(Debug, Clone)]
pub struct ValidationError {
pub node_id: Option<String>,
@@ -40,8 +28,6 @@ impl ValidationError {
}
}
/// Aggregated validation findings: blocking `errors` and informational
/// `warnings`.
#[derive(Debug, Default)]
pub struct ValidationResult {
pub errors: Vec<ValidationError>,
@@ -61,8 +47,6 @@ impl ValidationResult {
self.warnings.push(w);
}
/// Consume into a `Result`, aggregating all errors into a single message.
/// Warnings are dropped.
pub fn into_result(self) -> Result<()> {
if self.is_valid() {
return Ok(());
@@ -75,6 +59,7 @@ impl ValidationResult {
None => format!(" {}", e.message),
})
.collect();
bail!(
"Graph validation failed with {} error(s):\n{}",
self.errors.len(),
@@ -83,9 +68,6 @@ impl ValidationResult {
}
}
/// Agent-level context that `llm`-node `tools` and `model` references are
/// validated against. Supplying it lets a typo in a node's `tools` or
/// `model` be caught at graph load time instead of when the node runs.
pub struct AgentValidationContext {
pub tool_names: HashSet<String>,
pub mcp_servers: HashSet<String>,
@@ -107,8 +89,6 @@ impl AgentValidationContext {
}
}
/// Validator for graph structures. `base_dir` is used to resolve relative
/// script paths (typically the owning agent's data directory).
pub struct GraphValidator {
base_dir: PathBuf,
agent_ctx: Option<AgentValidationContext>,
@@ -138,6 +118,7 @@ impl GraphValidator {
self.validate_approval_routes(graph, &mut result);
self.validate_rag_nodes(graph, &mut result);
self.validate_llm_nodes(graph, &mut result);
result
}
@@ -166,10 +147,12 @@ impl GraphValidator {
let Some(ctx) = &self.agent_ctx else {
return;
};
for (node_id, node) in &graph.nodes {
let NodeType::Llm(llm) = &node.node_type else {
continue;
};
if let Some(tools) = &llm.tools {
for entry in tools {
if let Some(server) = entry.strip_prefix("mcp:") {
@@ -187,6 +170,7 @@ impl GraphValidator {
}
}
}
if let Some(model_id) = &llm.model
&& Model::retrieve_model(ctx.app_config.as_ref(), model_id, ModelType::Chat)
.is_err()
@@ -332,14 +316,12 @@ impl GraphValidator {
}
}
/// All declared outgoing targets from a node, paired with a human-readable
/// label for use in error messages. Used both for cycle detection and
/// reference validation.
fn declared_targets(node: &Node) -> Vec<(String, &'static str)> {
let mut out = Vec::new();
if let Some(n) = &node.next {
out.push((n.clone(), "'next'"));
}
match &node.node_type {
NodeType::Approval(a) => {
for v in a.routes.values() {
@@ -407,6 +389,7 @@ fn detect_cycle_dfs(
if !graph.has_node(&next) {
continue;
}
if !visited.contains(&next) {
if let Some(cycle) = detect_cycle_dfs(graph, &next, visited, rec_stack, path) {
return Some(cycle);
@@ -438,6 +421,7 @@ mod tests {
for (id, node) in nodes {
map.insert(id.to_string(), node);
}
Graph {
name: "t".into(),
description: String::new(),
@@ -472,6 +456,7 @@ mod tests {
for (k, v) in routes {
r.insert((*k).into(), (*v).into());
}
Node {
id: id.into(),
description: String::new(),
@@ -506,6 +491,7 @@ mod tests {
m.insert("ctx".into(), "{{output.context}}".into());
m
});
Node {
id: id.into(),
description: String::new(),
@@ -556,7 +542,9 @@ mod tests {
],
"l",
);
let result = validator().validate(&graph);
assert!(!result.is_valid());
assert!(result.errors.iter().any(|e| e.message.contains("ghost")));
}
@@ -575,6 +563,7 @@ mod tests {
n.tools = tools.map(|t| t.iter().map(|s| s.to_string()).collect());
n.model = model.map(String::from);
}
node
}
@@ -587,9 +576,11 @@ mod tests {
],
"l",
);
let result = validator()
.with_agent_context(agent_ctx(&["read_query"], &[]))
.validate(&graph);
assert!(!result.is_valid());
assert!(
result
@@ -608,9 +599,11 @@ mod tests {
],
"l",
);
let result = validator()
.with_agent_context(agent_ctx(&["read_query"], &[]))
.validate(&graph);
assert!(result.is_valid());
}
@@ -623,9 +616,11 @@ mod tests {
],
"l",
);
let result = validator()
.with_agent_context(agent_ctx(&[], &["pubmed-search"]))
.validate(&graph);
assert!(!result.is_valid());
assert!(
result
@@ -647,9 +642,11 @@ mod tests {
],
"l",
);
let result = validator()
.with_agent_context(agent_ctx(&[], &["pubmed-search"]))
.validate(&graph);
assert!(result.is_valid());
}
@@ -662,9 +659,11 @@ mod tests {
],
"l",
);
let result = validator()
.with_agent_context(agent_ctx(&[], &[]))
.validate(&graph);
assert!(!result.is_valid());
assert!(
result
@@ -683,7 +682,9 @@ mod tests {
],
"l",
);
let result = validator().validate(&graph);
assert!(result.is_valid());
}
@@ -693,7 +694,9 @@ mod tests {
vec![("r", rag_node("r", &[], true)), ("end", end_node("end"))],
"r",
);
let result = validator().validate(&graph);
assert!(!result.is_valid());
assert!(
result
@@ -712,7 +715,9 @@ mod tests {
],
"r",
);
let result = validator().validate(&graph);
assert!(result.is_valid());
assert!(
result
@@ -731,7 +736,9 @@ mod tests {
],
"r",
);
let result = validator().validate(&graph);
assert!(result.is_valid());
assert!(
!result
@@ -765,7 +772,9 @@ mod tests {
let mut start = end_node("start");
start.next = Some("end".into());
let graph = graph_with(vec![("start", start), ("end", end_node("end"))], "start");
let result = validator().validate(&graph);
assert!(result.is_valid(), "errors: {:?}", result.errors);
}
@@ -774,7 +783,9 @@ mod tests {
let mut n = end_node("n1");
n.next = Some("nope".into());
let graph = graph_with(vec![("n1", n), ("end", end_node("end"))], "n1");
let result = validator().validate(&graph);
assert!(!result.is_valid());
assert!(
result
@@ -794,7 +805,9 @@ mod tests {
"end",
);
let graph = graph_with(vec![("ap", approval), ("end", end_node("end"))], "ap");
let result = validator().validate(&graph);
assert!(!result.is_valid());
assert!(
result
@@ -808,7 +821,9 @@ mod tests {
fn flags_missing_approval_on_other_target() {
let approval = approval_node("ap", &["yes"], &[("yes", "end")], "missing");
let graph = graph_with(vec![("ap", approval), ("end", end_node("end"))], "ap");
let result = validator().validate(&graph);
assert!(!result.is_valid());
assert!(
result
@@ -823,7 +838,9 @@ mod tests {
fn flags_missing_script_fallback_target() {
let scr = script_node("s", "does-not-exist.py", Some("nowhere"));
let graph = graph_with(vec![("s", scr), ("end", end_node("end"))], "s");
let result = validator().validate(&graph);
assert!(
result
.errors
@@ -839,7 +856,9 @@ mod tests {
let mut b = end_node("b");
b.next = Some("a".into());
let graph = graph_with(vec![("a", a), ("b", b)], "a");
let result = validator().validate(&graph);
assert!(!result.is_valid());
assert!(
result
@@ -854,7 +873,9 @@ mod tests {
let mut a = end_node("a");
a.next = Some("a".into());
let graph = graph_with(vec![("a", a)], "a");
let result = validator().validate(&graph);
assert!(
result
.errors
@@ -869,7 +890,9 @@ mod tests {
vec![("start", end_node("start")), ("orphan", end_node("orphan"))],
"start",
);
let result = validator().validate(&graph);
assert!(
result.warnings.iter().any(
|w| w.node_id.as_deref() == Some("orphan") && w.message.contains("unreachable")
@@ -883,7 +906,9 @@ mod tests {
let b = agent_node("b", "__no_such_agent__", None);
a.next = Some("b".into());
let graph = graph_with(vec![("a", a), ("b", b)], "a");
let result = validator().validate(&graph);
assert!(
result
.errors
@@ -911,7 +936,9 @@ mod tests {
vec![("start", start), ("orphan_end", end_node("orphan_end"))],
"start",
);
let result = validator().validate(&graph);
assert!(result.is_valid(), "unexpected errors: {:?}", result.errors);
assert!(
result
@@ -930,7 +957,9 @@ mod tests {
vec![("start", start), ("s", scr), ("end", end_node("end"))],
"start",
);
let result = validator().validate(&graph);
assert!(
result
.errors
@@ -944,7 +973,9 @@ mod tests {
fn errors_when_referenced_agent_missing() {
let agent = agent_node("a", "__definitely_no_such_agent__", Some("end"));
let graph = graph_with(vec![("a", agent), ("end", end_node("end"))], "a");
let result = validator().validate(&graph);
assert!(result.errors.iter().any(|e| {
e.message
.contains("Agent '__definitely_no_such_agent__' not found")
@@ -955,7 +986,9 @@ mod tests {
fn errors_when_approval_option_has_no_route() {
let approval = approval_node("ap", &["yes", "no"], &[("yes", "end")], "end");
let graph = graph_with(vec![("ap", approval), ("end", end_node("end"))], "ap");
let result = validator().validate(&graph);
assert!(
result
.errors
@@ -968,7 +1001,9 @@ mod tests {
fn warns_when_approval_has_extra_route() {
let approval = approval_node("ap", &["yes"], &[("yes", "end"), ("maybe", "end")], "end");
let graph = graph_with(vec![("ap", approval), ("end", end_node("end"))], "ap");
let result = validator().validate(&graph);
assert!(result.warnings.iter().any(|w| {
w.message
.contains("Route 'maybe' has no corresponding option")
@@ -982,11 +1017,13 @@ mod tests {
let mut b = end_node("b");
b.next = Some("missing2".into());
let graph = graph_with(vec![("a", a), ("b", b)], "a");
let err = validator()
.validate(&graph)
.into_result()
.unwrap_err()
.to_string();
assert!(err.contains("missing1"), "got: {err}");
assert!(err.contains("missing2"), "got: {err}");
assert!(err.contains("validation failed"), "got: {err}");
@@ -996,7 +1033,9 @@ mod tests {
fn into_result_returns_ok_when_no_errors() {
let mut start = end_node("start");
start.next = Some("end".into());
let graph = graph_with(vec![("start", start), ("end", end_node("end"))], "start");
assert!(validator().validate(&graph).into_result().is_ok());
}
}
+1 -1
View File
@@ -22,6 +22,7 @@ use anyhow::{Context, Result, bail};
use crossterm::cursor::SetCursorStyle;
use fancy_regex::Regex;
use indoc::indoc;
use log::warn;
use parking_lot::RwLock;
use reedline::CursorConfig;
use reedline::{
@@ -32,7 +33,6 @@ use reedline::{
use reedline::{MenuBuilder, Signal};
use std::sync::LazyLock;
use std::{env, process, sync::Arc};
use log::warn;
const MENU_NAME: &str = "completion_menu";