feat: implemented structured logging for graph execution
This commit is contained in:
+30
-49
@@ -8,6 +8,7 @@
|
|||||||
|
|
||||||
use super::agent::AgentNodeExecutor;
|
use super::agent::AgentNodeExecutor;
|
||||||
use super::llm::LlmNodeExecutor;
|
use super::llm::LlmNodeExecutor;
|
||||||
|
use super::logging::GraphLogger;
|
||||||
use super::parser::GraphParser;
|
use super::parser::GraphParser;
|
||||||
use super::script::ScriptExecutor;
|
use super::script::ScriptExecutor;
|
||||||
use super::state::StateManager;
|
use super::state::StateManager;
|
||||||
@@ -15,7 +16,7 @@ use super::types::{EndNode, Graph, Node, NodeType};
|
|||||||
use super::user_interaction::{ApprovalNodeExecutor, InputNodeExecutor};
|
use super::user_interaction::{ApprovalNodeExecutor, InputNodeExecutor};
|
||||||
use super::validator::GraphValidator;
|
use super::validator::GraphValidator;
|
||||||
use crate::config::RequestContext;
|
use crate::config::RequestContext;
|
||||||
use crate::utils::{AbortSignal, dimmed_text};
|
use crate::utils::AbortSignal;
|
||||||
use anyhow::{Context, Result, anyhow, bail};
|
use anyhow::{Context, Result, anyhow, bail};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
@@ -45,11 +46,27 @@ impl GraphExecutor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Run the graph to completion. Returns the resolved `output` template
|
/// Run the graph to completion. Returns the resolved `output` template
|
||||||
/// of the terminal `End` node.
|
/// of the terminal `End` node. Any failure is logged via the
|
||||||
|
/// `GraphLogger` before being propagated.
|
||||||
pub async fn execute(
|
pub async fn execute(
|
||||||
self,
|
self,
|
||||||
ctx: &mut RequestContext,
|
ctx: &mut RequestContext,
|
||||||
abort_signal: AbortSignal,
|
abort_signal: AbortSignal,
|
||||||
|
) -> Result<String> {
|
||||||
|
let mut logger =
|
||||||
|
GraphLogger::new(&self.graph.name, self.graph.settings.log_state_snapshots);
|
||||||
|
let result = self.run(&mut logger, ctx, abort_signal).await;
|
||||||
|
if let Err(e) = &result {
|
||||||
|
logger.graph_error(e);
|
||||||
|
}
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(
|
||||||
|
self,
|
||||||
|
logger: &mut GraphLogger,
|
||||||
|
ctx: &mut RequestContext,
|
||||||
|
abort_signal: AbortSignal,
|
||||||
) -> Result<String> {
|
) -> Result<String> {
|
||||||
let GraphExecutor { graph, base_dir } = self;
|
let GraphExecutor { graph, base_dir } = self;
|
||||||
|
|
||||||
@@ -57,12 +74,7 @@ impl GraphExecutor {
|
|||||||
let validator = GraphValidator::new(&base_dir);
|
let validator = GraphValidator::new(&base_dir);
|
||||||
let result = validator.validate(&graph);
|
let result = validator.validate(&graph);
|
||||||
for w in &result.warnings {
|
for w in &result.warnings {
|
||||||
let where_ = w
|
logger.validation_warning(w.node_id.as_deref(), &w.message);
|
||||||
.node_id
|
|
||||||
.as_deref()
|
|
||||||
.map(|id| format!("[{id}] "))
|
|
||||||
.unwrap_or_default();
|
|
||||||
warn!("[graph:{}] {}{}", graph.name, where_, w.message);
|
|
||||||
}
|
}
|
||||||
result.into_result()?;
|
result.into_result()?;
|
||||||
}
|
}
|
||||||
@@ -74,11 +86,7 @@ impl GraphExecutor {
|
|||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|
||||||
let mut current = graph.start.clone();
|
let mut current = graph.start.clone();
|
||||||
info!("[graph:{}] start at '{}'", graph.name, current);
|
logger.graph_start(¤t, graph.nodes.len());
|
||||||
eprintln!(
|
|
||||||
"{}",
|
|
||||||
dimmed_text(&format!("▸ graph: {} (start: {})", graph.name, current))
|
|
||||||
);
|
|
||||||
|
|
||||||
let output = loop {
|
let output = loop {
|
||||||
if abort_signal.aborted() {
|
if abort_signal.aborted() {
|
||||||
@@ -111,16 +119,11 @@ impl GraphExecutor {
|
|||||||
.get_node(¤t)
|
.get_node(¤t)
|
||||||
.ok_or_else(|| anyhow!("Node '{}' not found in graph '{}'", current, graph.name))?;
|
.ok_or_else(|| anyhow!("Node '{}' not found in graph '{}'", current, graph.name))?;
|
||||||
|
|
||||||
debug!(
|
logger.node_entry(node, visits);
|
||||||
"[graph:{}] entering '{}' (visit {})",
|
logger.state_snapshot(¤t, &state);
|
||||||
graph.name, current, visits
|
|
||||||
);
|
|
||||||
eprintln!(
|
|
||||||
"{}",
|
|
||||||
dimmed_text(&format!("▸ {} ({})", current, node_type_label(node)))
|
|
||||||
);
|
|
||||||
|
|
||||||
let next = step(
|
let node_start = Instant::now();
|
||||||
|
let step_result = step(
|
||||||
node,
|
node,
|
||||||
&mut state,
|
&mut state,
|
||||||
ctx,
|
ctx,
|
||||||
@@ -128,28 +131,17 @@ impl GraphExecutor {
|
|||||||
&graph.name,
|
&graph.name,
|
||||||
¤t,
|
¤t,
|
||||||
)
|
)
|
||||||
.await
|
.await;
|
||||||
.with_context(|| format!("at node '{current}'"))?;
|
logger.record_timing(¤t, node_start.elapsed());
|
||||||
|
let next = step_result.with_context(|| format!("at node '{current}'"))?;
|
||||||
|
|
||||||
match next {
|
match next {
|
||||||
StepResult::Continue(next_id) => {
|
StepResult::Continue(next_id) => {
|
||||||
debug!("[graph:{}] {} -> {}", graph.name, current, next_id);
|
logger.routing(¤t, &next_id);
|
||||||
current = next_id;
|
current = next_id;
|
||||||
}
|
}
|
||||||
StepResult::End(out) => {
|
StepResult::End(out) => {
|
||||||
info!(
|
logger.graph_complete(¤t, start.elapsed());
|
||||||
"[graph:{}] end '{}' (elapsed {:?})",
|
|
||||||
graph.name,
|
|
||||||
current,
|
|
||||||
start.elapsed()
|
|
||||||
);
|
|
||||||
eprintln!(
|
|
||||||
"{}",
|
|
||||||
dimmed_text(&format!(
|
|
||||||
"▸ graph done in {:.2}s",
|
|
||||||
start.elapsed().as_secs_f64()
|
|
||||||
))
|
|
||||||
);
|
|
||||||
break out;
|
break out;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -164,17 +156,6 @@ enum StepResult {
|
|||||||
End(String),
|
End(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
fn node_type_label(node: &Node) -> &'static str {
|
|
||||||
match &node.node_type {
|
|
||||||
NodeType::Agent(_) => "agent",
|
|
||||||
NodeType::Script(_) => "script",
|
|
||||||
NodeType::Approval(_) => "approval",
|
|
||||||
NodeType::Input(_) => "input",
|
|
||||||
NodeType::Llm(_) => "llm",
|
|
||||||
NodeType::End(_) => "end",
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn step(
|
async fn step(
|
||||||
node: &Node,
|
node: &Node,
|
||||||
state: &mut StateManager,
|
state: &mut StateManager,
|
||||||
|
|||||||
@@ -0,0 +1,206 @@
|
|||||||
|
//! Structured logging and per-node timing for graph execution.
|
||||||
|
//!
|
||||||
|
//! Two output channels, both owned by [`GraphLogger`]:
|
||||||
|
//! - **`tracing`** (`info!`/`debug!`/`warn!`/`error!`) — respects
|
||||||
|
//! `RUST_LOG`; this is the developer-facing channel.
|
||||||
|
//! - **stderr narration** — the dimmed `▸` lines the user follows along
|
||||||
|
//! with during execution.
|
||||||
|
//!
|
||||||
|
//! The logger also accumulates per-node wall-clock timings and emits a
|
||||||
|
//! performance summary (slowest-first) when the graph completes.
|
||||||
|
|
||||||
|
use std::cmp::Reverse;
|
||||||
|
use super::state::StateManager;
|
||||||
|
use super::types::{Node, NodeType};
|
||||||
|
use crate::utils::dimmed_text;
|
||||||
|
use indexmap::IndexMap;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Default)]
|
||||||
|
struct NodeTiming {
|
||||||
|
count: usize,
|
||||||
|
total: Duration,
|
||||||
|
max: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NodeTiming {
|
||||||
|
fn record(&mut self, elapsed: Duration) {
|
||||||
|
self.count += 1;
|
||||||
|
self.total += elapsed;
|
||||||
|
if elapsed > self.max {
|
||||||
|
self.max = elapsed;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct GraphLogger {
|
||||||
|
graph_name: String,
|
||||||
|
log_state_snapshots: bool,
|
||||||
|
timings: IndexMap<String, NodeTiming>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GraphLogger {
|
||||||
|
pub fn new(graph_name: &str, log_state_snapshots: bool) -> Self {
|
||||||
|
Self {
|
||||||
|
graph_name: graph_name.to_string(),
|
||||||
|
log_state_snapshots,
|
||||||
|
timings: IndexMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn graph_start(&self, start_node: &str, node_count: usize) {
|
||||||
|
info!(
|
||||||
|
"[graph:{}] start at '{}' ({} nodes)",
|
||||||
|
self.graph_name, start_node, node_count
|
||||||
|
);
|
||||||
|
eprintln!(
|
||||||
|
"{}",
|
||||||
|
dimmed_text(&format!(
|
||||||
|
"▸ graph: {} (start: {start_node})",
|
||||||
|
self.graph_name
|
||||||
|
))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn graph_complete(&self, end_node: &str, elapsed: Duration) {
|
||||||
|
info!(
|
||||||
|
"[graph:{}] end '{}' (elapsed {:?})",
|
||||||
|
self.graph_name, end_node, elapsed
|
||||||
|
);
|
||||||
|
eprintln!(
|
||||||
|
"{}",
|
||||||
|
dimmed_text(&format!("▸ graph done in {:.2}s", elapsed.as_secs_f64()))
|
||||||
|
);
|
||||||
|
self.log_performance_summary();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn graph_error(&self, error: &anyhow::Error) {
|
||||||
|
error!("[graph:{}] execution failed: {error:#}", self.graph_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn node_entry(&self, node: &Node, visit: usize) {
|
||||||
|
debug!(
|
||||||
|
"[graph:{}] entering '{}' (visit {visit})",
|
||||||
|
self.graph_name, node.id
|
||||||
|
);
|
||||||
|
eprintln!(
|
||||||
|
"{}",
|
||||||
|
dimmed_text(&format!("▸ {} ({})", node.id, node_type_label(node)))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn record_timing(&mut self, node_id: &str, elapsed: Duration) {
|
||||||
|
self.timings
|
||||||
|
.entry(node_id.to_string())
|
||||||
|
.or_default()
|
||||||
|
.record(elapsed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn routing(&self, from: &str, to: &str) {
|
||||||
|
debug!("[graph:{}] {from} -> {to}", self.graph_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn validation_warning(&self, node_id: Option<&str>, message: &str) {
|
||||||
|
match node_id {
|
||||||
|
Some(id) => warn!("[graph:{}] [{id}] {message}", self.graph_name),
|
||||||
|
None => warn!("[graph:{}] {message}", self.graph_name),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Log a state snapshot before a node runs. No-op unless the graph's
|
||||||
|
/// `log_state_snapshots` setting is enabled. Keys + byte size go to
|
||||||
|
/// `debug`; the full state goes to `trace` (it may contain secrets,
|
||||||
|
/// so it is never logged at a more visible level).
|
||||||
|
pub fn state_snapshot(&self, node_id: &str, state: &StateManager) {
|
||||||
|
if !self.log_state_snapshots {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let snapshot = state.snapshot();
|
||||||
|
let mut keys: Vec<&str> = snapshot.keys().map(String::as_str).collect();
|
||||||
|
keys.sort_unstable();
|
||||||
|
debug!(
|
||||||
|
"[graph:{}] [{node_id}] state: {} bytes, keys={:?}",
|
||||||
|
self.graph_name,
|
||||||
|
state.size_bytes(),
|
||||||
|
keys
|
||||||
|
);
|
||||||
|
trace!(
|
||||||
|
"[graph:{}] [{node_id}] full state: {:?}",
|
||||||
|
self.graph_name, snapshot
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn log_performance_summary(&self) {
|
||||||
|
if self.timings.is_empty() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let mut rows: Vec<(&String, &NodeTiming)> = self.timings.iter().collect();
|
||||||
|
rows.sort_by_key(|b| Reverse(b.1.total));
|
||||||
|
info!(
|
||||||
|
"[graph:{}] performance summary (slowest first):",
|
||||||
|
self.graph_name
|
||||||
|
);
|
||||||
|
for (node_id, t) in rows {
|
||||||
|
let avg = t.total / t.count.max(1) as u32;
|
||||||
|
info!(
|
||||||
|
"[graph:{}] {node_id}: {} visit(s), total {}ms, avg {}ms, max {}ms",
|
||||||
|
self.graph_name,
|
||||||
|
t.count,
|
||||||
|
t.total.as_millis(),
|
||||||
|
avg.as_millis(),
|
||||||
|
t.max.as_millis(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn node_type_label(node: &Node) -> &'static str {
|
||||||
|
match &node.node_type {
|
||||||
|
NodeType::Agent(_) => "agent",
|
||||||
|
NodeType::Script(_) => "script",
|
||||||
|
NodeType::Approval(_) => "approval",
|
||||||
|
NodeType::Input(_) => "input",
|
||||||
|
NodeType::Llm(_) => "llm",
|
||||||
|
NodeType::End(_) => "end",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn records_and_aggregates_node_timings() {
|
||||||
|
let mut logger = GraphLogger::new("g", false);
|
||||||
|
logger.record_timing("a", Duration::from_millis(100));
|
||||||
|
logger.record_timing("a", Duration::from_millis(300));
|
||||||
|
logger.record_timing("b", Duration::from_millis(50));
|
||||||
|
|
||||||
|
let a = logger.timings.get("a").unwrap();
|
||||||
|
assert_eq!(a.count, 2);
|
||||||
|
assert_eq!(a.total, Duration::from_millis(400));
|
||||||
|
assert_eq!(a.max, Duration::from_millis(300));
|
||||||
|
|
||||||
|
let b = logger.timings.get("b").unwrap();
|
||||||
|
assert_eq!(b.count, 1);
|
||||||
|
assert_eq!(b.total, Duration::from_millis(50));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn node_timing_max_tracks_largest() {
|
||||||
|
let mut t = NodeTiming::default();
|
||||||
|
t.record(Duration::from_millis(10));
|
||||||
|
t.record(Duration::from_millis(80));
|
||||||
|
t.record(Duration::from_millis(40));
|
||||||
|
assert_eq!(t.max, Duration::from_millis(80));
|
||||||
|
assert_eq!(t.count, 3);
|
||||||
|
assert_eq!(t.total, Duration::from_millis(130));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn new_logger_has_no_timings() {
|
||||||
|
let logger = GraphLogger::new("g", true);
|
||||||
|
assert!(logger.timings.is_empty());
|
||||||
|
assert!(logger.log_state_snapshots);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -5,6 +5,7 @@ pub mod agent;
|
|||||||
pub mod dispatch;
|
pub mod dispatch;
|
||||||
pub mod executor;
|
pub mod executor;
|
||||||
pub mod llm;
|
pub mod llm;
|
||||||
|
pub mod logging;
|
||||||
pub mod parser;
|
pub mod parser;
|
||||||
pub mod script;
|
pub mod script;
|
||||||
pub mod state;
|
pub mod state;
|
||||||
@@ -17,6 +18,7 @@ pub use agent::AgentNodeExecutor;
|
|||||||
pub use dispatch::{active_agent_graph_name, run_active_agent_graph};
|
pub use dispatch::{active_agent_graph_name, run_active_agent_graph};
|
||||||
pub use executor::GraphExecutor;
|
pub use executor::GraphExecutor;
|
||||||
pub use llm::LlmNodeExecutor;
|
pub use llm::LlmNodeExecutor;
|
||||||
|
pub use logging::GraphLogger;
|
||||||
pub use parser::{GraphParser, agent_has_graph};
|
pub use parser::{GraphParser, agent_has_graph};
|
||||||
pub use script::ScriptExecutor;
|
pub use script::ScriptExecutor;
|
||||||
pub use state::{StateManager, StateRepresentation};
|
pub use state::{StateManager, StateRepresentation};
|
||||||
|
|||||||
Reference in New Issue
Block a user