feat: Full passive task queue integration for parallelization of subagents
This commit is contained in:
+271
-50
@@ -1,11 +1,19 @@
|
||||
use super::{FunctionDeclaration, JsonSchema};
|
||||
use crate::config::GlobalConfig;
|
||||
use crate::supervisor::mailbox::{Envelope, EnvelopePayload};
|
||||
use crate::client::call_chat_completions;
|
||||
use crate::config::{Config, GlobalConfig, Input};
|
||||
use crate::supervisor::mailbox::{Envelope, EnvelopePayload, Inbox};
|
||||
use crate::supervisor::{AgentExitStatus, AgentHandle, AgentResult};
|
||||
use crate::utils::{AbortSignal, create_abort_signal};
|
||||
|
||||
use anyhow::{Result, bail, anyhow};
|
||||
use anyhow::{Result, anyhow, bail};
|
||||
use chrono::Utc;
|
||||
use indexmap::IndexMap;
|
||||
use log::debug;
|
||||
use parking_lot::RwLock;
|
||||
use serde_json::{Value, json};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub const SUPERVISOR_FUNCTION_PREFIX: &str = "agent__";
|
||||
|
||||
@@ -217,7 +225,11 @@ pub fn supervisor_function_declarations() -> Vec<FunctionDeclaration> {
|
||||
]
|
||||
}
|
||||
|
||||
pub fn handle_supervisor_tool(
|
||||
// ---------------------------------------------------------------------------
|
||||
// Dispatch
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
pub async fn handle_supervisor_tool(
|
||||
config: &GlobalConfig,
|
||||
cmd_name: &str,
|
||||
args: &Value,
|
||||
@@ -227,9 +239,9 @@ pub fn handle_supervisor_tool(
|
||||
.unwrap_or(cmd_name);
|
||||
|
||||
match action {
|
||||
"spawn" => handle_spawn(config, args),
|
||||
"check" => handle_check(config, args),
|
||||
"collect" => handle_collect(config, args),
|
||||
"spawn" => handle_spawn(config, args).await,
|
||||
"check" => handle_check(config, args).await,
|
||||
"collect" => handle_collect(config, args).await,
|
||||
"list" => handle_list(config),
|
||||
"cancel" => handle_cancel(config, args),
|
||||
"send_message" => handle_send_message(config, args),
|
||||
@@ -241,43 +253,218 @@ pub fn handle_supervisor_tool(
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_spawn(_config: &GlobalConfig, args: &Value) -> Result<Value> {
|
||||
let _agent_name = args
|
||||
// ---------------------------------------------------------------------------
|
||||
// Child agent execution loop
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Run a child agent to completion, returning its accumulated text output.
|
||||
///
|
||||
/// This mirrors `start_directive` in main.rs but:
|
||||
/// - Uses `call_chat_completions(print=false)` so nothing goes to stdout
|
||||
/// - Returns the output text instead of printing it
|
||||
/// - Loops on tool calls just like `start_directive`'s recursion
|
||||
///
|
||||
/// Returns a boxed future to break the recursive type cycle:
|
||||
/// handle_spawn → tokio::spawn(run_child_agent) → call_chat_completions
|
||||
/// → eval_tool_calls → ToolCall::eval → handle_supervisor_tool → handle_spawn
|
||||
/// Without boxing, the compiler cannot prove Send for the recursive async type.
|
||||
fn run_child_agent(
|
||||
child_config: GlobalConfig,
|
||||
initial_input: Input,
|
||||
abort_signal: AbortSignal,
|
||||
) -> Pin<Box<dyn Future<Output = Result<String>> + Send>> {
|
||||
Box::pin(async move {
|
||||
let mut accumulated_output = String::new();
|
||||
let mut input = initial_input;
|
||||
|
||||
loop {
|
||||
let client = input.create_client()?;
|
||||
child_config.write().before_chat_completion(&input)?;
|
||||
|
||||
let (output, tool_results) = call_chat_completions(
|
||||
&input,
|
||||
false, // print=false — silent, no stdout
|
||||
false, // extract_code=false
|
||||
client.as_ref(),
|
||||
abort_signal.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
child_config
|
||||
.write()
|
||||
.after_chat_completion(&input, &output, &tool_results)?;
|
||||
|
||||
if !output.is_empty() {
|
||||
if !accumulated_output.is_empty() {
|
||||
accumulated_output.push('\n');
|
||||
}
|
||||
accumulated_output.push_str(&output);
|
||||
}
|
||||
|
||||
if tool_results.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
// Feed tool results back for the next round (mirrors start_directive recursion)
|
||||
input = input.merge_tool_results(output, tool_results);
|
||||
}
|
||||
|
||||
Ok(accumulated_output)
|
||||
})
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Handlers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async fn handle_spawn(config: &GlobalConfig, args: &Value) -> Result<Value> {
|
||||
let agent_name = args
|
||||
.get("agent")
|
||||
.and_then(Value::as_str)
|
||||
.ok_or_else(|| anyhow!("'agent' is required"))?;
|
||||
let _prompt = args
|
||||
.ok_or_else(|| anyhow!("'agent' is required"))?
|
||||
.to_string();
|
||||
let prompt = args
|
||||
.get("prompt")
|
||||
.and_then(Value::as_str)
|
||||
.ok_or_else(|| anyhow!("'prompt' is required"))?;
|
||||
.ok_or_else(|| anyhow!("'prompt' is required"))?
|
||||
.to_string();
|
||||
let _task_id = args.get("task_id").and_then(Value::as_str);
|
||||
|
||||
// TODO: Step 3 — actual agent spawning via tokio::spawn
|
||||
// For now, return a placeholder so the tool declarations compile and wire up
|
||||
// Generate a unique agent ID
|
||||
let short_uuid = &Uuid::new_v4().to_string()[..8];
|
||||
let agent_id = format!("agent_{agent_name}_{short_uuid}");
|
||||
|
||||
// --- Validate capacity ---
|
||||
// Read the supervisor to check capacity, then drop locks before doing async work
|
||||
let (max_depth, current_depth) = {
|
||||
let cfg = config.read();
|
||||
let supervisor = cfg
|
||||
.supervisor
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow!("No supervisor active — agent spawning not enabled"))?;
|
||||
let sup = supervisor.read();
|
||||
if sup.active_count() >= sup.max_concurrent() {
|
||||
return Ok(json!({
|
||||
"status": "error",
|
||||
"message": format!(
|
||||
"At capacity: {}/{} agents running. Wait for one to finish or cancel one.",
|
||||
sup.active_count(),
|
||||
sup.max_concurrent()
|
||||
),
|
||||
}));
|
||||
}
|
||||
(sup.max_depth(), cfg.current_depth + 1)
|
||||
};
|
||||
|
||||
if current_depth > max_depth {
|
||||
return Ok(json!({
|
||||
"status": "error",
|
||||
"message": format!("Max agent depth exceeded ({current_depth}/{max_depth})"),
|
||||
}));
|
||||
}
|
||||
|
||||
// --- Build an isolated child Config ---
|
||||
let child_inbox = Arc::new(Inbox::new());
|
||||
|
||||
let child_config: GlobalConfig = {
|
||||
let mut child_cfg = config.read().clone();
|
||||
|
||||
child_cfg.agent = None;
|
||||
child_cfg.session = None;
|
||||
child_cfg.rag = None;
|
||||
child_cfg.supervisor = None;
|
||||
child_cfg.last_message = None;
|
||||
child_cfg.tool_call_tracker = None;
|
||||
|
||||
child_cfg.stream = false;
|
||||
child_cfg.save = false;
|
||||
child_cfg.current_depth = current_depth;
|
||||
child_cfg.inbox = Some(Arc::clone(&child_inbox));
|
||||
|
||||
Arc::new(RwLock::new(child_cfg))
|
||||
};
|
||||
|
||||
// Load the target agent into the child config
|
||||
let child_abort = create_abort_signal();
|
||||
Config::use_agent(&child_config, &agent_name, None, child_abort.clone()).await?;
|
||||
|
||||
// Create the initial input from the prompt
|
||||
let input = Input::from_str(&child_config, &prompt, None);
|
||||
|
||||
debug!("Spawning child agent '{agent_name}' as '{agent_id}'");
|
||||
|
||||
// --- Spawn the agent task ---
|
||||
let spawn_agent_id = agent_id.clone();
|
||||
let spawn_agent_name = agent_name.clone();
|
||||
let spawn_abort = child_abort.clone();
|
||||
|
||||
let join_handle = tokio::spawn(async move {
|
||||
let result = run_child_agent(child_config, input, spawn_abort).await;
|
||||
|
||||
match result {
|
||||
Ok(output) => Ok(AgentResult {
|
||||
id: spawn_agent_id,
|
||||
agent_name: spawn_agent_name,
|
||||
output,
|
||||
exit_status: AgentExitStatus::Completed,
|
||||
}),
|
||||
Err(e) => Ok(AgentResult {
|
||||
id: spawn_agent_id,
|
||||
agent_name: spawn_agent_name,
|
||||
output: String::new(),
|
||||
exit_status: AgentExitStatus::Failed(e.to_string()),
|
||||
}),
|
||||
}
|
||||
});
|
||||
|
||||
// Register the handle with the supervisor
|
||||
let handle = AgentHandle {
|
||||
id: agent_id.clone(),
|
||||
agent_name: agent_name.clone(),
|
||||
depth: current_depth,
|
||||
inbox: child_inbox,
|
||||
abort_signal: child_abort,
|
||||
join_handle,
|
||||
};
|
||||
|
||||
{
|
||||
let cfg = config.read();
|
||||
let supervisor = cfg
|
||||
.supervisor
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow!("No supervisor active"))?;
|
||||
let mut sup = supervisor.write();
|
||||
sup.register(handle)?;
|
||||
}
|
||||
|
||||
Ok(json!({
|
||||
"status": "error",
|
||||
"message": "agent__spawn is not yet implemented — coming in Step 3"
|
||||
"status": "ok",
|
||||
"id": agent_id,
|
||||
"agent": agent_name,
|
||||
"message": format!("Agent '{agent_name}' spawned as '{agent_id}'. Use agent__check or agent__collect to get results."),
|
||||
}))
|
||||
}
|
||||
|
||||
fn handle_check(config: &GlobalConfig, args: &Value) -> Result<Value> {
|
||||
async fn handle_check(config: &GlobalConfig, args: &Value) -> Result<Value> {
|
||||
let id = args
|
||||
.get("id")
|
||||
.and_then(Value::as_str)
|
||||
.ok_or_else(|| anyhow!("'id' is required"))?;
|
||||
|
||||
let cfg = config.read();
|
||||
let supervisor = cfg
|
||||
.supervisor
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow!("No supervisor active"))?;
|
||||
let sup = supervisor.read();
|
||||
let is_finished = {
|
||||
let cfg = config.read();
|
||||
let supervisor = cfg
|
||||
.supervisor
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow!("No supervisor active"))?;
|
||||
let sup = supervisor.read();
|
||||
sup.is_finished(id)
|
||||
};
|
||||
|
||||
match sup.is_finished(id) {
|
||||
match is_finished {
|
||||
Some(true) => {
|
||||
drop(sup);
|
||||
drop(cfg);
|
||||
handle_collect(config, args)
|
||||
// Finished — collect the result
|
||||
handle_collect(config, args).await
|
||||
}
|
||||
Some(false) => Ok(json!({
|
||||
"status": "pending",
|
||||
@@ -291,27 +478,29 @@ fn handle_check(config: &GlobalConfig, args: &Value) -> Result<Value> {
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_collect(config: &GlobalConfig, args: &Value) -> Result<Value> {
|
||||
async fn handle_collect(config: &GlobalConfig, args: &Value) -> Result<Value> {
|
||||
let id = args
|
||||
.get("id")
|
||||
.and_then(Value::as_str)
|
||||
.ok_or_else(|| anyhow!("'id' is required"))?;
|
||||
|
||||
let cfg = config.read();
|
||||
let supervisor = cfg
|
||||
.supervisor
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow!("No supervisor active"))?;
|
||||
let mut sup = supervisor.write();
|
||||
// Extract the join handle while holding the lock, then drop the lock before awaiting
|
||||
let handle = {
|
||||
let cfg = config.read();
|
||||
let supervisor = cfg
|
||||
.supervisor
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow!("No supervisor active"))?;
|
||||
let mut sup = supervisor.write();
|
||||
sup.take(id)
|
||||
};
|
||||
|
||||
match sup.take_if_finished(id) {
|
||||
match handle {
|
||||
Some(handle) => {
|
||||
drop(sup);
|
||||
drop(cfg);
|
||||
|
||||
let rt = tokio::runtime::Handle::current();
|
||||
let result = rt
|
||||
.block_on(handle.join_handle)
|
||||
// Await the join handle OUTSIDE of any lock
|
||||
let result = handle
|
||||
.join_handle
|
||||
.await
|
||||
.map_err(|e| anyhow!("Agent task panicked: {e}"))?
|
||||
.map_err(|e| anyhow!("Agent failed: {e}"))?;
|
||||
|
||||
@@ -325,7 +514,7 @@ fn handle_collect(config: &GlobalConfig, args: &Value) -> Result<Value> {
|
||||
}
|
||||
None => Ok(json!({
|
||||
"status": "error",
|
||||
"message": format!("Agent '{id}' not found or still running. Use agent__check first.")
|
||||
"message": format!("Agent '{id}' not found. Use agent__check to verify it exists and is finished.")
|
||||
})),
|
||||
}
|
||||
}
|
||||
@@ -432,13 +621,32 @@ fn handle_send_message(config: &GlobalConfig, args: &Value) -> Result<Value> {
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_check_inbox(_config: &GlobalConfig) -> Result<Value> {
|
||||
// The parent agent's own inbox — will be wired when we plumb Inbox into Agent
|
||||
// For now, return empty
|
||||
Ok(json!({
|
||||
"messages": [],
|
||||
"count": 0,
|
||||
}))
|
||||
fn handle_check_inbox(config: &GlobalConfig) -> Result<Value> {
|
||||
let cfg = config.read();
|
||||
match &cfg.inbox {
|
||||
Some(inbox) => {
|
||||
let messages: Vec<Value> = inbox
|
||||
.drain()
|
||||
.into_iter()
|
||||
.map(|e| {
|
||||
json!({
|
||||
"from": e.from,
|
||||
"payload": e.payload,
|
||||
"timestamp": e.timestamp.to_rfc3339(),
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
let count = messages.len();
|
||||
Ok(json!({
|
||||
"messages": messages,
|
||||
"count": count,
|
||||
}))
|
||||
}
|
||||
None => Ok(json!({
|
||||
"messages": [],
|
||||
"count": 0,
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_task_create(config: &GlobalConfig, args: &Value) -> Result<Value> {
|
||||
@@ -531,7 +739,20 @@ fn handle_task_complete(config: &GlobalConfig, args: &Value) -> Result<Value> {
|
||||
.ok_or_else(|| anyhow!("No supervisor active"))?;
|
||||
let mut sup = supervisor.write();
|
||||
|
||||
let newly_runnable = sup.task_queue_mut().complete(task_id);
|
||||
let newly_runnable_ids = sup.task_queue_mut().complete(task_id);
|
||||
|
||||
let newly_runnable: Vec<Value> = newly_runnable_ids
|
||||
.iter()
|
||||
.filter_map(|id| {
|
||||
sup.task_queue().get(id).map(|t| {
|
||||
json!({
|
||||
"id": t.id,
|
||||
"subject": t.subject,
|
||||
"description": t.description,
|
||||
})
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(json!({
|
||||
"status": "ok",
|
||||
|
||||
Reference in New Issue
Block a user