feat: validation support for parallel graph execution; restricted map nodes to only run for nodes without next targets and not supporting chained map nodes
This commit is contained in:
+908
-1
@@ -2,7 +2,7 @@ use super::types::{Graph, Node, NodeType};
|
||||
use crate::client::{Model, ModelType};
|
||||
use crate::config::{Agent, AppConfig, paths};
|
||||
use anyhow::{Result, bail};
|
||||
use std::collections::{HashSet, VecDeque};
|
||||
use std::collections::{BTreeMap, HashSet, VecDeque};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -118,6 +118,10 @@ impl GraphValidator {
|
||||
self.validate_approval_routes(graph, &mut result);
|
||||
self.validate_rag_nodes(graph, &mut result);
|
||||
self.validate_llm_nodes(graph, &mut result);
|
||||
self.validate_max_concurrency(graph, &mut result);
|
||||
self.validate_map_branches(graph, &mut result);
|
||||
self.validate_parallel_user_interaction(graph, &mut result);
|
||||
self.validate_parallel_writes(graph, &mut result);
|
||||
|
||||
result
|
||||
}
|
||||
@@ -314,6 +318,227 @@ impl GraphValidator {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Phase C — Parallel-execution validation.
|
||||
//
|
||||
// The v1 algorithm uses immediate-successor analysis only: a parallel
|
||||
// group is the set of `next:` targets of a single fan-out node. Map nodes
|
||||
// are checked separately by `validate_map_branches` (the branch is
|
||||
// self-parallel, but enforcement comes from strict-mode rules on the
|
||||
// branch node, not from group membership). Transitive parallel groups
|
||||
// (deeper fan-out chains) are a v2 enhancement; v1 over-reports rather
|
||||
// than under-reports — a false positive forces an unneeded reducer
|
||||
// (mild annoyance); a false negative allows silent data races (catastrophic).
|
||||
|
||||
fn validate_max_concurrency(&self, graph: &Graph, result: &mut ValidationResult) {
|
||||
if graph.settings.max_concurrency == 0 {
|
||||
result.error(ValidationError::new(
|
||||
"settings.max_concurrency must be >= 1 (got 0); a zero cap would \
|
||||
deadlock the executor",
|
||||
));
|
||||
}
|
||||
|
||||
for (node_id, node) in &graph.nodes {
|
||||
if let NodeType::Map(m) = &node.node_type
|
||||
&& let Some(0) = m.max_concurrency
|
||||
{
|
||||
result.error(ValidationError::with_node(
|
||||
node_id,
|
||||
"map node's `max_concurrency` must be >= 1 (got 0); a zero cap \
|
||||
would deadlock the executor",
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_map_branches(&self, graph: &Graph, result: &mut ValidationResult) {
|
||||
for (map_id, node) in &graph.nodes {
|
||||
let NodeType::Map(m) = &node.node_type else {
|
||||
continue;
|
||||
};
|
||||
let Some(branch) = graph.get_node(&m.branch) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
match &branch.node_type {
|
||||
NodeType::Approval(_) => {
|
||||
result.error(ValidationError::with_node(
|
||||
map_id,
|
||||
format!(
|
||||
"map node points to branch '{}' which is an approval node; \
|
||||
approval/input nodes cannot run inside a parallel map branch \
|
||||
(the CLI would prompt the user N times concurrently)",
|
||||
m.branch
|
||||
),
|
||||
));
|
||||
continue;
|
||||
}
|
||||
NodeType::Input(_) => {
|
||||
result.error(ValidationError::with_node(
|
||||
map_id,
|
||||
format!(
|
||||
"map node points to branch '{}' which is an input node; \
|
||||
input nodes cannot run inside a parallel map branch",
|
||||
m.branch
|
||||
),
|
||||
));
|
||||
continue;
|
||||
}
|
||||
NodeType::End(_) => {
|
||||
result.error(ValidationError::with_node(
|
||||
map_id,
|
||||
format!(
|
||||
"map node points to branch '{}' which is an end node; \
|
||||
map branches terminate via the map's collect mechanism, \
|
||||
not via end nodes",
|
||||
m.branch
|
||||
),
|
||||
));
|
||||
continue;
|
||||
}
|
||||
NodeType::Map(_) => {
|
||||
result.error(ValidationError::with_node(
|
||||
map_id,
|
||||
format!(
|
||||
"map node points to branch '{}' which is itself a map node; \
|
||||
nested map fan-outs are not supported in v1",
|
||||
m.branch
|
||||
),
|
||||
));
|
||||
continue;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
if branch.next.is_some() {
|
||||
result.error(ValidationError::with_node(
|
||||
m.branch.clone(),
|
||||
format!(
|
||||
"branch node '{}' has a `next` declared, but map branches must be \
|
||||
atomic (one node, one execution per item). Remove `next` or \
|
||||
restructure the workflow so any chaining happens after the map.",
|
||||
m.branch
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(updates) = node_state_updates_keys(branch) {
|
||||
for k in &updates {
|
||||
if k != &m.output_key {
|
||||
result.error(ValidationError::with_node(
|
||||
m.branch.clone(),
|
||||
format!(
|
||||
"branch node '{}' writes state key '{}' via state_updates, \
|
||||
but map branches may only write through their `output_key` \
|
||||
('{}'). Rename the write, or move the side effect outside \
|
||||
the map.",
|
||||
m.branch, k, m.output_key
|
||||
),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let schema_keys = output_schema_top_level_keys(branch);
|
||||
if !schema_keys.is_empty() {
|
||||
let mut keys_sorted: Vec<String> = schema_keys.into_iter().collect();
|
||||
keys_sorted.sort();
|
||||
result.error(ValidationError::with_node(
|
||||
m.branch.clone(),
|
||||
format!(
|
||||
"branch node '{}' has an `output_schema` with top-level \
|
||||
properties ({}); map branches must write only through their \
|
||||
`output_key` ('{}'). Remove `output_schema`, or use state_updates \
|
||||
to map the output explicitly.",
|
||||
m.branch,
|
||||
keys_sorted.join(", "),
|
||||
m.output_key
|
||||
),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_parallel_user_interaction(&self, graph: &Graph, result: &mut ValidationResult) {
|
||||
for group in compute_parallel_groups(graph) {
|
||||
for node_id in &group {
|
||||
let Some(node) = graph.get_node(node_id) else {
|
||||
continue;
|
||||
};
|
||||
match &node.node_type {
|
||||
NodeType::Approval(_) => {
|
||||
result.error(ValidationError::with_node(
|
||||
node_id,
|
||||
"approval node is an immediate target of a fan-out \
|
||||
(`next: [...]`); approvals must run after the join, \
|
||||
not inside a parallel branch",
|
||||
));
|
||||
}
|
||||
NodeType::Input(_) => {
|
||||
result.error(ValidationError::with_node(
|
||||
node_id,
|
||||
"input node is an immediate target of a fan-out \
|
||||
(`next: [...]`); input nodes must run after the join, \
|
||||
not inside a parallel branch",
|
||||
));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_parallel_writes(&self, graph: &Graph, result: &mut ValidationResult) {
|
||||
for group in compute_parallel_groups(graph) {
|
||||
let mut node_writes: Vec<(String, HashSet<String>)> = Vec::new();
|
||||
for node_id in &group {
|
||||
let Some(node) = graph.get_node(node_id) else {
|
||||
continue;
|
||||
};
|
||||
match write_set_of(node) {
|
||||
Some(set) => node_writes.push((node_id.clone(), set)),
|
||||
None => {
|
||||
result.error(ValidationError::with_node(
|
||||
node_id,
|
||||
"script node is in a parallel branch but declares no \
|
||||
`state_updates`; parallel scripts must declare their writes \
|
||||
explicitly to avoid silent state collisions",
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut writers_by_key: BTreeMap<String, Vec<String>> = BTreeMap::new();
|
||||
for (nid, ws) in &node_writes {
|
||||
for k in ws {
|
||||
writers_by_key
|
||||
.entry(k.clone())
|
||||
.or_default()
|
||||
.push(nid.clone());
|
||||
}
|
||||
}
|
||||
|
||||
for (key, mut writers) in writers_by_key {
|
||||
if writers.len() < 2 {
|
||||
continue;
|
||||
}
|
||||
if graph.reducers.contains_key(&key) {
|
||||
continue;
|
||||
}
|
||||
writers.sort();
|
||||
result.error(ValidationError::new(format!(
|
||||
"nodes [{}] all write key '{}' in the same parallel super-step but \
|
||||
no reducer is declared for '{}'. Add `reducers: {{ {}: <reducer> }}` \
|
||||
at the graph root (built-ins: append, extend, concat, sum, max, min, \
|
||||
merge, overwrite), or rename one node's output.",
|
||||
writers.join(", "),
|
||||
key,
|
||||
key,
|
||||
key,
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn declared_targets(node: &Node) -> Vec<(String, &'static str)> {
|
||||
@@ -381,6 +606,75 @@ fn find_reachable_nodes(graph: &Graph) -> HashSet<String> {
|
||||
reachable
|
||||
}
|
||||
|
||||
// v1 parallel-group detection: only the immediate `next` targets of a fan-out
|
||||
// node count as a parallel group. Map branches are handled separately by
|
||||
// `validate_map_branches` (the branch's self-parallelism is checked via
|
||||
// strict-mode rules on the branch node itself, not via group membership).
|
||||
//
|
||||
// Returns one HashSet per fan-out source; deeper transitive parallelism is
|
||||
// intentionally out of scope for v1.
|
||||
fn compute_parallel_groups(graph: &Graph) -> Vec<HashSet<String>> {
|
||||
let mut groups = Vec::new();
|
||||
for node in graph.nodes.values() {
|
||||
if let Some(targets) = &node.next
|
||||
&& targets.is_fan_out()
|
||||
{
|
||||
groups.push(targets.as_slice().iter().cloned().collect());
|
||||
}
|
||||
}
|
||||
groups
|
||||
}
|
||||
|
||||
// Computes the set of state keys this node can write to.
|
||||
//
|
||||
// Sources considered:
|
||||
// - `state_updates` keys (every node type that has them)
|
||||
// - `output_schema` top-level `properties` for `llm` and `agent` (auto-merge)
|
||||
//
|
||||
// Returns `None` only for script nodes with no declared `state_updates` — their
|
||||
// emitted JSON is opaque to static analysis. The validator treats `None` as a
|
||||
// load-time error when the script appears in a parallel group (C.6).
|
||||
fn write_set_of(node: &Node) -> Option<HashSet<String>> {
|
||||
if matches!(node.node_type, NodeType::Script(_)) && node_state_updates_keys(node).is_none() {
|
||||
return None;
|
||||
}
|
||||
let mut writes = HashSet::new();
|
||||
if let Some(keys) = node_state_updates_keys(node) {
|
||||
writes.extend(keys);
|
||||
}
|
||||
writes.extend(output_schema_top_level_keys(node));
|
||||
Some(writes)
|
||||
}
|
||||
|
||||
fn node_state_updates_keys(node: &Node) -> Option<HashSet<String>> {
|
||||
let updates = match &node.node_type {
|
||||
NodeType::Agent(n) => n.state_updates.as_ref(),
|
||||
NodeType::Script(n) => n.state_updates.as_ref(),
|
||||
NodeType::Approval(n) => n.state_updates.as_ref(),
|
||||
NodeType::Input(n) => n.state_updates.as_ref(),
|
||||
NodeType::Llm(n) => n.state_updates.as_ref(),
|
||||
NodeType::Rag(n) => n.state_updates.as_ref(),
|
||||
NodeType::End(n) => n.state_updates.as_ref(),
|
||||
NodeType::Map(_) => return None,
|
||||
};
|
||||
updates.map(|m| m.keys().cloned().collect())
|
||||
}
|
||||
|
||||
fn output_schema_top_level_keys(node: &Node) -> HashSet<String> {
|
||||
let schema = match &node.node_type {
|
||||
NodeType::Agent(n) => n.output_schema.as_ref(),
|
||||
NodeType::Llm(n) => n.output_schema.as_ref(),
|
||||
_ => return HashSet::new(),
|
||||
};
|
||||
let Some(schema) = schema else {
|
||||
return HashSet::new();
|
||||
};
|
||||
let Some(properties) = schema.get("properties").and_then(|p| p.as_object()) else {
|
||||
return HashSet::new();
|
||||
};
|
||||
properties.keys().cloned().collect()
|
||||
}
|
||||
|
||||
fn detect_cycle_dfs(
|
||||
graph: &Graph,
|
||||
node_id: &str,
|
||||
@@ -1157,4 +1451,617 @@ mod tests {
|
||||
result.errors
|
||||
);
|
||||
}
|
||||
|
||||
fn map_node_basic(id: &str, branch: &str, next: Option<&str>) -> Node {
|
||||
Node {
|
||||
id: id.into(),
|
||||
description: String::new(),
|
||||
node_type: NodeType::Map(MapNode {
|
||||
over: "{{items}}".into(),
|
||||
as_name: "item".into(),
|
||||
branch: branch.into(),
|
||||
output_key: "output".into(),
|
||||
collect_into: "results".into(),
|
||||
max_concurrency: None,
|
||||
}),
|
||||
next: next.map(NextTargets::from),
|
||||
}
|
||||
}
|
||||
|
||||
fn llm_with_state_updates(id: &str, updates: &[(&str, &str)], next: Option<&str>) -> Node {
|
||||
let mut node = llm_node(id, None, next);
|
||||
if let NodeType::Llm(ref mut n) = node.node_type {
|
||||
let mut map: HashMap<String, String> = HashMap::new();
|
||||
for (k, v) in updates {
|
||||
map.insert((*k).into(), (*v).into());
|
||||
}
|
||||
n.state_updates = Some(map);
|
||||
}
|
||||
node
|
||||
}
|
||||
|
||||
fn llm_with_output_schema(id: &str, properties: &[&str], next: Option<&str>) -> Node {
|
||||
let mut node = llm_node(id, None, next);
|
||||
if let NodeType::Llm(ref mut n) = node.node_type {
|
||||
let mut props = serde_json::Map::new();
|
||||
for k in properties {
|
||||
props.insert((*k).to_string(), serde_json::json!({ "type": "string" }));
|
||||
}
|
||||
n.output_schema = Some(serde_json::json!({
|
||||
"type": "object",
|
||||
"properties": props,
|
||||
}));
|
||||
}
|
||||
node
|
||||
}
|
||||
|
||||
fn script_with_state_updates(id: &str, updates: &[(&str, &str)]) -> Node {
|
||||
let mut node = script_node(id, "Cargo.toml", None);
|
||||
if let NodeType::Script(ref mut n) = node.node_type {
|
||||
let mut map: HashMap<String, String> = HashMap::new();
|
||||
for (k, v) in updates {
|
||||
map.insert((*k).into(), (*v).into());
|
||||
}
|
||||
n.state_updates = Some(map);
|
||||
}
|
||||
node
|
||||
}
|
||||
|
||||
fn fan_out_graph_with_two_workers(worker_a: Node, worker_b: Node) -> Graph {
|
||||
let mut start = end_node("start");
|
||||
start.next = Some(NextTargets::Many(vec![
|
||||
"worker_a".into(),
|
||||
"worker_b".into(),
|
||||
]));
|
||||
graph_with(
|
||||
vec![
|
||||
("start", start),
|
||||
("worker_a", worker_a),
|
||||
("worker_b", worker_b),
|
||||
("end", end_node("end")),
|
||||
],
|
||||
"start",
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parallel_writes_to_same_key_without_reducer_errors() {
|
||||
let a = llm_with_state_updates("worker_a", &[("summary", "{{output}}")], Some("end"));
|
||||
let b = llm_with_state_updates("worker_b", &[("summary", "{{output}}")], Some("end"));
|
||||
let graph = fan_out_graph_with_two_workers(a, b);
|
||||
|
||||
let result = validator().validate(&graph);
|
||||
|
||||
assert!(
|
||||
result.errors.iter().any(|e| e
|
||||
.message
|
||||
.contains("nodes [worker_a, worker_b] all write key 'summary'")),
|
||||
"expected reducer-collision error for `summary`: {:?}",
|
||||
result.errors
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parallel_writes_to_same_key_with_reducer_pass() {
|
||||
let a = llm_with_state_updates("worker_a", &[("summary", "{{output}}")], Some("end"));
|
||||
let b = llm_with_state_updates("worker_b", &[("summary", "{{output}}")], Some("end"));
|
||||
let mut graph = fan_out_graph_with_two_workers(a, b);
|
||||
graph.reducers.insert("summary".into(), Reducer::Concat);
|
||||
|
||||
let result = validator().validate(&graph);
|
||||
|
||||
assert!(
|
||||
!result
|
||||
.errors
|
||||
.iter()
|
||||
.any(|e| e.message.contains("no reducer is declared")),
|
||||
"expected no reducer-collision error when reducer is declared: {:?}",
|
||||
result.errors
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parallel_writes_to_disjoint_keys_pass() {
|
||||
let a = llm_with_state_updates("worker_a", &[("a_out", "{{output}}")], Some("end"));
|
||||
let b = llm_with_state_updates("worker_b", &[("b_out", "{{output}}")], Some("end"));
|
||||
let graph = fan_out_graph_with_two_workers(a, b);
|
||||
|
||||
let result = validator().validate(&graph);
|
||||
|
||||
assert!(
|
||||
!result
|
||||
.errors
|
||||
.iter()
|
||||
.any(|e| e.message.contains("no reducer is declared")),
|
||||
"expected no collision for disjoint keys: {:?}",
|
||||
result.errors
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn output_schema_top_level_keys_count_as_parallel_writes() {
|
||||
let a = llm_with_output_schema("worker_a", &["summary"], Some("end"));
|
||||
let b = llm_with_output_schema("worker_b", &["summary"], Some("end"));
|
||||
let graph = fan_out_graph_with_two_workers(a, b);
|
||||
|
||||
let result = validator().validate(&graph);
|
||||
|
||||
assert!(
|
||||
result.errors.iter().any(|e| e
|
||||
.message
|
||||
.contains("nodes [worker_a, worker_b] all write key 'summary'")),
|
||||
"output_schema top-level keys should count as writes: {:?}",
|
||||
result.errors
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn three_parallel_writers_collision_lists_all_writers() {
|
||||
let mut start = end_node("start");
|
||||
start.next = Some(NextTargets::Many(vec!["a".into(), "b".into(), "c".into()]));
|
||||
let graph = graph_with(
|
||||
vec![
|
||||
("start", start),
|
||||
(
|
||||
"a",
|
||||
llm_with_state_updates("a", &[("k", "{{output}}")], Some("end")),
|
||||
),
|
||||
(
|
||||
"b",
|
||||
llm_with_state_updates("b", &[("k", "{{output}}")], Some("end")),
|
||||
),
|
||||
(
|
||||
"c",
|
||||
llm_with_state_updates("c", &[("k", "{{output}}")], Some("end")),
|
||||
),
|
||||
("end", end_node("end")),
|
||||
],
|
||||
"start",
|
||||
);
|
||||
|
||||
let result = validator().validate(&graph);
|
||||
|
||||
assert!(
|
||||
result
|
||||
.errors
|
||||
.iter()
|
||||
.any(|e| e.message.contains("nodes [a, b, c]") && e.message.contains("'k'")),
|
||||
"expected error listing all three writers a, b, c: {:?}",
|
||||
result.errors
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn approval_node_as_immediate_fan_out_target_errors() {
|
||||
let approval = approval_node("ap", &["yes"], &[("yes", "end")], "end");
|
||||
let other = end_node("other");
|
||||
let mut start = end_node("start");
|
||||
start.next = Some(NextTargets::Many(vec!["ap".into(), "other".into()]));
|
||||
let graph = graph_with(
|
||||
vec![
|
||||
("start", start),
|
||||
("ap", approval),
|
||||
("other", other),
|
||||
("end", end_node("end")),
|
||||
],
|
||||
"start",
|
||||
);
|
||||
|
||||
let result = validator().validate(&graph);
|
||||
|
||||
assert!(
|
||||
result
|
||||
.errors
|
||||
.iter()
|
||||
.any(|e| e.message.contains("approval node")
|
||||
&& e.message.contains("fan-out")
|
||||
&& e.node_id.as_deref() == Some("ap")),
|
||||
"expected approval-in-fan-out error: {:?}",
|
||||
result.errors
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn input_node_as_immediate_fan_out_target_errors() {
|
||||
let input = Node {
|
||||
id: "in".into(),
|
||||
description: String::new(),
|
||||
node_type: NodeType::Input(InputNode {
|
||||
question: "?".into(),
|
||||
default: None,
|
||||
validation: None,
|
||||
state_updates: None,
|
||||
}),
|
||||
next: Some("end".into()),
|
||||
};
|
||||
let other = end_node("other");
|
||||
let mut start = end_node("start");
|
||||
start.next = Some(NextTargets::Many(vec!["in".into(), "other".into()]));
|
||||
let graph = graph_with(
|
||||
vec![
|
||||
("start", start),
|
||||
("in", input),
|
||||
("other", other),
|
||||
("end", end_node("end")),
|
||||
],
|
||||
"start",
|
||||
);
|
||||
|
||||
let result = validator().validate(&graph);
|
||||
|
||||
assert!(
|
||||
result
|
||||
.errors
|
||||
.iter()
|
||||
.any(|e| e.message.contains("input node")
|
||||
&& e.message.contains("fan-out")
|
||||
&& e.node_id.as_deref() == Some("in")),
|
||||
"expected input-in-fan-out error: {:?}",
|
||||
result.errors
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn approval_after_join_passes() {
|
||||
let mut start = end_node("start");
|
||||
start.next = Some(NextTargets::Many(vec!["a".into(), "b".into()]));
|
||||
let mut a = end_node("a");
|
||||
a.next = Some("ap".into());
|
||||
let mut b = end_node("b");
|
||||
b.next = Some("ap".into());
|
||||
let approval = approval_node("ap", &["yes"], &[("yes", "end")], "end");
|
||||
let graph = graph_with(
|
||||
vec![
|
||||
("start", start),
|
||||
("a", a),
|
||||
("b", b),
|
||||
("ap", approval),
|
||||
("end", end_node("end")),
|
||||
],
|
||||
"start",
|
||||
);
|
||||
|
||||
let result = validator().validate(&graph);
|
||||
|
||||
assert!(
|
||||
!result
|
||||
.errors
|
||||
.iter()
|
||||
.any(|e| e.message.contains("fan-out") && e.node_id.as_deref() == Some("ap")),
|
||||
"approval AFTER join should be fine (v1 only checks immediate successors): {:?}",
|
||||
result.errors
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn map_branch_cannot_be_approval() {
|
||||
let map = map_node_basic("m", "br", Some("end"));
|
||||
let branch = approval_node("br", &["yes"], &[("yes", "end")], "end");
|
||||
let graph = graph_with(
|
||||
vec![("m", map), ("br", branch), ("end", end_node("end"))],
|
||||
"m",
|
||||
);
|
||||
|
||||
let result = validator().validate(&graph);
|
||||
|
||||
assert!(
|
||||
result
|
||||
.errors
|
||||
.iter()
|
||||
.any(|e| e.message.contains("approval node")
|
||||
&& e.message.contains("map branch")
|
||||
&& e.node_id.as_deref() == Some("m")),
|
||||
"expected map-branch-is-approval error: {:?}",
|
||||
result.errors
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn map_branch_cannot_be_input() {
|
||||
let map = map_node_basic("m", "br", Some("end"));
|
||||
let branch = Node {
|
||||
id: "br".into(),
|
||||
description: String::new(),
|
||||
node_type: NodeType::Input(InputNode {
|
||||
question: "?".into(),
|
||||
default: None,
|
||||
validation: None,
|
||||
state_updates: None,
|
||||
}),
|
||||
next: Some("end".into()),
|
||||
};
|
||||
let graph = graph_with(
|
||||
vec![("m", map), ("br", branch), ("end", end_node("end"))],
|
||||
"m",
|
||||
);
|
||||
|
||||
let result = validator().validate(&graph);
|
||||
|
||||
assert!(
|
||||
result
|
||||
.errors
|
||||
.iter()
|
||||
.any(|e| e.message.contains("input node")
|
||||
&& e.message.contains("map branch")
|
||||
&& e.node_id.as_deref() == Some("m")),
|
||||
"expected map-branch-is-input error: {:?}",
|
||||
result.errors
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn map_branch_cannot_be_end() {
|
||||
let map = map_node_basic("m", "br", Some("done"));
|
||||
let graph = graph_with(
|
||||
vec![
|
||||
("m", map),
|
||||
("br", end_node("br")),
|
||||
("done", end_node("done")),
|
||||
],
|
||||
"m",
|
||||
);
|
||||
|
||||
let result = validator().validate(&graph);
|
||||
|
||||
assert!(
|
||||
result.errors.iter().any(|e| e.message.contains("end node")
|
||||
&& e.message.contains("collect mechanism")
|
||||
&& e.node_id.as_deref() == Some("m")),
|
||||
"expected map-branch-is-end error: {:?}",
|
||||
result.errors
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn map_branch_cannot_be_another_map() {
|
||||
let outer = map_node_basic("outer", "inner", Some("end"));
|
||||
let inner = map_node_basic("inner", "end", Some("end"));
|
||||
let graph = graph_with(
|
||||
vec![("outer", outer), ("inner", inner), ("end", end_node("end"))],
|
||||
"outer",
|
||||
);
|
||||
|
||||
let result = validator().validate(&graph);
|
||||
|
||||
assert!(
|
||||
result
|
||||
.errors
|
||||
.iter()
|
||||
.any(|e| e.message.contains("itself a map node")
|
||||
&& e.node_id.as_deref() == Some("outer")),
|
||||
"expected nested-map error: {:?}",
|
||||
result.errors
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn map_branch_cannot_have_next_declared() {
|
||||
let map = map_node_basic("m", "br", Some("end"));
|
||||
let branch = llm_with_state_updates("br", &[("output", "{{output}}")], Some("somewhere"));
|
||||
let graph = graph_with(
|
||||
vec![
|
||||
("m", map),
|
||||
("br", branch),
|
||||
("somewhere", end_node("somewhere")),
|
||||
("end", end_node("end")),
|
||||
],
|
||||
"m",
|
||||
);
|
||||
|
||||
let result = validator().validate(&graph);
|
||||
|
||||
assert!(
|
||||
result
|
||||
.errors
|
||||
.iter()
|
||||
.any(|e| e.message.contains("has a `next` declared")
|
||||
&& e.message.contains("atomic")
|
||||
&& e.node_id.as_deref() == Some("br")),
|
||||
"expected branch-has-next error: {:?}",
|
||||
result.errors
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn map_branch_state_updates_matching_output_key_passes() {
|
||||
let map = map_node_basic("m", "br", Some("end"));
|
||||
let branch = llm_with_state_updates("br", &[("output", "{{output}}")], None);
|
||||
let graph = graph_with(
|
||||
vec![("m", map), ("br", branch), ("end", end_node("end"))],
|
||||
"m",
|
||||
);
|
||||
|
||||
let result = validator().validate(&graph);
|
||||
|
||||
assert!(
|
||||
!result.errors.iter().any(|e| e
|
||||
.message
|
||||
.contains("map branches may only write through their `output_key`")),
|
||||
"valid map branch should not error on writes: {:?}",
|
||||
result.errors
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn map_branch_state_updates_wrong_key_errors() {
|
||||
let map = map_node_basic("m", "br", Some("end"));
|
||||
let branch = llm_with_state_updates("br", &[("not_output", "{{output}}")], None);
|
||||
let graph = graph_with(
|
||||
vec![("m", map), ("br", branch), ("end", end_node("end"))],
|
||||
"m",
|
||||
);
|
||||
|
||||
let result = validator().validate(&graph);
|
||||
|
||||
assert!(
|
||||
result
|
||||
.errors
|
||||
.iter()
|
||||
.any(|e| e.message.contains("writes state key 'not_output'")
|
||||
&& e.message.contains("'output'")
|
||||
&& e.node_id.as_deref() == Some("br")),
|
||||
"expected wrong-key error: {:?}",
|
||||
result.errors
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn map_branch_with_output_schema_errors() {
|
||||
let map = map_node_basic("m", "br", Some("end"));
|
||||
let branch = llm_with_output_schema("br", &["foo", "bar"], None);
|
||||
let graph = graph_with(
|
||||
vec![("m", map), ("br", branch), ("end", end_node("end"))],
|
||||
"m",
|
||||
);
|
||||
|
||||
let result = validator().validate(&graph);
|
||||
|
||||
assert!(
|
||||
result
|
||||
.errors
|
||||
.iter()
|
||||
.any(|e| e.message.contains("output_schema")
|
||||
&& e.message.contains("top-level properties")
|
||||
&& e.node_id.as_deref() == Some("br")),
|
||||
"expected output_schema-forbidden error: {:?}",
|
||||
result.errors
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn script_in_fan_out_without_state_updates_errors() {
|
||||
let a = script_node("worker_a", "Cargo.toml", None);
|
||||
let b = end_node("worker_b");
|
||||
let graph = fan_out_graph_with_two_workers(a, b);
|
||||
|
||||
let result = validator().validate(&graph);
|
||||
|
||||
assert!(
|
||||
result.errors.iter().any(
|
||||
|e| e.message.contains("script node is in a parallel branch")
|
||||
&& e.message.contains("no `state_updates`")
|
||||
&& e.node_id.as_deref() == Some("worker_a")
|
||||
),
|
||||
"expected script-no-state-updates-in-parallel error: {:?}",
|
||||
result.errors
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn script_in_fan_out_with_state_updates_passes() {
|
||||
let a = script_with_state_updates("worker_a", &[("result_a", "{{output.x}}")]);
|
||||
let b = end_node("worker_b");
|
||||
let graph = fan_out_graph_with_two_workers(a, b);
|
||||
|
||||
let result = validator().validate(&graph);
|
||||
|
||||
assert!(
|
||||
!result
|
||||
.errors
|
||||
.iter()
|
||||
.any(|e| e.message.contains("script node is in a parallel branch")),
|
||||
"script with state_updates should pass C.6: {:?}",
|
||||
result.errors
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn script_outside_fan_out_without_state_updates_passes() {
|
||||
let mut start = end_node("start");
|
||||
start.next = Some("s".into());
|
||||
let s = script_node("s", "Cargo.toml", None);
|
||||
let graph = graph_with(
|
||||
vec![("start", start), ("s", s), ("end", end_node("end"))],
|
||||
"start",
|
||||
);
|
||||
|
||||
let result = validator().validate(&graph);
|
||||
|
||||
assert!(
|
||||
!result
|
||||
.errors
|
||||
.iter()
|
||||
.any(|e| e.message.contains("parallel branch")),
|
||||
"script outside fan-out should not trigger C.6: {:?}",
|
||||
result.errors
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn settings_max_concurrency_zero_errors() {
|
||||
let mut graph = graph_with(vec![("e", end_node("e"))], "e");
|
||||
graph.settings.max_concurrency = 0;
|
||||
|
||||
let result = validator().validate(&graph);
|
||||
|
||||
assert!(
|
||||
result
|
||||
.errors
|
||||
.iter()
|
||||
.any(|e| e.message.contains("settings.max_concurrency must be >= 1")),
|
||||
"expected graph-level max_concurrency=0 error: {:?}",
|
||||
result.errors
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn settings_max_concurrency_default_is_valid() {
|
||||
let graph = graph_with(vec![("e", end_node("e"))], "e");
|
||||
|
||||
let result = validator().validate(&graph);
|
||||
|
||||
assert!(
|
||||
!result
|
||||
.errors
|
||||
.iter()
|
||||
.any(|e| e.message.contains("max_concurrency")),
|
||||
"default max_concurrency should not error: {:?}",
|
||||
result.errors
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn map_max_concurrency_zero_errors() {
|
||||
let mut map = map_node_basic("m", "br", Some("end"));
|
||||
if let NodeType::Map(ref mut mm) = map.node_type {
|
||||
mm.max_concurrency = Some(0);
|
||||
}
|
||||
let branch = llm_with_state_updates("br", &[("output", "{{output}}")], None);
|
||||
let graph = graph_with(
|
||||
vec![("m", map), ("br", branch), ("end", end_node("end"))],
|
||||
"m",
|
||||
);
|
||||
|
||||
let result = validator().validate(&graph);
|
||||
|
||||
assert!(
|
||||
result.errors.iter().any(|e| e
|
||||
.message
|
||||
.contains("map node's `max_concurrency` must be >= 1")
|
||||
&& e.node_id.as_deref() == Some("m")),
|
||||
"expected map max_concurrency=0 error: {:?}",
|
||||
result.errors
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn map_max_concurrency_none_is_valid() {
|
||||
let map = map_node_basic("m", "br", Some("end"));
|
||||
let branch = llm_with_state_updates("br", &[("output", "{{output}}")], None);
|
||||
let graph = graph_with(
|
||||
vec![("m", map), ("br", branch), ("end", end_node("end"))],
|
||||
"m",
|
||||
);
|
||||
|
||||
let result = validator().validate(&graph);
|
||||
|
||||
assert!(
|
||||
!result
|
||||
.errors
|
||||
.iter()
|
||||
.any(|e| e.message.contains("max_concurrency")),
|
||||
"map without max_concurrency should not error: {:?}",
|
||||
result.errors
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user