From edd3c08247d21da68e8ade4914187abe02ac0773 Mon Sep 17 00:00:00 2001 From: Alex Clarke Date: Fri, 15 May 2026 13:17:42 -0600 Subject: [PATCH] feat: implemented structured logging for graph execution --- src/graph/executor.rs | 79 ++++++---------- src/graph/logging.rs | 206 ++++++++++++++++++++++++++++++++++++++++++ src/graph/mod.rs | 2 + 3 files changed, 238 insertions(+), 49 deletions(-) create mode 100644 src/graph/logging.rs diff --git a/src/graph/executor.rs b/src/graph/executor.rs index a72364f..c9e411a 100644 --- a/src/graph/executor.rs +++ b/src/graph/executor.rs @@ -8,6 +8,7 @@ use super::agent::AgentNodeExecutor; use super::llm::LlmNodeExecutor; +use super::logging::GraphLogger; use super::parser::GraphParser; use super::script::ScriptExecutor; use super::state::StateManager; @@ -15,7 +16,7 @@ use super::types::{EndNode, Graph, Node, NodeType}; use super::user_interaction::{ApprovalNodeExecutor, InputNodeExecutor}; use super::validator::GraphValidator; use crate::config::RequestContext; -use crate::utils::{AbortSignal, dimmed_text}; +use crate::utils::AbortSignal; use anyhow::{Context, Result, anyhow, bail}; use serde_json::Value; use std::collections::HashMap; @@ -45,11 +46,27 @@ impl GraphExecutor { } /// Run the graph to completion. Returns the resolved `output` template - /// of the terminal `End` node. + /// of the terminal `End` node. Any failure is logged via the + /// `GraphLogger` before being propagated. pub async fn execute( self, ctx: &mut RequestContext, abort_signal: AbortSignal, + ) -> Result { + let mut logger = + GraphLogger::new(&self.graph.name, self.graph.settings.log_state_snapshots); + let result = self.run(&mut logger, ctx, abort_signal).await; + if let Err(e) = &result { + logger.graph_error(e); + } + result + } + + async fn run( + self, + logger: &mut GraphLogger, + ctx: &mut RequestContext, + abort_signal: AbortSignal, ) -> Result { let GraphExecutor { graph, base_dir } = self; @@ -57,12 +74,7 @@ impl GraphExecutor { let validator = GraphValidator::new(&base_dir); let result = validator.validate(&graph); for w in &result.warnings { - let where_ = w - .node_id - .as_deref() - .map(|id| format!("[{id}] ")) - .unwrap_or_default(); - warn!("[graph:{}] {}{}", graph.name, where_, w.message); + logger.validation_warning(w.node_id.as_deref(), &w.message); } result.into_result()?; } @@ -74,11 +86,7 @@ impl GraphExecutor { let start = Instant::now(); let mut current = graph.start.clone(); - info!("[graph:{}] start at '{}'", graph.name, current); - eprintln!( - "{}", - dimmed_text(&format!("▸ graph: {} (start: {})", graph.name, current)) - ); + logger.graph_start(¤t, graph.nodes.len()); let output = loop { if abort_signal.aborted() { @@ -111,16 +119,11 @@ impl GraphExecutor { .get_node(¤t) .ok_or_else(|| anyhow!("Node '{}' not found in graph '{}'", current, graph.name))?; - debug!( - "[graph:{}] entering '{}' (visit {})", - graph.name, current, visits - ); - eprintln!( - "{}", - dimmed_text(&format!("▸ {} ({})", current, node_type_label(node))) - ); + logger.node_entry(node, visits); + logger.state_snapshot(¤t, &state); - let next = step( + let node_start = Instant::now(); + let step_result = step( node, &mut state, ctx, @@ -128,28 +131,17 @@ impl GraphExecutor { &graph.name, ¤t, ) - .await - .with_context(|| format!("at node '{current}'"))?; + .await; + logger.record_timing(¤t, node_start.elapsed()); + let next = step_result.with_context(|| format!("at node '{current}'"))?; match next { StepResult::Continue(next_id) => { - debug!("[graph:{}] {} -> {}", graph.name, current, next_id); + logger.routing(¤t, &next_id); current = next_id; } StepResult::End(out) => { - info!( - "[graph:{}] end '{}' (elapsed {:?})", - graph.name, - current, - start.elapsed() - ); - eprintln!( - "{}", - dimmed_text(&format!( - "▸ graph done in {:.2}s", - start.elapsed().as_secs_f64() - )) - ); + logger.graph_complete(¤t, start.elapsed()); break out; } } @@ -164,17 +156,6 @@ enum StepResult { End(String), } -fn node_type_label(node: &Node) -> &'static str { - match &node.node_type { - NodeType::Agent(_) => "agent", - NodeType::Script(_) => "script", - NodeType::Approval(_) => "approval", - NodeType::Input(_) => "input", - NodeType::Llm(_) => "llm", - NodeType::End(_) => "end", - } -} - async fn step( node: &Node, state: &mut StateManager, diff --git a/src/graph/logging.rs b/src/graph/logging.rs new file mode 100644 index 0000000..e1cfd0f --- /dev/null +++ b/src/graph/logging.rs @@ -0,0 +1,206 @@ +//! Structured logging and per-node timing for graph execution. +//! +//! Two output channels, both owned by [`GraphLogger`]: +//! - **`tracing`** (`info!`/`debug!`/`warn!`/`error!`) — respects +//! `RUST_LOG`; this is the developer-facing channel. +//! - **stderr narration** — the dimmed `▸` lines the user follows along +//! with during execution. +//! +//! The logger also accumulates per-node wall-clock timings and emits a +//! performance summary (slowest-first) when the graph completes. + +use std::cmp::Reverse; +use super::state::StateManager; +use super::types::{Node, NodeType}; +use crate::utils::dimmed_text; +use indexmap::IndexMap; +use std::time::Duration; + +#[derive(Debug, Clone, Default)] +struct NodeTiming { + count: usize, + total: Duration, + max: Duration, +} + +impl NodeTiming { + fn record(&mut self, elapsed: Duration) { + self.count += 1; + self.total += elapsed; + if elapsed > self.max { + self.max = elapsed; + } + } +} + +pub struct GraphLogger { + graph_name: String, + log_state_snapshots: bool, + timings: IndexMap, +} + +impl GraphLogger { + pub fn new(graph_name: &str, log_state_snapshots: bool) -> Self { + Self { + graph_name: graph_name.to_string(), + log_state_snapshots, + timings: IndexMap::new(), + } + } + + pub fn graph_start(&self, start_node: &str, node_count: usize) { + info!( + "[graph:{}] start at '{}' ({} nodes)", + self.graph_name, start_node, node_count + ); + eprintln!( + "{}", + dimmed_text(&format!( + "▸ graph: {} (start: {start_node})", + self.graph_name + )) + ); + } + + pub fn graph_complete(&self, end_node: &str, elapsed: Duration) { + info!( + "[graph:{}] end '{}' (elapsed {:?})", + self.graph_name, end_node, elapsed + ); + eprintln!( + "{}", + dimmed_text(&format!("▸ graph done in {:.2}s", elapsed.as_secs_f64())) + ); + self.log_performance_summary(); + } + + pub fn graph_error(&self, error: &anyhow::Error) { + error!("[graph:{}] execution failed: {error:#}", self.graph_name); + } + + pub fn node_entry(&self, node: &Node, visit: usize) { + debug!( + "[graph:{}] entering '{}' (visit {visit})", + self.graph_name, node.id + ); + eprintln!( + "{}", + dimmed_text(&format!("▸ {} ({})", node.id, node_type_label(node))) + ); + } + + pub fn record_timing(&mut self, node_id: &str, elapsed: Duration) { + self.timings + .entry(node_id.to_string()) + .or_default() + .record(elapsed); + } + + pub fn routing(&self, from: &str, to: &str) { + debug!("[graph:{}] {from} -> {to}", self.graph_name); + } + + pub fn validation_warning(&self, node_id: Option<&str>, message: &str) { + match node_id { + Some(id) => warn!("[graph:{}] [{id}] {message}", self.graph_name), + None => warn!("[graph:{}] {message}", self.graph_name), + } + } + + /// Log a state snapshot before a node runs. No-op unless the graph's + /// `log_state_snapshots` setting is enabled. Keys + byte size go to + /// `debug`; the full state goes to `trace` (it may contain secrets, + /// so it is never logged at a more visible level). + pub fn state_snapshot(&self, node_id: &str, state: &StateManager) { + if !self.log_state_snapshots { + return; + } + let snapshot = state.snapshot(); + let mut keys: Vec<&str> = snapshot.keys().map(String::as_str).collect(); + keys.sort_unstable(); + debug!( + "[graph:{}] [{node_id}] state: {} bytes, keys={:?}", + self.graph_name, + state.size_bytes(), + keys + ); + trace!( + "[graph:{}] [{node_id}] full state: {:?}", + self.graph_name, snapshot + ); + } + + fn log_performance_summary(&self) { + if self.timings.is_empty() { + return; + } + let mut rows: Vec<(&String, &NodeTiming)> = self.timings.iter().collect(); + rows.sort_by_key(|b| Reverse(b.1.total)); + info!( + "[graph:{}] performance summary (slowest first):", + self.graph_name + ); + for (node_id, t) in rows { + let avg = t.total / t.count.max(1) as u32; + info!( + "[graph:{}] {node_id}: {} visit(s), total {}ms, avg {}ms, max {}ms", + self.graph_name, + t.count, + t.total.as_millis(), + avg.as_millis(), + t.max.as_millis(), + ); + } + } +} + +fn node_type_label(node: &Node) -> &'static str { + match &node.node_type { + NodeType::Agent(_) => "agent", + NodeType::Script(_) => "script", + NodeType::Approval(_) => "approval", + NodeType::Input(_) => "input", + NodeType::Llm(_) => "llm", + NodeType::End(_) => "end", + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn records_and_aggregates_node_timings() { + let mut logger = GraphLogger::new("g", false); + logger.record_timing("a", Duration::from_millis(100)); + logger.record_timing("a", Duration::from_millis(300)); + logger.record_timing("b", Duration::from_millis(50)); + + let a = logger.timings.get("a").unwrap(); + assert_eq!(a.count, 2); + assert_eq!(a.total, Duration::from_millis(400)); + assert_eq!(a.max, Duration::from_millis(300)); + + let b = logger.timings.get("b").unwrap(); + assert_eq!(b.count, 1); + assert_eq!(b.total, Duration::from_millis(50)); + } + + #[test] + fn node_timing_max_tracks_largest() { + let mut t = NodeTiming::default(); + t.record(Duration::from_millis(10)); + t.record(Duration::from_millis(80)); + t.record(Duration::from_millis(40)); + assert_eq!(t.max, Duration::from_millis(80)); + assert_eq!(t.count, 3); + assert_eq!(t.total, Duration::from_millis(130)); + } + + #[test] + fn new_logger_has_no_timings() { + let logger = GraphLogger::new("g", true); + assert!(logger.timings.is_empty()); + assert!(logger.log_state_snapshots); + } +} diff --git a/src/graph/mod.rs b/src/graph/mod.rs index 354e61a..d4e3fed 100644 --- a/src/graph/mod.rs +++ b/src/graph/mod.rs @@ -5,6 +5,7 @@ pub mod agent; pub mod dispatch; pub mod executor; pub mod llm; +pub mod logging; pub mod parser; pub mod script; pub mod state; @@ -17,6 +18,7 @@ pub use agent::AgentNodeExecutor; pub use dispatch::{active_agent_graph_name, run_active_agent_graph}; pub use executor::GraphExecutor; pub use llm::LlmNodeExecutor; +pub use logging::GraphLogger; pub use parser::{GraphParser, agent_has_graph}; pub use script::ScriptExecutor; pub use state::{StateManager, StateRepresentation};