use super::state::template_root_keys; use super::types::{Graph, Node, NodeType}; use crate::client::{Model, ModelType}; use crate::config::{Agent, AppConfig, paths}; use anyhow::{Result, bail}; use std::collections::{BTreeMap, HashSet, VecDeque}; use std::path::PathBuf; use std::sync::Arc; #[derive(Debug, Clone)] pub struct ValidationError { pub node_id: Option, pub message: String, } impl ValidationError { fn new(message: impl Into) -> Self { Self { node_id: None, message: message.into(), } } fn with_node(node_id: impl Into, message: impl Into) -> Self { Self { node_id: Some(node_id.into()), message: message.into(), } } } #[derive(Debug, Default)] pub struct ValidationResult { pub errors: Vec, pub warnings: Vec, } impl ValidationResult { pub fn is_valid(&self) -> bool { self.errors.is_empty() } fn error(&mut self, e: ValidationError) { self.errors.push(e); } fn warning(&mut self, w: ValidationError) { self.warnings.push(w); } pub fn into_result(self) -> Result<()> { if self.is_valid() { return Ok(()); } let lines: Vec = self .errors .iter() .map(|e| match &e.node_id { Some(id) => format!(" [{id}] {}", e.message), None => format!(" {}", e.message), }) .collect(); bail!( "Graph validation failed with {} error(s):\n{}", self.errors.len(), lines.join("\n") ); } } pub struct AgentValidationContext { pub tool_names: HashSet, pub mcp_servers: HashSet, pub app_config: Arc, } impl AgentValidationContext { pub fn from_agent(agent: &Agent, app_config: Arc) -> Self { Self { tool_names: agent .functions() .declarations() .iter() .map(|d| d.name.clone()) .collect(), mcp_servers: agent.mcp_server_names().iter().cloned().collect(), app_config, } } } pub struct GraphValidator { base_dir: PathBuf, agent_ctx: Option, } impl GraphValidator { pub fn new(base_dir: impl Into) -> Self { Self { base_dir: base_dir.into(), agent_ctx: None, } } pub fn with_agent_context(mut self, ctx: AgentValidationContext) -> Self { self.agent_ctx = Some(ctx); self } pub fn validate(&self, graph: &Graph) -> ValidationResult { let mut result = ValidationResult::default(); self.validate_node_references(graph, &mut result); self.validate_cycles(graph, &mut result); self.validate_reachability(graph, &mut result); self.validate_terminal_nodes(graph, &mut result); self.validate_scripts(graph, &mut result); self.validate_agents(graph, &mut result); self.validate_approval_routes(graph, &mut result); self.validate_rag_nodes(graph, &mut result); self.validate_llm_nodes(graph, &mut result); self.validate_max_concurrency(graph, &mut result); self.validate_map_branches(graph, &mut result); self.validate_parallel_user_interaction(graph, &mut result); self.validate_parallel_writes(graph, &mut result); self.validate_parallel_reads(graph, &mut result); result } fn validate_rag_nodes(&self, graph: &Graph, result: &mut ValidationResult) { for (node_id, node) in &graph.nodes { if let NodeType::Rag(r) = &node.node_type { if r.documents.is_empty() { result.error(ValidationError::with_node( node_id, "RAG node has no 'documents'; at least one knowledge source \ is required", )); } if r.state_updates.is_none() { result.warning(ValidationError::with_node( node_id, "RAG node has no 'state_updates'; its retrieval result will \ not be written to state", )); } } } } fn validate_llm_nodes(&self, graph: &Graph, result: &mut ValidationResult) { let Some(ctx) = &self.agent_ctx else { return; }; for (node_id, node) in &graph.nodes { let NodeType::Llm(llm) = &node.node_type else { continue; }; if let Some(tools) = &llm.tools { for entry in tools { if let Some(server) = entry.strip_prefix("mcp:") { if !ctx.mcp_servers.contains(server) { result.error(ValidationError::with_node( node_id, format!("llm node references unknown MCP server 'mcp:{server}'"), )); } } else if !ctx.tool_names.contains(entry) { result.error(ValidationError::with_node( node_id, format!("llm node references unknown tool '{entry}'"), )); } } } if let Some(model_id) = &llm.model && Model::retrieve_model(ctx.app_config.as_ref(), model_id, ModelType::Chat) .is_err() { result.error(ValidationError::with_node( node_id, format!("llm node references unknown model '{model_id}'"), )); } } } fn validate_node_references(&self, graph: &Graph, result: &mut ValidationResult) { for (node_id, node) in &graph.nodes { for (target, label) in declared_targets(node) { if !graph.has_node(&target) { result.error(ValidationError::with_node( node_id, format!("References non-existent node '{target}' in {label}"), )); } } } } fn validate_cycles(&self, graph: &Graph, result: &mut ValidationResult) { let mut visited: HashSet = HashSet::new(); let mut rec_stack: HashSet = HashSet::new(); let mut path: Vec = Vec::new(); for node_id in graph.node_ids() { if !visited.contains(node_id) && let Some(cycle) = detect_cycle_dfs(graph, node_id, &mut visited, &mut rec_stack, &mut path) { result.error(ValidationError::new(format!( "Cycle detected: {}", cycle.join(" -> ") ))); return; } } } fn validate_reachability(&self, graph: &Graph, result: &mut ValidationResult) { let reachable = find_reachable_nodes(graph); for node_id in graph.node_ids() { if !reachable.contains(node_id) { result.warning(ValidationError::with_node( node_id, "Node is unreachable from the start node via declared edges \ (script `_next` routing is not analyzed)", )); } } } fn validate_terminal_nodes(&self, graph: &Graph, result: &mut ValidationResult) { let has_any_end = graph .nodes .values() .any(|n| matches!(n.node_type, NodeType::End(_))); if !has_any_end { result.error(ValidationError::new( "Graph has no end nodes; execution would never terminate", )); return; } let reachable = find_reachable_nodes(graph); let reachable_end = graph .nodes .iter() .any(|(id, n)| matches!(n.node_type, NodeType::End(_)) && reachable.contains(id)); if !reachable_end { result.warning(ValidationError::new( "No end node is reachable from the start node via declared edges \ (a script's `_next` may still route to one)", )); } } fn validate_scripts(&self, graph: &Graph, result: &mut ValidationResult) { for (node_id, node) in &graph.nodes { if let NodeType::Script(s) = &node.node_type { let script_path = self.base_dir.join(&s.script); if !script_path.exists() { result.error(ValidationError::with_node( node_id, format!("Script file not found: '{}'", script_path.display()), )); } } } } fn validate_agents(&self, graph: &Graph, result: &mut ValidationResult) { for (node_id, node) in &graph.nodes { if let NodeType::Agent(a) = &node.node_type { let agent_dir = paths::agent_data_dir(&a.agent); let has_config = paths::agent_config_file(&a.agent).exists(); let has_graph = paths::agent_graph_file(&a.agent).exists(); if !agent_dir.exists() { result.error(ValidationError::with_node( node_id, format!("Agent '{}' not found (directory missing)", a.agent), )); } else if !has_config && !has_graph { result.error(ValidationError::with_node( node_id, format!( "Agent '{}' has neither a config.yaml nor a graph.yaml", a.agent ), )); } } } } fn validate_approval_routes(&self, graph: &Graph, result: &mut ValidationResult) { for (node_id, node) in &graph.nodes { if let NodeType::Approval(a) = &node.node_type { for option in &a.options { if !a.routes.contains_key(option) { result.error(ValidationError::with_node( node_id, format!("Approval option '{option}' has no route defined"), )); } } for key in a.routes.keys() { if !a.options.contains(key) { result.warning(ValidationError::with_node( node_id, format!("Route '{key}' has no corresponding option"), )); } } } } } // Parallel-execution validation. // // The v1 algorithm uses immediate-successor analysis only: a parallel group is the set of `next:` targets of a // single fan-out node. Map nodes are checked separately by `validate_map_branches` (the branch is self-parallel, // but enforcement comes from strict-mode rules on the branch node, not from group membership). Transitive parallel // groups (deeper fan-out chains) are a v2 enhancement; v1 over-reports rather than under-reports. A false positive // forces an unneeded reducer (mild annoyance); a false negative allows silent data races (catastrophic). fn validate_max_concurrency(&self, graph: &Graph, result: &mut ValidationResult) { if graph.settings.max_concurrency == 0 { result.error(ValidationError::new( "settings.max_concurrency must be >= 1 (got 0); a zero cap would \ deadlock the executor", )); } for (node_id, node) in &graph.nodes { if let NodeType::Map(m) = &node.node_type && let Some(0) = m.max_concurrency { result.error(ValidationError::with_node( node_id, "map node's `max_concurrency` must be >= 1 (got 0); a zero cap \ would deadlock the executor", )); } } } fn validate_map_branches(&self, graph: &Graph, result: &mut ValidationResult) { for (map_id, node) in &graph.nodes { let NodeType::Map(m) = &node.node_type else { continue; }; let Some(branch) = graph.get_node(&m.branch) else { continue; }; match &branch.node_type { NodeType::Approval(_) => { result.error(ValidationError::with_node( map_id, format!( "map node points to branch '{}' which is an approval node; \ approval/input nodes cannot run inside a parallel map branch \ (the CLI would prompt the user N times concurrently)", m.branch ), )); continue; } NodeType::Input(_) => { result.error(ValidationError::with_node( map_id, format!( "map node points to branch '{}' which is an input node; \ input nodes cannot run inside a parallel map branch", m.branch ), )); continue; } NodeType::End(_) => { result.error(ValidationError::with_node( map_id, format!( "map node points to branch '{}' which is an end node; \ map branches terminate via the map's collect mechanism, \ not via end nodes", m.branch ), )); continue; } NodeType::Map(_) => { result.error(ValidationError::with_node( map_id, format!( "map node points to branch '{}' which is itself a map node; \ nested map fan-outs are not supported in v1", m.branch ), )); continue; } _ => {} } if branch.next.is_some() { result.error(ValidationError::with_node( m.branch.clone(), format!( "branch node '{}' has a `next` declared, but map branches must be \ atomic (one node, one execution per item). Remove `next` or \ restructure the workflow so any chaining happens after the map.", m.branch ), )); } if let Some(updates) = node_state_updates_keys(branch) { for k in &updates { if k != &m.output_key { result.error(ValidationError::with_node( m.branch.clone(), format!( "branch node '{}' writes state key '{}' via state_updates, \ but map branches may only write through their `output_key` \ ('{}'). Rename the write, or move the side effect outside \ the map.", m.branch, k, m.output_key ), )); } } } let schema_keys = output_schema_top_level_keys(branch); if !schema_keys.is_empty() { let mut keys_sorted: Vec = schema_keys.into_iter().collect(); keys_sorted.sort(); result.error(ValidationError::with_node( m.branch.clone(), format!( "branch node '{}' has an `output_schema` with top-level \ properties ({}); map branches must write only through their \ `output_key` ('{}'). Remove `output_schema`, or use state_updates \ to map the output explicitly.", m.branch, keys_sorted.join(", "), m.output_key ), )); } } } fn validate_parallel_user_interaction(&self, graph: &Graph, result: &mut ValidationResult) { for group in compute_parallel_groups(graph) { for node_id in &group { let Some(node) = graph.get_node(node_id) else { continue; }; match &node.node_type { NodeType::Approval(_) => { result.error(ValidationError::with_node( node_id, "approval node is an immediate target of a fan-out \ (`next: [...]`); approvals must run after the join, \ not inside a parallel branch", )); } NodeType::Input(_) => { result.error(ValidationError::with_node( node_id, "input node is an immediate target of a fan-out \ (`next: [...]`); input nodes must run after the join, \ not inside a parallel branch", )); } _ => {} } } } } fn validate_parallel_writes(&self, graph: &Graph, result: &mut ValidationResult) { for group in compute_parallel_groups(graph) { let mut node_writes: Vec<(String, HashSet)> = Vec::new(); for node_id in &group { let Some(node) = graph.get_node(node_id) else { continue; }; match write_set_of(node) { Some(set) => node_writes.push((node_id.clone(), set)), None => { result.error(ValidationError::with_node( node_id, "script node is in a parallel branch but declares no \ `state_updates`; parallel scripts must declare their writes \ explicitly to avoid silent state collisions", )); } } } let mut writers_by_key: BTreeMap> = BTreeMap::new(); for (nid, ws) in &node_writes { for k in ws { writers_by_key .entry(k.clone()) .or_default() .push(nid.clone()); } } for (key, mut writers) in writers_by_key { if writers.len() < 2 { continue; } if graph.reducers.contains_key(&key) { continue; } writers.sort(); result.error(ValidationError::new(format!( "nodes [{}] all write key '{}' in the same parallel super-step but \ no reducer is declared for '{}'. Add `reducers: {{ {}: }}` \ at the graph root (built-ins: append, extend, concat, sum, max, min, \ merge, overwrite), or rename one node's output.", writers.join(", "), key, key, key, ))); } } } fn validate_parallel_reads(&self, graph: &Graph, result: &mut ValidationResult) { for group in compute_parallel_groups(graph) { let nodes: Vec<(&String, &Node)> = group .iter() .filter_map(|id| graph.nodes.get(id).map(|n| (id, n))) .collect(); for (id_a, node_a) in &nodes { let read_set_a = read_set_of(node_a); if read_set_a.is_empty() { continue; } for (id_b, node_b) in &nodes { if id_b == id_a { continue; } let Some(write_set_b) = write_set_of(node_b) else { continue; }; let mut collisions: Vec = read_set_a.intersection(&write_set_b).cloned().collect(); if collisions.is_empty() { continue; } collisions.sort(); let keys = collisions .iter() .map(|k| format!("`{k}`")) .collect::>() .join(", "); result.error(ValidationError::with_node( id_a.as_str(), format!( "node '{id_a}' reads state key(s) {keys} which sibling parallel \ branch '{id_b}' writes in the same super-step; parallel branches \ see a state snapshot taken BEFORE the super-step and cannot observe \ each other's writes. Move the dependent read to a later super-step \ (or remove the cross-branch reference)." ), )); } } } } } fn declared_targets(node: &Node) -> Vec<(String, &'static str)> { let mut out = Vec::new(); if let Some(targets) = &node.next { for target in targets.as_slice() { out.push((target.clone(), "'next'")); } } match &node.node_type { NodeType::Approval(a) => { for v in a.routes.values() { out.push((v.clone(), "approval 'routes'")); } out.push((a.on_other.clone(), "approval 'on_other'")); } NodeType::Script(s) => { if let Some(t) = &s.fallback { out.push((t.clone(), "script 'fallback'")); } } NodeType::Llm(l) => { if let Some(t) = &l.fallback { out.push((t.clone(), "llm 'fallback'")); } } NodeType::Map(m) => { out.push((m.branch.clone(), "map 'branch'")); } // `agent`/`input`/`rag` route only via `next` (already collected // above); `end` is terminal. No type-specific routing edges to add. NodeType::Agent(_) | NodeType::Input(_) | NodeType::Rag(_) | NodeType::End(_) => {} } out } fn outgoing_node_ids(node: &Node) -> Vec { declared_targets(node).into_iter().map(|(t, _)| t).collect() } fn find_reachable_nodes(graph: &Graph) -> HashSet { let mut reachable: HashSet = HashSet::new(); let mut queue: VecDeque = VecDeque::new(); if !graph.has_node(&graph.start) { return reachable; } reachable.insert(graph.start.clone()); queue.push_back(graph.start.clone()); while let Some(id) = queue.pop_front() { if let Some(node) = graph.get_node(&id) { for next in outgoing_node_ids(node) { if graph.has_node(&next) && reachable.insert(next.clone()) { queue.push_back(next); } } } } reachable } // v1 parallel-group detection: only the immediate `next` targets of a fan-out node count as a parallel group. Map // branches are handled separately by `validate_map_branches` (the branch's self-parallelism is checked via strict-mode // rules on the branch node itself, not via group membership). // // Returns one HashSet per fan-out source; deeper transitive parallelism is intentionally out of scope for v1. fn compute_parallel_groups(graph: &Graph) -> Vec> { let mut groups = Vec::new(); for node in graph.nodes.values() { if let Some(targets) = &node.next && targets.is_fan_out() { groups.push(targets.as_slice().iter().cloned().collect()); } } groups } // Computes the set of state keys this node can write to. // // Sources considered: // - `state_updates` keys (every node type that has them) // - `output_schema` top-level `properties` for `llm` and `agent` (auto-merge) fn write_set_of(node: &Node) -> Option> { if matches!(node.node_type, NodeType::Script(_)) && node_state_updates_keys(node).is_none() { return None; } let mut writes = HashSet::new(); if let Some(keys) = node_state_updates_keys(node) { writes.extend(keys); } writes.extend(output_schema_top_level_keys(node)); Some(writes) } // Computes the set of root state keys this node's templated fields read from. // // "Root key" follows the same definition as `template_root_keys`: for a // reference like `{{user.name}}` or `{{items[0]}}`, the root is the bare // identifier before the first `.` or `[`. fn read_set_of(node: &Node) -> HashSet { let mut reads: HashSet = HashSet::new(); let scoped: &[&str] = match &node.node_type { NodeType::Llm(_) | NodeType::Agent(_) | NodeType::Rag(_) => &["output"], NodeType::Approval(_) => &["choice"], NodeType::Input(_) => &["input"], NodeType::Script(_) | NodeType::End(_) | NodeType::Map(_) => &[], }; for s in primary_templated_fields(node) { for k in template_root_keys(&s) { reads.insert(k); } } if let Some(updates) = node_state_updates_map(node) { for v in updates.values() { for k in template_root_keys(v) { if !scoped.contains(&k.as_str()) { reads.insert(k); } } } } reads } fn primary_templated_fields(node: &Node) -> Vec { match &node.node_type { NodeType::Llm(n) => { let mut v = vec![n.prompt.clone()]; if let Some(i) = &n.instructions { v.push(i.clone()); } v } NodeType::Agent(n) => vec![n.prompt.clone()], NodeType::Rag(n) => { vec![ n.query .clone() .unwrap_or_else(|| "{{initial_prompt}}".to_string()), ] } NodeType::Approval(n) => vec![n.question.clone()], NodeType::Input(n) => { let mut v = vec![n.question.clone()]; if let Some(d) = &n.default { v.push(d.clone()); } v } NodeType::End(n) => vec![n.output.clone()], NodeType::Map(n) => vec![n.over.clone()], NodeType::Script(_) => Vec::new(), } } fn node_state_updates_map(node: &Node) -> Option<&std::collections::HashMap> { match &node.node_type { NodeType::Llm(n) => n.state_updates.as_ref(), NodeType::Agent(n) => n.state_updates.as_ref(), NodeType::Rag(n) => n.state_updates.as_ref(), NodeType::Approval(n) => n.state_updates.as_ref(), NodeType::Input(n) => n.state_updates.as_ref(), NodeType::Script(n) => n.state_updates.as_ref(), NodeType::End(n) => n.state_updates.as_ref(), NodeType::Map(_) => None, } } fn node_state_updates_keys(node: &Node) -> Option> { let updates = match &node.node_type { NodeType::Agent(n) => n.state_updates.as_ref(), NodeType::Script(n) => n.state_updates.as_ref(), NodeType::Approval(n) => n.state_updates.as_ref(), NodeType::Input(n) => n.state_updates.as_ref(), NodeType::Llm(n) => n.state_updates.as_ref(), NodeType::Rag(n) => n.state_updates.as_ref(), NodeType::End(n) => n.state_updates.as_ref(), NodeType::Map(_) => return None, }; updates.map(|m| m.keys().cloned().collect()) } fn output_schema_top_level_keys(node: &Node) -> HashSet { let schema = match &node.node_type { NodeType::Agent(n) => n.output_schema.as_ref(), NodeType::Llm(n) => n.output_schema.as_ref(), _ => return HashSet::new(), }; let Some(schema) = schema else { return HashSet::new(); }; let Some(properties) = schema.get("properties").and_then(|p| p.as_object()) else { return HashSet::new(); }; properties.keys().cloned().collect() } fn detect_cycle_dfs( graph: &Graph, node_id: &str, visited: &mut HashSet, rec_stack: &mut HashSet, path: &mut Vec, ) -> Option> { visited.insert(node_id.to_string()); rec_stack.insert(node_id.to_string()); path.push(node_id.to_string()); if let Some(node) = graph.get_node(node_id) { for next in outgoing_node_ids(node) { if !graph.has_node(&next) { continue; } if !visited.contains(&next) { if let Some(cycle) = detect_cycle_dfs(graph, &next, visited, rec_stack, path) { return Some(cycle); } } else if rec_stack.contains(&next) { let start = path.iter().position(|n| n == &next).unwrap_or(0); let mut cycle: Vec = path[start..].to_vec(); cycle.push(next.clone()); return Some(cycle); } } } path.pop(); rec_stack.remove(node_id); None } #[cfg(test)] mod tests { use super::super::types::*; use super::*; use indexmap::IndexMap; use std::collections::HashMap; use std::env; fn graph_with(nodes: Vec<(&str, Node)>, start: &str) -> Graph { let mut map: IndexMap = IndexMap::new(); for (id, node) in nodes { map.insert(id.to_string(), node); } Graph { name: "t".into(), description: String::new(), version: "1.0".into(), model: None, temperature: None, top_p: None, global_tools: Vec::new(), mcp_servers: Vec::new(), conversation_starters: Vec::new(), variables: Vec::new(), settings: GraphSettings::default(), initial_state: HashMap::new(), reducers: HashMap::new(), start: start.into(), nodes: map, } } fn end_node(id: &str) -> Node { Node { id: id.into(), description: String::new(), node_type: NodeType::End(EndNode { output: String::new(), state_updates: None, }), next: None, } } fn approval_node(id: &str, options: &[&str], routes: &[(&str, &str)], on_other: &str) -> Node { let mut r: HashMap = HashMap::new(); for (k, v) in routes { r.insert((*k).into(), (*v).into()); } Node { id: id.into(), description: String::new(), node_type: NodeType::Approval(ApprovalNode { question: "?".into(), options: options.iter().map(|s| (*s).into()).collect(), routes: r, on_other: on_other.into(), state_updates: None, }), next: None, } } fn script_node(id: &str, script: &str, fallback: Option<&str>) -> Node { Node { id: id.into(), description: String::new(), node_type: NodeType::Script(ScriptNode { script: script.into(), state_updates: None, fallback: fallback.map(String::from), timeout: 30, }), next: None, } } fn rag_node(id: &str, documents: &[&str], with_state_updates: bool) -> Node { let state_updates = with_state_updates.then(|| { let mut m: HashMap = HashMap::new(); m.insert("ctx".into(), "{{output.context}}".into()); m }); Node { id: id.into(), description: String::new(), node_type: NodeType::Rag(RagNode { documents: documents.iter().map(|s| (*s).into()).collect(), query: None, top_k: None, embedding_model: None, chunk_size: None, chunk_overlap: None, reranker_model: None, batch_size: None, state_updates, timeout: None, }), next: Some("end".into()), } } fn llm_node(id: &str, fallback: Option<&str>, next: Option<&str>) -> Node { Node { id: id.into(), description: String::new(), node_type: NodeType::Llm(LlmNode { instructions: None, prompt: "p".into(), tools: None, model: None, temperature: None, top_p: None, fallback: fallback.map(String::from), max_attempts: 1, max_iterations: 10, state_updates: None, output_schema: None, timeout: None, }), next: next.map(NextTargets::from), } } #[test] fn flags_missing_llm_fallback_target() { let graph = graph_with( vec![ ("l", llm_node("l", Some("ghost"), Some("end"))), ("end", end_node("end")), ], "l", ); let result = validator().validate(&graph); assert!(!result.is_valid()); assert!(result.errors.iter().any(|e| e.message.contains("ghost"))); } fn agent_ctx(tools: &[&str], mcp: &[&str]) -> AgentValidationContext { AgentValidationContext { tool_names: tools.iter().map(|s| s.to_string()).collect(), mcp_servers: mcp.iter().map(|s| s.to_string()).collect(), app_config: Arc::new(AppConfig::default()), } } fn llm_node_with(id: &str, tools: Option>, model: Option<&str>) -> Node { let mut node = llm_node(id, None, Some("end")); if let NodeType::Llm(ref mut n) = node.node_type { n.tools = tools.map(|t| t.iter().map(|s| s.to_string()).collect()); n.model = model.map(String::from); } node } #[test] fn llm_node_unknown_tool_is_an_error() { let graph = graph_with( vec![ ("l", llm_node_with("l", Some(vec!["bogus_tool"]), None)), ("end", end_node("end")), ], "l", ); let result = validator() .with_agent_context(agent_ctx(&["read_query"], &[])) .validate(&graph); assert!(!result.is_valid()); assert!( result .errors .iter() .any(|e| e.message.contains("bogus_tool")) ); } #[test] fn llm_node_known_tool_passes() { let graph = graph_with( vec![ ("l", llm_node_with("l", Some(vec!["read_query"]), None)), ("end", end_node("end")), ], "l", ); let result = validator() .with_agent_context(agent_ctx(&["read_query"], &[])) .validate(&graph); assert!(result.is_valid()); } #[test] fn llm_node_unknown_mcp_server_is_an_error() { let graph = graph_with( vec![ ("l", llm_node_with("l", Some(vec!["mcp:bogus"]), None)), ("end", end_node("end")), ], "l", ); let result = validator() .with_agent_context(agent_ctx(&[], &["pubmed-search"])) .validate(&graph); assert!(!result.is_valid()); assert!( result .errors .iter() .any(|e| e.message.contains("mcp:bogus")) ); } #[test] fn llm_node_known_mcp_server_passes() { let graph = graph_with( vec![ ( "l", llm_node_with("l", Some(vec!["mcp:pubmed-search"]), None), ), ("end", end_node("end")), ], "l", ); let result = validator() .with_agent_context(agent_ctx(&[], &["pubmed-search"])) .validate(&graph); assert!(result.is_valid()); } #[test] fn llm_node_unknown_model_is_an_error() { let graph = graph_with( vec![ ("l", llm_node_with("l", None, Some("nonexistent:model"))), ("end", end_node("end")), ], "l", ); let result = validator() .with_agent_context(agent_ctx(&[], &[])) .validate(&graph); assert!(!result.is_valid()); assert!( result .errors .iter() .any(|e| e.message.contains("nonexistent:model")) ); } #[test] fn llm_node_validation_skipped_without_agent_context() { let graph = graph_with( vec![ ("l", llm_node_with("l", Some(vec!["bogus_tool"]), None)), ("end", end_node("end")), ], "l", ); let result = validator().validate(&graph); assert!(result.is_valid()); } #[test] fn rag_node_without_documents_errors() { let graph = graph_with( vec![("r", rag_node("r", &[], true)), ("end", end_node("end"))], "r", ); let result = validator().validate(&graph); assert!(!result.is_valid()); assert!( result .errors .iter() .any(|e| e.message.contains("no 'documents'") && e.node_id.as_deref() == Some("r")) ); } #[test] fn rag_node_without_state_updates_warns() { let graph = graph_with( vec![ ("r", rag_node("r", &["./docs"], false)), ("end", end_node("end")), ], "r", ); let result = validator().validate(&graph); assert!(result.is_valid()); assert!( result .warnings .iter() .any(|w| w.message.contains("no 'state_updates'")) ); } #[test] fn valid_rag_node_produces_no_findings() { let graph = graph_with( vec![ ("r", rag_node("r", &["./docs"], true)), ("end", end_node("end")), ], "r", ); let result = validator().validate(&graph); assert!(result.is_valid()); assert!( !result .warnings .iter() .any(|w| w.message.contains("RAG node")) ); } fn agent_node(id: &str, agent: &str, next: Option<&str>) -> Node { Node { id: id.into(), description: String::new(), node_type: NodeType::Agent(AgentNode { agent: agent.into(), prompt: "hi".into(), state_updates: None, output_schema: None, timeout: None, }), next: next.map(NextTargets::from), } } fn validator() -> GraphValidator { GraphValidator::new(env::current_dir().unwrap()) } #[test] fn valid_simple_graph_passes() { let mut start = end_node("start"); start.next = Some("end".into()); let graph = graph_with(vec![("start", start), ("end", end_node("end"))], "start"); let result = validator().validate(&graph); assert!(result.is_valid(), "errors: {:?}", result.errors); } #[test] fn flags_missing_node_reference_in_next() { let mut n = end_node("n1"); n.next = Some("nope".into()); let graph = graph_with(vec![("n1", n), ("end", end_node("end"))], "n1"); let result = validator().validate(&graph); assert!(!result.is_valid()); assert!( result .errors .iter() .any(|e| e.message.contains("non-existent node 'nope'") && e.node_id.as_deref() == Some("n1")) ); } #[test] fn flags_missing_approval_route_target() { let approval = approval_node( "ap", &["yes", "no"], &[("yes", "end"), ("no", "missing")], "end", ); let graph = graph_with(vec![("ap", approval), ("end", end_node("end"))], "ap"); let result = validator().validate(&graph); assert!(!result.is_valid()); assert!( result .errors .iter() .any(|e| e.message.contains("non-existent node 'missing'")) ); } #[test] fn flags_missing_approval_on_other_target() { let approval = approval_node("ap", &["yes"], &[("yes", "end")], "missing"); let graph = graph_with(vec![("ap", approval), ("end", end_node("end"))], "ap"); let result = validator().validate(&graph); assert!(!result.is_valid()); assert!( result .errors .iter() .any(|e| e.message.contains("non-existent node 'missing'") && e.message.contains("on_other")) ); } #[test] fn flags_missing_script_fallback_target() { let scr = script_node("s", "does-not-exist.py", Some("nowhere")); let graph = graph_with(vec![("s", scr), ("end", end_node("end"))], "s"); let result = validator().validate(&graph); assert!( result .errors .iter() .any(|e| e.message.contains("non-existent node 'nowhere'")) ); } #[test] fn detects_two_node_cycle() { let mut a = end_node("a"); a.next = Some("b".into()); let mut b = end_node("b"); b.next = Some("a".into()); let graph = graph_with(vec![("a", a), ("b", b)], "a"); let result = validator().validate(&graph); assert!(!result.is_valid()); assert!( result .errors .iter() .any(|e| e.message.contains("Cycle detected")) ); } #[test] fn detects_self_loop_as_cycle() { let mut a = end_node("a"); a.next = Some("a".into()); let graph = graph_with(vec![("a", a)], "a"); let result = validator().validate(&graph); assert!( result .errors .iter() .any(|e| e.message.contains("Cycle detected")) ); } #[test] fn warns_on_unreachable_node() { let graph = graph_with( vec![("start", end_node("start")), ("orphan", end_node("orphan"))], "start", ); let result = validator().validate(&graph); assert!( result.warnings.iter().any( |w| w.node_id.as_deref() == Some("orphan") && w.message.contains("unreachable") ) ); } #[test] fn errors_when_graph_has_no_end_node_at_all() { let mut a = agent_node("a", "__no_such_agent__", Some("b")); let b = agent_node("b", "__no_such_agent__", None); a.next = Some("b".into()); let graph = graph_with(vec![("a", a), ("b", b)], "a"); let result = validator().validate(&graph); assert!( result .errors .iter() .any(|e| e.message.contains("no end nodes")), "errors: {:?}", result.errors ); } #[test] fn warns_when_end_exists_but_not_reachable() { let start = Node { id: "start".into(), description: String::new(), node_type: NodeType::Input(InputNode { question: "?".into(), default: None, validation: None, state_updates: None, }), next: None, }; let graph = graph_with( vec![("start", start), ("orphan_end", end_node("orphan_end"))], "start", ); let result = validator().validate(&graph); assert!(result.is_valid(), "unexpected errors: {:?}", result.errors); assert!( result .warnings .iter() .any(|w| w.message.contains("No end node is reachable")) ); } #[test] fn errors_when_script_file_missing() { let scr = script_node("s", "definitely-not-here.py", None); let mut start = end_node("start"); start.next = Some("s".into()); let graph = graph_with( vec![("start", start), ("s", scr), ("end", end_node("end"))], "start", ); let result = validator().validate(&graph); assert!( result .errors .iter() .any(|e| e.message.contains("Script file not found") && e.node_id.as_deref() == Some("s")) ); } #[test] fn errors_when_referenced_agent_missing() { let agent = agent_node("a", "__definitely_no_such_agent__", Some("end")); let graph = graph_with(vec![("a", agent), ("end", end_node("end"))], "a"); let result = validator().validate(&graph); assert!(result.errors.iter().any(|e| { e.message .contains("Agent '__definitely_no_such_agent__' not found") })); } #[test] fn errors_when_approval_option_has_no_route() { let approval = approval_node("ap", &["yes", "no"], &[("yes", "end")], "end"); let graph = graph_with(vec![("ap", approval), ("end", end_node("end"))], "ap"); let result = validator().validate(&graph); assert!( result .errors .iter() .any(|e| e.message.contains("'no' has no route defined")) ); } #[test] fn warns_when_approval_has_extra_route() { let approval = approval_node("ap", &["yes"], &[("yes", "end"), ("maybe", "end")], "end"); let graph = graph_with(vec![("ap", approval), ("end", end_node("end"))], "ap"); let result = validator().validate(&graph); assert!(result.warnings.iter().any(|w| { w.message .contains("Route 'maybe' has no corresponding option") })); } #[test] fn into_result_aggregates_all_errors() { let mut a = end_node("a"); a.next = Some("missing1".into()); let mut b = end_node("b"); b.next = Some("missing2".into()); let graph = graph_with(vec![("a", a), ("b", b)], "a"); let err = validator() .validate(&graph) .into_result() .unwrap_err() .to_string(); assert!(err.contains("missing1"), "got: {err}"); assert!(err.contains("missing2"), "got: {err}"); assert!(err.contains("validation failed"), "got: {err}"); } #[test] fn into_result_returns_ok_when_no_errors() { let mut start = end_node("start"); start.next = Some("end".into()); let graph = graph_with(vec![("start", start), ("end", end_node("end"))], "start"); assert!(validator().validate(&graph).into_result().is_ok()); } #[test] fn cycle_detector_treats_fan_out_diamond_as_a_valid_dag() { let mut start = end_node("start"); start.next = Some(NextTargets::Many(vec!["a".into(), "b".into()])); let mut a = end_node("a"); a.next = Some("join".into()); let mut b = end_node("b"); b.next = Some("join".into()); let mut join = end_node("join"); join.next = Some("end".into()); let graph = graph_with( vec![ ("start", start), ("a", a), ("b", b), ("join", join), ("end", end_node("end")), ], "start", ); let result = validator().validate(&graph); assert!( !result .errors .iter() .any(|e| e.message.contains("Cycle detected")), "fan-out diamond incorrectly reported as cycle: {:?}", result.errors ); } #[test] fn reachability_visits_every_member_of_many_next_targets() { let mut start = end_node("start"); start.next = Some(NextTargets::Many(vec!["a".into(), "b".into(), "c".into()])); let graph = graph_with( vec![ ("start", start), ("a", end_node("a")), ("b", end_node("b")), ("c", end_node("c")), ], "start", ); let result = validator().validate(&graph); for orphan in ["a", "b", "c"] { assert!( !result .warnings .iter() .any(|w| w.node_id.as_deref() == Some(orphan) && w.message.contains("unreachable")), "fan-out target '{orphan}' incorrectly marked unreachable: {:?}", result.warnings ); } } #[test] fn node_reference_check_catches_missing_member_inside_many() { let mut start = end_node("start"); start.next = Some(NextTargets::Many(vec!["a".into(), "ghost".into()])); let graph = graph_with(vec![("start", start), ("a", end_node("a"))], "start"); let result = validator().validate(&graph); assert!( result .errors .iter() .any(|e| e.message.contains("non-existent node 'ghost'") && e.node_id.as_deref() == Some("start")), "expected error for missing 'ghost' target in Many: {:?}", result.errors ); } #[test] fn node_reference_check_catches_missing_map_branch_target() { let map = Node { id: "fan".into(), description: String::new(), node_type: NodeType::Map(MapNode { over: "{{items}}".into(), as_name: "item".into(), branch: "no_such_node".into(), output_key: "output".into(), collect_into: "results".into(), max_concurrency: None, }), next: Some("end".into()), }; let graph = graph_with(vec![("fan", map), ("end", end_node("end"))], "fan"); let result = validator().validate(&graph); assert!( result .errors .iter() .any(|e| e.message.contains("non-existent node 'no_such_node'") && e.message.contains("map 'branch'")), "expected error for missing map branch: {:?}", result.errors ); } fn map_node_basic(id: &str, branch: &str, next: Option<&str>) -> Node { Node { id: id.into(), description: String::new(), node_type: NodeType::Map(MapNode { over: "{{items}}".into(), as_name: "item".into(), branch: branch.into(), output_key: "output".into(), collect_into: "results".into(), max_concurrency: None, }), next: next.map(NextTargets::from), } } fn llm_with_state_updates(id: &str, updates: &[(&str, &str)], next: Option<&str>) -> Node { let mut node = llm_node(id, None, next); if let NodeType::Llm(ref mut n) = node.node_type { let mut map: HashMap = HashMap::new(); for (k, v) in updates { map.insert((*k).into(), (*v).into()); } n.state_updates = Some(map); } node } fn llm_with_output_schema(id: &str, properties: &[&str], next: Option<&str>) -> Node { let mut node = llm_node(id, None, next); if let NodeType::Llm(ref mut n) = node.node_type { let mut props = serde_json::Map::new(); for k in properties { props.insert((*k).to_string(), serde_json::json!({ "type": "string" })); } n.output_schema = Some(serde_json::json!({ "type": "object", "properties": props, })); } node } fn script_with_state_updates(id: &str, updates: &[(&str, &str)]) -> Node { let mut node = script_node(id, "Cargo.toml", None); if let NodeType::Script(ref mut n) = node.node_type { let mut map: HashMap = HashMap::new(); for (k, v) in updates { map.insert((*k).into(), (*v).into()); } n.state_updates = Some(map); } node } fn fan_out_graph_with_two_workers(worker_a: Node, worker_b: Node) -> Graph { let mut start = end_node("start"); start.next = Some(NextTargets::Many(vec![ "worker_a".into(), "worker_b".into(), ])); graph_with( vec![ ("start", start), ("worker_a", worker_a), ("worker_b", worker_b), ("end", end_node("end")), ], "start", ) } #[test] fn parallel_writes_to_same_key_without_reducer_errors() { let a = llm_with_state_updates("worker_a", &[("summary", "{{output}}")], Some("end")); let b = llm_with_state_updates("worker_b", &[("summary", "{{output}}")], Some("end")); let graph = fan_out_graph_with_two_workers(a, b); let result = validator().validate(&graph); assert!( result.errors.iter().any(|e| e .message .contains("nodes [worker_a, worker_b] all write key 'summary'")), "expected reducer-collision error for `summary`: {:?}", result.errors ); } #[test] fn parallel_writes_to_same_key_with_reducer_pass() { let a = llm_with_state_updates("worker_a", &[("summary", "{{output}}")], Some("end")); let b = llm_with_state_updates("worker_b", &[("summary", "{{output}}")], Some("end")); let mut graph = fan_out_graph_with_two_workers(a, b); graph.reducers.insert("summary".into(), Reducer::Concat); let result = validator().validate(&graph); assert!( !result .errors .iter() .any(|e| e.message.contains("no reducer is declared")), "expected no reducer-collision error when reducer is declared: {:?}", result.errors ); } #[test] fn parallel_writes_to_disjoint_keys_pass() { let a = llm_with_state_updates("worker_a", &[("a_out", "{{output}}")], Some("end")); let b = llm_with_state_updates("worker_b", &[("b_out", "{{output}}")], Some("end")); let graph = fan_out_graph_with_two_workers(a, b); let result = validator().validate(&graph); assert!( !result .errors .iter() .any(|e| e.message.contains("no reducer is declared")), "expected no collision for disjoint keys: {:?}", result.errors ); } #[test] fn output_schema_top_level_keys_count_as_parallel_writes() { let a = llm_with_output_schema("worker_a", &["summary"], Some("end")); let b = llm_with_output_schema("worker_b", &["summary"], Some("end")); let graph = fan_out_graph_with_two_workers(a, b); let result = validator().validate(&graph); assert!( result.errors.iter().any(|e| e .message .contains("nodes [worker_a, worker_b] all write key 'summary'")), "output_schema top-level keys should count as writes: {:?}", result.errors ); } #[test] fn three_parallel_writers_collision_lists_all_writers() { let mut start = end_node("start"); start.next = Some(NextTargets::Many(vec!["a".into(), "b".into(), "c".into()])); let graph = graph_with( vec![ ("start", start), ( "a", llm_with_state_updates("a", &[("k", "{{output}}")], Some("end")), ), ( "b", llm_with_state_updates("b", &[("k", "{{output}}")], Some("end")), ), ( "c", llm_with_state_updates("c", &[("k", "{{output}}")], Some("end")), ), ("end", end_node("end")), ], "start", ); let result = validator().validate(&graph); assert!( result .errors .iter() .any(|e| e.message.contains("nodes [a, b, c]") && e.message.contains("'k'")), "expected error listing all three writers a, b, c: {:?}", result.errors ); } #[test] fn approval_node_as_immediate_fan_out_target_errors() { let approval = approval_node("ap", &["yes"], &[("yes", "end")], "end"); let other = end_node("other"); let mut start = end_node("start"); start.next = Some(NextTargets::Many(vec!["ap".into(), "other".into()])); let graph = graph_with( vec![ ("start", start), ("ap", approval), ("other", other), ("end", end_node("end")), ], "start", ); let result = validator().validate(&graph); assert!( result .errors .iter() .any(|e| e.message.contains("approval node") && e.message.contains("fan-out") && e.node_id.as_deref() == Some("ap")), "expected approval-in-fan-out error: {:?}", result.errors ); } #[test] fn input_node_as_immediate_fan_out_target_errors() { let input = Node { id: "in".into(), description: String::new(), node_type: NodeType::Input(InputNode { question: "?".into(), default: None, validation: None, state_updates: None, }), next: Some("end".into()), }; let other = end_node("other"); let mut start = end_node("start"); start.next = Some(NextTargets::Many(vec!["in".into(), "other".into()])); let graph = graph_with( vec![ ("start", start), ("in", input), ("other", other), ("end", end_node("end")), ], "start", ); let result = validator().validate(&graph); assert!( result .errors .iter() .any(|e| e.message.contains("input node") && e.message.contains("fan-out") && e.node_id.as_deref() == Some("in")), "expected input-in-fan-out error: {:?}", result.errors ); } #[test] fn approval_after_join_passes() { let mut start = end_node("start"); start.next = Some(NextTargets::Many(vec!["a".into(), "b".into()])); let mut a = end_node("a"); a.next = Some("ap".into()); let mut b = end_node("b"); b.next = Some("ap".into()); let approval = approval_node("ap", &["yes"], &[("yes", "end")], "end"); let graph = graph_with( vec![ ("start", start), ("a", a), ("b", b), ("ap", approval), ("end", end_node("end")), ], "start", ); let result = validator().validate(&graph); assert!( !result .errors .iter() .any(|e| e.message.contains("fan-out") && e.node_id.as_deref() == Some("ap")), "approval AFTER join should be fine (v1 only checks immediate successors): {:?}", result.errors ); } #[test] fn map_branch_cannot_be_approval() { let map = map_node_basic("m", "br", Some("end")); let branch = approval_node("br", &["yes"], &[("yes", "end")], "end"); let graph = graph_with( vec![("m", map), ("br", branch), ("end", end_node("end"))], "m", ); let result = validator().validate(&graph); assert!( result .errors .iter() .any(|e| e.message.contains("approval node") && e.message.contains("map branch") && e.node_id.as_deref() == Some("m")), "expected map-branch-is-approval error: {:?}", result.errors ); } #[test] fn map_branch_cannot_be_input() { let map = map_node_basic("m", "br", Some("end")); let branch = Node { id: "br".into(), description: String::new(), node_type: NodeType::Input(InputNode { question: "?".into(), default: None, validation: None, state_updates: None, }), next: Some("end".into()), }; let graph = graph_with( vec![("m", map), ("br", branch), ("end", end_node("end"))], "m", ); let result = validator().validate(&graph); assert!( result .errors .iter() .any(|e| e.message.contains("input node") && e.message.contains("map branch") && e.node_id.as_deref() == Some("m")), "expected map-branch-is-input error: {:?}", result.errors ); } #[test] fn map_branch_cannot_be_end() { let map = map_node_basic("m", "br", Some("done")); let graph = graph_with( vec![ ("m", map), ("br", end_node("br")), ("done", end_node("done")), ], "m", ); let result = validator().validate(&graph); assert!( result.errors.iter().any(|e| e.message.contains("end node") && e.message.contains("collect mechanism") && e.node_id.as_deref() == Some("m")), "expected map-branch-is-end error: {:?}", result.errors ); } #[test] fn map_branch_cannot_be_another_map() { let outer = map_node_basic("outer", "inner", Some("end")); let inner = map_node_basic("inner", "end", Some("end")); let graph = graph_with( vec![("outer", outer), ("inner", inner), ("end", end_node("end"))], "outer", ); let result = validator().validate(&graph); assert!( result .errors .iter() .any(|e| e.message.contains("itself a map node") && e.node_id.as_deref() == Some("outer")), "expected nested-map error: {:?}", result.errors ); } #[test] fn map_branch_cannot_have_next_declared() { let map = map_node_basic("m", "br", Some("end")); let branch = llm_with_state_updates("br", &[("output", "{{output}}")], Some("somewhere")); let graph = graph_with( vec![ ("m", map), ("br", branch), ("somewhere", end_node("somewhere")), ("end", end_node("end")), ], "m", ); let result = validator().validate(&graph); assert!( result .errors .iter() .any(|e| e.message.contains("has a `next` declared") && e.message.contains("atomic") && e.node_id.as_deref() == Some("br")), "expected branch-has-next error: {:?}", result.errors ); } #[test] fn map_branch_state_updates_matching_output_key_passes() { let map = map_node_basic("m", "br", Some("end")); let branch = llm_with_state_updates("br", &[("output", "{{output}}")], None); let graph = graph_with( vec![("m", map), ("br", branch), ("end", end_node("end"))], "m", ); let result = validator().validate(&graph); assert!( !result.errors.iter().any(|e| e .message .contains("map branches may only write through their `output_key`")), "valid map branch should not error on writes: {:?}", result.errors ); } #[test] fn map_branch_state_updates_wrong_key_errors() { let map = map_node_basic("m", "br", Some("end")); let branch = llm_with_state_updates("br", &[("not_output", "{{output}}")], None); let graph = graph_with( vec![("m", map), ("br", branch), ("end", end_node("end"))], "m", ); let result = validator().validate(&graph); assert!( result .errors .iter() .any(|e| e.message.contains("writes state key 'not_output'") && e.message.contains("'output'") && e.node_id.as_deref() == Some("br")), "expected wrong-key error: {:?}", result.errors ); } #[test] fn map_branch_with_output_schema_errors() { let map = map_node_basic("m", "br", Some("end")); let branch = llm_with_output_schema("br", &["foo", "bar"], None); let graph = graph_with( vec![("m", map), ("br", branch), ("end", end_node("end"))], "m", ); let result = validator().validate(&graph); assert!( result .errors .iter() .any(|e| e.message.contains("output_schema") && e.message.contains("top-level properties") && e.node_id.as_deref() == Some("br")), "expected output_schema-forbidden error: {:?}", result.errors ); } #[test] fn script_in_fan_out_without_state_updates_errors() { let a = script_node("worker_a", "Cargo.toml", None); let b = end_node("worker_b"); let graph = fan_out_graph_with_two_workers(a, b); let result = validator().validate(&graph); assert!( result.errors.iter().any( |e| e.message.contains("script node is in a parallel branch") && e.message.contains("no `state_updates`") && e.node_id.as_deref() == Some("worker_a") ), "expected script-no-state-updates-in-parallel error: {:?}", result.errors ); } #[test] fn script_in_fan_out_with_state_updates_passes() { let a = script_with_state_updates("worker_a", &[("result_a", "{{output.x}}")]); let b = end_node("worker_b"); let graph = fan_out_graph_with_two_workers(a, b); let result = validator().validate(&graph); assert!( !result .errors .iter() .any(|e| e.message.contains("script node is in a parallel branch")), "script with state_updates should pass C.6: {:?}", result.errors ); } #[test] fn script_outside_fan_out_without_state_updates_passes() { let mut start = end_node("start"); start.next = Some("s".into()); let s = script_node("s", "Cargo.toml", None); let graph = graph_with( vec![("start", start), ("s", s), ("end", end_node("end"))], "start", ); let result = validator().validate(&graph); assert!( !result .errors .iter() .any(|e| e.message.contains("parallel branch")), "script outside fan-out should not trigger C.6: {:?}", result.errors ); } #[test] fn settings_max_concurrency_zero_errors() { let mut graph = graph_with(vec![("e", end_node("e"))], "e"); graph.settings.max_concurrency = 0; let result = validator().validate(&graph); assert!( result .errors .iter() .any(|e| e.message.contains("settings.max_concurrency must be >= 1")), "expected graph-level max_concurrency=0 error: {:?}", result.errors ); } #[test] fn settings_max_concurrency_default_is_valid() { let graph = graph_with(vec![("e", end_node("e"))], "e"); let result = validator().validate(&graph); assert!( !result .errors .iter() .any(|e| e.message.contains("max_concurrency")), "default max_concurrency should not error: {:?}", result.errors ); } #[test] fn map_max_concurrency_zero_errors() { let mut map = map_node_basic("m", "br", Some("end")); if let NodeType::Map(ref mut mm) = map.node_type { mm.max_concurrency = Some(0); } let branch = llm_with_state_updates("br", &[("output", "{{output}}")], None); let graph = graph_with( vec![("m", map), ("br", branch), ("end", end_node("end"))], "m", ); let result = validator().validate(&graph); assert!( result.errors.iter().any(|e| e .message .contains("map node's `max_concurrency` must be >= 1") && e.node_id.as_deref() == Some("m")), "expected map max_concurrency=0 error: {:?}", result.errors ); } #[test] fn map_max_concurrency_none_is_valid() { let map = map_node_basic("m", "br", Some("end")); let branch = llm_with_state_updates("br", &[("output", "{{output}}")], None); let graph = graph_with( vec![("m", map), ("br", branch), ("end", end_node("end"))], "m", ); let result = validator().validate(&graph); assert!( !result .errors .iter() .any(|e| e.message.contains("max_concurrency")), "map without max_concurrency should not error: {:?}", result.errors ); } fn llm_with_prompt(id: &str, prompt: &str, next: Option<&str>) -> Node { let mut node = llm_node(id, None, next); if let NodeType::Llm(ref mut n) = node.node_type { n.prompt = prompt.into(); } node } #[test] fn parallel_read_of_sibling_write_errors() { let reader = llm_with_prompt("worker_a", "Hello {{summary}}!", Some("end")); let writer = llm_with_state_updates("worker_b", &[("summary", "static")], Some("end")); let graph = fan_out_graph_with_two_workers(reader, writer); let result = validator().validate(&graph); assert!( result .errors .iter() .any(|e| e.message.contains("reads state key(s) `summary`") && e.message.contains("'worker_b'")), "expected cross-branch read error mentioning `summary` and sibling writer: {:?}", result.errors ); } #[test] fn parallel_read_of_upstream_key_passes() { let reader_a = llm_with_prompt("worker_a", "Topic is {{topic}}", Some("end")); let reader_b = llm_with_prompt("worker_b", "Also {{topic}}", Some("end")); let graph = fan_out_graph_with_two_workers(reader_a, reader_b); let result = validator().validate(&graph); assert!( !result .errors .iter() .any(|e| e.message.contains("reads state key")), "upstream `topic` shouldn't trigger cross-branch read error: {:?}", result.errors ); } #[test] fn scoped_output_var_in_state_updates_not_treated_as_read() { let scoped_user = llm_with_state_updates("worker_a", &[("a_key", "{{output}}")], Some("end")); let writes_output = llm_with_state_updates("worker_b", &[("output", "{{output}}")], Some("end")); let graph = fan_out_graph_with_two_workers(scoped_user, writes_output); let result = validator().validate(&graph); assert!( !result .errors .iter() .any(|e| e.message.contains("reads state key(s) `output`") && e.message.contains("worker_a")), "scoped `{{{{output}}}}` inside state_updates value should NOT be treated as a read: {:?}", result.errors ); } #[test] fn rag_query_reading_sibling_script_write_errors() { let mut rag = rag_node("worker_a", &["./k"], true); if let NodeType::Rag(ref mut n) = rag.node_type { n.query = Some("codes: {{loinc_codes}}\n{{db_result}}".into()); if let Some(m) = n.state_updates.as_mut() { m.insert("rag_ctx".into(), "{{output.context}}".into()); } } rag.next = Some("end".into()); let mut script = script_with_state_updates("worker_b", &[("db_result", "{{output}}")]); script.next = Some("end".into()); let graph = fan_out_graph_with_two_workers(rag, script); let result = validator().validate(&graph); assert!( result .errors .iter() .any(|e| e.message.contains("reads state key(s) `db_result`") && e.message.contains("'worker_b'")), "expected cross-branch read error for rag query reading db_result: {:?}", result.errors ); } #[test] fn map_over_reading_sibling_write_errors() { let map_n = Node { id: "fan".into(), description: String::new(), node_type: NodeType::Map(MapNode { over: "{{items}}".into(), as_name: "item".into(), branch: "branch_n".into(), output_key: "output".into(), collect_into: "results".into(), max_concurrency: None, }), next: Some("end".into()), }; let branch_n = llm_with_prompt("branch_n", "Process {{item}}", None); let producer = llm_with_state_updates("producer", &[("items", "[1,2,3]")], Some("end")); let mut start = end_node("start"); start.next = Some(NextTargets::Many(vec!["fan".into(), "producer".into()])); let graph = graph_with( vec![ ("start", start), ("fan", map_n), ("branch_n", branch_n), ("producer", producer), ("end", end_node("end")), ], "start", ); let result = validator().validate(&graph); assert!( result .errors .iter() .any(|e| e.message.contains("reads state key(s) `items`") && e.message.contains("'producer'")), "expected cross-branch read error for map `over` reading sibling write: {:?}", result.errors ); } }