feat: validation support for parallel graph execution; restricted map nodes to only run for nodes without next targets and not supporting chained map nodes
This commit is contained in:
+908
-1
@@ -2,7 +2,7 @@ use super::types::{Graph, Node, NodeType};
|
|||||||
use crate::client::{Model, ModelType};
|
use crate::client::{Model, ModelType};
|
||||||
use crate::config::{Agent, AppConfig, paths};
|
use crate::config::{Agent, AppConfig, paths};
|
||||||
use anyhow::{Result, bail};
|
use anyhow::{Result, bail};
|
||||||
use std::collections::{HashSet, VecDeque};
|
use std::collections::{BTreeMap, HashSet, VecDeque};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
@@ -118,6 +118,10 @@ impl GraphValidator {
|
|||||||
self.validate_approval_routes(graph, &mut result);
|
self.validate_approval_routes(graph, &mut result);
|
||||||
self.validate_rag_nodes(graph, &mut result);
|
self.validate_rag_nodes(graph, &mut result);
|
||||||
self.validate_llm_nodes(graph, &mut result);
|
self.validate_llm_nodes(graph, &mut result);
|
||||||
|
self.validate_max_concurrency(graph, &mut result);
|
||||||
|
self.validate_map_branches(graph, &mut result);
|
||||||
|
self.validate_parallel_user_interaction(graph, &mut result);
|
||||||
|
self.validate_parallel_writes(graph, &mut result);
|
||||||
|
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
@@ -314,6 +318,227 @@ impl GraphValidator {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Phase C — Parallel-execution validation.
|
||||||
|
//
|
||||||
|
// The v1 algorithm uses immediate-successor analysis only: a parallel
|
||||||
|
// group is the set of `next:` targets of a single fan-out node. Map nodes
|
||||||
|
// are checked separately by `validate_map_branches` (the branch is
|
||||||
|
// self-parallel, but enforcement comes from strict-mode rules on the
|
||||||
|
// branch node, not from group membership). Transitive parallel groups
|
||||||
|
// (deeper fan-out chains) are a v2 enhancement; v1 over-reports rather
|
||||||
|
// than under-reports — a false positive forces an unneeded reducer
|
||||||
|
// (mild annoyance); a false negative allows silent data races (catastrophic).
|
||||||
|
|
||||||
|
fn validate_max_concurrency(&self, graph: &Graph, result: &mut ValidationResult) {
|
||||||
|
if graph.settings.max_concurrency == 0 {
|
||||||
|
result.error(ValidationError::new(
|
||||||
|
"settings.max_concurrency must be >= 1 (got 0); a zero cap would \
|
||||||
|
deadlock the executor",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (node_id, node) in &graph.nodes {
|
||||||
|
if let NodeType::Map(m) = &node.node_type
|
||||||
|
&& let Some(0) = m.max_concurrency
|
||||||
|
{
|
||||||
|
result.error(ValidationError::with_node(
|
||||||
|
node_id,
|
||||||
|
"map node's `max_concurrency` must be >= 1 (got 0); a zero cap \
|
||||||
|
would deadlock the executor",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn validate_map_branches(&self, graph: &Graph, result: &mut ValidationResult) {
|
||||||
|
for (map_id, node) in &graph.nodes {
|
||||||
|
let NodeType::Map(m) = &node.node_type else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let Some(branch) = graph.get_node(&m.branch) else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
match &branch.node_type {
|
||||||
|
NodeType::Approval(_) => {
|
||||||
|
result.error(ValidationError::with_node(
|
||||||
|
map_id,
|
||||||
|
format!(
|
||||||
|
"map node points to branch '{}' which is an approval node; \
|
||||||
|
approval/input nodes cannot run inside a parallel map branch \
|
||||||
|
(the CLI would prompt the user N times concurrently)",
|
||||||
|
m.branch
|
||||||
|
),
|
||||||
|
));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
NodeType::Input(_) => {
|
||||||
|
result.error(ValidationError::with_node(
|
||||||
|
map_id,
|
||||||
|
format!(
|
||||||
|
"map node points to branch '{}' which is an input node; \
|
||||||
|
input nodes cannot run inside a parallel map branch",
|
||||||
|
m.branch
|
||||||
|
),
|
||||||
|
));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
NodeType::End(_) => {
|
||||||
|
result.error(ValidationError::with_node(
|
||||||
|
map_id,
|
||||||
|
format!(
|
||||||
|
"map node points to branch '{}' which is an end node; \
|
||||||
|
map branches terminate via the map's collect mechanism, \
|
||||||
|
not via end nodes",
|
||||||
|
m.branch
|
||||||
|
),
|
||||||
|
));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
NodeType::Map(_) => {
|
||||||
|
result.error(ValidationError::with_node(
|
||||||
|
map_id,
|
||||||
|
format!(
|
||||||
|
"map node points to branch '{}' which is itself a map node; \
|
||||||
|
nested map fan-outs are not supported in v1",
|
||||||
|
m.branch
|
||||||
|
),
|
||||||
|
));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
|
||||||
|
if branch.next.is_some() {
|
||||||
|
result.error(ValidationError::with_node(
|
||||||
|
m.branch.clone(),
|
||||||
|
format!(
|
||||||
|
"branch node '{}' has a `next` declared, but map branches must be \
|
||||||
|
atomic (one node, one execution per item). Remove `next` or \
|
||||||
|
restructure the workflow so any chaining happens after the map.",
|
||||||
|
m.branch
|
||||||
|
),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(updates) = node_state_updates_keys(branch) {
|
||||||
|
for k in &updates {
|
||||||
|
if k != &m.output_key {
|
||||||
|
result.error(ValidationError::with_node(
|
||||||
|
m.branch.clone(),
|
||||||
|
format!(
|
||||||
|
"branch node '{}' writes state key '{}' via state_updates, \
|
||||||
|
but map branches may only write through their `output_key` \
|
||||||
|
('{}'). Rename the write, or move the side effect outside \
|
||||||
|
the map.",
|
||||||
|
m.branch, k, m.output_key
|
||||||
|
),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let schema_keys = output_schema_top_level_keys(branch);
|
||||||
|
if !schema_keys.is_empty() {
|
||||||
|
let mut keys_sorted: Vec<String> = schema_keys.into_iter().collect();
|
||||||
|
keys_sorted.sort();
|
||||||
|
result.error(ValidationError::with_node(
|
||||||
|
m.branch.clone(),
|
||||||
|
format!(
|
||||||
|
"branch node '{}' has an `output_schema` with top-level \
|
||||||
|
properties ({}); map branches must write only through their \
|
||||||
|
`output_key` ('{}'). Remove `output_schema`, or use state_updates \
|
||||||
|
to map the output explicitly.",
|
||||||
|
m.branch,
|
||||||
|
keys_sorted.join(", "),
|
||||||
|
m.output_key
|
||||||
|
),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn validate_parallel_user_interaction(&self, graph: &Graph, result: &mut ValidationResult) {
|
||||||
|
for group in compute_parallel_groups(graph) {
|
||||||
|
for node_id in &group {
|
||||||
|
let Some(node) = graph.get_node(node_id) else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
match &node.node_type {
|
||||||
|
NodeType::Approval(_) => {
|
||||||
|
result.error(ValidationError::with_node(
|
||||||
|
node_id,
|
||||||
|
"approval node is an immediate target of a fan-out \
|
||||||
|
(`next: [...]`); approvals must run after the join, \
|
||||||
|
not inside a parallel branch",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
NodeType::Input(_) => {
|
||||||
|
result.error(ValidationError::with_node(
|
||||||
|
node_id,
|
||||||
|
"input node is an immediate target of a fan-out \
|
||||||
|
(`next: [...]`); input nodes must run after the join, \
|
||||||
|
not inside a parallel branch",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn validate_parallel_writes(&self, graph: &Graph, result: &mut ValidationResult) {
|
||||||
|
for group in compute_parallel_groups(graph) {
|
||||||
|
let mut node_writes: Vec<(String, HashSet<String>)> = Vec::new();
|
||||||
|
for node_id in &group {
|
||||||
|
let Some(node) = graph.get_node(node_id) else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
match write_set_of(node) {
|
||||||
|
Some(set) => node_writes.push((node_id.clone(), set)),
|
||||||
|
None => {
|
||||||
|
result.error(ValidationError::with_node(
|
||||||
|
node_id,
|
||||||
|
"script node is in a parallel branch but declares no \
|
||||||
|
`state_updates`; parallel scripts must declare their writes \
|
||||||
|
explicitly to avoid silent state collisions",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut writers_by_key: BTreeMap<String, Vec<String>> = BTreeMap::new();
|
||||||
|
for (nid, ws) in &node_writes {
|
||||||
|
for k in ws {
|
||||||
|
writers_by_key
|
||||||
|
.entry(k.clone())
|
||||||
|
.or_default()
|
||||||
|
.push(nid.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (key, mut writers) in writers_by_key {
|
||||||
|
if writers.len() < 2 {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if graph.reducers.contains_key(&key) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
writers.sort();
|
||||||
|
result.error(ValidationError::new(format!(
|
||||||
|
"nodes [{}] all write key '{}' in the same parallel super-step but \
|
||||||
|
no reducer is declared for '{}'. Add `reducers: {{ {}: <reducer> }}` \
|
||||||
|
at the graph root (built-ins: append, extend, concat, sum, max, min, \
|
||||||
|
merge, overwrite), or rename one node's output.",
|
||||||
|
writers.join(", "),
|
||||||
|
key,
|
||||||
|
key,
|
||||||
|
key,
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn declared_targets(node: &Node) -> Vec<(String, &'static str)> {
|
fn declared_targets(node: &Node) -> Vec<(String, &'static str)> {
|
||||||
@@ -381,6 +606,75 @@ fn find_reachable_nodes(graph: &Graph) -> HashSet<String> {
|
|||||||
reachable
|
reachable
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// v1 parallel-group detection: only the immediate `next` targets of a fan-out
|
||||||
|
// node count as a parallel group. Map branches are handled separately by
|
||||||
|
// `validate_map_branches` (the branch's self-parallelism is checked via
|
||||||
|
// strict-mode rules on the branch node itself, not via group membership).
|
||||||
|
//
|
||||||
|
// Returns one HashSet per fan-out source; deeper transitive parallelism is
|
||||||
|
// intentionally out of scope for v1.
|
||||||
|
fn compute_parallel_groups(graph: &Graph) -> Vec<HashSet<String>> {
|
||||||
|
let mut groups = Vec::new();
|
||||||
|
for node in graph.nodes.values() {
|
||||||
|
if let Some(targets) = &node.next
|
||||||
|
&& targets.is_fan_out()
|
||||||
|
{
|
||||||
|
groups.push(targets.as_slice().iter().cloned().collect());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
groups
|
||||||
|
}
|
||||||
|
|
||||||
|
// Computes the set of state keys this node can write to.
|
||||||
|
//
|
||||||
|
// Sources considered:
|
||||||
|
// - `state_updates` keys (every node type that has them)
|
||||||
|
// - `output_schema` top-level `properties` for `llm` and `agent` (auto-merge)
|
||||||
|
//
|
||||||
|
// Returns `None` only for script nodes with no declared `state_updates` — their
|
||||||
|
// emitted JSON is opaque to static analysis. The validator treats `None` as a
|
||||||
|
// load-time error when the script appears in a parallel group (C.6).
|
||||||
|
fn write_set_of(node: &Node) -> Option<HashSet<String>> {
|
||||||
|
if matches!(node.node_type, NodeType::Script(_)) && node_state_updates_keys(node).is_none() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let mut writes = HashSet::new();
|
||||||
|
if let Some(keys) = node_state_updates_keys(node) {
|
||||||
|
writes.extend(keys);
|
||||||
|
}
|
||||||
|
writes.extend(output_schema_top_level_keys(node));
|
||||||
|
Some(writes)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn node_state_updates_keys(node: &Node) -> Option<HashSet<String>> {
|
||||||
|
let updates = match &node.node_type {
|
||||||
|
NodeType::Agent(n) => n.state_updates.as_ref(),
|
||||||
|
NodeType::Script(n) => n.state_updates.as_ref(),
|
||||||
|
NodeType::Approval(n) => n.state_updates.as_ref(),
|
||||||
|
NodeType::Input(n) => n.state_updates.as_ref(),
|
||||||
|
NodeType::Llm(n) => n.state_updates.as_ref(),
|
||||||
|
NodeType::Rag(n) => n.state_updates.as_ref(),
|
||||||
|
NodeType::End(n) => n.state_updates.as_ref(),
|
||||||
|
NodeType::Map(_) => return None,
|
||||||
|
};
|
||||||
|
updates.map(|m| m.keys().cloned().collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn output_schema_top_level_keys(node: &Node) -> HashSet<String> {
|
||||||
|
let schema = match &node.node_type {
|
||||||
|
NodeType::Agent(n) => n.output_schema.as_ref(),
|
||||||
|
NodeType::Llm(n) => n.output_schema.as_ref(),
|
||||||
|
_ => return HashSet::new(),
|
||||||
|
};
|
||||||
|
let Some(schema) = schema else {
|
||||||
|
return HashSet::new();
|
||||||
|
};
|
||||||
|
let Some(properties) = schema.get("properties").and_then(|p| p.as_object()) else {
|
||||||
|
return HashSet::new();
|
||||||
|
};
|
||||||
|
properties.keys().cloned().collect()
|
||||||
|
}
|
||||||
|
|
||||||
fn detect_cycle_dfs(
|
fn detect_cycle_dfs(
|
||||||
graph: &Graph,
|
graph: &Graph,
|
||||||
node_id: &str,
|
node_id: &str,
|
||||||
@@ -1157,4 +1451,617 @@ mod tests {
|
|||||||
result.errors
|
result.errors
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn map_node_basic(id: &str, branch: &str, next: Option<&str>) -> Node {
|
||||||
|
Node {
|
||||||
|
id: id.into(),
|
||||||
|
description: String::new(),
|
||||||
|
node_type: NodeType::Map(MapNode {
|
||||||
|
over: "{{items}}".into(),
|
||||||
|
as_name: "item".into(),
|
||||||
|
branch: branch.into(),
|
||||||
|
output_key: "output".into(),
|
||||||
|
collect_into: "results".into(),
|
||||||
|
max_concurrency: None,
|
||||||
|
}),
|
||||||
|
next: next.map(NextTargets::from),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn llm_with_state_updates(id: &str, updates: &[(&str, &str)], next: Option<&str>) -> Node {
|
||||||
|
let mut node = llm_node(id, None, next);
|
||||||
|
if let NodeType::Llm(ref mut n) = node.node_type {
|
||||||
|
let mut map: HashMap<String, String> = HashMap::new();
|
||||||
|
for (k, v) in updates {
|
||||||
|
map.insert((*k).into(), (*v).into());
|
||||||
|
}
|
||||||
|
n.state_updates = Some(map);
|
||||||
|
}
|
||||||
|
node
|
||||||
|
}
|
||||||
|
|
||||||
|
fn llm_with_output_schema(id: &str, properties: &[&str], next: Option<&str>) -> Node {
|
||||||
|
let mut node = llm_node(id, None, next);
|
||||||
|
if let NodeType::Llm(ref mut n) = node.node_type {
|
||||||
|
let mut props = serde_json::Map::new();
|
||||||
|
for k in properties {
|
||||||
|
props.insert((*k).to_string(), serde_json::json!({ "type": "string" }));
|
||||||
|
}
|
||||||
|
n.output_schema = Some(serde_json::json!({
|
||||||
|
"type": "object",
|
||||||
|
"properties": props,
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
node
|
||||||
|
}
|
||||||
|
|
||||||
|
fn script_with_state_updates(id: &str, updates: &[(&str, &str)]) -> Node {
|
||||||
|
let mut node = script_node(id, "Cargo.toml", None);
|
||||||
|
if let NodeType::Script(ref mut n) = node.node_type {
|
||||||
|
let mut map: HashMap<String, String> = HashMap::new();
|
||||||
|
for (k, v) in updates {
|
||||||
|
map.insert((*k).into(), (*v).into());
|
||||||
|
}
|
||||||
|
n.state_updates = Some(map);
|
||||||
|
}
|
||||||
|
node
|
||||||
|
}
|
||||||
|
|
||||||
|
fn fan_out_graph_with_two_workers(worker_a: Node, worker_b: Node) -> Graph {
|
||||||
|
let mut start = end_node("start");
|
||||||
|
start.next = Some(NextTargets::Many(vec![
|
||||||
|
"worker_a".into(),
|
||||||
|
"worker_b".into(),
|
||||||
|
]));
|
||||||
|
graph_with(
|
||||||
|
vec![
|
||||||
|
("start", start),
|
||||||
|
("worker_a", worker_a),
|
||||||
|
("worker_b", worker_b),
|
||||||
|
("end", end_node("end")),
|
||||||
|
],
|
||||||
|
"start",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parallel_writes_to_same_key_without_reducer_errors() {
|
||||||
|
let a = llm_with_state_updates("worker_a", &[("summary", "{{output}}")], Some("end"));
|
||||||
|
let b = llm_with_state_updates("worker_b", &[("summary", "{{output}}")], Some("end"));
|
||||||
|
let graph = fan_out_graph_with_two_workers(a, b);
|
||||||
|
|
||||||
|
let result = validator().validate(&graph);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
result.errors.iter().any(|e| e
|
||||||
|
.message
|
||||||
|
.contains("nodes [worker_a, worker_b] all write key 'summary'")),
|
||||||
|
"expected reducer-collision error for `summary`: {:?}",
|
||||||
|
result.errors
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parallel_writes_to_same_key_with_reducer_pass() {
|
||||||
|
let a = llm_with_state_updates("worker_a", &[("summary", "{{output}}")], Some("end"));
|
||||||
|
let b = llm_with_state_updates("worker_b", &[("summary", "{{output}}")], Some("end"));
|
||||||
|
let mut graph = fan_out_graph_with_two_workers(a, b);
|
||||||
|
graph.reducers.insert("summary".into(), Reducer::Concat);
|
||||||
|
|
||||||
|
let result = validator().validate(&graph);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
!result
|
||||||
|
.errors
|
||||||
|
.iter()
|
||||||
|
.any(|e| e.message.contains("no reducer is declared")),
|
||||||
|
"expected no reducer-collision error when reducer is declared: {:?}",
|
||||||
|
result.errors
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parallel_writes_to_disjoint_keys_pass() {
|
||||||
|
let a = llm_with_state_updates("worker_a", &[("a_out", "{{output}}")], Some("end"));
|
||||||
|
let b = llm_with_state_updates("worker_b", &[("b_out", "{{output}}")], Some("end"));
|
||||||
|
let graph = fan_out_graph_with_two_workers(a, b);
|
||||||
|
|
||||||
|
let result = validator().validate(&graph);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
!result
|
||||||
|
.errors
|
||||||
|
.iter()
|
||||||
|
.any(|e| e.message.contains("no reducer is declared")),
|
||||||
|
"expected no collision for disjoint keys: {:?}",
|
||||||
|
result.errors
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn output_schema_top_level_keys_count_as_parallel_writes() {
|
||||||
|
let a = llm_with_output_schema("worker_a", &["summary"], Some("end"));
|
||||||
|
let b = llm_with_output_schema("worker_b", &["summary"], Some("end"));
|
||||||
|
let graph = fan_out_graph_with_two_workers(a, b);
|
||||||
|
|
||||||
|
let result = validator().validate(&graph);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
result.errors.iter().any(|e| e
|
||||||
|
.message
|
||||||
|
.contains("nodes [worker_a, worker_b] all write key 'summary'")),
|
||||||
|
"output_schema top-level keys should count as writes: {:?}",
|
||||||
|
result.errors
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn three_parallel_writers_collision_lists_all_writers() {
|
||||||
|
let mut start = end_node("start");
|
||||||
|
start.next = Some(NextTargets::Many(vec!["a".into(), "b".into(), "c".into()]));
|
||||||
|
let graph = graph_with(
|
||||||
|
vec![
|
||||||
|
("start", start),
|
||||||
|
(
|
||||||
|
"a",
|
||||||
|
llm_with_state_updates("a", &[("k", "{{output}}")], Some("end")),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"b",
|
||||||
|
llm_with_state_updates("b", &[("k", "{{output}}")], Some("end")),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"c",
|
||||||
|
llm_with_state_updates("c", &[("k", "{{output}}")], Some("end")),
|
||||||
|
),
|
||||||
|
("end", end_node("end")),
|
||||||
|
],
|
||||||
|
"start",
|
||||||
|
);
|
||||||
|
|
||||||
|
let result = validator().validate(&graph);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
result
|
||||||
|
.errors
|
||||||
|
.iter()
|
||||||
|
.any(|e| e.message.contains("nodes [a, b, c]") && e.message.contains("'k'")),
|
||||||
|
"expected error listing all three writers a, b, c: {:?}",
|
||||||
|
result.errors
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn approval_node_as_immediate_fan_out_target_errors() {
|
||||||
|
let approval = approval_node("ap", &["yes"], &[("yes", "end")], "end");
|
||||||
|
let other = end_node("other");
|
||||||
|
let mut start = end_node("start");
|
||||||
|
start.next = Some(NextTargets::Many(vec!["ap".into(), "other".into()]));
|
||||||
|
let graph = graph_with(
|
||||||
|
vec![
|
||||||
|
("start", start),
|
||||||
|
("ap", approval),
|
||||||
|
("other", other),
|
||||||
|
("end", end_node("end")),
|
||||||
|
],
|
||||||
|
"start",
|
||||||
|
);
|
||||||
|
|
||||||
|
let result = validator().validate(&graph);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
result
|
||||||
|
.errors
|
||||||
|
.iter()
|
||||||
|
.any(|e| e.message.contains("approval node")
|
||||||
|
&& e.message.contains("fan-out")
|
||||||
|
&& e.node_id.as_deref() == Some("ap")),
|
||||||
|
"expected approval-in-fan-out error: {:?}",
|
||||||
|
result.errors
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn input_node_as_immediate_fan_out_target_errors() {
|
||||||
|
let input = Node {
|
||||||
|
id: "in".into(),
|
||||||
|
description: String::new(),
|
||||||
|
node_type: NodeType::Input(InputNode {
|
||||||
|
question: "?".into(),
|
||||||
|
default: None,
|
||||||
|
validation: None,
|
||||||
|
state_updates: None,
|
||||||
|
}),
|
||||||
|
next: Some("end".into()),
|
||||||
|
};
|
||||||
|
let other = end_node("other");
|
||||||
|
let mut start = end_node("start");
|
||||||
|
start.next = Some(NextTargets::Many(vec!["in".into(), "other".into()]));
|
||||||
|
let graph = graph_with(
|
||||||
|
vec![
|
||||||
|
("start", start),
|
||||||
|
("in", input),
|
||||||
|
("other", other),
|
||||||
|
("end", end_node("end")),
|
||||||
|
],
|
||||||
|
"start",
|
||||||
|
);
|
||||||
|
|
||||||
|
let result = validator().validate(&graph);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
result
|
||||||
|
.errors
|
||||||
|
.iter()
|
||||||
|
.any(|e| e.message.contains("input node")
|
||||||
|
&& e.message.contains("fan-out")
|
||||||
|
&& e.node_id.as_deref() == Some("in")),
|
||||||
|
"expected input-in-fan-out error: {:?}",
|
||||||
|
result.errors
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn approval_after_join_passes() {
|
||||||
|
let mut start = end_node("start");
|
||||||
|
start.next = Some(NextTargets::Many(vec!["a".into(), "b".into()]));
|
||||||
|
let mut a = end_node("a");
|
||||||
|
a.next = Some("ap".into());
|
||||||
|
let mut b = end_node("b");
|
||||||
|
b.next = Some("ap".into());
|
||||||
|
let approval = approval_node("ap", &["yes"], &[("yes", "end")], "end");
|
||||||
|
let graph = graph_with(
|
||||||
|
vec![
|
||||||
|
("start", start),
|
||||||
|
("a", a),
|
||||||
|
("b", b),
|
||||||
|
("ap", approval),
|
||||||
|
("end", end_node("end")),
|
||||||
|
],
|
||||||
|
"start",
|
||||||
|
);
|
||||||
|
|
||||||
|
let result = validator().validate(&graph);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
!result
|
||||||
|
.errors
|
||||||
|
.iter()
|
||||||
|
.any(|e| e.message.contains("fan-out") && e.node_id.as_deref() == Some("ap")),
|
||||||
|
"approval AFTER join should be fine (v1 only checks immediate successors): {:?}",
|
||||||
|
result.errors
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn map_branch_cannot_be_approval() {
|
||||||
|
let map = map_node_basic("m", "br", Some("end"));
|
||||||
|
let branch = approval_node("br", &["yes"], &[("yes", "end")], "end");
|
||||||
|
let graph = graph_with(
|
||||||
|
vec![("m", map), ("br", branch), ("end", end_node("end"))],
|
||||||
|
"m",
|
||||||
|
);
|
||||||
|
|
||||||
|
let result = validator().validate(&graph);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
result
|
||||||
|
.errors
|
||||||
|
.iter()
|
||||||
|
.any(|e| e.message.contains("approval node")
|
||||||
|
&& e.message.contains("map branch")
|
||||||
|
&& e.node_id.as_deref() == Some("m")),
|
||||||
|
"expected map-branch-is-approval error: {:?}",
|
||||||
|
result.errors
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn map_branch_cannot_be_input() {
|
||||||
|
let map = map_node_basic("m", "br", Some("end"));
|
||||||
|
let branch = Node {
|
||||||
|
id: "br".into(),
|
||||||
|
description: String::new(),
|
||||||
|
node_type: NodeType::Input(InputNode {
|
||||||
|
question: "?".into(),
|
||||||
|
default: None,
|
||||||
|
validation: None,
|
||||||
|
state_updates: None,
|
||||||
|
}),
|
||||||
|
next: Some("end".into()),
|
||||||
|
};
|
||||||
|
let graph = graph_with(
|
||||||
|
vec![("m", map), ("br", branch), ("end", end_node("end"))],
|
||||||
|
"m",
|
||||||
|
);
|
||||||
|
|
||||||
|
let result = validator().validate(&graph);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
result
|
||||||
|
.errors
|
||||||
|
.iter()
|
||||||
|
.any(|e| e.message.contains("input node")
|
||||||
|
&& e.message.contains("map branch")
|
||||||
|
&& e.node_id.as_deref() == Some("m")),
|
||||||
|
"expected map-branch-is-input error: {:?}",
|
||||||
|
result.errors
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn map_branch_cannot_be_end() {
|
||||||
|
let map = map_node_basic("m", "br", Some("done"));
|
||||||
|
let graph = graph_with(
|
||||||
|
vec![
|
||||||
|
("m", map),
|
||||||
|
("br", end_node("br")),
|
||||||
|
("done", end_node("done")),
|
||||||
|
],
|
||||||
|
"m",
|
||||||
|
);
|
||||||
|
|
||||||
|
let result = validator().validate(&graph);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
result.errors.iter().any(|e| e.message.contains("end node")
|
||||||
|
&& e.message.contains("collect mechanism")
|
||||||
|
&& e.node_id.as_deref() == Some("m")),
|
||||||
|
"expected map-branch-is-end error: {:?}",
|
||||||
|
result.errors
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn map_branch_cannot_be_another_map() {
|
||||||
|
let outer = map_node_basic("outer", "inner", Some("end"));
|
||||||
|
let inner = map_node_basic("inner", "end", Some("end"));
|
||||||
|
let graph = graph_with(
|
||||||
|
vec![("outer", outer), ("inner", inner), ("end", end_node("end"))],
|
||||||
|
"outer",
|
||||||
|
);
|
||||||
|
|
||||||
|
let result = validator().validate(&graph);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
result
|
||||||
|
.errors
|
||||||
|
.iter()
|
||||||
|
.any(|e| e.message.contains("itself a map node")
|
||||||
|
&& e.node_id.as_deref() == Some("outer")),
|
||||||
|
"expected nested-map error: {:?}",
|
||||||
|
result.errors
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn map_branch_cannot_have_next_declared() {
|
||||||
|
let map = map_node_basic("m", "br", Some("end"));
|
||||||
|
let branch = llm_with_state_updates("br", &[("output", "{{output}}")], Some("somewhere"));
|
||||||
|
let graph = graph_with(
|
||||||
|
vec![
|
||||||
|
("m", map),
|
||||||
|
("br", branch),
|
||||||
|
("somewhere", end_node("somewhere")),
|
||||||
|
("end", end_node("end")),
|
||||||
|
],
|
||||||
|
"m",
|
||||||
|
);
|
||||||
|
|
||||||
|
let result = validator().validate(&graph);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
result
|
||||||
|
.errors
|
||||||
|
.iter()
|
||||||
|
.any(|e| e.message.contains("has a `next` declared")
|
||||||
|
&& e.message.contains("atomic")
|
||||||
|
&& e.node_id.as_deref() == Some("br")),
|
||||||
|
"expected branch-has-next error: {:?}",
|
||||||
|
result.errors
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn map_branch_state_updates_matching_output_key_passes() {
|
||||||
|
let map = map_node_basic("m", "br", Some("end"));
|
||||||
|
let branch = llm_with_state_updates("br", &[("output", "{{output}}")], None);
|
||||||
|
let graph = graph_with(
|
||||||
|
vec![("m", map), ("br", branch), ("end", end_node("end"))],
|
||||||
|
"m",
|
||||||
|
);
|
||||||
|
|
||||||
|
let result = validator().validate(&graph);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
!result.errors.iter().any(|e| e
|
||||||
|
.message
|
||||||
|
.contains("map branches may only write through their `output_key`")),
|
||||||
|
"valid map branch should not error on writes: {:?}",
|
||||||
|
result.errors
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn map_branch_state_updates_wrong_key_errors() {
|
||||||
|
let map = map_node_basic("m", "br", Some("end"));
|
||||||
|
let branch = llm_with_state_updates("br", &[("not_output", "{{output}}")], None);
|
||||||
|
let graph = graph_with(
|
||||||
|
vec![("m", map), ("br", branch), ("end", end_node("end"))],
|
||||||
|
"m",
|
||||||
|
);
|
||||||
|
|
||||||
|
let result = validator().validate(&graph);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
result
|
||||||
|
.errors
|
||||||
|
.iter()
|
||||||
|
.any(|e| e.message.contains("writes state key 'not_output'")
|
||||||
|
&& e.message.contains("'output'")
|
||||||
|
&& e.node_id.as_deref() == Some("br")),
|
||||||
|
"expected wrong-key error: {:?}",
|
||||||
|
result.errors
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn map_branch_with_output_schema_errors() {
|
||||||
|
let map = map_node_basic("m", "br", Some("end"));
|
||||||
|
let branch = llm_with_output_schema("br", &["foo", "bar"], None);
|
||||||
|
let graph = graph_with(
|
||||||
|
vec![("m", map), ("br", branch), ("end", end_node("end"))],
|
||||||
|
"m",
|
||||||
|
);
|
||||||
|
|
||||||
|
let result = validator().validate(&graph);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
result
|
||||||
|
.errors
|
||||||
|
.iter()
|
||||||
|
.any(|e| e.message.contains("output_schema")
|
||||||
|
&& e.message.contains("top-level properties")
|
||||||
|
&& e.node_id.as_deref() == Some("br")),
|
||||||
|
"expected output_schema-forbidden error: {:?}",
|
||||||
|
result.errors
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn script_in_fan_out_without_state_updates_errors() {
|
||||||
|
let a = script_node("worker_a", "Cargo.toml", None);
|
||||||
|
let b = end_node("worker_b");
|
||||||
|
let graph = fan_out_graph_with_two_workers(a, b);
|
||||||
|
|
||||||
|
let result = validator().validate(&graph);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
result.errors.iter().any(
|
||||||
|
|e| e.message.contains("script node is in a parallel branch")
|
||||||
|
&& e.message.contains("no `state_updates`")
|
||||||
|
&& e.node_id.as_deref() == Some("worker_a")
|
||||||
|
),
|
||||||
|
"expected script-no-state-updates-in-parallel error: {:?}",
|
||||||
|
result.errors
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn script_in_fan_out_with_state_updates_passes() {
|
||||||
|
let a = script_with_state_updates("worker_a", &[("result_a", "{{output.x}}")]);
|
||||||
|
let b = end_node("worker_b");
|
||||||
|
let graph = fan_out_graph_with_two_workers(a, b);
|
||||||
|
|
||||||
|
let result = validator().validate(&graph);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
!result
|
||||||
|
.errors
|
||||||
|
.iter()
|
||||||
|
.any(|e| e.message.contains("script node is in a parallel branch")),
|
||||||
|
"script with state_updates should pass C.6: {:?}",
|
||||||
|
result.errors
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn script_outside_fan_out_without_state_updates_passes() {
|
||||||
|
let mut start = end_node("start");
|
||||||
|
start.next = Some("s".into());
|
||||||
|
let s = script_node("s", "Cargo.toml", None);
|
||||||
|
let graph = graph_with(
|
||||||
|
vec![("start", start), ("s", s), ("end", end_node("end"))],
|
||||||
|
"start",
|
||||||
|
);
|
||||||
|
|
||||||
|
let result = validator().validate(&graph);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
!result
|
||||||
|
.errors
|
||||||
|
.iter()
|
||||||
|
.any(|e| e.message.contains("parallel branch")),
|
||||||
|
"script outside fan-out should not trigger C.6: {:?}",
|
||||||
|
result.errors
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn settings_max_concurrency_zero_errors() {
|
||||||
|
let mut graph = graph_with(vec![("e", end_node("e"))], "e");
|
||||||
|
graph.settings.max_concurrency = 0;
|
||||||
|
|
||||||
|
let result = validator().validate(&graph);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
result
|
||||||
|
.errors
|
||||||
|
.iter()
|
||||||
|
.any(|e| e.message.contains("settings.max_concurrency must be >= 1")),
|
||||||
|
"expected graph-level max_concurrency=0 error: {:?}",
|
||||||
|
result.errors
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn settings_max_concurrency_default_is_valid() {
|
||||||
|
let graph = graph_with(vec![("e", end_node("e"))], "e");
|
||||||
|
|
||||||
|
let result = validator().validate(&graph);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
!result
|
||||||
|
.errors
|
||||||
|
.iter()
|
||||||
|
.any(|e| e.message.contains("max_concurrency")),
|
||||||
|
"default max_concurrency should not error: {:?}",
|
||||||
|
result.errors
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn map_max_concurrency_zero_errors() {
|
||||||
|
let mut map = map_node_basic("m", "br", Some("end"));
|
||||||
|
if let NodeType::Map(ref mut mm) = map.node_type {
|
||||||
|
mm.max_concurrency = Some(0);
|
||||||
|
}
|
||||||
|
let branch = llm_with_state_updates("br", &[("output", "{{output}}")], None);
|
||||||
|
let graph = graph_with(
|
||||||
|
vec![("m", map), ("br", branch), ("end", end_node("end"))],
|
||||||
|
"m",
|
||||||
|
);
|
||||||
|
|
||||||
|
let result = validator().validate(&graph);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
result.errors.iter().any(|e| e
|
||||||
|
.message
|
||||||
|
.contains("map node's `max_concurrency` must be >= 1")
|
||||||
|
&& e.node_id.as_deref() == Some("m")),
|
||||||
|
"expected map max_concurrency=0 error: {:?}",
|
||||||
|
result.errors
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn map_max_concurrency_none_is_valid() {
|
||||||
|
let map = map_node_basic("m", "br", Some("end"));
|
||||||
|
let branch = llm_with_state_updates("br", &[("output", "{{output}}")], None);
|
||||||
|
let graph = graph_with(
|
||||||
|
vec![("m", map), ("br", branch), ("end", end_node("end"))],
|
||||||
|
"m",
|
||||||
|
);
|
||||||
|
|
||||||
|
let result = validator().validate(&graph);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
!result
|
||||||
|
.errors
|
||||||
|
.iter()
|
||||||
|
.any(|e| e.message.contains("max_concurrency")),
|
||||||
|
"map without max_concurrency should not error: {:?}",
|
||||||
|
result.errors
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user