fmt: cleaned up graph implementation

This commit is contained in:
2026-05-21 11:27:29 -06:00
parent 81c037515e
commit 597f823bdf
20 changed files with 95 additions and 243 deletions
+21 -20
View File
@@ -1,8 +1,8 @@
# Graph-based agent definition (full-featured reference)
# Location: <loki-config-dir>/agents/<agent-name>/graph.yaml
#
# A graph agent is defined by THIS FILE ALONE. An agent directory contains
# EITHER a config.yaml (a normal LLM-loop agent) or a graph.yaml (a graph
# A graph agent is defined by this file alone. An agent directory contains
# either a config.yaml (a normal LLM-loop agent) or a graph.yaml (a graph
# agent), never both. The presence of graph.yaml is what makes the agent
# a graph agent.
#
@@ -23,7 +23,7 @@ description: | # Free-form prose describing the workflow
A reference workflow: triage a research request, retrieve local
context, branch on a script decision, run either a sub-agent or an
LLM research step, then gate the result behind human approval.
version: "1.0" # Graph SCHEMA version. Only "1.0" is accepted.
version: "1.0" # Graph schema version. Only "1.0" is accepted.
# ---------------------------------------------------------------------------
# Agent-level config (all optional)
@@ -63,7 +63,7 @@ settings:
# Reducers (optional, required whenever two parallel branches write the same
# state key in the same super-step; otherwise the validator errors at load).
#
# A reducer says HOW two values for the same key get merged. Built-ins:
# A reducer says how two values for the same key get merged. Built-ins:
# append list += [value] (single value appended to a list)
# extend list += value (a list) (list-of-lists flattened by one level)
# concat "a\nb" (string join with newline separator)
@@ -135,10 +135,11 @@ nodes:
required: [topic, needs_deep_dive]
state_updates: # {{output}} = this node's result (here, the parsed object)
triage_result: "{{output}}"
# --- POLYMORPHIC `next` -----------------------------------------------
# --- Polymorphic `next` -----------------------------------------------
# A single string runs the next node sequentially (e.g. `next: retrieve`).
# A list runs ALL listed nodes IN PARALLEL as one BSP super-step. Their
# writes are merged via `reducers:` at the join. Branches converge
# A list runs all listed nodes in parallel as one BSP super-step
# (for more info on BSP, see https://en.wikipedia.org/wiki/Bulk_synchronous_parallel).
# Their writes are merged via `reducers:` at the join. Branches converge
# implicitly when they all route to the same downstream node (here,
# `synthesize`). See the diamond:
#
@@ -146,12 +147,12 @@ nodes:
# / \
# retrieve web_search (run concurrently)
# \ /
# synthesize (join fires once after both finish)
# synthesize (join; fires once after both finish)
next: [retrieve, web_search]
# --- rag node (parallel branch 1 of the diamond) ------------------------
# Hybrid (vector + keyword) retrieval against a per-node knowledge base.
# The knowledge base is built ONCE, at agent load time, into
# The knowledge base is built once, at agent load time, into
# <agent-dir>/retrieve.yaml (named after this node's id).
retrieve:
id: retrieve
@@ -162,7 +163,7 @@ nodes:
query: "{{topic}}" # Retrieval query (templated). Default: {{initial_prompt}}.
top_k: 5 # Chunks to retrieve. Default = the KB's own top_k.
timeout: 120 # Retrieval timeout in seconds. Default 120.
# Knowledge-base BUILD config (optional; used only when the KB is first
# Knowledge-base build config (optional; used only when the KB is first
# built). When embedding_model + chunk_size + chunk_overlap are all set,
# the KB builds with no interactive prompts (works in non-interactive runs).
embedding_model: openai:text-embedding-3-small
@@ -171,8 +172,8 @@ nodes:
reranker_model: null # Optional reranker for hybrid-search results
batch_size: 100 # Optional embedding-request batch size
state_updates: # {{output}} = { context: <str>, sources: [<path>, ...] }
context: "{{output.context}}" # writes `context` `reducers.context = concat`
sources: "{{output.sources}}" # writes `sources` `reducers.sources = append`
context: "{{output.context}}" # writes `context` -> `reducers.context = concat`
sources: "{{output.sources}}" # writes `sources` -> `reducers.sources = append`
next: synthesize # Joins with web_search at `synthesize`.
# --- llm node (parallel branch 2 of the diamond) ------------------------
@@ -199,7 +200,7 @@ nodes:
# `context` and `sources` are produced without needing `state_updates`.
next: synthesize # Joins with retrieve at `synthesize`.
# --- script node (the diamond's JOIN; also dispatches) -----------------
# --- script node (the diamond's join; also dispatches) -----------------
# Runs a .sh / .py / .ts script. The script receives state via the
# GRAPH_STATE env var (inline JSON) or GRAPH_STATE_FILE (path to a JSON
# file, used when state exceeds 32 KiB). Exactly one is set. It must print
@@ -223,12 +224,12 @@ nodes:
# This script is expected to emit `_next: deep_dive` (or `_next: subjects_map`
# to demonstrate the map node below), or no `_next` (then `next` is used).
# Targets reached only via the script's dynamic `_next` get an
# "unreachable" warning from the validator expected for `_next`-routed
# "unreachable" warning from the validator. This is expected for `_next`-routed
# targets.
# --- agent node ---------------------------------------------------------
# Spawns a full Loki sub-agent and waits for it. The child uses its own
# tool stack. Agent nodes have NO `tools:` field. No schema hint is
# tool stack. Agent nodes have no `tools:` field. No schema hint is
# injected even when `output_schema` is set (unlike llm nodes).
deep_dive:
id: deep_dive
@@ -250,7 +251,7 @@ nodes:
research: "{{output}}"
next: review # Required for agent nodes
# --- map node (Dynamic fan-out LangGraph's `Send` API) ----------------
# --- map node (Dynamic fan-out. Think: LangGraph's `Send` API) ----------------
# Spawns one parallel sub-branch per item in `over`. Each sub-branch runs
# the node referenced by `branch:` with the item bound to `as:`. Outputs
# collect into the array named by `collect_into:`, preserving input order.
@@ -262,8 +263,8 @@ nodes:
id: subjects_map
type: map
over: "{{subjects}}" # Required. List expression resolved from state.
# Empty list is allowed no branches spawn,
# `collect_into` is written as [].
# Empty list is allowed. It means no branches spawn,
# and thus `collect_into` is written as [].
as: subject # Required. Per-branch state key holding the
# current item. Read with {{subject}} inside
# the branch node's prompt.
@@ -275,7 +276,7 @@ nodes:
# map's `collect_into` channel
# - no `output_schema:` (top-level merge
# would clash with collect_into)
# Validator (C.5) enforces all three.
# Validator enforces all three.
collect_into: subject_findings # Required. State key for the array of
# per-branch outputs, in input order
# (not spawn-finish order).
@@ -297,7 +298,7 @@ nodes:
prompt: "Research {{subject}}: pull the key facts and one citation."
tools:
- web_search_loki
# No `next:`, `state_updates:`, or `output_schema:` here — map branches
# No `next:`, `state_updates:`, or `output_schema:` here. Map branches
# have a strict contract (see `subjects_map.branch` comment).
# Aggregator that runs after the map joins. Reads the collected list.
+3
View File
@@ -406,18 +406,21 @@ mod tests {
#[test]
fn parse_update_flag_no_value() {
let cli = parse(&["--update"]);
assert_eq!(cli.update, Some(None));
}
#[test]
fn parse_update_flag_with_version() {
let cli = parse(&["--update", "v0.4.0"]);
assert_eq!(cli.update, Some(Some("v0.4.0".to_string())));
}
#[test]
fn parse_update_with_force() {
let cli = parse(&["--update", "--force"]);
assert_eq!(cli.update, Some(None));
assert!(cli.force);
}
+7 -5
View File
@@ -94,21 +94,21 @@ impl MessageContent {
match self {
MessageContent::Text(text) => multiline_text(text),
MessageContent::Array(list) => {
let (mut concated_text, mut files) = (String::new(), vec![]);
let (mut concatenated_text, mut files) = (String::new(), vec![]);
for item in list {
match item {
MessageContentPart::Text { text } => {
concated_text = format!("{concated_text} {text}")
concatenated_text = format!("{concatenated_text} {text}")
}
MessageContentPart::ImageUrl { image_url } => {
files.push(resolve_url_fn(&image_url.url))
}
}
}
if !concated_text.is_empty() {
concated_text = format!(" -- {}", multiline_text(&concated_text))
if !concatenated_text.is_empty() {
concatenated_text = format!(" -- {}", multiline_text(&concatenated_text))
}
format!(".file {}{}", files.join(" "), concated_text)
format!(".file {}{}", files.join(" "), concatenated_text)
}
MessageContent::ToolCalls(MessageContentToolCalls {
tool_results, text, ..
@@ -230,9 +230,11 @@ pub fn extract_system_message(messages: &mut Vec<Message>) -> Option<String> {
if messages.is_empty() {
return None;
}
if messages[0].role.is_system() {
let system_message = messages.remove(0);
return Some(system_message.content.to_text());
}
None
}
+2 -6
View File
@@ -33,11 +33,6 @@ impl SseHandler {
}
}
/// Suppresses stdout streaming of incoming tokens. Tokens are still buffered
/// internally (so the caller's `.take()` still returns the full response) —
/// only the per-token send to the SSE renderer is skipped. Used by parallel
/// graph super-step branches so concurrent LLM calls don't interleave on
/// stdout.
pub fn set_silent(&mut self, silent: bool) {
self.silent = silent;
}
@@ -47,10 +42,11 @@ impl SseHandler {
return Ok(());
}
self.buffer.push_str(text);
if self.silent {
return Ok(());
}
let ret = self
.sender
.send(SseEvent::Text(text.to_string()))
+4
View File
@@ -802,6 +802,7 @@ fn resolve_document_paths(
} else {
PathBuf::from(&resolved_path)
};
document_paths.push(format!("{}:{}", protocol, new_path.display()));
} else if Path::new(&resolve_home_dir(path)).is_relative() {
let new_path = safe_join_path(agent_data_dir, path)
@@ -829,6 +830,7 @@ async fn init_graph_rags(
if info_flag {
return Ok(rags);
}
for (node_id, node) in &graph.nodes {
let NodeType::Rag(rag_node) = &node.node_type else {
continue;
@@ -1059,6 +1061,7 @@ variables:
output: done
"#};
let graph: Graph = serde_yaml::from_str(&yaml).unwrap();
let config = AgentConfig::from_graph("my-agent-dir", &graph);
assert_eq!(config.name, "my-agent-dir");
@@ -1101,6 +1104,7 @@ variables:
fn from_graph_keeps_defaults_for_llm_loop_fields() {
let yaml = "name: g\nstart: x\nnodes:\n x:\n id: x\n type: end\n output: ok\n";
let graph: Graph = serde_yaml::from_str(yaml).unwrap();
let config = AgentConfig::from_graph("d", &graph);
assert!(!config.auto_continue);
+2
View File
@@ -273,6 +273,7 @@ pub fn install_assets(category: AssetCategory) -> Result<()> {
}
println!("Reinstalled bundled {label} ({})", target.display());
Ok(())
}
@@ -297,6 +298,7 @@ fn confirm_asset_overwrite(category: AssetCategory, label: &str, target: &Path)
};
let prompt = format!("{} {body}\nContinue? [y/N] ", warning_text("WARNING:"));
let answer = read_single_key(&['y', 'Y', 'n', 'N'], 'n', &prompt)?;
Ok(matches!(answer, 'y' | 'Y'))
}
+12 -21
View File
@@ -33,11 +33,11 @@ use indoc::formatdoc;
use inquire::{Confirm, MultiSelect, Text, list_option::ListOption, validator::Validation};
use parking_lot::RwLock;
use std::collections::{HashMap, HashSet};
use std::env;
use std::fs::{File, OpenOptions, read_dir, read_to_string, remove_dir_all, remove_file};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::{env, fs};
pub struct AutoContinueConfig {
pub enabled: bool,
@@ -46,10 +46,6 @@ pub struct AutoContinueConfig {
pub continuation_prompt: Option<String>,
}
/// Controls how LLM token streams are presented to the user. `Silent` is set
/// on branch contexts during parallel graph super-steps so concurrent LLM
/// calls don't interleave token-by-token on stdout — the full response still
/// lands in graph state via the normal output_schema / state_updates pathway.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum RenderMode {
#[default]
@@ -166,27 +162,18 @@ impl RequestContext {
/// Forks the context for one parallel branch of a graph super-step.
///
/// Each branch gets a fresh, owned clone — mutations (role swap,
/// Each branch gets a fresh, owned clone. Mutations (role swap,
/// `before/after_chat_completion`, tool tracker, last_message, etc.) are
/// scoped to the branch and discarded when the branch finishes. The
/// user-visible state communication happens through the graph's
/// `StateManager` (via `fork_for_branch_state` + `diff_against` +
/// `apply_branch_writes` reducers), NOT through `RequestContext`.
/// `apply_branch_writes` reducers), and not through `RequestContext`.
///
/// Distinction from `new_for_child`: `new_for_child` builds a fresh context
/// for a SPAWNED SUB-AGENT (different agent identity, different supervisor
/// for a spawned sub-agent (different agent identity, different supervisor
/// hierarchy, depth+1, fresh tool tracker). `fork_for_branch` keeps the
/// caller's identity and supervisor hierarchy it's a sibling clone of the
/// SAME logical agent, running one of N parallel work items.
///
/// Behavior of per-field cloning:
/// - `Arc`-wrapped fields (`app`, `rag`, `supervisor`, `parent_supervisor`,
/// `inbox`, `escalation_queue`) — shared via Arc::clone
/// - Owned heap fields (`model`, `role`, `session`, `agent`, `tool_scope`,
/// `todo_list`, etc.) — deep `.clone()` so the branch can mutate freely
/// - `auto_continue_count` reset to 0 (each branch starts a fresh
/// continuation budget)
/// - `last_continuation_response` reset to None
/// caller's identity and supervisor hierarchy; it's a sibling clone of the
/// same logical agent, running one of N parallel work items.
pub fn fork_for_branch(&self) -> Self {
Self {
app: Arc::clone(&self.app),
@@ -1419,6 +1406,7 @@ impl RequestContext {
env!("CARGO_CRATE_NAME"),
mcp_path.display(),
);
Ok(())
}
@@ -1503,20 +1491,23 @@ impl RequestContext {
} else {
config_path
};
ensure_parent_exists(&target_path)?;
if !target_path.exists() {
std::fs::write(
fs::write(
&target_path,
"# see https://github.com/Dark-Alex-17/loki/blob/main/config.agent.example.yaml\n",
)
.with_context(|| format!("Failed to write to '{}'", target_path.display()))?;
}
let editor = app.editor()?;
edit_file(&editor, &target_path)?;
println!(
"NOTE: Remember to reload the agent if there are changes made to '{}'",
target_path.display()
);
Ok(())
}
@@ -2026,7 +2017,7 @@ impl RequestContext {
.collect();
} else if cmd == ".agent" {
if args.len() == 2 {
let dir = paths::agent_data_dir(args[0]).join(super::SESSIONS_DIR_NAME);
let dir = paths::agent_data_dir(args[0]).join(SESSIONS_DIR_NAME);
values = list_file_names(dir, ".yaml")
.into_iter()
.map(|v| (v, None))
+4 -12
View File
@@ -76,9 +76,6 @@ impl GraphExecutor {
let max_iterations = graph.settings.max_loop_iterations;
let graph_timeout = graph.settings.timeout.map(Duration::from_secs);
let max_concurrency = graph.settings.max_concurrency;
// Wrap in Arc so spawned branch tasks can cheaply share the Graph for
// node lookup (especially the map executor, which needs to resolve its
// `branch:` target from inside a spawned task).
let graph = Arc::new(graph);
let start = Instant::now();
@@ -297,10 +294,6 @@ fn sorted_frontier(frontier: &HashSet<String>) -> Vec<String> {
v
}
// Bundles the engine-config refs that every `step()` call needs to thread
// through. Constructed once per spawned branch task (or once at the call site
// for sequential paths) so step() and downstream executors (MapNodeExecutor)
// take one parameter instead of five.
pub(super) struct StepContext<'a> {
pub graph: &'a Graph,
pub script_executor: &'a ScriptExecutor,
@@ -391,8 +384,6 @@ async fn step(
}
}
// Returns all `next:` targets from the node (handles both `One` and `Many`),
// erroring if no `next` is set.
fn static_next_targets(node: &Node, current: &str, kind: &str) -> Result<Vec<String>> {
node.next
.as_ref()
@@ -400,9 +391,6 @@ fn static_next_targets(node: &Node, current: &str, kind: &str) -> Result<Vec<Str
.ok_or_else(|| anyhow!("{kind} node '{current}' has no `next` and is not an end node"))
}
// Returns the first declared `next:` target as a borrowed `&str`, or `None` if
// no `next` is set. Used by node executors that take `Option<&str>` for their
// primary routing argument (LLM, RAG, Input).
fn first_next_target(node: &Node) -> Option<&str> {
node.next
.as_ref()
@@ -447,6 +435,7 @@ mod tests {
#[test]
fn resolve_end_output_interpolates_template_against_state() {
let mut state = state_with(&[("name", json!("alice"))]);
let node = end_node("done: {{name}}", None);
assert_eq!(resolve_end_output(&node, &mut state), "done: alice");
@@ -457,6 +446,7 @@ mod tests {
let mut updates = HashMap::new();
updates.insert("summary".into(), "completed for {{user}}".into());
let node = end_node("RESULT: {{summary}}", Some(updates));
let mut state = state_with(&[("user", json!("bob"))]);
assert_eq!(
@@ -472,6 +462,7 @@ mod tests {
#[test]
fn resolve_end_output_with_empty_template_returns_empty_string() {
let mut state = state_with(&[]);
let node = end_node("", None);
assert_eq!(resolve_end_output(&node, &mut state), "");
@@ -480,6 +471,7 @@ mod tests {
#[test]
fn resolve_end_output_lenient_on_missing_keys() {
let mut state = state_with(&[]);
let node = end_node("hello {{unknown}}!", None);
assert_eq!(resolve_end_output(&node, &mut state), "hello !");
+2 -10
View File
@@ -13,12 +13,6 @@ use tokio::time::timeout;
const OUTPUT_KEY: &str = "output";
/// What happened during an LLM node's execution, from the caller's routing
/// perspective. `Continue` means the caller should advance via the node's
/// declared `next:` targets (whether the LLM actually succeeded or failed
/// without a fallback — either way, the executor uses node.next). `FellBack`
/// means the LLM failed after retries and the node had a `fallback:` declared,
/// so routing should go to that fallback target only.
#[derive(Debug, PartialEq, Eq)]
pub(super) enum LlmExecutionOutcome {
Continue,
@@ -235,6 +229,7 @@ fn categorize_tools(entries: Option<&[String]>) -> (Vec<String>, Vec<String>) {
let Some(entries) = entries else {
return (regular, mcp);
};
for e in entries {
if let Some(server) = e.strip_prefix("mcp:") {
mcp.push(server.to_string());
@@ -242,6 +237,7 @@ fn categorize_tools(entries: Option<&[String]>) -> (Vec<String>, Vec<String>) {
regular.push(e.clone());
}
}
(regular, mcp)
}
@@ -465,10 +461,6 @@ mod tests {
#[test]
fn outcome_from_failure_without_fallback_is_continue() {
// Failed but no fallback: caller routes via node.next as if successful.
// The error has already been recorded to state via the OUTPUT_KEY by
// execute(); the caller's `static_next_targets` will error if node.next
// is also missing.
assert_eq!(outcome_from(true, None), LlmExecutionOutcome::Continue);
}
+1 -2
View File
@@ -125,8 +125,7 @@ impl MapNodeExecutor {
let joined = join_all(sub_tasks).await;
drop(progress_tracker);
// Collect outputs keyed by input index so order is preserved regardless
// of finish order. This is the user-facing contract from plan E.2.
// Collect outputs keyed by input index so order is preserved regardless of finish order.
let mut outputs: HashMap<usize, Value> = HashMap::new();
for join_result in joined {
let (idx, sub_state, exec_result) =
-8
View File
@@ -9,14 +9,6 @@ static SPINNER_STYLE: LazyLock<ProgressStyle> = LazyLock::new(|| {
.tick_strings(&["", "", "", "", "", "", "", "", "", "", ""])
});
// Manages a set of per-branch spinners drawn side-by-side via indicatif's
// `MultiProgress`. Created at the start of a multi-branch graph super-step
// (or map sub-branch fan-out) and torn down at the join.
//
// When stdout isn't a terminal (CI, piped output), the tracker becomes a
// no-op — `add_branch` returns a disabled handle whose methods do nothing.
// This keeps machine-piped graph runs free of spinner garbage in their
// captured output.
pub(super) struct BranchProgressTracker {
multi: Option<MultiProgress>,
}
+7 -15
View File
@@ -3,17 +3,6 @@ use crate::graph::type_name;
use anyhow::{Result, bail};
use serde_json::{Number, Value};
/// Combines a branch's incoming write with the current state value (if any)
/// via the specified reducer. The result is what gets written back to live
/// state during the super-step merge phase.
///
/// `current = None` means the key has no prior value in this super-step or in
/// live state. Most reducers treat absent as their identity (empty array,
/// empty string, no prior value). `Overwrite` ignores `current` entirely.
///
/// Errors clearly when types are incompatible with the reducer (e.g.
/// `Sum` on a string), naming the reducer and which side (`current` / `incoming`)
/// has the wrong type.
pub fn apply(reducer: Reducer, current: Option<&Value>, incoming: Value) -> Result<Value> {
match reducer {
Reducer::Append => apply_append(current, incoming),
@@ -37,6 +26,7 @@ fn apply_append(current: Option<&Value>, incoming: Value) -> Result<Value> {
),
};
arr.push(incoming);
Ok(Value::Array(arr))
}
@@ -56,6 +46,7 @@ fn apply_extend(current: Option<&Value>, incoming: Value) -> Result<Value> {
type_name(&other)
),
}
Ok(Value::Array(arr))
}
@@ -81,6 +72,7 @@ fn apply_concat(current: Option<&Value>, incoming: Value) -> Result<Value> {
type_name(other)
),
};
Ok(Value::String(result))
}
@@ -90,6 +82,7 @@ fn apply_sum(current: Option<&Value>, incoming: Value) -> Result<Value> {
None => 0.0,
Some(value) => number_or_error(value, "sum", "current")?,
};
Ok(json_number(c + i))
}
@@ -135,6 +128,7 @@ fn apply_merge(current: Option<&Value>, incoming: Value) -> Result<Value> {
type_name(&other)
),
}
Ok(Value::Object(map))
}
@@ -148,10 +142,8 @@ fn number_or_error(value: &Value, reducer_name: &str, position: &str) -> Result<
}
}
// Numeric reducers compute in f64 for simplicity. We preserve integer typing
// when the result is losslessly representable as i64 so `count: sum` stays an
// integer rather than degrading to a float. Non-finite values (NaN, Inf) can't
// arise from finite inputs to +/max/min, so the fallback never fires in practice.
// Numeric reducers compute in f64 for simplicity. Integer typing is preserved when the result is losslessly
// representable as i64.
fn json_number(n: f64) -> Value {
if n.fract() == 0.0 && n.is_finite() && n.abs() <= (i64::MAX as f64) {
Value::Number(Number::from(n as i64))
+1
View File
@@ -102,6 +102,7 @@ fn apply_state_updates(node: &ScriptNode, state_manager: &mut StateManager) {
let Some(updates) = &node.state_updates else {
return;
};
for (key, template) in updates {
let value = state_manager.interpolate_lenient(template);
state_manager
-6
View File
@@ -1,12 +1,6 @@
use serde_json::Value;
use std::collections::HashMap;
/// Published form of one branch's writes for the super-step merge phase.
/// Callers assemble these into a deterministically-ordered `Vec` keyed by
/// `(node_id, invocation_index)` before passing to
/// `StateManager::apply_branch_writes`. `invocation_index` is 0 for normal
/// branches and the input-list position for map sub-branches — so multiple
/// invocations of the same `branch:` node by a `map` are still totally ordered.
#[derive(Debug, Clone)]
pub struct BranchWrites {
pub node_id: String,
+5 -59
View File
@@ -159,13 +159,6 @@ impl StateManager {
}
}
/// Forks state for a parallel branch: returns a fully-owned `StateManager`
/// seeded from the current state's data. The branch mutates its fork
/// freely; callers extract its writes via `diff_against` after the branch
/// completes, then merge them via `apply_branch_writes`.
///
/// Distinct from `read_snapshot` (returns a shared `Arc<GraphState>` for
/// reads) — `fork_for_branch_state` returns a writable owned clone.
pub fn fork_for_branch_state(&self) -> Self {
Self {
state: self.state.clone(),
@@ -173,11 +166,6 @@ impl StateManager {
}
}
/// Returns the keys whose values differ from `snapshot`. Use this after a
/// branch finishes to extract its writes (input to `apply_branch_writes`).
/// Keys present in `self` but absent from `snapshot`, or with different
/// values, count as writes. Deletions are not represented (no current node
/// executor deletes state).
pub fn diff_against(&self, snapshot: &GraphState) -> HashMap<String, Value> {
let mut diff = HashMap::new();
for (k, v) in self.state.data() {
@@ -188,30 +176,10 @@ impl StateManager {
diff
}
/// Returns an `Arc`-wrapped snapshot of the current graph state. Each
/// branch in a parallel super-step uses this snapshot as the baseline for
/// its `diff_against` call at branch end. The executor extracts each
/// branch's writes (the diff) and merges them via `apply_branch_writes` at
/// the super-step boundary.
///
/// Distinct from the older `snapshot()` method (returns a `HashMap` clone
/// of the data only — used by `script_executor` to ship state to child
/// processes).
#[allow(dead_code)]
pub fn read_snapshot(&self) -> Arc<GraphState> {
Arc::new(self.state.clone())
}
/// Commits a deterministically-ordered set of per-branch writes back into
/// live state, applying declared reducers where they exist.
///
/// Caller must pre-sort `writes` by `(node_id, invocation_index)` so that
/// non-commutative reducers (`Concat`, `Merge`) produce reproducible output.
///
/// Errors when a key has writers from ≥2 branches but no reducer declared.
/// The validator (Phase C) catches this at load time; this runtime check is
/// defense-in-depth against a malformed or out-of-date validator missing it.
#[allow(dead_code)]
pub fn apply_branch_writes(
&mut self,
writes: Vec<BranchWrites>,
@@ -252,22 +220,6 @@ impl StateManager {
Ok(())
}
/// Interpolates a template and returns a typed JSON `Value`.
///
/// Two paths depending on the template shape:
/// - **Pure single reference** (the entire trimmed template is a single
/// `{{key}}` expression, e.g. `"{{subjects}}"`, `"{{user.name}}"`,
/// `"{{items[0]}}"`) — returns the typed `Value` at that key, preserving
/// numbers, bools, arrays, and objects. Errors if the key is missing.
/// - **Mixed template** (multiple refs, surrounding text, or no refs) —
/// falls back to string interpolation via `interpolate()` and returns
/// `Value::String(...)`. Strict on missing keys.
///
/// Required by:
/// - `map.over: "{{subjects}}"` — must resolve to a JSON array, not its string form
/// - `state_updates` writes that should preserve the source type (a `cost_usd: "{{api_cost}}"`
/// write should land as a Number, not a String)
#[allow(dead_code)]
pub fn interpolate_raw(&self, template: &str) -> Result<Value> {
let trimmed = template.trim();
if let Some(key) = single_reference_key(trimmed) {
@@ -338,9 +290,7 @@ fn split_indices(segment: &str) -> Option<(&str, Vec<usize>)> {
}
// Returns the inner key when `template` is exactly a single `{{key}}` reference
// (no surrounding text, no other braces). Mirrors the character set the
// TEMPLATE_VAR_RE regex accepts so `interpolate_raw` and `interpolate` stay
// consistent about what counts as a valid key.
// (no surrounding text, no other braces).
fn single_reference_key(template: &str) -> Option<&str> {
let inner = template.strip_prefix("{{")?.strip_suffix("}}")?;
if inner.contains("{{") || inner.contains("}}") {
@@ -353,11 +303,10 @@ fn single_reference_key(template: &str) -> Option<&str> {
valid.then_some(inner)
}
// Returns the root state keys referenced by any `{{...}}` expressions in the
// given template string. The "root key" is the identifier before the first
// `.` or `[` — i.e. for `{{user.name}}` the root is `user`, for `{{items[0]}}`
// the root is `items`. Used by the validator to compute the static read-set of
// a node's templated fields without depending on a runtime `StateManager`.
// Returns the root state keys referenced by any `{{...}}` expressions in the given template string. The "root key" is
// the identifier before the first `.` or `[`; e.g., for `{{user.name}}` the root is `user`, for `{{items[0]}}` the
// root is `items`. Used by the validator to compute the static read-set of a node's templated fields without
// depending on a runtime `StateManager`.
pub(super) fn template_root_keys(template: &str) -> Vec<String> {
TEMPLATE_VAR_RE
.captures_iter(template)
@@ -985,9 +934,6 @@ mod tests {
#[test]
fn interpolate_raw_inner_spaces_treated_as_mixed() {
let manager = manager_with(&[("k", json!("v"))]);
// `{{ k }}` is not a valid pure reference (spaces inside braces are
// outside the allowed character set). Fall back to string interpolation
// -- which doesn't match the regex either, so the literal passes through.
let result = manager.interpolate_raw("{{ k }}").unwrap();
assert_eq!(result, json!("{{ k }}"));
}
+2 -19
View File
@@ -128,13 +128,8 @@ pub struct Node {
pub next: Option<NextTargets>,
}
#[cfg(test)]
impl Node {
/// Returns the single next target as a string slice for tests and other
/// read-only inspection. Returns `None` when no `next:` is declared at all,
/// OR when a real multi-target fan-out is declared (since a fan-out has no
/// "single" target). Execution paths use `static_next_targets` in the graph
/// executor instead.
#[allow(dead_code)]
pub fn next_target(&self) -> Option<&str> {
match &self.next {
None => None,
@@ -153,7 +148,6 @@ pub enum NextTargets {
}
impl NextTargets {
/// View as a slice of node ids. `One(s)` returns a single-element slice.
pub fn as_slice(&self) -> &[String] {
match self {
NextTargets::One(s) => slice::from_ref(s),
@@ -161,7 +155,6 @@ impl NextTargets {
}
}
/// True if this declares more than one parallel target (i.e., a real fan-out).
pub fn is_fan_out(&self) -> bool {
matches!(self, NextTargets::Many(v) if v.len() > 1)
}
@@ -349,28 +342,18 @@ pub struct EndNode {
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct MapNode {
/// Template expression that must resolve (via `interpolate_raw`, added in
/// Phase B) to a JSON array. Each item in the array is one branch invocation.
pub over: String,
/// The name to bind each item under, accessible as `{{<as_name>}}` inside
/// the branch node's templates. YAML field is `as:`.
#[serde(rename = "as")]
pub as_name: String,
/// Node id to invoke once per item in the resolved list.
pub branch: String,
/// State key that the branch node writes; the map collects this key's value
/// across invocations. Defaults to "output".
#[serde(default = "default_map_output_key")]
pub output_key: String,
/// State key to receive the array of per-branch outputs, in input-list order.
pub collect_into: String,
/// Optional cap on simultaneously-running sub-branches. Falls back to
/// `settings.max_concurrency` when unset.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_concurrency: Option<usize>,
}
@@ -707,7 +690,7 @@ on_other: edit_loop
initial.insert("k".to_string(), json!("v"));
let state = GraphState::new(initial);
let serialized = state.to_json().unwrap();
let parsed: serde_json::Value = serde_json::from_str(&serialized).unwrap();
let parsed: Value = serde_json::from_str(&serialized).unwrap();
assert_eq!(parsed.get("k"), Some(&json!("v")));
assert!(state.size_bytes() > 0);
}
+16 -46
View File
@@ -321,17 +321,13 @@ impl GraphValidator {
}
}
// Phase C — Parallel-execution validation.
// Parallel-execution validation.
//
// The v1 algorithm uses immediate-successor analysis only: a parallel
// group is the set of `next:` targets of a single fan-out node. Map nodes
// are checked separately by `validate_map_branches` (the branch is
// self-parallel, but enforcement comes from strict-mode rules on the
// branch node, not from group membership). Transitive parallel groups
// (deeper fan-out chains) are a v2 enhancement; v1 over-reports rather
// than under-reports — a false positive forces an unneeded reducer
// (mild annoyance); a false negative allows silent data races (catastrophic).
// The v1 algorithm uses immediate-successor analysis only: a parallel group is the set of `next:` targets of a
// single fan-out node. Map nodes are checked separately by `validate_map_branches` (the branch is self-parallel,
// but enforcement comes from strict-mode rules on the branch node, not from group membership). Transitive parallel
// groups (deeper fan-out chains) are a v2 enhancement; v1 over-reports rather than under-reports. A false positive
// forces an unneeded reducer (mild annoyance); a false negative allows silent data races (catastrophic).
fn validate_max_concurrency(&self, graph: &Graph, result: &mut ValidationResult) {
if graph.settings.max_concurrency == 0 {
result.error(ValidationError::new(
@@ -613,15 +609,12 @@ fn declared_targets(node: &Node) -> Vec<(String, &'static str)> {
out.push((t.clone(), "llm 'fallback'"));
}
}
// `agent`/`input`/`rag` route only via `next` (already collected
// above); `end` is terminal. No type-specific routing edges to add.
NodeType::Agent(_) | NodeType::Input(_) | NodeType::Rag(_) | NodeType::End(_) => {}
// A `map` node invokes its `branch:` target once per item from the
// resolved `over` list. The branch is statically referenced, so it
// is a real declared edge for cycle/reachability purposes.
NodeType::Map(m) => {
out.push((m.branch.clone(), "map 'branch'"));
}
// `agent`/`input`/`rag` route only via `next` (already collected
// above); `end` is terminal. No type-specific routing edges to add.
NodeType::Agent(_) | NodeType::Input(_) | NodeType::Rag(_) | NodeType::End(_) => {}
}
out
}
@@ -653,13 +646,11 @@ fn find_reachable_nodes(graph: &Graph) -> HashSet<String> {
reachable
}
// v1 parallel-group detection: only the immediate `next` targets of a fan-out
// node count as a parallel group. Map branches are handled separately by
// `validate_map_branches` (the branch's self-parallelism is checked via
// strict-mode rules on the branch node itself, not via group membership).
// v1 parallel-group detection: only the immediate `next` targets of a fan-out node count as a parallel group. Map
// branches are handled separately by `validate_map_branches` (the branch's self-parallelism is checked via strict-mode
// rules on the branch node itself, not via group membership).
//
// Returns one HashSet per fan-out source; deeper transitive parallelism is
// intentionally out of scope for v1.
// Returns one HashSet per fan-out source; deeper transitive parallelism is intentionally out of scope for v1.
fn compute_parallel_groups(graph: &Graph) -> Vec<HashSet<String>> {
let mut groups = Vec::new();
for node in graph.nodes.values() {
@@ -677,19 +668,18 @@ fn compute_parallel_groups(graph: &Graph) -> Vec<HashSet<String>> {
// Sources considered:
// - `state_updates` keys (every node type that has them)
// - `output_schema` top-level `properties` for `llm` and `agent` (auto-merge)
//
// Returns `None` only for script nodes with no declared `state_updates` — their
// emitted JSON is opaque to static analysis. The validator treats `None` as a
// load-time error when the script appears in a parallel group (C.6).
fn write_set_of(node: &Node) -> Option<HashSet<String>> {
if matches!(node.node_type, NodeType::Script(_)) && node_state_updates_keys(node).is_none() {
return None;
}
let mut writes = HashSet::new();
if let Some(keys) = node_state_updates_keys(node) {
writes.extend(keys);
}
writes.extend(output_schema_top_level_keys(node));
Some(writes)
}
@@ -698,26 +688,6 @@ fn write_set_of(node: &Node) -> Option<HashSet<String>> {
// "Root key" follows the same definition as `template_root_keys`: for a
// reference like `{{user.name}}` or `{{items[0]}}`, the root is the bare
// identifier before the first `.` or `[`.
//
// Templated fields scanned per node type:
// - llm: instructions, prompt, state_updates values
// - agent: prompt, state_updates values
// - rag: query (defaulting to "{{initial_prompt}}"), state_updates values
// - approval: question, state_updates values
// - input: question, default, state_updates values
// - end: output, state_updates values
// - map: over (its `{{...}}` IS the dynamic read of the list to fan out over)
// - script: state_updates values only (the script body is opaque to static
// analysis; its reads via GRAPH_STATE / GRAPH_STATE_FILE can't be
// inferred at load time)
//
// Scoped variables produced by THIS node's own execution are excluded from
// state_updates value scanning:
// - llm/agent/rag → "output" (the node's body output)
// - approval → "choice" (the user's selected option)
// - input → "input" (the user's typed text)
// These are bindings created inside the node, not reads from prior state, so
// they cannot race with a sibling's writes.
fn read_set_of(node: &Node) -> HashSet<String> {
let mut reads: HashSet<String> = HashSet::new();
let scoped: &[&str] = match &node.node_type {
+1
View File
@@ -155,6 +155,7 @@ async fn run(
if let Some(category) = cli.install {
return config::install_assets(category);
}
if cli.sync_models {
let url = ctx.app.config.sync_models_url();
return sync_models(&url, abort_signal.clone()).await;
+2 -2
View File
@@ -5,8 +5,8 @@ use futures_util::StreamExt;
use futures_util::stream::BoxStream;
use mpsc::error::SendError;
use mpsc::{OwnedPermit, Receiver, Sender, channel};
use reqwest::Client;
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use reqwest::{Client, header};
use rmcp::model::{ClientJsonRpcMessage, ServerJsonRpcMessage};
use std::collections::HashMap;
use std::error::Error;
@@ -52,7 +52,7 @@ impl LegacySseTransport {
let response = client
.get(sse_url)
.header(reqwest::header::ACCEPT, "text/event-stream")
.header(header::ACCEPT, "text/event-stream")
.send()
.await
.context("Failed to open SSE connection")?
+3 -12
View File
@@ -82,9 +82,6 @@ impl Clone for Rag {
}
}
/// Caller-supplied overrides for building a RAG knowledge base. Each field
/// takes precedence over the app-level `rag_*` config; a field left `None`
/// falls back to app config and then, if still unset, an interactive prompt.
#[derive(Debug, Clone, Default)]
pub struct RagInitConfig {
pub embedding_model: Option<String>,
@@ -100,12 +97,6 @@ impl Rag {
init_client(&self.app_config, model)
}
/// Build a RAG knowledge base using caller-supplied config overrides.
/// Unlike [`Rag::init`], this does not bail outright in non-interactive
/// mode: it only requires a terminal when a needed value is missing
/// from both `config` and app config. When `config` fully specifies
/// `embedding_model`, `chunk_size`, and `chunk_overlap`, the build runs
/// with no prompts.
pub async fn init_with_config(
app: &AppConfig,
name: &str,
@@ -1389,13 +1380,13 @@ mod tests {
#[test]
fn get_separators_returns_language_specific() {
let rs_seps = splitter::get_separators("rs");
let rs_seps = get_separators("rs");
assert!(rs_seps.iter().any(|s| s.contains("fn ")));
let py_seps = splitter::get_separators("py");
let py_seps = get_separators("py");
assert!(py_seps.iter().any(|s| s.contains("def ")));
let md_seps = splitter::get_separators("md");
let md_seps = get_separators("md");
assert!(md_seps.iter().any(|s| s.contains("# ")));
}