From a9f2a5edc2c08e0c0a55316789f0d83bdc3360bf Mon Sep 17 00:00:00 2001 From: Alex Clarke Date: Wed, 20 May 2026 12:50:29 -0600 Subject: [PATCH] feat: validation support for parallel graph execution; restricted map nodes to only run for nodes without next targets and not supporting chained map nodes --- src/graph/validator.rs | 909 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 908 insertions(+), 1 deletion(-) diff --git a/src/graph/validator.rs b/src/graph/validator.rs index 90beb96..8785b4a 100644 --- a/src/graph/validator.rs +++ b/src/graph/validator.rs @@ -2,7 +2,7 @@ use super::types::{Graph, Node, NodeType}; use crate::client::{Model, ModelType}; use crate::config::{Agent, AppConfig, paths}; use anyhow::{Result, bail}; -use std::collections::{HashSet, VecDeque}; +use std::collections::{BTreeMap, HashSet, VecDeque}; use std::path::PathBuf; use std::sync::Arc; @@ -118,6 +118,10 @@ impl GraphValidator { self.validate_approval_routes(graph, &mut result); self.validate_rag_nodes(graph, &mut result); self.validate_llm_nodes(graph, &mut result); + self.validate_max_concurrency(graph, &mut result); + self.validate_map_branches(graph, &mut result); + self.validate_parallel_user_interaction(graph, &mut result); + self.validate_parallel_writes(graph, &mut result); result } @@ -314,6 +318,227 @@ impl GraphValidator { } } } + + // Phase C — Parallel-execution validation. + // + // The v1 algorithm uses immediate-successor analysis only: a parallel + // group is the set of `next:` targets of a single fan-out node. Map nodes + // are checked separately by `validate_map_branches` (the branch is + // self-parallel, but enforcement comes from strict-mode rules on the + // branch node, not from group membership). Transitive parallel groups + // (deeper fan-out chains) are a v2 enhancement; v1 over-reports rather + // than under-reports — a false positive forces an unneeded reducer + // (mild annoyance); a false negative allows silent data races (catastrophic). + + fn validate_max_concurrency(&self, graph: &Graph, result: &mut ValidationResult) { + if graph.settings.max_concurrency == 0 { + result.error(ValidationError::new( + "settings.max_concurrency must be >= 1 (got 0); a zero cap would \ + deadlock the executor", + )); + } + + for (node_id, node) in &graph.nodes { + if let NodeType::Map(m) = &node.node_type + && let Some(0) = m.max_concurrency + { + result.error(ValidationError::with_node( + node_id, + "map node's `max_concurrency` must be >= 1 (got 0); a zero cap \ + would deadlock the executor", + )); + } + } + } + + fn validate_map_branches(&self, graph: &Graph, result: &mut ValidationResult) { + for (map_id, node) in &graph.nodes { + let NodeType::Map(m) = &node.node_type else { + continue; + }; + let Some(branch) = graph.get_node(&m.branch) else { + continue; + }; + + match &branch.node_type { + NodeType::Approval(_) => { + result.error(ValidationError::with_node( + map_id, + format!( + "map node points to branch '{}' which is an approval node; \ + approval/input nodes cannot run inside a parallel map branch \ + (the CLI would prompt the user N times concurrently)", + m.branch + ), + )); + continue; + } + NodeType::Input(_) => { + result.error(ValidationError::with_node( + map_id, + format!( + "map node points to branch '{}' which is an input node; \ + input nodes cannot run inside a parallel map branch", + m.branch + ), + )); + continue; + } + NodeType::End(_) => { + result.error(ValidationError::with_node( + map_id, + format!( + "map node points to branch '{}' which is an end node; \ + map branches terminate via the map's collect mechanism, \ + not via end nodes", + m.branch + ), + )); + continue; + } + NodeType::Map(_) => { + result.error(ValidationError::with_node( + map_id, + format!( + "map node points to branch '{}' which is itself a map node; \ + nested map fan-outs are not supported in v1", + m.branch + ), + )); + continue; + } + _ => {} + } + + if branch.next.is_some() { + result.error(ValidationError::with_node( + m.branch.clone(), + format!( + "branch node '{}' has a `next` declared, but map branches must be \ + atomic (one node, one execution per item). Remove `next` or \ + restructure the workflow so any chaining happens after the map.", + m.branch + ), + )); + } + + if let Some(updates) = node_state_updates_keys(branch) { + for k in &updates { + if k != &m.output_key { + result.error(ValidationError::with_node( + m.branch.clone(), + format!( + "branch node '{}' writes state key '{}' via state_updates, \ + but map branches may only write through their `output_key` \ + ('{}'). Rename the write, or move the side effect outside \ + the map.", + m.branch, k, m.output_key + ), + )); + } + } + } + + let schema_keys = output_schema_top_level_keys(branch); + if !schema_keys.is_empty() { + let mut keys_sorted: Vec = schema_keys.into_iter().collect(); + keys_sorted.sort(); + result.error(ValidationError::with_node( + m.branch.clone(), + format!( + "branch node '{}' has an `output_schema` with top-level \ + properties ({}); map branches must write only through their \ + `output_key` ('{}'). Remove `output_schema`, or use state_updates \ + to map the output explicitly.", + m.branch, + keys_sorted.join(", "), + m.output_key + ), + )); + } + } + } + + fn validate_parallel_user_interaction(&self, graph: &Graph, result: &mut ValidationResult) { + for group in compute_parallel_groups(graph) { + for node_id in &group { + let Some(node) = graph.get_node(node_id) else { + continue; + }; + match &node.node_type { + NodeType::Approval(_) => { + result.error(ValidationError::with_node( + node_id, + "approval node is an immediate target of a fan-out \ + (`next: [...]`); approvals must run after the join, \ + not inside a parallel branch", + )); + } + NodeType::Input(_) => { + result.error(ValidationError::with_node( + node_id, + "input node is an immediate target of a fan-out \ + (`next: [...]`); input nodes must run after the join, \ + not inside a parallel branch", + )); + } + _ => {} + } + } + } + } + + fn validate_parallel_writes(&self, graph: &Graph, result: &mut ValidationResult) { + for group in compute_parallel_groups(graph) { + let mut node_writes: Vec<(String, HashSet)> = Vec::new(); + for node_id in &group { + let Some(node) = graph.get_node(node_id) else { + continue; + }; + match write_set_of(node) { + Some(set) => node_writes.push((node_id.clone(), set)), + None => { + result.error(ValidationError::with_node( + node_id, + "script node is in a parallel branch but declares no \ + `state_updates`; parallel scripts must declare their writes \ + explicitly to avoid silent state collisions", + )); + } + } + } + + let mut writers_by_key: BTreeMap> = BTreeMap::new(); + for (nid, ws) in &node_writes { + for k in ws { + writers_by_key + .entry(k.clone()) + .or_default() + .push(nid.clone()); + } + } + + for (key, mut writers) in writers_by_key { + if writers.len() < 2 { + continue; + } + if graph.reducers.contains_key(&key) { + continue; + } + writers.sort(); + result.error(ValidationError::new(format!( + "nodes [{}] all write key '{}' in the same parallel super-step but \ + no reducer is declared for '{}'. Add `reducers: {{ {}: }}` \ + at the graph root (built-ins: append, extend, concat, sum, max, min, \ + merge, overwrite), or rename one node's output.", + writers.join(", "), + key, + key, + key, + ))); + } + } + } } fn declared_targets(node: &Node) -> Vec<(String, &'static str)> { @@ -381,6 +606,75 @@ fn find_reachable_nodes(graph: &Graph) -> HashSet { reachable } +// v1 parallel-group detection: only the immediate `next` targets of a fan-out +// node count as a parallel group. Map branches are handled separately by +// `validate_map_branches` (the branch's self-parallelism is checked via +// strict-mode rules on the branch node itself, not via group membership). +// +// Returns one HashSet per fan-out source; deeper transitive parallelism is +// intentionally out of scope for v1. +fn compute_parallel_groups(graph: &Graph) -> Vec> { + let mut groups = Vec::new(); + for node in graph.nodes.values() { + if let Some(targets) = &node.next + && targets.is_fan_out() + { + groups.push(targets.as_slice().iter().cloned().collect()); + } + } + groups +} + +// Computes the set of state keys this node can write to. +// +// Sources considered: +// - `state_updates` keys (every node type that has them) +// - `output_schema` top-level `properties` for `llm` and `agent` (auto-merge) +// +// Returns `None` only for script nodes with no declared `state_updates` — their +// emitted JSON is opaque to static analysis. The validator treats `None` as a +// load-time error when the script appears in a parallel group (C.6). +fn write_set_of(node: &Node) -> Option> { + if matches!(node.node_type, NodeType::Script(_)) && node_state_updates_keys(node).is_none() { + return None; + } + let mut writes = HashSet::new(); + if let Some(keys) = node_state_updates_keys(node) { + writes.extend(keys); + } + writes.extend(output_schema_top_level_keys(node)); + Some(writes) +} + +fn node_state_updates_keys(node: &Node) -> Option> { + let updates = match &node.node_type { + NodeType::Agent(n) => n.state_updates.as_ref(), + NodeType::Script(n) => n.state_updates.as_ref(), + NodeType::Approval(n) => n.state_updates.as_ref(), + NodeType::Input(n) => n.state_updates.as_ref(), + NodeType::Llm(n) => n.state_updates.as_ref(), + NodeType::Rag(n) => n.state_updates.as_ref(), + NodeType::End(n) => n.state_updates.as_ref(), + NodeType::Map(_) => return None, + }; + updates.map(|m| m.keys().cloned().collect()) +} + +fn output_schema_top_level_keys(node: &Node) -> HashSet { + let schema = match &node.node_type { + NodeType::Agent(n) => n.output_schema.as_ref(), + NodeType::Llm(n) => n.output_schema.as_ref(), + _ => return HashSet::new(), + }; + let Some(schema) = schema else { + return HashSet::new(); + }; + let Some(properties) = schema.get("properties").and_then(|p| p.as_object()) else { + return HashSet::new(); + }; + properties.keys().cloned().collect() +} + fn detect_cycle_dfs( graph: &Graph, node_id: &str, @@ -1157,4 +1451,617 @@ mod tests { result.errors ); } + + fn map_node_basic(id: &str, branch: &str, next: Option<&str>) -> Node { + Node { + id: id.into(), + description: String::new(), + node_type: NodeType::Map(MapNode { + over: "{{items}}".into(), + as_name: "item".into(), + branch: branch.into(), + output_key: "output".into(), + collect_into: "results".into(), + max_concurrency: None, + }), + next: next.map(NextTargets::from), + } + } + + fn llm_with_state_updates(id: &str, updates: &[(&str, &str)], next: Option<&str>) -> Node { + let mut node = llm_node(id, None, next); + if let NodeType::Llm(ref mut n) = node.node_type { + let mut map: HashMap = HashMap::new(); + for (k, v) in updates { + map.insert((*k).into(), (*v).into()); + } + n.state_updates = Some(map); + } + node + } + + fn llm_with_output_schema(id: &str, properties: &[&str], next: Option<&str>) -> Node { + let mut node = llm_node(id, None, next); + if let NodeType::Llm(ref mut n) = node.node_type { + let mut props = serde_json::Map::new(); + for k in properties { + props.insert((*k).to_string(), serde_json::json!({ "type": "string" })); + } + n.output_schema = Some(serde_json::json!({ + "type": "object", + "properties": props, + })); + } + node + } + + fn script_with_state_updates(id: &str, updates: &[(&str, &str)]) -> Node { + let mut node = script_node(id, "Cargo.toml", None); + if let NodeType::Script(ref mut n) = node.node_type { + let mut map: HashMap = HashMap::new(); + for (k, v) in updates { + map.insert((*k).into(), (*v).into()); + } + n.state_updates = Some(map); + } + node + } + + fn fan_out_graph_with_two_workers(worker_a: Node, worker_b: Node) -> Graph { + let mut start = end_node("start"); + start.next = Some(NextTargets::Many(vec![ + "worker_a".into(), + "worker_b".into(), + ])); + graph_with( + vec![ + ("start", start), + ("worker_a", worker_a), + ("worker_b", worker_b), + ("end", end_node("end")), + ], + "start", + ) + } + + #[test] + fn parallel_writes_to_same_key_without_reducer_errors() { + let a = llm_with_state_updates("worker_a", &[("summary", "{{output}}")], Some("end")); + let b = llm_with_state_updates("worker_b", &[("summary", "{{output}}")], Some("end")); + let graph = fan_out_graph_with_two_workers(a, b); + + let result = validator().validate(&graph); + + assert!( + result.errors.iter().any(|e| e + .message + .contains("nodes [worker_a, worker_b] all write key 'summary'")), + "expected reducer-collision error for `summary`: {:?}", + result.errors + ); + } + + #[test] + fn parallel_writes_to_same_key_with_reducer_pass() { + let a = llm_with_state_updates("worker_a", &[("summary", "{{output}}")], Some("end")); + let b = llm_with_state_updates("worker_b", &[("summary", "{{output}}")], Some("end")); + let mut graph = fan_out_graph_with_two_workers(a, b); + graph.reducers.insert("summary".into(), Reducer::Concat); + + let result = validator().validate(&graph); + + assert!( + !result + .errors + .iter() + .any(|e| e.message.contains("no reducer is declared")), + "expected no reducer-collision error when reducer is declared: {:?}", + result.errors + ); + } + + #[test] + fn parallel_writes_to_disjoint_keys_pass() { + let a = llm_with_state_updates("worker_a", &[("a_out", "{{output}}")], Some("end")); + let b = llm_with_state_updates("worker_b", &[("b_out", "{{output}}")], Some("end")); + let graph = fan_out_graph_with_two_workers(a, b); + + let result = validator().validate(&graph); + + assert!( + !result + .errors + .iter() + .any(|e| e.message.contains("no reducer is declared")), + "expected no collision for disjoint keys: {:?}", + result.errors + ); + } + + #[test] + fn output_schema_top_level_keys_count_as_parallel_writes() { + let a = llm_with_output_schema("worker_a", &["summary"], Some("end")); + let b = llm_with_output_schema("worker_b", &["summary"], Some("end")); + let graph = fan_out_graph_with_two_workers(a, b); + + let result = validator().validate(&graph); + + assert!( + result.errors.iter().any(|e| e + .message + .contains("nodes [worker_a, worker_b] all write key 'summary'")), + "output_schema top-level keys should count as writes: {:?}", + result.errors + ); + } + + #[test] + fn three_parallel_writers_collision_lists_all_writers() { + let mut start = end_node("start"); + start.next = Some(NextTargets::Many(vec!["a".into(), "b".into(), "c".into()])); + let graph = graph_with( + vec![ + ("start", start), + ( + "a", + llm_with_state_updates("a", &[("k", "{{output}}")], Some("end")), + ), + ( + "b", + llm_with_state_updates("b", &[("k", "{{output}}")], Some("end")), + ), + ( + "c", + llm_with_state_updates("c", &[("k", "{{output}}")], Some("end")), + ), + ("end", end_node("end")), + ], + "start", + ); + + let result = validator().validate(&graph); + + assert!( + result + .errors + .iter() + .any(|e| e.message.contains("nodes [a, b, c]") && e.message.contains("'k'")), + "expected error listing all three writers a, b, c: {:?}", + result.errors + ); + } + + #[test] + fn approval_node_as_immediate_fan_out_target_errors() { + let approval = approval_node("ap", &["yes"], &[("yes", "end")], "end"); + let other = end_node("other"); + let mut start = end_node("start"); + start.next = Some(NextTargets::Many(vec!["ap".into(), "other".into()])); + let graph = graph_with( + vec![ + ("start", start), + ("ap", approval), + ("other", other), + ("end", end_node("end")), + ], + "start", + ); + + let result = validator().validate(&graph); + + assert!( + result + .errors + .iter() + .any(|e| e.message.contains("approval node") + && e.message.contains("fan-out") + && e.node_id.as_deref() == Some("ap")), + "expected approval-in-fan-out error: {:?}", + result.errors + ); + } + + #[test] + fn input_node_as_immediate_fan_out_target_errors() { + let input = Node { + id: "in".into(), + description: String::new(), + node_type: NodeType::Input(InputNode { + question: "?".into(), + default: None, + validation: None, + state_updates: None, + }), + next: Some("end".into()), + }; + let other = end_node("other"); + let mut start = end_node("start"); + start.next = Some(NextTargets::Many(vec!["in".into(), "other".into()])); + let graph = graph_with( + vec![ + ("start", start), + ("in", input), + ("other", other), + ("end", end_node("end")), + ], + "start", + ); + + let result = validator().validate(&graph); + + assert!( + result + .errors + .iter() + .any(|e| e.message.contains("input node") + && e.message.contains("fan-out") + && e.node_id.as_deref() == Some("in")), + "expected input-in-fan-out error: {:?}", + result.errors + ); + } + + #[test] + fn approval_after_join_passes() { + let mut start = end_node("start"); + start.next = Some(NextTargets::Many(vec!["a".into(), "b".into()])); + let mut a = end_node("a"); + a.next = Some("ap".into()); + let mut b = end_node("b"); + b.next = Some("ap".into()); + let approval = approval_node("ap", &["yes"], &[("yes", "end")], "end"); + let graph = graph_with( + vec![ + ("start", start), + ("a", a), + ("b", b), + ("ap", approval), + ("end", end_node("end")), + ], + "start", + ); + + let result = validator().validate(&graph); + + assert!( + !result + .errors + .iter() + .any(|e| e.message.contains("fan-out") && e.node_id.as_deref() == Some("ap")), + "approval AFTER join should be fine (v1 only checks immediate successors): {:?}", + result.errors + ); + } + + #[test] + fn map_branch_cannot_be_approval() { + let map = map_node_basic("m", "br", Some("end")); + let branch = approval_node("br", &["yes"], &[("yes", "end")], "end"); + let graph = graph_with( + vec![("m", map), ("br", branch), ("end", end_node("end"))], + "m", + ); + + let result = validator().validate(&graph); + + assert!( + result + .errors + .iter() + .any(|e| e.message.contains("approval node") + && e.message.contains("map branch") + && e.node_id.as_deref() == Some("m")), + "expected map-branch-is-approval error: {:?}", + result.errors + ); + } + + #[test] + fn map_branch_cannot_be_input() { + let map = map_node_basic("m", "br", Some("end")); + let branch = Node { + id: "br".into(), + description: String::new(), + node_type: NodeType::Input(InputNode { + question: "?".into(), + default: None, + validation: None, + state_updates: None, + }), + next: Some("end".into()), + }; + let graph = graph_with( + vec![("m", map), ("br", branch), ("end", end_node("end"))], + "m", + ); + + let result = validator().validate(&graph); + + assert!( + result + .errors + .iter() + .any(|e| e.message.contains("input node") + && e.message.contains("map branch") + && e.node_id.as_deref() == Some("m")), + "expected map-branch-is-input error: {:?}", + result.errors + ); + } + + #[test] + fn map_branch_cannot_be_end() { + let map = map_node_basic("m", "br", Some("done")); + let graph = graph_with( + vec![ + ("m", map), + ("br", end_node("br")), + ("done", end_node("done")), + ], + "m", + ); + + let result = validator().validate(&graph); + + assert!( + result.errors.iter().any(|e| e.message.contains("end node") + && e.message.contains("collect mechanism") + && e.node_id.as_deref() == Some("m")), + "expected map-branch-is-end error: {:?}", + result.errors + ); + } + + #[test] + fn map_branch_cannot_be_another_map() { + let outer = map_node_basic("outer", "inner", Some("end")); + let inner = map_node_basic("inner", "end", Some("end")); + let graph = graph_with( + vec![("outer", outer), ("inner", inner), ("end", end_node("end"))], + "outer", + ); + + let result = validator().validate(&graph); + + assert!( + result + .errors + .iter() + .any(|e| e.message.contains("itself a map node") + && e.node_id.as_deref() == Some("outer")), + "expected nested-map error: {:?}", + result.errors + ); + } + + #[test] + fn map_branch_cannot_have_next_declared() { + let map = map_node_basic("m", "br", Some("end")); + let branch = llm_with_state_updates("br", &[("output", "{{output}}")], Some("somewhere")); + let graph = graph_with( + vec![ + ("m", map), + ("br", branch), + ("somewhere", end_node("somewhere")), + ("end", end_node("end")), + ], + "m", + ); + + let result = validator().validate(&graph); + + assert!( + result + .errors + .iter() + .any(|e| e.message.contains("has a `next` declared") + && e.message.contains("atomic") + && e.node_id.as_deref() == Some("br")), + "expected branch-has-next error: {:?}", + result.errors + ); + } + + #[test] + fn map_branch_state_updates_matching_output_key_passes() { + let map = map_node_basic("m", "br", Some("end")); + let branch = llm_with_state_updates("br", &[("output", "{{output}}")], None); + let graph = graph_with( + vec![("m", map), ("br", branch), ("end", end_node("end"))], + "m", + ); + + let result = validator().validate(&graph); + + assert!( + !result.errors.iter().any(|e| e + .message + .contains("map branches may only write through their `output_key`")), + "valid map branch should not error on writes: {:?}", + result.errors + ); + } + + #[test] + fn map_branch_state_updates_wrong_key_errors() { + let map = map_node_basic("m", "br", Some("end")); + let branch = llm_with_state_updates("br", &[("not_output", "{{output}}")], None); + let graph = graph_with( + vec![("m", map), ("br", branch), ("end", end_node("end"))], + "m", + ); + + let result = validator().validate(&graph); + + assert!( + result + .errors + .iter() + .any(|e| e.message.contains("writes state key 'not_output'") + && e.message.contains("'output'") + && e.node_id.as_deref() == Some("br")), + "expected wrong-key error: {:?}", + result.errors + ); + } + + #[test] + fn map_branch_with_output_schema_errors() { + let map = map_node_basic("m", "br", Some("end")); + let branch = llm_with_output_schema("br", &["foo", "bar"], None); + let graph = graph_with( + vec![("m", map), ("br", branch), ("end", end_node("end"))], + "m", + ); + + let result = validator().validate(&graph); + + assert!( + result + .errors + .iter() + .any(|e| e.message.contains("output_schema") + && e.message.contains("top-level properties") + && e.node_id.as_deref() == Some("br")), + "expected output_schema-forbidden error: {:?}", + result.errors + ); + } + + #[test] + fn script_in_fan_out_without_state_updates_errors() { + let a = script_node("worker_a", "Cargo.toml", None); + let b = end_node("worker_b"); + let graph = fan_out_graph_with_two_workers(a, b); + + let result = validator().validate(&graph); + + assert!( + result.errors.iter().any( + |e| e.message.contains("script node is in a parallel branch") + && e.message.contains("no `state_updates`") + && e.node_id.as_deref() == Some("worker_a") + ), + "expected script-no-state-updates-in-parallel error: {:?}", + result.errors + ); + } + + #[test] + fn script_in_fan_out_with_state_updates_passes() { + let a = script_with_state_updates("worker_a", &[("result_a", "{{output.x}}")]); + let b = end_node("worker_b"); + let graph = fan_out_graph_with_two_workers(a, b); + + let result = validator().validate(&graph); + + assert!( + !result + .errors + .iter() + .any(|e| e.message.contains("script node is in a parallel branch")), + "script with state_updates should pass C.6: {:?}", + result.errors + ); + } + + #[test] + fn script_outside_fan_out_without_state_updates_passes() { + let mut start = end_node("start"); + start.next = Some("s".into()); + let s = script_node("s", "Cargo.toml", None); + let graph = graph_with( + vec![("start", start), ("s", s), ("end", end_node("end"))], + "start", + ); + + let result = validator().validate(&graph); + + assert!( + !result + .errors + .iter() + .any(|e| e.message.contains("parallel branch")), + "script outside fan-out should not trigger C.6: {:?}", + result.errors + ); + } + + #[test] + fn settings_max_concurrency_zero_errors() { + let mut graph = graph_with(vec![("e", end_node("e"))], "e"); + graph.settings.max_concurrency = 0; + + let result = validator().validate(&graph); + + assert!( + result + .errors + .iter() + .any(|e| e.message.contains("settings.max_concurrency must be >= 1")), + "expected graph-level max_concurrency=0 error: {:?}", + result.errors + ); + } + + #[test] + fn settings_max_concurrency_default_is_valid() { + let graph = graph_with(vec![("e", end_node("e"))], "e"); + + let result = validator().validate(&graph); + + assert!( + !result + .errors + .iter() + .any(|e| e.message.contains("max_concurrency")), + "default max_concurrency should not error: {:?}", + result.errors + ); + } + + #[test] + fn map_max_concurrency_zero_errors() { + let mut map = map_node_basic("m", "br", Some("end")); + if let NodeType::Map(ref mut mm) = map.node_type { + mm.max_concurrency = Some(0); + } + let branch = llm_with_state_updates("br", &[("output", "{{output}}")], None); + let graph = graph_with( + vec![("m", map), ("br", branch), ("end", end_node("end"))], + "m", + ); + + let result = validator().validate(&graph); + + assert!( + result.errors.iter().any(|e| e + .message + .contains("map node's `max_concurrency` must be >= 1") + && e.node_id.as_deref() == Some("m")), + "expected map max_concurrency=0 error: {:?}", + result.errors + ); + } + + #[test] + fn map_max_concurrency_none_is_valid() { + let map = map_node_basic("m", "br", Some("end")); + let branch = llm_with_state_updates("br", &[("output", "{{output}}")], None); + let graph = graph_with( + vec![("m", map), ("br", branch), ("end", end_node("end"))], + "m", + ); + + let result = validator().validate(&graph); + + assert!( + !result + .errors + .iter() + .any(|e| e.message.contains("max_concurrency")), + "map without max_concurrency should not error: {:?}", + result.errors + ); + } }