feat: Removed indicatif spinners. The UX just won't stop clobbering for parallel graph nodes
This commit is contained in:
+56
-29
@@ -1,8 +1,7 @@
|
||||
use super::agent::AgentNodeExecutor;
|
||||
use super::llm::{LlmExecutionOutcome, LlmNodeExecutor};
|
||||
use super::logging::{GraphLogger, node_type_label};
|
||||
use super::logging::{GraphLogger, narrate_node_complete, narrate_node_failed};
|
||||
use super::map::MapNodeExecutor;
|
||||
use super::progress::{BranchProgressHandle, BranchProgressTracker};
|
||||
use super::rag::RagNodeExecutor;
|
||||
use super::script::ScriptExecutor;
|
||||
use super::staging::BranchWrites;
|
||||
@@ -152,14 +151,15 @@ impl GraphExecutor {
|
||||
let semaphore = Arc::new(Semaphore::new(max_concurrency));
|
||||
|
||||
let frontier_size = frontier.len();
|
||||
let is_nested = ctx.current_depth > 0;
|
||||
let has_progress_nodes = frontier.iter().any(|nid| {
|
||||
graph.get_node(nid).is_some_and(|n| {
|
||||
!matches!(n.node_type, NodeType::Approval(_) | NodeType::Input(_))
|
||||
})
|
||||
});
|
||||
let progress_tracker =
|
||||
(has_progress_nodes && !is_nested).then(BranchProgressTracker::new);
|
||||
let in_super_step = frontier_size > 1;
|
||||
let silent = logger.silent();
|
||||
|
||||
if in_super_step {
|
||||
let mut branches = sorted_frontier(&frontier);
|
||||
branches.sort();
|
||||
logger.super_step_start(&branches);
|
||||
}
|
||||
|
||||
let mut branch_tasks = Vec::with_capacity(frontier_size);
|
||||
for node_id in &frontier {
|
||||
let node = graph
|
||||
@@ -168,34 +168,31 @@ impl GraphExecutor {
|
||||
anyhow!("Node '{}' not found in graph '{}'", node_id, graph.name)
|
||||
})?
|
||||
.clone();
|
||||
logger.node_start(&node, in_super_step);
|
||||
let branch_state = state.fork_for_branch_state();
|
||||
let mut branch_ctx = ctx.fork_for_branch();
|
||||
branch_ctx.render_mode = RenderMode::Silent;
|
||||
if in_super_step {
|
||||
branch_ctx.render_mode = RenderMode::Silent;
|
||||
}
|
||||
let script_exec_clone = script_executor.clone();
|
||||
let graph_clone = Arc::clone(&graph);
|
||||
let current = node_id.clone();
|
||||
let sem_clone = semaphore.clone();
|
||||
let abort_clone = abort_signal.clone();
|
||||
let progress_handle = match (
|
||||
matches!(node.node_type, NodeType::Approval(_) | NodeType::Input(_)),
|
||||
&progress_tracker,
|
||||
) {
|
||||
(false, Some(tracker)) => {
|
||||
tracker.add_branch(&format!("{} ({})", node_id, node_type_label(&node)))
|
||||
}
|
||||
_ => BranchProgressHandle::disabled(),
|
||||
};
|
||||
|
||||
let task = tokio::spawn(async move {
|
||||
let mut progress_handle = Some(progress_handle);
|
||||
let _permit = sem_clone
|
||||
.acquire()
|
||||
.await
|
||||
.expect("semaphore should not be closed");
|
||||
if abort_clone.aborted() {
|
||||
if let Some(h) = progress_handle.take() {
|
||||
h.fail("aborted");
|
||||
}
|
||||
narrate_node_failed(
|
||||
silent,
|
||||
&node,
|
||||
Duration::default(),
|
||||
"aborted",
|
||||
in_super_step,
|
||||
);
|
||||
return (
|
||||
current.clone(),
|
||||
branch_state,
|
||||
@@ -214,10 +211,38 @@ impl GraphExecutor {
|
||||
};
|
||||
let result = step(&node, &mut state, &mut ctx, &step_ctx, ¤t).await;
|
||||
let elapsed = node_start.elapsed();
|
||||
if let Some(h) = progress_handle.take() {
|
||||
match &result {
|
||||
Ok(_) => h.complete(),
|
||||
Err(e) => h.fail(&e.to_string()),
|
||||
match &result {
|
||||
Ok(StepResult::Continue(targets)) => {
|
||||
let route = if targets.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(targets.join(", "))
|
||||
};
|
||||
narrate_node_complete(
|
||||
silent,
|
||||
&node,
|
||||
elapsed,
|
||||
route.as_deref(),
|
||||
in_super_step,
|
||||
);
|
||||
}
|
||||
Ok(StepResult::End(_)) => {
|
||||
narrate_node_complete(
|
||||
silent,
|
||||
&node,
|
||||
elapsed,
|
||||
Some("END"),
|
||||
in_super_step,
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
narrate_node_failed(
|
||||
silent,
|
||||
&node,
|
||||
elapsed,
|
||||
&e.to_string(),
|
||||
in_super_step,
|
||||
);
|
||||
}
|
||||
}
|
||||
(current, state, result, elapsed)
|
||||
@@ -226,7 +251,6 @@ impl GraphExecutor {
|
||||
}
|
||||
|
||||
let joined = join_all(branch_tasks).await;
|
||||
drop(progress_tracker);
|
||||
|
||||
let mut branch_writes: Vec<BranchWrites> = Vec::new();
|
||||
let mut next_frontier: HashSet<String> = HashSet::new();
|
||||
@@ -294,6 +318,9 @@ impl GraphExecutor {
|
||||
return Ok(output);
|
||||
}
|
||||
|
||||
if in_super_step {
|
||||
logger.super_step_end(&sorted_frontier(&next_frontier));
|
||||
}
|
||||
frontier = next_frontier;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,24 @@
|
||||
use super::state::StateManager;
|
||||
use super::types::{Node, NodeType};
|
||||
use crate::utils::dimmed_text;
|
||||
use chrono::Local;
|
||||
use indexmap::IndexMap;
|
||||
use std::cmp::Reverse;
|
||||
use std::time::Duration;
|
||||
|
||||
fn ts() -> String {
|
||||
Local::now().format("%H:%M:%S").to_string()
|
||||
}
|
||||
|
||||
fn fmt_secs(elapsed: Duration) -> String {
|
||||
let secs = elapsed.as_secs_f64();
|
||||
if secs < 1.0 {
|
||||
format!("{}ms", elapsed.as_millis())
|
||||
} else {
|
||||
format!("{secs:.2}s")
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
struct NodeTiming {
|
||||
count: usize,
|
||||
@@ -80,6 +94,43 @@ impl GraphLogger {
|
||||
);
|
||||
}
|
||||
|
||||
pub fn silent(&self) -> bool {
|
||||
self.silent
|
||||
}
|
||||
|
||||
pub fn node_start(&self, node: &Node, in_super_step: bool) {
|
||||
narrate_node_start(self.silent, node, in_super_step);
|
||||
}
|
||||
|
||||
pub fn super_step_start(&self, branches: &[String]) {
|
||||
if self.silent {
|
||||
return;
|
||||
}
|
||||
eprintln!(
|
||||
"{}",
|
||||
dimmed_text(&format!(
|
||||
"▸ {} super-step start: {}",
|
||||
ts(),
|
||||
branches.join(", ")
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
pub fn super_step_end(&self, targets: &[String]) {
|
||||
if self.silent {
|
||||
return;
|
||||
}
|
||||
let route = if targets.is_empty() {
|
||||
String::new()
|
||||
} else {
|
||||
format!(" -> {}", targets.join(", "))
|
||||
};
|
||||
eprintln!(
|
||||
"{}",
|
||||
dimmed_text(&format!("▸ {} super-step end{route}", ts()))
|
||||
);
|
||||
}
|
||||
|
||||
pub fn record_timing(&mut self, node_id: &str, elapsed: Duration) {
|
||||
self.timings
|
||||
.entry(node_id.to_string())
|
||||
@@ -144,6 +195,66 @@ impl GraphLogger {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn narrate_node_start(silent: bool, node: &Node, in_super_step: bool) {
|
||||
if silent {
|
||||
return;
|
||||
}
|
||||
let indent = if in_super_step { " " } else { "" };
|
||||
let label = node_type_label(node);
|
||||
eprintln!(
|
||||
"{}",
|
||||
dimmed_text(&format!("▸ {} {indent}{} ({label}) start", ts(), node.id))
|
||||
);
|
||||
}
|
||||
|
||||
pub fn narrate_node_complete(
|
||||
silent: bool,
|
||||
node: &Node,
|
||||
elapsed: Duration,
|
||||
next_target: Option<&str>,
|
||||
in_super_step: bool,
|
||||
) {
|
||||
if silent {
|
||||
return;
|
||||
}
|
||||
let indent = if in_super_step { " " } else { "" };
|
||||
let label = node_type_label(node);
|
||||
let dur = fmt_secs(elapsed);
|
||||
let route = next_target.map(|t| format!(" -> {t}")).unwrap_or_default();
|
||||
eprintln!(
|
||||
"{}",
|
||||
dimmed_text(&format!(
|
||||
"▸ {} {indent}{} ({label}) done in {dur}{route}",
|
||||
ts(),
|
||||
node.id
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
pub fn narrate_node_failed(
|
||||
silent: bool,
|
||||
node: &Node,
|
||||
elapsed: Duration,
|
||||
err: &str,
|
||||
in_super_step: bool,
|
||||
) {
|
||||
if silent {
|
||||
return;
|
||||
}
|
||||
let indent = if in_super_step { " " } else { "" };
|
||||
let label = node_type_label(node);
|
||||
let dur = fmt_secs(elapsed);
|
||||
let excerpt: String = err.chars().take(120).collect();
|
||||
eprintln!(
|
||||
"{}",
|
||||
dimmed_text(&format!(
|
||||
"▸ {} {indent}{} ({label}) FAILED in {dur} -- {excerpt}",
|
||||
ts(),
|
||||
node.id
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
pub(super) fn node_type_label(node: &Node) -> &'static str {
|
||||
match &node.node_type {
|
||||
NodeType::Agent(_) => "agent",
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use super::agent::AgentNodeExecutor;
|
||||
use super::executor::StepContext;
|
||||
use super::llm::LlmNodeExecutor;
|
||||
use super::progress::{BranchProgressHandle, BranchProgressTracker};
|
||||
use super::rag::RagNodeExecutor;
|
||||
use super::state::StateManager;
|
||||
use super::types::{MapNode, NodeType};
|
||||
@@ -54,7 +53,6 @@ impl MapNodeExecutor {
|
||||
.unwrap_or(step_ctx.max_concurrency)
|
||||
.max(1);
|
||||
let semaphore = Arc::new(Semaphore::new(max_conc));
|
||||
let progress_tracker = BranchProgressTracker::new();
|
||||
let mut sub_tasks = Vec::with_capacity(items.len());
|
||||
|
||||
for (idx, item) in items.iter().enumerate() {
|
||||
@@ -68,21 +66,15 @@ impl MapNodeExecutor {
|
||||
let sub_branch_id = node.branch.clone();
|
||||
let sem = semaphore.clone();
|
||||
let abort = step_ctx.abort_signal.clone();
|
||||
let progress_handle: BranchProgressHandle =
|
||||
progress_tracker.add_branch(&format!("{}[{idx}]", node.branch));
|
||||
|
||||
sub_state.state_mut().set(as_name, item);
|
||||
|
||||
let task = tokio::spawn(async move {
|
||||
let mut progress_handle = Some(progress_handle);
|
||||
let _permit = sem
|
||||
.acquire()
|
||||
.await
|
||||
.expect("map semaphore should not be closed");
|
||||
if abort.aborted() {
|
||||
if let Some(h) = progress_handle.take() {
|
||||
h.fail("aborted");
|
||||
}
|
||||
return (
|
||||
idx,
|
||||
sub_state,
|
||||
@@ -110,20 +102,12 @@ impl MapNodeExecutor {
|
||||
)),
|
||||
};
|
||||
|
||||
if let Some(h) = progress_handle.take() {
|
||||
match &exec_result {
|
||||
Ok(_) => h.complete(),
|
||||
Err(e) => h.fail(&e.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
(idx, state, exec_result)
|
||||
});
|
||||
sub_tasks.push(task);
|
||||
}
|
||||
|
||||
let joined = join_all(sub_tasks).await;
|
||||
drop(progress_tracker);
|
||||
|
||||
// Collect outputs keyed by input index so order is preserved regardless of finish order.
|
||||
let mut outputs: HashMap<usize, Value> = HashMap::new();
|
||||
|
||||
@@ -5,7 +5,6 @@ pub mod llm;
|
||||
pub mod logging;
|
||||
pub mod map;
|
||||
pub mod parser;
|
||||
pub mod progress;
|
||||
pub mod rag;
|
||||
pub mod reducer;
|
||||
pub mod script;
|
||||
|
||||
@@ -1,71 +0,0 @@
|
||||
use crate::utils::IS_STDOUT_TERMINAL;
|
||||
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
|
||||
use std::sync::LazyLock;
|
||||
use std::time::Duration;
|
||||
|
||||
const GREEN: &str = "\x1b[32m";
|
||||
const RED: &str = "\x1b[31m";
|
||||
const RESET: &str = "\x1b[0m";
|
||||
|
||||
static SPINNER_STYLE: LazyLock<ProgressStyle> = LazyLock::new(|| {
|
||||
ProgressStyle::with_template("{spinner} [{prefix}] {msg} ({elapsed})")
|
||||
.expect("valid template")
|
||||
.tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏", ""])
|
||||
});
|
||||
|
||||
pub(super) struct BranchProgressTracker {
|
||||
multi: Option<MultiProgress>,
|
||||
}
|
||||
|
||||
impl BranchProgressTracker {
|
||||
pub fn new() -> Self {
|
||||
if *IS_STDOUT_TERMINAL {
|
||||
Self {
|
||||
multi: Some(MultiProgress::new()),
|
||||
}
|
||||
} else {
|
||||
Self { multi: None }
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_branch(&self, label: &str) -> BranchProgressHandle {
|
||||
let Some(multi) = &self.multi else {
|
||||
return BranchProgressHandle::disabled();
|
||||
};
|
||||
let bar = multi.add(ProgressBar::new_spinner());
|
||||
bar.set_style(SPINNER_STYLE.clone());
|
||||
bar.set_prefix(label.to_string());
|
||||
bar.set_message("running…");
|
||||
bar.enable_steady_tick(Duration::from_millis(80));
|
||||
BranchProgressHandle { bar: Some(bar) }
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) struct BranchProgressHandle {
|
||||
bar: Option<ProgressBar>,
|
||||
}
|
||||
|
||||
impl BranchProgressHandle {
|
||||
pub fn disabled() -> Self {
|
||||
Self { bar: None }
|
||||
}
|
||||
|
||||
pub fn complete(self) {
|
||||
if let Some(bar) = self.bar {
|
||||
bar.finish_with_message(format!("{GREEN}✓ done{RESET}"));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn fail(self, err: &str) {
|
||||
if let Some(bar) = self.bar {
|
||||
let truncated = if err.len() > 80 {
|
||||
let mut s = err[..80].to_string();
|
||||
s.push('…');
|
||||
s
|
||||
} else {
|
||||
err.to_string()
|
||||
};
|
||||
bar.finish_with_message(format!("{RED}✗ failed {RESET} — {truncated}"));
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user