fmt: cleaned up graph implementation
This commit is contained in:
+4
-12
@@ -76,9 +76,6 @@ impl GraphExecutor {
|
||||
let max_iterations = graph.settings.max_loop_iterations;
|
||||
let graph_timeout = graph.settings.timeout.map(Duration::from_secs);
|
||||
let max_concurrency = graph.settings.max_concurrency;
|
||||
// Wrap in Arc so spawned branch tasks can cheaply share the Graph for
|
||||
// node lookup (especially the map executor, which needs to resolve its
|
||||
// `branch:` target from inside a spawned task).
|
||||
let graph = Arc::new(graph);
|
||||
let start = Instant::now();
|
||||
|
||||
@@ -297,10 +294,6 @@ fn sorted_frontier(frontier: &HashSet<String>) -> Vec<String> {
|
||||
v
|
||||
}
|
||||
|
||||
// Bundles the engine-config refs that every `step()` call needs to thread
|
||||
// through. Constructed once per spawned branch task (or once at the call site
|
||||
// for sequential paths) so step() and downstream executors (MapNodeExecutor)
|
||||
// take one parameter instead of five.
|
||||
pub(super) struct StepContext<'a> {
|
||||
pub graph: &'a Graph,
|
||||
pub script_executor: &'a ScriptExecutor,
|
||||
@@ -391,8 +384,6 @@ async fn step(
|
||||
}
|
||||
}
|
||||
|
||||
// Returns all `next:` targets from the node (handles both `One` and `Many`),
|
||||
// erroring if no `next` is set.
|
||||
fn static_next_targets(node: &Node, current: &str, kind: &str) -> Result<Vec<String>> {
|
||||
node.next
|
||||
.as_ref()
|
||||
@@ -400,9 +391,6 @@ fn static_next_targets(node: &Node, current: &str, kind: &str) -> Result<Vec<Str
|
||||
.ok_or_else(|| anyhow!("{kind} node '{current}' has no `next` and is not an end node"))
|
||||
}
|
||||
|
||||
// Returns the first declared `next:` target as a borrowed `&str`, or `None` if
|
||||
// no `next` is set. Used by node executors that take `Option<&str>` for their
|
||||
// primary routing argument (LLM, RAG, Input).
|
||||
fn first_next_target(node: &Node) -> Option<&str> {
|
||||
node.next
|
||||
.as_ref()
|
||||
@@ -447,6 +435,7 @@ mod tests {
|
||||
#[test]
|
||||
fn resolve_end_output_interpolates_template_against_state() {
|
||||
let mut state = state_with(&[("name", json!("alice"))]);
|
||||
|
||||
let node = end_node("done: {{name}}", None);
|
||||
|
||||
assert_eq!(resolve_end_output(&node, &mut state), "done: alice");
|
||||
@@ -457,6 +446,7 @@ mod tests {
|
||||
let mut updates = HashMap::new();
|
||||
updates.insert("summary".into(), "completed for {{user}}".into());
|
||||
let node = end_node("RESULT: {{summary}}", Some(updates));
|
||||
|
||||
let mut state = state_with(&[("user", json!("bob"))]);
|
||||
|
||||
assert_eq!(
|
||||
@@ -472,6 +462,7 @@ mod tests {
|
||||
#[test]
|
||||
fn resolve_end_output_with_empty_template_returns_empty_string() {
|
||||
let mut state = state_with(&[]);
|
||||
|
||||
let node = end_node("", None);
|
||||
|
||||
assert_eq!(resolve_end_output(&node, &mut state), "");
|
||||
@@ -480,6 +471,7 @@ mod tests {
|
||||
#[test]
|
||||
fn resolve_end_output_lenient_on_missing_keys() {
|
||||
let mut state = state_with(&[]);
|
||||
|
||||
let node = end_node("hello {{unknown}}!", None);
|
||||
|
||||
assert_eq!(resolve_end_output(&node, &mut state), "hello !");
|
||||
|
||||
+2
-10
@@ -13,12 +13,6 @@ use tokio::time::timeout;
|
||||
|
||||
const OUTPUT_KEY: &str = "output";
|
||||
|
||||
/// What happened during an LLM node's execution, from the caller's routing
|
||||
/// perspective. `Continue` means the caller should advance via the node's
|
||||
/// declared `next:` targets (whether the LLM actually succeeded or failed
|
||||
/// without a fallback — either way, the executor uses node.next). `FellBack`
|
||||
/// means the LLM failed after retries and the node had a `fallback:` declared,
|
||||
/// so routing should go to that fallback target only.
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub(super) enum LlmExecutionOutcome {
|
||||
Continue,
|
||||
@@ -235,6 +229,7 @@ fn categorize_tools(entries: Option<&[String]>) -> (Vec<String>, Vec<String>) {
|
||||
let Some(entries) = entries else {
|
||||
return (regular, mcp);
|
||||
};
|
||||
|
||||
for e in entries {
|
||||
if let Some(server) = e.strip_prefix("mcp:") {
|
||||
mcp.push(server.to_string());
|
||||
@@ -242,6 +237,7 @@ fn categorize_tools(entries: Option<&[String]>) -> (Vec<String>, Vec<String>) {
|
||||
regular.push(e.clone());
|
||||
}
|
||||
}
|
||||
|
||||
(regular, mcp)
|
||||
}
|
||||
|
||||
@@ -465,10 +461,6 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn outcome_from_failure_without_fallback_is_continue() {
|
||||
// Failed but no fallback: caller routes via node.next as if successful.
|
||||
// The error has already been recorded to state via the OUTPUT_KEY by
|
||||
// execute(); the caller's `static_next_targets` will error if node.next
|
||||
// is also missing.
|
||||
assert_eq!(outcome_from(true, None), LlmExecutionOutcome::Continue);
|
||||
}
|
||||
|
||||
|
||||
+1
-2
@@ -125,8 +125,7 @@ impl MapNodeExecutor {
|
||||
let joined = join_all(sub_tasks).await;
|
||||
drop(progress_tracker);
|
||||
|
||||
// Collect outputs keyed by input index so order is preserved regardless
|
||||
// of finish order. This is the user-facing contract from plan E.2.
|
||||
// Collect outputs keyed by input index so order is preserved regardless of finish order.
|
||||
let mut outputs: HashMap<usize, Value> = HashMap::new();
|
||||
for join_result in joined {
|
||||
let (idx, sub_state, exec_result) =
|
||||
|
||||
@@ -9,14 +9,6 @@ static SPINNER_STYLE: LazyLock<ProgressStyle> = LazyLock::new(|| {
|
||||
.tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏", ""])
|
||||
});
|
||||
|
||||
// Manages a set of per-branch spinners drawn side-by-side via indicatif's
|
||||
// `MultiProgress`. Created at the start of a multi-branch graph super-step
|
||||
// (or map sub-branch fan-out) and torn down at the join.
|
||||
//
|
||||
// When stdout isn't a terminal (CI, piped output), the tracker becomes a
|
||||
// no-op — `add_branch` returns a disabled handle whose methods do nothing.
|
||||
// This keeps machine-piped graph runs free of spinner garbage in their
|
||||
// captured output.
|
||||
pub(super) struct BranchProgressTracker {
|
||||
multi: Option<MultiProgress>,
|
||||
}
|
||||
|
||||
+7
-15
@@ -3,17 +3,6 @@ use crate::graph::type_name;
|
||||
use anyhow::{Result, bail};
|
||||
use serde_json::{Number, Value};
|
||||
|
||||
/// Combines a branch's incoming write with the current state value (if any)
|
||||
/// via the specified reducer. The result is what gets written back to live
|
||||
/// state during the super-step merge phase.
|
||||
///
|
||||
/// `current = None` means the key has no prior value in this super-step or in
|
||||
/// live state. Most reducers treat absent as their identity (empty array,
|
||||
/// empty string, no prior value). `Overwrite` ignores `current` entirely.
|
||||
///
|
||||
/// Errors clearly when types are incompatible with the reducer (e.g.
|
||||
/// `Sum` on a string), naming the reducer and which side (`current` / `incoming`)
|
||||
/// has the wrong type.
|
||||
pub fn apply(reducer: Reducer, current: Option<&Value>, incoming: Value) -> Result<Value> {
|
||||
match reducer {
|
||||
Reducer::Append => apply_append(current, incoming),
|
||||
@@ -37,6 +26,7 @@ fn apply_append(current: Option<&Value>, incoming: Value) -> Result<Value> {
|
||||
),
|
||||
};
|
||||
arr.push(incoming);
|
||||
|
||||
Ok(Value::Array(arr))
|
||||
}
|
||||
|
||||
@@ -56,6 +46,7 @@ fn apply_extend(current: Option<&Value>, incoming: Value) -> Result<Value> {
|
||||
type_name(&other)
|
||||
),
|
||||
}
|
||||
|
||||
Ok(Value::Array(arr))
|
||||
}
|
||||
|
||||
@@ -81,6 +72,7 @@ fn apply_concat(current: Option<&Value>, incoming: Value) -> Result<Value> {
|
||||
type_name(other)
|
||||
),
|
||||
};
|
||||
|
||||
Ok(Value::String(result))
|
||||
}
|
||||
|
||||
@@ -90,6 +82,7 @@ fn apply_sum(current: Option<&Value>, incoming: Value) -> Result<Value> {
|
||||
None => 0.0,
|
||||
Some(value) => number_or_error(value, "sum", "current")?,
|
||||
};
|
||||
|
||||
Ok(json_number(c + i))
|
||||
}
|
||||
|
||||
@@ -135,6 +128,7 @@ fn apply_merge(current: Option<&Value>, incoming: Value) -> Result<Value> {
|
||||
type_name(&other)
|
||||
),
|
||||
}
|
||||
|
||||
Ok(Value::Object(map))
|
||||
}
|
||||
|
||||
@@ -148,10 +142,8 @@ fn number_or_error(value: &Value, reducer_name: &str, position: &str) -> Result<
|
||||
}
|
||||
}
|
||||
|
||||
// Numeric reducers compute in f64 for simplicity. We preserve integer typing
|
||||
// when the result is losslessly representable as i64 so `count: sum` stays an
|
||||
// integer rather than degrading to a float. Non-finite values (NaN, Inf) can't
|
||||
// arise from finite inputs to +/max/min, so the fallback never fires in practice.
|
||||
// Numeric reducers compute in f64 for simplicity. Integer typing is preserved when the result is losslessly
|
||||
// representable as i64.
|
||||
fn json_number(n: f64) -> Value {
|
||||
if n.fract() == 0.0 && n.is_finite() && n.abs() <= (i64::MAX as f64) {
|
||||
Value::Number(Number::from(n as i64))
|
||||
|
||||
@@ -102,6 +102,7 @@ fn apply_state_updates(node: &ScriptNode, state_manager: &mut StateManager) {
|
||||
let Some(updates) = &node.state_updates else {
|
||||
return;
|
||||
};
|
||||
|
||||
for (key, template) in updates {
|
||||
let value = state_manager.interpolate_lenient(template);
|
||||
state_manager
|
||||
|
||||
@@ -1,12 +1,6 @@
|
||||
use serde_json::Value;
|
||||
use std::collections::HashMap;
|
||||
|
||||
/// Published form of one branch's writes for the super-step merge phase.
|
||||
/// Callers assemble these into a deterministically-ordered `Vec` keyed by
|
||||
/// `(node_id, invocation_index)` before passing to
|
||||
/// `StateManager::apply_branch_writes`. `invocation_index` is 0 for normal
|
||||
/// branches and the input-list position for map sub-branches — so multiple
|
||||
/// invocations of the same `branch:` node by a `map` are still totally ordered.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BranchWrites {
|
||||
pub node_id: String,
|
||||
|
||||
+5
-59
@@ -159,13 +159,6 @@ impl StateManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Forks state for a parallel branch: returns a fully-owned `StateManager`
|
||||
/// seeded from the current state's data. The branch mutates its fork
|
||||
/// freely; callers extract its writes via `diff_against` after the branch
|
||||
/// completes, then merge them via `apply_branch_writes`.
|
||||
///
|
||||
/// Distinct from `read_snapshot` (returns a shared `Arc<GraphState>` for
|
||||
/// reads) — `fork_for_branch_state` returns a writable owned clone.
|
||||
pub fn fork_for_branch_state(&self) -> Self {
|
||||
Self {
|
||||
state: self.state.clone(),
|
||||
@@ -173,11 +166,6 @@ impl StateManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the keys whose values differ from `snapshot`. Use this after a
|
||||
/// branch finishes to extract its writes (input to `apply_branch_writes`).
|
||||
/// Keys present in `self` but absent from `snapshot`, or with different
|
||||
/// values, count as writes. Deletions are not represented (no current node
|
||||
/// executor deletes state).
|
||||
pub fn diff_against(&self, snapshot: &GraphState) -> HashMap<String, Value> {
|
||||
let mut diff = HashMap::new();
|
||||
for (k, v) in self.state.data() {
|
||||
@@ -188,30 +176,10 @@ impl StateManager {
|
||||
diff
|
||||
}
|
||||
|
||||
/// Returns an `Arc`-wrapped snapshot of the current graph state. Each
|
||||
/// branch in a parallel super-step uses this snapshot as the baseline for
|
||||
/// its `diff_against` call at branch end. The executor extracts each
|
||||
/// branch's writes (the diff) and merges them via `apply_branch_writes` at
|
||||
/// the super-step boundary.
|
||||
///
|
||||
/// Distinct from the older `snapshot()` method (returns a `HashMap` clone
|
||||
/// of the data only — used by `script_executor` to ship state to child
|
||||
/// processes).
|
||||
#[allow(dead_code)]
|
||||
pub fn read_snapshot(&self) -> Arc<GraphState> {
|
||||
Arc::new(self.state.clone())
|
||||
}
|
||||
|
||||
/// Commits a deterministically-ordered set of per-branch writes back into
|
||||
/// live state, applying declared reducers where they exist.
|
||||
///
|
||||
/// Caller must pre-sort `writes` by `(node_id, invocation_index)` so that
|
||||
/// non-commutative reducers (`Concat`, `Merge`) produce reproducible output.
|
||||
///
|
||||
/// Errors when a key has writers from ≥2 branches but no reducer declared.
|
||||
/// The validator (Phase C) catches this at load time; this runtime check is
|
||||
/// defense-in-depth against a malformed or out-of-date validator missing it.
|
||||
#[allow(dead_code)]
|
||||
pub fn apply_branch_writes(
|
||||
&mut self,
|
||||
writes: Vec<BranchWrites>,
|
||||
@@ -252,22 +220,6 @@ impl StateManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Interpolates a template and returns a typed JSON `Value`.
|
||||
///
|
||||
/// Two paths depending on the template shape:
|
||||
/// - **Pure single reference** (the entire trimmed template is a single
|
||||
/// `{{key}}` expression, e.g. `"{{subjects}}"`, `"{{user.name}}"`,
|
||||
/// `"{{items[0]}}"`) — returns the typed `Value` at that key, preserving
|
||||
/// numbers, bools, arrays, and objects. Errors if the key is missing.
|
||||
/// - **Mixed template** (multiple refs, surrounding text, or no refs) —
|
||||
/// falls back to string interpolation via `interpolate()` and returns
|
||||
/// `Value::String(...)`. Strict on missing keys.
|
||||
///
|
||||
/// Required by:
|
||||
/// - `map.over: "{{subjects}}"` — must resolve to a JSON array, not its string form
|
||||
/// - `state_updates` writes that should preserve the source type (a `cost_usd: "{{api_cost}}"`
|
||||
/// write should land as a Number, not a String)
|
||||
#[allow(dead_code)]
|
||||
pub fn interpolate_raw(&self, template: &str) -> Result<Value> {
|
||||
let trimmed = template.trim();
|
||||
if let Some(key) = single_reference_key(trimmed) {
|
||||
@@ -338,9 +290,7 @@ fn split_indices(segment: &str) -> Option<(&str, Vec<usize>)> {
|
||||
}
|
||||
|
||||
// Returns the inner key when `template` is exactly a single `{{key}}` reference
|
||||
// (no surrounding text, no other braces). Mirrors the character set the
|
||||
// TEMPLATE_VAR_RE regex accepts so `interpolate_raw` and `interpolate` stay
|
||||
// consistent about what counts as a valid key.
|
||||
// (no surrounding text, no other braces).
|
||||
fn single_reference_key(template: &str) -> Option<&str> {
|
||||
let inner = template.strip_prefix("{{")?.strip_suffix("}}")?;
|
||||
if inner.contains("{{") || inner.contains("}}") {
|
||||
@@ -353,11 +303,10 @@ fn single_reference_key(template: &str) -> Option<&str> {
|
||||
valid.then_some(inner)
|
||||
}
|
||||
|
||||
// Returns the root state keys referenced by any `{{...}}` expressions in the
|
||||
// given template string. The "root key" is the identifier before the first
|
||||
// `.` or `[` — i.e. for `{{user.name}}` the root is `user`, for `{{items[0]}}`
|
||||
// the root is `items`. Used by the validator to compute the static read-set of
|
||||
// a node's templated fields without depending on a runtime `StateManager`.
|
||||
// Returns the root state keys referenced by any `{{...}}` expressions in the given template string. The "root key" is
|
||||
// the identifier before the first `.` or `[`; e.g., for `{{user.name}}` the root is `user`, for `{{items[0]}}` the
|
||||
// root is `items`. Used by the validator to compute the static read-set of a node's templated fields without
|
||||
// depending on a runtime `StateManager`.
|
||||
pub(super) fn template_root_keys(template: &str) -> Vec<String> {
|
||||
TEMPLATE_VAR_RE
|
||||
.captures_iter(template)
|
||||
@@ -985,9 +934,6 @@ mod tests {
|
||||
#[test]
|
||||
fn interpolate_raw_inner_spaces_treated_as_mixed() {
|
||||
let manager = manager_with(&[("k", json!("v"))]);
|
||||
// `{{ k }}` is not a valid pure reference (spaces inside braces are
|
||||
// outside the allowed character set). Fall back to string interpolation
|
||||
// -- which doesn't match the regex either, so the literal passes through.
|
||||
let result = manager.interpolate_raw("{{ k }}").unwrap();
|
||||
assert_eq!(result, json!("{{ k }}"));
|
||||
}
|
||||
|
||||
+2
-19
@@ -128,13 +128,8 @@ pub struct Node {
|
||||
pub next: Option<NextTargets>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl Node {
|
||||
/// Returns the single next target as a string slice for tests and other
|
||||
/// read-only inspection. Returns `None` when no `next:` is declared at all,
|
||||
/// OR when a real multi-target fan-out is declared (since a fan-out has no
|
||||
/// "single" target). Execution paths use `static_next_targets` in the graph
|
||||
/// executor instead.
|
||||
#[allow(dead_code)]
|
||||
pub fn next_target(&self) -> Option<&str> {
|
||||
match &self.next {
|
||||
None => None,
|
||||
@@ -153,7 +148,6 @@ pub enum NextTargets {
|
||||
}
|
||||
|
||||
impl NextTargets {
|
||||
/// View as a slice of node ids. `One(s)` returns a single-element slice.
|
||||
pub fn as_slice(&self) -> &[String] {
|
||||
match self {
|
||||
NextTargets::One(s) => slice::from_ref(s),
|
||||
@@ -161,7 +155,6 @@ impl NextTargets {
|
||||
}
|
||||
}
|
||||
|
||||
/// True if this declares more than one parallel target (i.e., a real fan-out).
|
||||
pub fn is_fan_out(&self) -> bool {
|
||||
matches!(self, NextTargets::Many(v) if v.len() > 1)
|
||||
}
|
||||
@@ -349,28 +342,18 @@ pub struct EndNode {
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct MapNode {
|
||||
/// Template expression that must resolve (via `interpolate_raw`, added in
|
||||
/// Phase B) to a JSON array. Each item in the array is one branch invocation.
|
||||
pub over: String,
|
||||
|
||||
/// The name to bind each item under, accessible as `{{<as_name>}}` inside
|
||||
/// the branch node's templates. YAML field is `as:`.
|
||||
#[serde(rename = "as")]
|
||||
pub as_name: String,
|
||||
|
||||
/// Node id to invoke once per item in the resolved list.
|
||||
pub branch: String,
|
||||
|
||||
/// State key that the branch node writes; the map collects this key's value
|
||||
/// across invocations. Defaults to "output".
|
||||
#[serde(default = "default_map_output_key")]
|
||||
pub output_key: String,
|
||||
|
||||
/// State key to receive the array of per-branch outputs, in input-list order.
|
||||
pub collect_into: String,
|
||||
|
||||
/// Optional cap on simultaneously-running sub-branches. Falls back to
|
||||
/// `settings.max_concurrency` when unset.
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub max_concurrency: Option<usize>,
|
||||
}
|
||||
@@ -707,7 +690,7 @@ on_other: edit_loop
|
||||
initial.insert("k".to_string(), json!("v"));
|
||||
let state = GraphState::new(initial);
|
||||
let serialized = state.to_json().unwrap();
|
||||
let parsed: serde_json::Value = serde_json::from_str(&serialized).unwrap();
|
||||
let parsed: Value = serde_json::from_str(&serialized).unwrap();
|
||||
assert_eq!(parsed.get("k"), Some(&json!("v")));
|
||||
assert!(state.size_bytes() > 0);
|
||||
}
|
||||
|
||||
+16
-46
@@ -321,17 +321,13 @@ impl GraphValidator {
|
||||
}
|
||||
}
|
||||
|
||||
// Phase C — Parallel-execution validation.
|
||||
// Parallel-execution validation.
|
||||
//
|
||||
// The v1 algorithm uses immediate-successor analysis only: a parallel
|
||||
// group is the set of `next:` targets of a single fan-out node. Map nodes
|
||||
// are checked separately by `validate_map_branches` (the branch is
|
||||
// self-parallel, but enforcement comes from strict-mode rules on the
|
||||
// branch node, not from group membership). Transitive parallel groups
|
||||
// (deeper fan-out chains) are a v2 enhancement; v1 over-reports rather
|
||||
// than under-reports — a false positive forces an unneeded reducer
|
||||
// (mild annoyance); a false negative allows silent data races (catastrophic).
|
||||
|
||||
// The v1 algorithm uses immediate-successor analysis only: a parallel group is the set of `next:` targets of a
|
||||
// single fan-out node. Map nodes are checked separately by `validate_map_branches` (the branch is self-parallel,
|
||||
// but enforcement comes from strict-mode rules on the branch node, not from group membership). Transitive parallel
|
||||
// groups (deeper fan-out chains) are a v2 enhancement; v1 over-reports rather than under-reports. A false positive
|
||||
// forces an unneeded reducer (mild annoyance); a false negative allows silent data races (catastrophic).
|
||||
fn validate_max_concurrency(&self, graph: &Graph, result: &mut ValidationResult) {
|
||||
if graph.settings.max_concurrency == 0 {
|
||||
result.error(ValidationError::new(
|
||||
@@ -613,15 +609,12 @@ fn declared_targets(node: &Node) -> Vec<(String, &'static str)> {
|
||||
out.push((t.clone(), "llm 'fallback'"));
|
||||
}
|
||||
}
|
||||
// `agent`/`input`/`rag` route only via `next` (already collected
|
||||
// above); `end` is terminal. No type-specific routing edges to add.
|
||||
NodeType::Agent(_) | NodeType::Input(_) | NodeType::Rag(_) | NodeType::End(_) => {}
|
||||
// A `map` node invokes its `branch:` target once per item from the
|
||||
// resolved `over` list. The branch is statically referenced, so it
|
||||
// is a real declared edge for cycle/reachability purposes.
|
||||
NodeType::Map(m) => {
|
||||
out.push((m.branch.clone(), "map 'branch'"));
|
||||
}
|
||||
// `agent`/`input`/`rag` route only via `next` (already collected
|
||||
// above); `end` is terminal. No type-specific routing edges to add.
|
||||
NodeType::Agent(_) | NodeType::Input(_) | NodeType::Rag(_) | NodeType::End(_) => {}
|
||||
}
|
||||
out
|
||||
}
|
||||
@@ -653,13 +646,11 @@ fn find_reachable_nodes(graph: &Graph) -> HashSet<String> {
|
||||
reachable
|
||||
}
|
||||
|
||||
// v1 parallel-group detection: only the immediate `next` targets of a fan-out
|
||||
// node count as a parallel group. Map branches are handled separately by
|
||||
// `validate_map_branches` (the branch's self-parallelism is checked via
|
||||
// strict-mode rules on the branch node itself, not via group membership).
|
||||
// v1 parallel-group detection: only the immediate `next` targets of a fan-out node count as a parallel group. Map
|
||||
// branches are handled separately by `validate_map_branches` (the branch's self-parallelism is checked via strict-mode
|
||||
// rules on the branch node itself, not via group membership).
|
||||
//
|
||||
// Returns one HashSet per fan-out source; deeper transitive parallelism is
|
||||
// intentionally out of scope for v1.
|
||||
// Returns one HashSet per fan-out source; deeper transitive parallelism is intentionally out of scope for v1.
|
||||
fn compute_parallel_groups(graph: &Graph) -> Vec<HashSet<String>> {
|
||||
let mut groups = Vec::new();
|
||||
for node in graph.nodes.values() {
|
||||
@@ -677,19 +668,18 @@ fn compute_parallel_groups(graph: &Graph) -> Vec<HashSet<String>> {
|
||||
// Sources considered:
|
||||
// - `state_updates` keys (every node type that has them)
|
||||
// - `output_schema` top-level `properties` for `llm` and `agent` (auto-merge)
|
||||
//
|
||||
// Returns `None` only for script nodes with no declared `state_updates` — their
|
||||
// emitted JSON is opaque to static analysis. The validator treats `None` as a
|
||||
// load-time error when the script appears in a parallel group (C.6).
|
||||
fn write_set_of(node: &Node) -> Option<HashSet<String>> {
|
||||
if matches!(node.node_type, NodeType::Script(_)) && node_state_updates_keys(node).is_none() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut writes = HashSet::new();
|
||||
if let Some(keys) = node_state_updates_keys(node) {
|
||||
writes.extend(keys);
|
||||
}
|
||||
|
||||
writes.extend(output_schema_top_level_keys(node));
|
||||
|
||||
Some(writes)
|
||||
}
|
||||
|
||||
@@ -698,26 +688,6 @@ fn write_set_of(node: &Node) -> Option<HashSet<String>> {
|
||||
// "Root key" follows the same definition as `template_root_keys`: for a
|
||||
// reference like `{{user.name}}` or `{{items[0]}}`, the root is the bare
|
||||
// identifier before the first `.` or `[`.
|
||||
//
|
||||
// Templated fields scanned per node type:
|
||||
// - llm: instructions, prompt, state_updates values
|
||||
// - agent: prompt, state_updates values
|
||||
// - rag: query (defaulting to "{{initial_prompt}}"), state_updates values
|
||||
// - approval: question, state_updates values
|
||||
// - input: question, default, state_updates values
|
||||
// - end: output, state_updates values
|
||||
// - map: over (its `{{...}}` IS the dynamic read of the list to fan out over)
|
||||
// - script: state_updates values only (the script body is opaque to static
|
||||
// analysis; its reads via GRAPH_STATE / GRAPH_STATE_FILE can't be
|
||||
// inferred at load time)
|
||||
//
|
||||
// Scoped variables produced by THIS node's own execution are excluded from
|
||||
// state_updates value scanning:
|
||||
// - llm/agent/rag → "output" (the node's body output)
|
||||
// - approval → "choice" (the user's selected option)
|
||||
// - input → "input" (the user's typed text)
|
||||
// These are bindings created inside the node, not reads from prior state, so
|
||||
// they cannot race with a sibling's writes.
|
||||
fn read_set_of(node: &Node) -> HashSet<String> {
|
||||
let mut reads: HashSet<String> = HashSet::new();
|
||||
let scoped: &[&str] = match &node.node_type {
|
||||
|
||||
Reference in New Issue
Block a user