feat: added branch progress tracker for better visualization of parallel graph super-steps

This commit is contained in:
2026-05-20 15:50:38 -06:00
parent 7ac753d824
commit 3eff135349
7 changed files with 180 additions and 14 deletions
+21
View File
@@ -2,6 +2,7 @@ use super::agent::AgentNodeExecutor;
use super::llm::LlmNodeExecutor;
use super::logging::GraphLogger;
use super::map::MapNodeExecutor;
use super::progress::{BranchProgressHandle, BranchProgressTracker};
use super::rag::RagNodeExecutor;
use super::script::ScriptExecutor;
use super::staging::BranchWrites;
@@ -145,6 +146,11 @@ impl GraphExecutor {
let semaphore = Arc::new(Semaphore::new(max_concurrency));
let frontier_size = frontier.len();
let progress_tracker = if frontier_size > 1 {
Some(BranchProgressTracker::new())
} else {
None
};
let mut branch_tasks = Vec::with_capacity(frontier_size);
for node_id in &frontier {
let node = graph
@@ -163,13 +169,19 @@ impl GraphExecutor {
let current = node_id.clone();
let sem_clone = semaphore.clone();
let abort_clone = abort_signal.clone();
let progress_handle: Option<BranchProgressHandle> =
progress_tracker.as_ref().map(|t| t.add_branch(node_id));
let task = tokio::spawn(async move {
let mut progress_handle = progress_handle;
let _permit = sem_clone
.acquire()
.await
.expect("semaphore should not be closed");
if abort_clone.aborted() {
if let Some(h) = progress_handle.take() {
h.fail("aborted");
}
return (
current.clone(),
branch_state,
@@ -188,12 +200,21 @@ impl GraphExecutor {
};
let result = step(&node, &mut state, &mut ctx, &step_ctx, &current).await;
let elapsed = node_start.elapsed();
if let Some(h) = progress_handle.take() {
match &result {
Ok(_) => h.complete(),
Err(e) => h.fail(&e.to_string()),
}
}
(current, state, result, elapsed)
});
branch_tasks.push(task);
}
let joined = join_all(branch_tasks).await;
if let Some(t) = &progress_tracker {
t.clear();
}
let mut branch_writes: Vec<BranchWrites> = Vec::new();
let mut next_frontier: HashSet<String> = HashSet::new();
+16
View File
@@ -1,6 +1,7 @@
use super::agent::AgentNodeExecutor;
use super::executor::StepContext;
use super::llm::LlmNodeExecutor;
use super::progress::{BranchProgressHandle, BranchProgressTracker};
use super::rag::RagNodeExecutor;
use super::state::StateManager;
use super::types::{MapNode, NodeType};
@@ -59,6 +60,7 @@ impl MapNodeExecutor {
.unwrap_or(step_ctx.max_concurrency)
.max(1);
let semaphore = Arc::new(Semaphore::new(max_conc));
let progress_tracker = BranchProgressTracker::new();
let mut sub_tasks = Vec::with_capacity(items.len());
for (idx, item) in items.iter().enumerate() {
@@ -72,15 +74,21 @@ impl MapNodeExecutor {
let sub_branch_id = node.branch.clone();
let sem = semaphore.clone();
let abort = step_ctx.abort_signal.clone();
let progress_handle: BranchProgressHandle =
progress_tracker.add_branch(&format!("{}[{idx}]", node.branch));
sub_state.state_mut().set(as_name, item);
let task = tokio::spawn(async move {
let mut progress_handle = Some(progress_handle);
let _permit = sem
.acquire()
.await
.expect("map semaphore should not be closed");
if abort.aborted() {
if let Some(h) = progress_handle.take() {
h.fail("aborted");
}
return (
idx,
sub_state,
@@ -119,12 +127,20 @@ impl MapNodeExecutor {
)),
};
if let Some(h) = progress_handle.take() {
match &exec_result {
Ok(_) => h.complete(),
Err(e) => h.fail(&e.to_string()),
}
}
(idx, state, exec_result)
});
sub_tasks.push(task);
}
let joined = join_all(sub_tasks).await;
progress_tracker.clear();
// Collect outputs keyed by input index so order is preserved regardless
// of finish order. This is the user-facing contract from plan E.2.
+11 -10
View File
@@ -5,6 +5,7 @@ pub mod llm;
pub mod logging;
pub mod map;
pub mod parser;
pub mod progress;
pub mod rag;
pub mod reducer;
pub mod script;
@@ -15,10 +16,10 @@ pub mod types;
pub mod user_interaction;
pub mod validator;
use serde_json::Value;
pub use dispatch::{active_agent_graph_name, run_active_agent_graph};
pub use executor::GraphExecutor;
pub use parser::{GraphParser, agent_has_graph};
use serde_json::Value;
pub use types::{Graph, NodeType};
pub const GRAPH_SCHEMA_VERSION: &str = "1.0";
@@ -27,13 +28,13 @@ pub const DEFAULT_MAX_LOOP_ITERATIONS: usize = 100;
pub const MAX_STATE_SIZE_BYTES: usize = 32 * 1024;
pub (in crate::graph) fn type_name(value: &Value) -> &'static str {
match value {
Value::Null => "null",
Value::Bool(_) => "bool",
Value::Number(_) => "number",
Value::String(_) => "string",
Value::Array(_) => "array",
Value::Object(_) => "object",
}
pub(in crate::graph) fn type_name(value: &Value) -> &'static str {
match value {
Value::Null => "null",
Value::Bool(_) => "bool",
Value::Number(_) => "number",
Value::String(_) => "string",
Value::Array(_) => "array",
Value::Object(_) => "object",
}
}
+94
View File
@@ -0,0 +1,94 @@
use crate::utils::IS_STDOUT_TERMINAL;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use std::sync::LazyLock;
use std::time::{Duration, Instant};
static SPINNER_STYLE: LazyLock<ProgressStyle> = LazyLock::new(|| {
ProgressStyle::with_template("{spinner} [{prefix}] {msg}")
.expect("valid template")
.tick_strings(&["", "", "", "", "", "", "", "", "", "", ""])
});
// Manages a set of per-branch spinners drawn side-by-side via indicatif's
// `MultiProgress`. Created at the start of a multi-branch graph super-step
// (or map sub-branch fan-out) and torn down at the join.
//
// When stdout isn't a terminal (CI, piped output), the tracker becomes a
// no-op — `add_branch` returns a disabled handle whose methods do nothing.
// This keeps machine-piped graph runs free of spinner garbage in their
// captured output.
pub(super) struct BranchProgressTracker {
multi: Option<MultiProgress>,
}
impl BranchProgressTracker {
pub fn new() -> Self {
if *IS_STDOUT_TERMINAL {
Self {
multi: Some(MultiProgress::new()),
}
} else {
Self { multi: None }
}
}
pub fn add_branch(&self, label: &str) -> BranchProgressHandle {
let Some(multi) = &self.multi else {
return BranchProgressHandle::disabled();
};
let bar = multi.add(ProgressBar::new_spinner());
bar.set_style(SPINNER_STYLE.clone());
bar.set_prefix(label.to_string());
bar.set_message("running…");
bar.enable_steady_tick(Duration::from_millis(80));
BranchProgressHandle {
bar: Some(bar),
started: Instant::now(),
}
}
pub fn clear(&self) {
if let Some(multi) = &self.multi {
let _ = multi.clear();
}
}
}
pub(super) struct BranchProgressHandle {
bar: Option<ProgressBar>,
started: Instant,
}
impl BranchProgressHandle {
fn disabled() -> Self {
Self {
bar: None,
started: Instant::now(),
}
}
pub fn complete(self) {
if let Some(bar) = self.bar {
let elapsed = self.started.elapsed();
bar.finish_with_message(format!("✓ done ({:.1}s)", elapsed.as_secs_f64()));
}
}
pub fn fail(self, err: &str) {
if let Some(bar) = self.bar {
let elapsed = self.started.elapsed();
let truncated = if err.len() > 80 {
let mut s = err[..80].to_string();
s.push('…');
s
} else {
err.to_string()
};
bar.finish_with_message(format!(
"✗ failed ({:.1}s) — {}",
elapsed.as_secs_f64(),
truncated
));
}
}
}
+1 -1
View File
@@ -1,7 +1,7 @@
use super::types::Reducer;
use crate::graph::type_name;
use anyhow::{Result, bail};
use serde_json::{Number, Value};
use crate::graph::type_name;
/// Combines a branch's incoming write with the current state value (if any)
/// via the specified reducer. The result is what gets written back to live