feat: implemented structured logging for graph execution
This commit is contained in:
+30
-49
@@ -8,6 +8,7 @@
|
||||
|
||||
use super::agent::AgentNodeExecutor;
|
||||
use super::llm::LlmNodeExecutor;
|
||||
use super::logging::GraphLogger;
|
||||
use super::parser::GraphParser;
|
||||
use super::script::ScriptExecutor;
|
||||
use super::state::StateManager;
|
||||
@@ -15,7 +16,7 @@ use super::types::{EndNode, Graph, Node, NodeType};
|
||||
use super::user_interaction::{ApprovalNodeExecutor, InputNodeExecutor};
|
||||
use super::validator::GraphValidator;
|
||||
use crate::config::RequestContext;
|
||||
use crate::utils::{AbortSignal, dimmed_text};
|
||||
use crate::utils::AbortSignal;
|
||||
use anyhow::{Context, Result, anyhow, bail};
|
||||
use serde_json::Value;
|
||||
use std::collections::HashMap;
|
||||
@@ -45,11 +46,27 @@ impl GraphExecutor {
|
||||
}
|
||||
|
||||
/// Run the graph to completion. Returns the resolved `output` template
|
||||
/// of the terminal `End` node.
|
||||
/// of the terminal `End` node. Any failure is logged via the
|
||||
/// `GraphLogger` before being propagated.
|
||||
pub async fn execute(
|
||||
self,
|
||||
ctx: &mut RequestContext,
|
||||
abort_signal: AbortSignal,
|
||||
) -> Result<String> {
|
||||
let mut logger =
|
||||
GraphLogger::new(&self.graph.name, self.graph.settings.log_state_snapshots);
|
||||
let result = self.run(&mut logger, ctx, abort_signal).await;
|
||||
if let Err(e) = &result {
|
||||
logger.graph_error(e);
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
async fn run(
|
||||
self,
|
||||
logger: &mut GraphLogger,
|
||||
ctx: &mut RequestContext,
|
||||
abort_signal: AbortSignal,
|
||||
) -> Result<String> {
|
||||
let GraphExecutor { graph, base_dir } = self;
|
||||
|
||||
@@ -57,12 +74,7 @@ impl GraphExecutor {
|
||||
let validator = GraphValidator::new(&base_dir);
|
||||
let result = validator.validate(&graph);
|
||||
for w in &result.warnings {
|
||||
let where_ = w
|
||||
.node_id
|
||||
.as_deref()
|
||||
.map(|id| format!("[{id}] "))
|
||||
.unwrap_or_default();
|
||||
warn!("[graph:{}] {}{}", graph.name, where_, w.message);
|
||||
logger.validation_warning(w.node_id.as_deref(), &w.message);
|
||||
}
|
||||
result.into_result()?;
|
||||
}
|
||||
@@ -74,11 +86,7 @@ impl GraphExecutor {
|
||||
let start = Instant::now();
|
||||
|
||||
let mut current = graph.start.clone();
|
||||
info!("[graph:{}] start at '{}'", graph.name, current);
|
||||
eprintln!(
|
||||
"{}",
|
||||
dimmed_text(&format!("▸ graph: {} (start: {})", graph.name, current))
|
||||
);
|
||||
logger.graph_start(¤t, graph.nodes.len());
|
||||
|
||||
let output = loop {
|
||||
if abort_signal.aborted() {
|
||||
@@ -111,16 +119,11 @@ impl GraphExecutor {
|
||||
.get_node(¤t)
|
||||
.ok_or_else(|| anyhow!("Node '{}' not found in graph '{}'", current, graph.name))?;
|
||||
|
||||
debug!(
|
||||
"[graph:{}] entering '{}' (visit {})",
|
||||
graph.name, current, visits
|
||||
);
|
||||
eprintln!(
|
||||
"{}",
|
||||
dimmed_text(&format!("▸ {} ({})", current, node_type_label(node)))
|
||||
);
|
||||
logger.node_entry(node, visits);
|
||||
logger.state_snapshot(¤t, &state);
|
||||
|
||||
let next = step(
|
||||
let node_start = Instant::now();
|
||||
let step_result = step(
|
||||
node,
|
||||
&mut state,
|
||||
ctx,
|
||||
@@ -128,28 +131,17 @@ impl GraphExecutor {
|
||||
&graph.name,
|
||||
¤t,
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("at node '{current}'"))?;
|
||||
.await;
|
||||
logger.record_timing(¤t, node_start.elapsed());
|
||||
let next = step_result.with_context(|| format!("at node '{current}'"))?;
|
||||
|
||||
match next {
|
||||
StepResult::Continue(next_id) => {
|
||||
debug!("[graph:{}] {} -> {}", graph.name, current, next_id);
|
||||
logger.routing(¤t, &next_id);
|
||||
current = next_id;
|
||||
}
|
||||
StepResult::End(out) => {
|
||||
info!(
|
||||
"[graph:{}] end '{}' (elapsed {:?})",
|
||||
graph.name,
|
||||
current,
|
||||
start.elapsed()
|
||||
);
|
||||
eprintln!(
|
||||
"{}",
|
||||
dimmed_text(&format!(
|
||||
"▸ graph done in {:.2}s",
|
||||
start.elapsed().as_secs_f64()
|
||||
))
|
||||
);
|
||||
logger.graph_complete(¤t, start.elapsed());
|
||||
break out;
|
||||
}
|
||||
}
|
||||
@@ -164,17 +156,6 @@ enum StepResult {
|
||||
End(String),
|
||||
}
|
||||
|
||||
fn node_type_label(node: &Node) -> &'static str {
|
||||
match &node.node_type {
|
||||
NodeType::Agent(_) => "agent",
|
||||
NodeType::Script(_) => "script",
|
||||
NodeType::Approval(_) => "approval",
|
||||
NodeType::Input(_) => "input",
|
||||
NodeType::Llm(_) => "llm",
|
||||
NodeType::End(_) => "end",
|
||||
}
|
||||
}
|
||||
|
||||
async fn step(
|
||||
node: &Node,
|
||||
state: &mut StateManager,
|
||||
|
||||
Reference in New Issue
Block a user