From d46b9fec322b86ad7b26e13544fc9bbe33065efd Mon Sep 17 00:00:00 2001 From: Alex Clarke Date: Thu, 21 May 2026 11:27:29 -0600 Subject: [PATCH] fmt: cleaned up graph implementation --- graph.example.yaml | 41 +++++++++++----------- src/cli/mod.rs | 3 ++ src/client/message.rs | 12 ++++--- src/client/stream.rs | 8 ++--- src/config/agent.rs | 4 +++ src/config/mod.rs | 2 ++ src/config/request_context.rs | 33 +++++++----------- src/graph/executor.rs | 16 +++------ src/graph/llm.rs | 12 ++----- src/graph/map.rs | 3 +- src/graph/progress.rs | 8 ----- src/graph/reducer.rs | 22 ++++-------- src/graph/script.rs | 1 + src/graph/staging.rs | 6 ---- src/graph/state.rs | 64 +++-------------------------------- src/graph/types.rs | 21 ++---------- src/graph/validator.rs | 62 +++++++++------------------------ src/main.rs | 1 + src/mcp/sse_transport.rs | 4 +-- src/rag/mod.rs | 15 ++------ 20 files changed, 95 insertions(+), 243 deletions(-) diff --git a/graph.example.yaml b/graph.example.yaml index 931fb4c..ac45858 100644 --- a/graph.example.yaml +++ b/graph.example.yaml @@ -1,8 +1,8 @@ # Graph-based agent definition (full-featured reference) # Location: /agents//graph.yaml # -# A graph agent is defined by THIS FILE ALONE. An agent directory contains -# EITHER a config.yaml (a normal LLM-loop agent) or a graph.yaml (a graph +# A graph agent is defined by this file alone. An agent directory contains +# either a config.yaml (a normal LLM-loop agent) or a graph.yaml (a graph # agent), never both. The presence of graph.yaml is what makes the agent # a graph agent. # @@ -23,7 +23,7 @@ description: | # Free-form prose describing the workflow A reference workflow: triage a research request, retrieve local context, branch on a script decision, run either a sub-agent or an LLM research step, then gate the result behind human approval. -version: "1.0" # Graph SCHEMA version. Only "1.0" is accepted. +version: "1.0" # Graph schema version. Only "1.0" is accepted. # --------------------------------------------------------------------------- # Agent-level config (all optional) @@ -63,7 +63,7 @@ settings: # Reducers (optional, required whenever two parallel branches write the same # state key in the same super-step; otherwise the validator errors at load). # -# A reducer says HOW two values for the same key get merged. Built-ins: +# A reducer says how two values for the same key get merged. Built-ins: # append list += [value] (single value appended to a list) # extend list += value (a list) (list-of-lists flattened by one level) # concat "a\nb" (string join with newline separator) @@ -135,10 +135,11 @@ nodes: required: [topic, needs_deep_dive] state_updates: # {{output}} = this node's result (here, the parsed object) triage_result: "{{output}}" - # --- POLYMORPHIC `next` ----------------------------------------------- + # --- Polymorphic `next` ----------------------------------------------- # A single string runs the next node sequentially (e.g. `next: retrieve`). - # A list runs ALL listed nodes IN PARALLEL as one BSP super-step. Their - # writes are merged via `reducers:` at the join. Branches converge + # A list runs all listed nodes in parallel as one BSP super-step + # (for more info on BSP, see https://en.wikipedia.org/wiki/Bulk_synchronous_parallel). + # Their writes are merged via `reducers:` at the join. Branches converge # implicitly when they all route to the same downstream node (here, # `synthesize`). See the diamond: # @@ -146,12 +147,12 @@ nodes: # / \ # retrieve web_search (run concurrently) # \ / - # synthesize (join — fires once after both finish) + # synthesize (join; fires once after both finish) next: [retrieve, web_search] # --- rag node (parallel branch 1 of the diamond) ------------------------ # Hybrid (vector + keyword) retrieval against a per-node knowledge base. - # The knowledge base is built ONCE, at agent load time, into + # The knowledge base is built once, at agent load time, into # /retrieve.yaml (named after this node's id). retrieve: id: retrieve @@ -162,7 +163,7 @@ nodes: query: "{{topic}}" # Retrieval query (templated). Default: {{initial_prompt}}. top_k: 5 # Chunks to retrieve. Default = the KB's own top_k. timeout: 120 # Retrieval timeout in seconds. Default 120. - # Knowledge-base BUILD config (optional; used only when the KB is first + # Knowledge-base build config (optional; used only when the KB is first # built). When embedding_model + chunk_size + chunk_overlap are all set, # the KB builds with no interactive prompts (works in non-interactive runs). embedding_model: openai:text-embedding-3-small @@ -171,8 +172,8 @@ nodes: reranker_model: null # Optional reranker for hybrid-search results batch_size: 100 # Optional embedding-request batch size state_updates: # {{output}} = { context: , sources: [, ...] } - context: "{{output.context}}" # writes `context` — `reducers.context = concat` - sources: "{{output.sources}}" # writes `sources` — `reducers.sources = append` + context: "{{output.context}}" # writes `context` -> `reducers.context = concat` + sources: "{{output.sources}}" # writes `sources` -> `reducers.sources = append` next: synthesize # Joins with web_search at `synthesize`. # --- llm node (parallel branch 2 of the diamond) ------------------------ @@ -199,7 +200,7 @@ nodes: # `context` and `sources` are produced without needing `state_updates`. next: synthesize # Joins with retrieve at `synthesize`. - # --- script node (the diamond's JOIN; also dispatches) ----------------- + # --- script node (the diamond's join; also dispatches) ----------------- # Runs a .sh / .py / .ts script. The script receives state via the # GRAPH_STATE env var (inline JSON) or GRAPH_STATE_FILE (path to a JSON # file, used when state exceeds 32 KiB). Exactly one is set. It must print @@ -223,12 +224,12 @@ nodes: # This script is expected to emit `_next: deep_dive` (or `_next: subjects_map` # to demonstrate the map node below), or no `_next` (then `next` is used). # Targets reached only via the script's dynamic `_next` get an - # "unreachable" warning from the validator — expected for `_next`-routed + # "unreachable" warning from the validator. This is expected for `_next`-routed # targets. # --- agent node --------------------------------------------------------- # Spawns a full Loki sub-agent and waits for it. The child uses its own - # tool stack. Agent nodes have NO `tools:` field. No schema hint is + # tool stack. Agent nodes have no `tools:` field. No schema hint is # injected even when `output_schema` is set (unlike llm nodes). deep_dive: id: deep_dive @@ -250,7 +251,7 @@ nodes: research: "{{output}}" next: review # Required for agent nodes - # --- map node (Dynamic fan-out — LangGraph's `Send` API) ---------------- + # --- map node (Dynamic fan-out. Think: LangGraph's `Send` API) ---------------- # Spawns one parallel sub-branch per item in `over`. Each sub-branch runs # the node referenced by `branch:` with the item bound to `as:`. Outputs # collect into the array named by `collect_into:`, preserving input order. @@ -262,8 +263,8 @@ nodes: id: subjects_map type: map over: "{{subjects}}" # Required. List expression resolved from state. - # Empty list is allowed — no branches spawn, - # `collect_into` is written as []. + # Empty list is allowed. It means no branches spawn, + # and thus `collect_into` is written as []. as: subject # Required. Per-branch state key holding the # current item. Read with {{subject}} inside # the branch node's prompt. @@ -275,7 +276,7 @@ nodes: # map's `collect_into` channel # - no `output_schema:` (top-level merge # would clash with collect_into) - # Validator (C.5) enforces all three. + # Validator enforces all three. collect_into: subject_findings # Required. State key for the array of # per-branch outputs, in input order # (not spawn-finish order). @@ -297,7 +298,7 @@ nodes: prompt: "Research {{subject}}: pull the key facts and one citation." tools: - web_search_loki - # No `next:`, `state_updates:`, or `output_schema:` here — map branches + # No `next:`, `state_updates:`, or `output_schema:` here. Map branches # have a strict contract (see `subjects_map.branch` comment). # Aggregator that runs after the map joins. Reads the collected list. diff --git a/src/cli/mod.rs b/src/cli/mod.rs index 31f46c7..0557269 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -406,18 +406,21 @@ mod tests { #[test] fn parse_update_flag_no_value() { let cli = parse(&["--update"]); + assert_eq!(cli.update, Some(None)); } #[test] fn parse_update_flag_with_version() { let cli = parse(&["--update", "v0.4.0"]); + assert_eq!(cli.update, Some(Some("v0.4.0".to_string()))); } #[test] fn parse_update_with_force() { let cli = parse(&["--update", "--force"]); + assert_eq!(cli.update, Some(None)); assert!(cli.force); } diff --git a/src/client/message.rs b/src/client/message.rs index 97760e2..f73b1b9 100644 --- a/src/client/message.rs +++ b/src/client/message.rs @@ -94,21 +94,21 @@ impl MessageContent { match self { MessageContent::Text(text) => multiline_text(text), MessageContent::Array(list) => { - let (mut concated_text, mut files) = (String::new(), vec![]); + let (mut concatenated_text, mut files) = (String::new(), vec![]); for item in list { match item { MessageContentPart::Text { text } => { - concated_text = format!("{concated_text} {text}") + concatenated_text = format!("{concatenated_text} {text}") } MessageContentPart::ImageUrl { image_url } => { files.push(resolve_url_fn(&image_url.url)) } } } - if !concated_text.is_empty() { - concated_text = format!(" -- {}", multiline_text(&concated_text)) + if !concatenated_text.is_empty() { + concatenated_text = format!(" -- {}", multiline_text(&concatenated_text)) } - format!(".file {}{}", files.join(" "), concated_text) + format!(".file {}{}", files.join(" "), concatenated_text) } MessageContent::ToolCalls(MessageContentToolCalls { tool_results, text, .. @@ -230,9 +230,11 @@ pub fn extract_system_message(messages: &mut Vec) -> Option { if messages.is_empty() { return None; } + if messages[0].role.is_system() { let system_message = messages.remove(0); return Some(system_message.content.to_text()); } + None } diff --git a/src/client/stream.rs b/src/client/stream.rs index df52238..daf308e 100644 --- a/src/client/stream.rs +++ b/src/client/stream.rs @@ -33,11 +33,6 @@ impl SseHandler { } } - /// Suppresses stdout streaming of incoming tokens. Tokens are still buffered - /// internally (so the caller's `.take()` still returns the full response) — - /// only the per-token send to the SSE renderer is skipped. Used by parallel - /// graph super-step branches so concurrent LLM calls don't interleave on - /// stdout. pub fn set_silent(&mut self, silent: bool) { self.silent = silent; } @@ -47,10 +42,11 @@ impl SseHandler { return Ok(()); } self.buffer.push_str(text); + if self.silent { return Ok(()); } - + let ret = self .sender .send(SseEvent::Text(text.to_string())) diff --git a/src/config/agent.rs b/src/config/agent.rs index 2940952..a17d733 100644 --- a/src/config/agent.rs +++ b/src/config/agent.rs @@ -802,6 +802,7 @@ fn resolve_document_paths( } else { PathBuf::from(&resolved_path) }; + document_paths.push(format!("{}:{}", protocol, new_path.display())); } else if Path::new(&resolve_home_dir(path)).is_relative() { let new_path = safe_join_path(agent_data_dir, path) @@ -829,6 +830,7 @@ async fn init_graph_rags( if info_flag { return Ok(rags); } + for (node_id, node) in &graph.nodes { let NodeType::Rag(rag_node) = &node.node_type else { continue; @@ -1059,6 +1061,7 @@ variables: output: done "#}; let graph: Graph = serde_yaml::from_str(&yaml).unwrap(); + let config = AgentConfig::from_graph("my-agent-dir", &graph); assert_eq!(config.name, "my-agent-dir"); @@ -1101,6 +1104,7 @@ variables: fn from_graph_keeps_defaults_for_llm_loop_fields() { let yaml = "name: g\nstart: x\nnodes:\n x:\n id: x\n type: end\n output: ok\n"; let graph: Graph = serde_yaml::from_str(yaml).unwrap(); + let config = AgentConfig::from_graph("d", &graph); assert!(!config.auto_continue); diff --git a/src/config/mod.rs b/src/config/mod.rs index fdca1fe..08af88e 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -273,6 +273,7 @@ pub fn install_assets(category: AssetCategory) -> Result<()> { } println!("Reinstalled bundled {label} ({})", target.display()); + Ok(()) } @@ -297,6 +298,7 @@ fn confirm_asset_overwrite(category: AssetCategory, label: &str, target: &Path) }; let prompt = format!("{} {body}\nContinue? [y/N] ", warning_text("WARNING:")); let answer = read_single_key(&['y', 'Y', 'n', 'N'], 'n', &prompt)?; + Ok(matches!(answer, 'y' | 'Y')) } diff --git a/src/config/request_context.rs b/src/config/request_context.rs index 88e9198..5f30f57 100644 --- a/src/config/request_context.rs +++ b/src/config/request_context.rs @@ -33,11 +33,11 @@ use indoc::formatdoc; use inquire::{Confirm, MultiSelect, Text, list_option::ListOption, validator::Validation}; use parking_lot::RwLock; use std::collections::{HashMap, HashSet}; -use std::env; use std::fs::{File, OpenOptions, read_dir, read_to_string, remove_dir_all, remove_file}; use std::io::Write; use std::path::{Path, PathBuf}; use std::sync::Arc; +use std::{env, fs}; pub struct AutoContinueConfig { pub enabled: bool, @@ -46,10 +46,6 @@ pub struct AutoContinueConfig { pub continuation_prompt: Option, } -/// Controls how LLM token streams are presented to the user. `Silent` is set -/// on branch contexts during parallel graph super-steps so concurrent LLM -/// calls don't interleave token-by-token on stdout — the full response still -/// lands in graph state via the normal output_schema / state_updates pathway. #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] pub enum RenderMode { #[default] @@ -166,27 +162,18 @@ impl RequestContext { /// Forks the context for one parallel branch of a graph super-step. /// - /// Each branch gets a fresh, owned clone — mutations (role swap, + /// Each branch gets a fresh, owned clone. Mutations (role swap, /// `before/after_chat_completion`, tool tracker, last_message, etc.) are /// scoped to the branch and discarded when the branch finishes. The /// user-visible state communication happens through the graph's /// `StateManager` (via `fork_for_branch_state` + `diff_against` + - /// `apply_branch_writes` reducers), NOT through `RequestContext`. + /// `apply_branch_writes` reducers), and not through `RequestContext`. /// /// Distinction from `new_for_child`: `new_for_child` builds a fresh context - /// for a SPAWNED SUB-AGENT (different agent identity, different supervisor + /// for a spawned sub-agent (different agent identity, different supervisor /// hierarchy, depth+1, fresh tool tracker). `fork_for_branch` keeps the - /// caller's identity and supervisor hierarchy — it's a sibling clone of the - /// SAME logical agent, running one of N parallel work items. - /// - /// Behavior of per-field cloning: - /// - `Arc`-wrapped fields (`app`, `rag`, `supervisor`, `parent_supervisor`, - /// `inbox`, `escalation_queue`) — shared via Arc::clone - /// - Owned heap fields (`model`, `role`, `session`, `agent`, `tool_scope`, - /// `todo_list`, etc.) — deep `.clone()` so the branch can mutate freely - /// - `auto_continue_count` reset to 0 (each branch starts a fresh - /// continuation budget) - /// - `last_continuation_response` reset to None + /// caller's identity and supervisor hierarchy; it's a sibling clone of the + /// same logical agent, running one of N parallel work items. pub fn fork_for_branch(&self) -> Self { Self { app: Arc::clone(&self.app), @@ -1419,6 +1406,7 @@ impl RequestContext { env!("CARGO_CRATE_NAME"), mcp_path.display(), ); + Ok(()) } @@ -1503,20 +1491,23 @@ impl RequestContext { } else { config_path }; + ensure_parent_exists(&target_path)?; if !target_path.exists() { - std::fs::write( + fs::write( &target_path, "# see https://github.com/Dark-Alex-17/loki/blob/main/config.agent.example.yaml\n", ) .with_context(|| format!("Failed to write to '{}'", target_path.display()))?; } + let editor = app.editor()?; edit_file(&editor, &target_path)?; println!( "NOTE: Remember to reload the agent if there are changes made to '{}'", target_path.display() ); + Ok(()) } @@ -2026,7 +2017,7 @@ impl RequestContext { .collect(); } else if cmd == ".agent" { if args.len() == 2 { - let dir = paths::agent_data_dir(args[0]).join(super::SESSIONS_DIR_NAME); + let dir = paths::agent_data_dir(args[0]).join(SESSIONS_DIR_NAME); values = list_file_names(dir, ".yaml") .into_iter() .map(|v| (v, None)) diff --git a/src/graph/executor.rs b/src/graph/executor.rs index 563506b..28e3792 100644 --- a/src/graph/executor.rs +++ b/src/graph/executor.rs @@ -76,9 +76,6 @@ impl GraphExecutor { let max_iterations = graph.settings.max_loop_iterations; let graph_timeout = graph.settings.timeout.map(Duration::from_secs); let max_concurrency = graph.settings.max_concurrency; - // Wrap in Arc so spawned branch tasks can cheaply share the Graph for - // node lookup (especially the map executor, which needs to resolve its - // `branch:` target from inside a spawned task). let graph = Arc::new(graph); let start = Instant::now(); @@ -297,10 +294,6 @@ fn sorted_frontier(frontier: &HashSet) -> Vec { v } -// Bundles the engine-config refs that every `step()` call needs to thread -// through. Constructed once per spawned branch task (or once at the call site -// for sequential paths) so step() and downstream executors (MapNodeExecutor) -// take one parameter instead of five. pub(super) struct StepContext<'a> { pub graph: &'a Graph, pub script_executor: &'a ScriptExecutor, @@ -391,8 +384,6 @@ async fn step( } } -// Returns all `next:` targets from the node (handles both `One` and `Many`), -// erroring if no `next` is set. fn static_next_targets(node: &Node, current: &str, kind: &str) -> Result> { node.next .as_ref() @@ -400,9 +391,6 @@ fn static_next_targets(node: &Node, current: &str, kind: &str) -> Result` for their -// primary routing argument (LLM, RAG, Input). fn first_next_target(node: &Node) -> Option<&str> { node.next .as_ref() @@ -447,6 +435,7 @@ mod tests { #[test] fn resolve_end_output_interpolates_template_against_state() { let mut state = state_with(&[("name", json!("alice"))]); + let node = end_node("done: {{name}}", None); assert_eq!(resolve_end_output(&node, &mut state), "done: alice"); @@ -457,6 +446,7 @@ mod tests { let mut updates = HashMap::new(); updates.insert("summary".into(), "completed for {{user}}".into()); let node = end_node("RESULT: {{summary}}", Some(updates)); + let mut state = state_with(&[("user", json!("bob"))]); assert_eq!( @@ -472,6 +462,7 @@ mod tests { #[test] fn resolve_end_output_with_empty_template_returns_empty_string() { let mut state = state_with(&[]); + let node = end_node("", None); assert_eq!(resolve_end_output(&node, &mut state), ""); @@ -480,6 +471,7 @@ mod tests { #[test] fn resolve_end_output_lenient_on_missing_keys() { let mut state = state_with(&[]); + let node = end_node("hello {{unknown}}!", None); assert_eq!(resolve_end_output(&node, &mut state), "hello !"); diff --git a/src/graph/llm.rs b/src/graph/llm.rs index 86ea453..b228672 100644 --- a/src/graph/llm.rs +++ b/src/graph/llm.rs @@ -13,12 +13,6 @@ use tokio::time::timeout; const OUTPUT_KEY: &str = "output"; -/// What happened during an LLM node's execution, from the caller's routing -/// perspective. `Continue` means the caller should advance via the node's -/// declared `next:` targets (whether the LLM actually succeeded or failed -/// without a fallback — either way, the executor uses node.next). `FellBack` -/// means the LLM failed after retries and the node had a `fallback:` declared, -/// so routing should go to that fallback target only. #[derive(Debug, PartialEq, Eq)] pub(super) enum LlmExecutionOutcome { Continue, @@ -235,6 +229,7 @@ fn categorize_tools(entries: Option<&[String]>) -> (Vec, Vec) { let Some(entries) = entries else { return (regular, mcp); }; + for e in entries { if let Some(server) = e.strip_prefix("mcp:") { mcp.push(server.to_string()); @@ -242,6 +237,7 @@ fn categorize_tools(entries: Option<&[String]>) -> (Vec, Vec) { regular.push(e.clone()); } } + (regular, mcp) } @@ -465,10 +461,6 @@ mod tests { #[test] fn outcome_from_failure_without_fallback_is_continue() { - // Failed but no fallback: caller routes via node.next as if successful. - // The error has already been recorded to state via the OUTPUT_KEY by - // execute(); the caller's `static_next_targets` will error if node.next - // is also missing. assert_eq!(outcome_from(true, None), LlmExecutionOutcome::Continue); } diff --git a/src/graph/map.rs b/src/graph/map.rs index 627a07c..5045abe 100644 --- a/src/graph/map.rs +++ b/src/graph/map.rs @@ -125,8 +125,7 @@ impl MapNodeExecutor { let joined = join_all(sub_tasks).await; drop(progress_tracker); - // Collect outputs keyed by input index so order is preserved regardless - // of finish order. This is the user-facing contract from plan E.2. + // Collect outputs keyed by input index so order is preserved regardless of finish order. let mut outputs: HashMap = HashMap::new(); for join_result in joined { let (idx, sub_state, exec_result) = diff --git a/src/graph/progress.rs b/src/graph/progress.rs index 2aa5330..60ca073 100644 --- a/src/graph/progress.rs +++ b/src/graph/progress.rs @@ -9,14 +9,6 @@ static SPINNER_STYLE: LazyLock = LazyLock::new(|| { .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏", ""]) }); -// Manages a set of per-branch spinners drawn side-by-side via indicatif's -// `MultiProgress`. Created at the start of a multi-branch graph super-step -// (or map sub-branch fan-out) and torn down at the join. -// -// When stdout isn't a terminal (CI, piped output), the tracker becomes a -// no-op — `add_branch` returns a disabled handle whose methods do nothing. -// This keeps machine-piped graph runs free of spinner garbage in their -// captured output. pub(super) struct BranchProgressTracker { multi: Option, } diff --git a/src/graph/reducer.rs b/src/graph/reducer.rs index b634b97..10eeec6 100644 --- a/src/graph/reducer.rs +++ b/src/graph/reducer.rs @@ -3,17 +3,6 @@ use crate::graph::type_name; use anyhow::{Result, bail}; use serde_json::{Number, Value}; -/// Combines a branch's incoming write with the current state value (if any) -/// via the specified reducer. The result is what gets written back to live -/// state during the super-step merge phase. -/// -/// `current = None` means the key has no prior value in this super-step or in -/// live state. Most reducers treat absent as their identity (empty array, -/// empty string, no prior value). `Overwrite` ignores `current` entirely. -/// -/// Errors clearly when types are incompatible with the reducer (e.g. -/// `Sum` on a string), naming the reducer and which side (`current` / `incoming`) -/// has the wrong type. pub fn apply(reducer: Reducer, current: Option<&Value>, incoming: Value) -> Result { match reducer { Reducer::Append => apply_append(current, incoming), @@ -37,6 +26,7 @@ fn apply_append(current: Option<&Value>, incoming: Value) -> Result { ), }; arr.push(incoming); + Ok(Value::Array(arr)) } @@ -56,6 +46,7 @@ fn apply_extend(current: Option<&Value>, incoming: Value) -> Result { type_name(&other) ), } + Ok(Value::Array(arr)) } @@ -81,6 +72,7 @@ fn apply_concat(current: Option<&Value>, incoming: Value) -> Result { type_name(other) ), }; + Ok(Value::String(result)) } @@ -90,6 +82,7 @@ fn apply_sum(current: Option<&Value>, incoming: Value) -> Result { None => 0.0, Some(value) => number_or_error(value, "sum", "current")?, }; + Ok(json_number(c + i)) } @@ -135,6 +128,7 @@ fn apply_merge(current: Option<&Value>, incoming: Value) -> Result { type_name(&other) ), } + Ok(Value::Object(map)) } @@ -148,10 +142,8 @@ fn number_or_error(value: &Value, reducer_name: &str, position: &str) -> Result< } } -// Numeric reducers compute in f64 for simplicity. We preserve integer typing -// when the result is losslessly representable as i64 so `count: sum` stays an -// integer rather than degrading to a float. Non-finite values (NaN, Inf) can't -// arise from finite inputs to +/max/min, so the fallback never fires in practice. +// Numeric reducers compute in f64 for simplicity. Integer typing is preserved when the result is losslessly +// representable as i64. fn json_number(n: f64) -> Value { if n.fract() == 0.0 && n.is_finite() && n.abs() <= (i64::MAX as f64) { Value::Number(Number::from(n as i64)) diff --git a/src/graph/script.rs b/src/graph/script.rs index f4a01bb..cd53a53 100644 --- a/src/graph/script.rs +++ b/src/graph/script.rs @@ -102,6 +102,7 @@ fn apply_state_updates(node: &ScriptNode, state_manager: &mut StateManager) { let Some(updates) = &node.state_updates else { return; }; + for (key, template) in updates { let value = state_manager.interpolate_lenient(template); state_manager diff --git a/src/graph/staging.rs b/src/graph/staging.rs index 097c7f2..31522e4 100644 --- a/src/graph/staging.rs +++ b/src/graph/staging.rs @@ -1,12 +1,6 @@ use serde_json::Value; use std::collections::HashMap; -/// Published form of one branch's writes for the super-step merge phase. -/// Callers assemble these into a deterministically-ordered `Vec` keyed by -/// `(node_id, invocation_index)` before passing to -/// `StateManager::apply_branch_writes`. `invocation_index` is 0 for normal -/// branches and the input-list position for map sub-branches — so multiple -/// invocations of the same `branch:` node by a `map` are still totally ordered. #[derive(Debug, Clone)] pub struct BranchWrites { pub node_id: String, diff --git a/src/graph/state.rs b/src/graph/state.rs index 5f8fb72..e16f16f 100644 --- a/src/graph/state.rs +++ b/src/graph/state.rs @@ -159,13 +159,6 @@ impl StateManager { } } - /// Forks state for a parallel branch: returns a fully-owned `StateManager` - /// seeded from the current state's data. The branch mutates its fork - /// freely; callers extract its writes via `diff_against` after the branch - /// completes, then merge them via `apply_branch_writes`. - /// - /// Distinct from `read_snapshot` (returns a shared `Arc` for - /// reads) — `fork_for_branch_state` returns a writable owned clone. pub fn fork_for_branch_state(&self) -> Self { Self { state: self.state.clone(), @@ -173,11 +166,6 @@ impl StateManager { } } - /// Returns the keys whose values differ from `snapshot`. Use this after a - /// branch finishes to extract its writes (input to `apply_branch_writes`). - /// Keys present in `self` but absent from `snapshot`, or with different - /// values, count as writes. Deletions are not represented (no current node - /// executor deletes state). pub fn diff_against(&self, snapshot: &GraphState) -> HashMap { let mut diff = HashMap::new(); for (k, v) in self.state.data() { @@ -188,30 +176,10 @@ impl StateManager { diff } - /// Returns an `Arc`-wrapped snapshot of the current graph state. Each - /// branch in a parallel super-step uses this snapshot as the baseline for - /// its `diff_against` call at branch end. The executor extracts each - /// branch's writes (the diff) and merges them via `apply_branch_writes` at - /// the super-step boundary. - /// - /// Distinct from the older `snapshot()` method (returns a `HashMap` clone - /// of the data only — used by `script_executor` to ship state to child - /// processes). - #[allow(dead_code)] pub fn read_snapshot(&self) -> Arc { Arc::new(self.state.clone()) } - /// Commits a deterministically-ordered set of per-branch writes back into - /// live state, applying declared reducers where they exist. - /// - /// Caller must pre-sort `writes` by `(node_id, invocation_index)` so that - /// non-commutative reducers (`Concat`, `Merge`) produce reproducible output. - /// - /// Errors when a key has writers from ≥2 branches but no reducer declared. - /// The validator (Phase C) catches this at load time; this runtime check is - /// defense-in-depth against a malformed or out-of-date validator missing it. - #[allow(dead_code)] pub fn apply_branch_writes( &mut self, writes: Vec, @@ -252,22 +220,6 @@ impl StateManager { Ok(()) } - /// Interpolates a template and returns a typed JSON `Value`. - /// - /// Two paths depending on the template shape: - /// - **Pure single reference** (the entire trimmed template is a single - /// `{{key}}` expression, e.g. `"{{subjects}}"`, `"{{user.name}}"`, - /// `"{{items[0]}}"`) — returns the typed `Value` at that key, preserving - /// numbers, bools, arrays, and objects. Errors if the key is missing. - /// - **Mixed template** (multiple refs, surrounding text, or no refs) — - /// falls back to string interpolation via `interpolate()` and returns - /// `Value::String(...)`. Strict on missing keys. - /// - /// Required by: - /// - `map.over: "{{subjects}}"` — must resolve to a JSON array, not its string form - /// - `state_updates` writes that should preserve the source type (a `cost_usd: "{{api_cost}}"` - /// write should land as a Number, not a String) - #[allow(dead_code)] pub fn interpolate_raw(&self, template: &str) -> Result { let trimmed = template.trim(); if let Some(key) = single_reference_key(trimmed) { @@ -338,9 +290,7 @@ fn split_indices(segment: &str) -> Option<(&str, Vec)> { } // Returns the inner key when `template` is exactly a single `{{key}}` reference -// (no surrounding text, no other braces). Mirrors the character set the -// TEMPLATE_VAR_RE regex accepts so `interpolate_raw` and `interpolate` stay -// consistent about what counts as a valid key. +// (no surrounding text, no other braces). fn single_reference_key(template: &str) -> Option<&str> { let inner = template.strip_prefix("{{")?.strip_suffix("}}")?; if inner.contains("{{") || inner.contains("}}") { @@ -353,11 +303,10 @@ fn single_reference_key(template: &str) -> Option<&str> { valid.then_some(inner) } -// Returns the root state keys referenced by any `{{...}}` expressions in the -// given template string. The "root key" is the identifier before the first -// `.` or `[` — i.e. for `{{user.name}}` the root is `user`, for `{{items[0]}}` -// the root is `items`. Used by the validator to compute the static read-set of -// a node's templated fields without depending on a runtime `StateManager`. +// Returns the root state keys referenced by any `{{...}}` expressions in the given template string. The "root key" is +// the identifier before the first `.` or `[`; e.g., for `{{user.name}}` the root is `user`, for `{{items[0]}}` the +// root is `items`. Used by the validator to compute the static read-set of a node's templated fields without +// depending on a runtime `StateManager`. pub(super) fn template_root_keys(template: &str) -> Vec { TEMPLATE_VAR_RE .captures_iter(template) @@ -985,9 +934,6 @@ mod tests { #[test] fn interpolate_raw_inner_spaces_treated_as_mixed() { let manager = manager_with(&[("k", json!("v"))]); - // `{{ k }}` is not a valid pure reference (spaces inside braces are - // outside the allowed character set). Fall back to string interpolation - // -- which doesn't match the regex either, so the literal passes through. let result = manager.interpolate_raw("{{ k }}").unwrap(); assert_eq!(result, json!("{{ k }}")); } diff --git a/src/graph/types.rs b/src/graph/types.rs index 6a668e1..0341d82 100644 --- a/src/graph/types.rs +++ b/src/graph/types.rs @@ -128,13 +128,8 @@ pub struct Node { pub next: Option, } +#[cfg(test)] impl Node { - /// Returns the single next target as a string slice for tests and other - /// read-only inspection. Returns `None` when no `next:` is declared at all, - /// OR when a real multi-target fan-out is declared (since a fan-out has no - /// "single" target). Execution paths use `static_next_targets` in the graph - /// executor instead. - #[allow(dead_code)] pub fn next_target(&self) -> Option<&str> { match &self.next { None => None, @@ -153,7 +148,6 @@ pub enum NextTargets { } impl NextTargets { - /// View as a slice of node ids. `One(s)` returns a single-element slice. pub fn as_slice(&self) -> &[String] { match self { NextTargets::One(s) => slice::from_ref(s), @@ -161,7 +155,6 @@ impl NextTargets { } } - /// True if this declares more than one parallel target (i.e., a real fan-out). pub fn is_fan_out(&self) -> bool { matches!(self, NextTargets::Many(v) if v.len() > 1) } @@ -349,28 +342,18 @@ pub struct EndNode { #[derive(Debug, Clone, Deserialize, Serialize)] pub struct MapNode { - /// Template expression that must resolve (via `interpolate_raw`, added in - /// Phase B) to a JSON array. Each item in the array is one branch invocation. pub over: String, - /// The name to bind each item under, accessible as `{{}}` inside - /// the branch node's templates. YAML field is `as:`. #[serde(rename = "as")] pub as_name: String, - /// Node id to invoke once per item in the resolved list. pub branch: String, - /// State key that the branch node writes; the map collects this key's value - /// across invocations. Defaults to "output". #[serde(default = "default_map_output_key")] pub output_key: String, - /// State key to receive the array of per-branch outputs, in input-list order. pub collect_into: String, - /// Optional cap on simultaneously-running sub-branches. Falls back to - /// `settings.max_concurrency` when unset. #[serde(default, skip_serializing_if = "Option::is_none")] pub max_concurrency: Option, } @@ -707,7 +690,7 @@ on_other: edit_loop initial.insert("k".to_string(), json!("v")); let state = GraphState::new(initial); let serialized = state.to_json().unwrap(); - let parsed: serde_json::Value = serde_json::from_str(&serialized).unwrap(); + let parsed: Value = serde_json::from_str(&serialized).unwrap(); assert_eq!(parsed.get("k"), Some(&json!("v"))); assert!(state.size_bytes() > 0); } diff --git a/src/graph/validator.rs b/src/graph/validator.rs index 01cd166..09b7b37 100644 --- a/src/graph/validator.rs +++ b/src/graph/validator.rs @@ -321,17 +321,13 @@ impl GraphValidator { } } - // Phase C — Parallel-execution validation. + // Parallel-execution validation. // - // The v1 algorithm uses immediate-successor analysis only: a parallel - // group is the set of `next:` targets of a single fan-out node. Map nodes - // are checked separately by `validate_map_branches` (the branch is - // self-parallel, but enforcement comes from strict-mode rules on the - // branch node, not from group membership). Transitive parallel groups - // (deeper fan-out chains) are a v2 enhancement; v1 over-reports rather - // than under-reports — a false positive forces an unneeded reducer - // (mild annoyance); a false negative allows silent data races (catastrophic). - + // The v1 algorithm uses immediate-successor analysis only: a parallel group is the set of `next:` targets of a + // single fan-out node. Map nodes are checked separately by `validate_map_branches` (the branch is self-parallel, + // but enforcement comes from strict-mode rules on the branch node, not from group membership). Transitive parallel + // groups (deeper fan-out chains) are a v2 enhancement; v1 over-reports rather than under-reports. A false positive + // forces an unneeded reducer (mild annoyance); a false negative allows silent data races (catastrophic). fn validate_max_concurrency(&self, graph: &Graph, result: &mut ValidationResult) { if graph.settings.max_concurrency == 0 { result.error(ValidationError::new( @@ -613,15 +609,12 @@ fn declared_targets(node: &Node) -> Vec<(String, &'static str)> { out.push((t.clone(), "llm 'fallback'")); } } - // `agent`/`input`/`rag` route only via `next` (already collected - // above); `end` is terminal. No type-specific routing edges to add. - NodeType::Agent(_) | NodeType::Input(_) | NodeType::Rag(_) | NodeType::End(_) => {} - // A `map` node invokes its `branch:` target once per item from the - // resolved `over` list. The branch is statically referenced, so it - // is a real declared edge for cycle/reachability purposes. NodeType::Map(m) => { out.push((m.branch.clone(), "map 'branch'")); } + // `agent`/`input`/`rag` route only via `next` (already collected + // above); `end` is terminal. No type-specific routing edges to add. + NodeType::Agent(_) | NodeType::Input(_) | NodeType::Rag(_) | NodeType::End(_) => {} } out } @@ -653,13 +646,11 @@ fn find_reachable_nodes(graph: &Graph) -> HashSet { reachable } -// v1 parallel-group detection: only the immediate `next` targets of a fan-out -// node count as a parallel group. Map branches are handled separately by -// `validate_map_branches` (the branch's self-parallelism is checked via -// strict-mode rules on the branch node itself, not via group membership). +// v1 parallel-group detection: only the immediate `next` targets of a fan-out node count as a parallel group. Map +// branches are handled separately by `validate_map_branches` (the branch's self-parallelism is checked via strict-mode +// rules on the branch node itself, not via group membership). // -// Returns one HashSet per fan-out source; deeper transitive parallelism is -// intentionally out of scope for v1. +// Returns one HashSet per fan-out source; deeper transitive parallelism is intentionally out of scope for v1. fn compute_parallel_groups(graph: &Graph) -> Vec> { let mut groups = Vec::new(); for node in graph.nodes.values() { @@ -677,19 +668,18 @@ fn compute_parallel_groups(graph: &Graph) -> Vec> { // Sources considered: // - `state_updates` keys (every node type that has them) // - `output_schema` top-level `properties` for `llm` and `agent` (auto-merge) -// -// Returns `None` only for script nodes with no declared `state_updates` — their -// emitted JSON is opaque to static analysis. The validator treats `None` as a -// load-time error when the script appears in a parallel group (C.6). fn write_set_of(node: &Node) -> Option> { if matches!(node.node_type, NodeType::Script(_)) && node_state_updates_keys(node).is_none() { return None; } + let mut writes = HashSet::new(); if let Some(keys) = node_state_updates_keys(node) { writes.extend(keys); } + writes.extend(output_schema_top_level_keys(node)); + Some(writes) } @@ -698,26 +688,6 @@ fn write_set_of(node: &Node) -> Option> { // "Root key" follows the same definition as `template_root_keys`: for a // reference like `{{user.name}}` or `{{items[0]}}`, the root is the bare // identifier before the first `.` or `[`. -// -// Templated fields scanned per node type: -// - llm: instructions, prompt, state_updates values -// - agent: prompt, state_updates values -// - rag: query (defaulting to "{{initial_prompt}}"), state_updates values -// - approval: question, state_updates values -// - input: question, default, state_updates values -// - end: output, state_updates values -// - map: over (its `{{...}}` IS the dynamic read of the list to fan out over) -// - script: state_updates values only (the script body is opaque to static -// analysis; its reads via GRAPH_STATE / GRAPH_STATE_FILE can't be -// inferred at load time) -// -// Scoped variables produced by THIS node's own execution are excluded from -// state_updates value scanning: -// - llm/agent/rag → "output" (the node's body output) -// - approval → "choice" (the user's selected option) -// - input → "input" (the user's typed text) -// These are bindings created inside the node, not reads from prior state, so -// they cannot race with a sibling's writes. fn read_set_of(node: &Node) -> HashSet { let mut reads: HashSet = HashSet::new(); let scoped: &[&str] = match &node.node_type { diff --git a/src/main.rs b/src/main.rs index 1206371..d776e18 100644 --- a/src/main.rs +++ b/src/main.rs @@ -155,6 +155,7 @@ async fn run( if let Some(category) = cli.install { return config::install_assets(category); } + if cli.sync_models { let url = ctx.app.config.sync_models_url(); return sync_models(&url, abort_signal.clone()).await; diff --git a/src/mcp/sse_transport.rs b/src/mcp/sse_transport.rs index c5c7af5..7dad1c2 100644 --- a/src/mcp/sse_transport.rs +++ b/src/mcp/sse_transport.rs @@ -5,8 +5,8 @@ use futures_util::StreamExt; use futures_util::stream::BoxStream; use mpsc::error::SendError; use mpsc::{OwnedPermit, Receiver, Sender, channel}; -use reqwest::Client; use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; +use reqwest::{Client, header}; use rmcp::model::{ClientJsonRpcMessage, ServerJsonRpcMessage}; use std::collections::HashMap; use std::error::Error; @@ -52,7 +52,7 @@ impl LegacySseTransport { let response = client .get(sse_url) - .header(reqwest::header::ACCEPT, "text/event-stream") + .header(header::ACCEPT, "text/event-stream") .send() .await .context("Failed to open SSE connection")? diff --git a/src/rag/mod.rs b/src/rag/mod.rs index 6ed70d4..19cc6f2 100644 --- a/src/rag/mod.rs +++ b/src/rag/mod.rs @@ -82,9 +82,6 @@ impl Clone for Rag { } } -/// Caller-supplied overrides for building a RAG knowledge base. Each field -/// takes precedence over the app-level `rag_*` config; a field left `None` -/// falls back to app config and then, if still unset, an interactive prompt. #[derive(Debug, Clone, Default)] pub struct RagInitConfig { pub embedding_model: Option, @@ -100,12 +97,6 @@ impl Rag { init_client(&self.app_config, model) } - /// Build a RAG knowledge base using caller-supplied config overrides. - /// Unlike [`Rag::init`], this does not bail outright in non-interactive - /// mode: it only requires a terminal when a needed value is missing - /// from both `config` and app config. When `config` fully specifies - /// `embedding_model`, `chunk_size`, and `chunk_overlap`, the build runs - /// with no prompts. pub async fn init_with_config( app: &AppConfig, name: &str, @@ -1389,13 +1380,13 @@ mod tests { #[test] fn get_separators_returns_language_specific() { - let rs_seps = splitter::get_separators("rs"); + let rs_seps = get_separators("rs"); assert!(rs_seps.iter().any(|s| s.contains("fn "))); - let py_seps = splitter::get_separators("py"); + let py_seps = get_separators("py"); assert!(py_seps.iter().any(|s| s.contains("def "))); - let md_seps = splitter::get_separators("md"); + let md_seps = get_separators("md"); assert!(md_seps.iter().any(|s| s.contains("# "))); }