feat: Added initial support for native Loki agent nodes in the graph-based agent system
This commit is contained in:
@@ -326,7 +326,7 @@ pub async fn handle_supervisor_tool(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_child_agent(
|
pub fn run_child_agent(
|
||||||
mut child_ctx: RequestContext,
|
mut child_ctx: RequestContext,
|
||||||
initial_input: Input,
|
initial_input: Input,
|
||||||
abort_signal: AbortSignal,
|
abort_signal: AbortSignal,
|
||||||
@@ -374,6 +374,98 @@ fn run_child_agent(
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Spawn an agent synchronously from a graph node and return its accumulated
|
||||||
|
/// output. This is similar to `handle_spawn` but runs the child agent in the
|
||||||
|
/// current task (no tokio::spawn, no supervisor handle registration) so the
|
||||||
|
/// graph executor can sequence agent nodes directly.
|
||||||
|
pub async fn run_agent_for_graph(
|
||||||
|
parent_ctx: &mut RequestContext,
|
||||||
|
agent_name: &str,
|
||||||
|
prompt: &str,
|
||||||
|
) -> Result<String> {
|
||||||
|
let short_uuid = &Uuid::new_v4().to_string()[..8];
|
||||||
|
let agent_id = format!("graph_agent_{agent_name}_{short_uuid}");
|
||||||
|
let current_depth = parent_ctx.current_depth + 1;
|
||||||
|
|
||||||
|
if let Some(supervisor) = parent_ctx.supervisor.as_ref().cloned() {
|
||||||
|
let max_depth = supervisor.read().max_depth();
|
||||||
|
if current_depth > max_depth {
|
||||||
|
bail!("Max agent depth exceeded ({current_depth}/{max_depth})");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !parent_ctx.app.config.function_calling_support {
|
||||||
|
bail!("Function calling support must be enabled to spawn agents.");
|
||||||
|
}
|
||||||
|
|
||||||
|
let child_inbox = Arc::new(Inbox::new());
|
||||||
|
parent_ctx.ensure_root_escalation_queue();
|
||||||
|
let child_abort = create_abort_signal();
|
||||||
|
|
||||||
|
let app_config = Arc::clone(&parent_ctx.app.config);
|
||||||
|
let current_model = parent_ctx.current_model().clone();
|
||||||
|
let info_flag = parent_ctx.info_flag;
|
||||||
|
let child_app_state = Arc::new(AppState {
|
||||||
|
config: Arc::new(app_config.as_ref().clone()),
|
||||||
|
vault: parent_ctx.app.vault.clone(),
|
||||||
|
mcp_factory: parent_ctx.app.mcp_factory.clone(),
|
||||||
|
rag_cache: parent_ctx.app.rag_cache.clone(),
|
||||||
|
mcp_config: parent_ctx.app.mcp_config.clone(),
|
||||||
|
mcp_log_path: parent_ctx.app.mcp_log_path.clone(),
|
||||||
|
mcp_registry: parent_ctx.app.mcp_registry.clone(),
|
||||||
|
functions: parent_ctx.app.functions.clone(),
|
||||||
|
});
|
||||||
|
|
||||||
|
let agent = Agent::init(
|
||||||
|
app_config.as_ref(),
|
||||||
|
child_app_state.as_ref(),
|
||||||
|
¤t_model,
|
||||||
|
info_flag,
|
||||||
|
agent_name,
|
||||||
|
child_abort.clone(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let agent_mcp_servers = agent.mcp_server_names().to_vec();
|
||||||
|
let session = agent.agent_session().map(|v| v.to_string());
|
||||||
|
let should_init_supervisor = agent.can_spawn_agents();
|
||||||
|
let agent_max_concurrent = agent.max_concurrent_agents();
|
||||||
|
let agent_max_depth = agent.max_agent_depth();
|
||||||
|
|
||||||
|
let mut child_ctx = RequestContext::new_for_child(
|
||||||
|
Arc::clone(&child_app_state),
|
||||||
|
parent_ctx,
|
||||||
|
current_depth,
|
||||||
|
Arc::clone(&child_inbox),
|
||||||
|
agent_id.clone(),
|
||||||
|
);
|
||||||
|
child_ctx.rag = agent.rag();
|
||||||
|
child_ctx.agent = Some(agent);
|
||||||
|
if should_init_supervisor {
|
||||||
|
child_ctx.supervisor = Some(Arc::new(RwLock::new(Supervisor::new(
|
||||||
|
agent_max_concurrent,
|
||||||
|
agent_max_depth,
|
||||||
|
))));
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(session) = session {
|
||||||
|
child_ctx
|
||||||
|
.use_session(app_config.as_ref(), Some(&session), child_abort.clone())
|
||||||
|
.await?;
|
||||||
|
sync_agent_functions_to_ctx(&mut child_ctx)?;
|
||||||
|
} else {
|
||||||
|
populate_agent_mcp_runtime(&mut child_ctx, &agent_mcp_servers).await?;
|
||||||
|
sync_agent_functions_to_ctx(&mut child_ctx)?;
|
||||||
|
child_ctx.init_agent_shared_variables()?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let input = Input::from_str(&child_ctx, prompt, None);
|
||||||
|
|
||||||
|
debug!("Spawning agent '{agent_name}' for graph node as '{agent_id}'");
|
||||||
|
|
||||||
|
run_child_agent(child_ctx, input, child_abort).await
|
||||||
|
}
|
||||||
|
|
||||||
async fn populate_agent_mcp_runtime(ctx: &mut RequestContext, server_ids: &[String]) -> Result<()> {
|
async fn populate_agent_mcp_runtime(ctx: &mut RequestContext, server_ids: &[String]) -> Result<()> {
|
||||||
if !ctx.app.config.mcp_server_support {
|
if !ctx.app.config.mcp_server_support {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
|
|||||||
@@ -0,0 +1,188 @@
|
|||||||
|
//! Execution of `agent`-type graph nodes.
|
||||||
|
//!
|
||||||
|
//! Spawns a child agent via `function::supervisor::run_agent_for_graph`,
|
||||||
|
//! interpolating the prompt against current graph state. After the agent
|
||||||
|
//! finishes, applies the node's `state_updates` (templates can reference
|
||||||
|
//! `{{output}}` for the agent's stdout).
|
||||||
|
|
||||||
|
use super::state::StateManager;
|
||||||
|
use super::types::AgentNode;
|
||||||
|
use crate::config::RequestContext;
|
||||||
|
use crate::function::supervisor::run_agent_for_graph;
|
||||||
|
use anyhow::{Context, Result};
|
||||||
|
use serde_json::Value;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::time::timeout;
|
||||||
|
|
||||||
|
const OUTPUT_KEY: &str = "output";
|
||||||
|
const DEFAULT_TIMEOUT_SECS: u64 = 300;
|
||||||
|
|
||||||
|
pub struct AgentNodeExecutor;
|
||||||
|
|
||||||
|
impl AgentNodeExecutor {
|
||||||
|
/// Interpolate the node's prompt, spawn the agent, wait for it to
|
||||||
|
/// finish, then apply `state_updates`. Returns the agent's full output.
|
||||||
|
pub async fn execute(
|
||||||
|
node: &AgentNode,
|
||||||
|
state_manager: &mut StateManager,
|
||||||
|
parent_ctx: &mut RequestContext,
|
||||||
|
) -> Result<String> {
|
||||||
|
let prompt = state_manager
|
||||||
|
.interpolate(&node.prompt)
|
||||||
|
.with_context(|| format!("Failed to interpolate prompt for agent '{}'", node.agent))?;
|
||||||
|
|
||||||
|
let timeout_dur = Duration::from_secs(node.timeout.unwrap_or(DEFAULT_TIMEOUT_SECS));
|
||||||
|
|
||||||
|
let output = timeout(
|
||||||
|
timeout_dur,
|
||||||
|
run_agent_for_graph(parent_ctx, &node.agent, &prompt),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.with_context(|| {
|
||||||
|
format!(
|
||||||
|
"Agent '{}' timed out after {}s",
|
||||||
|
node.agent,
|
||||||
|
timeout_dur.as_secs()
|
||||||
|
)
|
||||||
|
})?
|
||||||
|
.with_context(|| format!("Agent '{}' failed", node.agent))?;
|
||||||
|
|
||||||
|
apply_state_updates(node, state_manager, &output);
|
||||||
|
|
||||||
|
Ok(output)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Exposes the agent's output as `{{output}}` for template evaluation, then
|
||||||
|
/// applies every key/template in `state_updates`. The temporary `output`
|
||||||
|
/// state key is removed at the end so it doesn't leak into subsequent
|
||||||
|
/// nodes' templates.
|
||||||
|
fn apply_state_updates(node: &AgentNode, state_manager: &mut StateManager, output: &str) {
|
||||||
|
let Some(updates) = &node.state_updates else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let prev_output = state_manager.state().get(OUTPUT_KEY).cloned();
|
||||||
|
state_manager
|
||||||
|
.state_mut()
|
||||||
|
.set(OUTPUT_KEY.into(), Value::String(output.to_string()));
|
||||||
|
|
||||||
|
for (key, template) in updates {
|
||||||
|
let value = state_manager.interpolate_lenient(template);
|
||||||
|
state_manager
|
||||||
|
.state_mut()
|
||||||
|
.set(key.clone(), Value::String(value));
|
||||||
|
}
|
||||||
|
|
||||||
|
match prev_output {
|
||||||
|
Some(v) => state_manager.state_mut().set(OUTPUT_KEY.into(), v),
|
||||||
|
None => {
|
||||||
|
state_manager
|
||||||
|
.state_mut()
|
||||||
|
.set(OUTPUT_KEY.into(), Value::Null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::super::types::AgentNode;
|
||||||
|
use super::*;
|
||||||
|
use serde_json::json;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
fn manager_with(pairs: &[(&str, Value)]) -> StateManager {
|
||||||
|
let mut map = HashMap::new();
|
||||||
|
for (k, v) in pairs {
|
||||||
|
map.insert((*k).into(), v.clone());
|
||||||
|
}
|
||||||
|
StateManager::new(map)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn node_with(prompt: &str, updates: Option<HashMap<String, String>>) -> AgentNode {
|
||||||
|
AgentNode {
|
||||||
|
agent: "test_agent".into(),
|
||||||
|
prompt: prompt.into(),
|
||||||
|
state_updates: updates,
|
||||||
|
timeout: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn state_updates_use_output_placeholder() {
|
||||||
|
let node = {
|
||||||
|
let mut u = HashMap::new();
|
||||||
|
u.insert("findings".into(), "{{output}}".into());
|
||||||
|
node_with("hi", Some(u))
|
||||||
|
};
|
||||||
|
let mut state = manager_with(&[]);
|
||||||
|
apply_state_updates(&node, &mut state, "agent finished its work");
|
||||||
|
assert_eq!(
|
||||||
|
state.state().get("findings"),
|
||||||
|
Some(&json!("agent finished its work"))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn state_updates_can_reference_existing_keys_and_output() {
|
||||||
|
let node = {
|
||||||
|
let mut u = HashMap::new();
|
||||||
|
u.insert("summary".into(), "{{topic}}: {{output}}".into());
|
||||||
|
node_with("hi", Some(u))
|
||||||
|
};
|
||||||
|
let mut state = manager_with(&[("topic", json!("auth"))]);
|
||||||
|
apply_state_updates(&node, &mut state, "JWT vs sessions");
|
||||||
|
assert_eq!(
|
||||||
|
state.state().get("summary"),
|
||||||
|
Some(&json!("auth: JWT vs sessions"))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn output_key_is_cleaned_up_after_state_updates() {
|
||||||
|
let node = {
|
||||||
|
let mut u = HashMap::new();
|
||||||
|
u.insert("findings".into(), "{{output}}".into());
|
||||||
|
node_with("hi", Some(u))
|
||||||
|
};
|
||||||
|
let mut state = manager_with(&[]);
|
||||||
|
apply_state_updates(&node, &mut state, "anything");
|
||||||
|
assert_eq!(state.state().get("output"), Some(&Value::Null));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn pre_existing_output_value_is_preserved() {
|
||||||
|
let node = {
|
||||||
|
let mut u = HashMap::new();
|
||||||
|
u.insert("greeting".into(), "{{output}}".into());
|
||||||
|
node_with("hi", Some(u))
|
||||||
|
};
|
||||||
|
let mut state = manager_with(&[("output", json!("preserved"))]);
|
||||||
|
apply_state_updates(&node, &mut state, "new agent output");
|
||||||
|
assert_eq!(
|
||||||
|
state.state().get("greeting"),
|
||||||
|
Some(&json!("new agent output"))
|
||||||
|
);
|
||||||
|
assert_eq!(state.state().get("output"), Some(&json!("preserved")));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn no_state_updates_is_a_noop() {
|
||||||
|
let node = node_with("hi", None);
|
||||||
|
let mut state = manager_with(&[("k", json!("v"))]);
|
||||||
|
apply_state_updates(&node, &mut state, "ignored");
|
||||||
|
assert_eq!(state.state().get("k"), Some(&json!("v")));
|
||||||
|
assert!(state.state().get("output").is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn interpolate_lenient_on_state_updates_handles_missing_keys() {
|
||||||
|
let node = {
|
||||||
|
let mut u = HashMap::new();
|
||||||
|
u.insert("decorated".into(), "[{{missing}}] {{output}}".into());
|
||||||
|
node_with("hi", Some(u))
|
||||||
|
};
|
||||||
|
let mut state = manager_with(&[]);
|
||||||
|
apply_state_updates(&node, &mut state, "DATA");
|
||||||
|
assert_eq!(state.state().get("decorated"), Some(&json!("[] DATA")));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,12 +1,14 @@
|
|||||||
//! Graph-based agent orchestration. Declarative YAML workflows over a shared
|
//! Graph-based agent orchestration. Declarative YAML workflows over a shared
|
||||||
//! JSON state, composed of agent/script/approval/input/end nodes.
|
//! JSON state, composed of agent/script/approval/input/end nodes.
|
||||||
|
|
||||||
|
pub mod agent;
|
||||||
pub mod parser;
|
pub mod parser;
|
||||||
pub mod script;
|
pub mod script;
|
||||||
pub mod state;
|
pub mod state;
|
||||||
pub mod types;
|
pub mod types;
|
||||||
pub mod validator;
|
pub mod validator;
|
||||||
|
|
||||||
|
pub use agent::AgentNodeExecutor;
|
||||||
pub use parser::{GraphParser, agent_has_graph, load_agent_graph};
|
pub use parser::{GraphParser, agent_has_graph, load_agent_graph};
|
||||||
pub use script::ScriptExecutor;
|
pub use script::ScriptExecutor;
|
||||||
pub use state::{StateManager, StateRepresentation};
|
pub use state::{StateManager, StateRepresentation};
|
||||||
|
|||||||
Reference in New Issue
Block a user