From 60ad83d6d96979f9812b6258a65f813b0554833f Mon Sep 17 00:00:00 2001 From: Alex Clarke Date: Tue, 17 Feb 2026 13:42:53 -0700 Subject: [PATCH] feat: Full passive task queue integration for parallelization of subagents --- src/config/mod.rs | 7 + src/function/mod.rs | 8 +- src/function/supervisor.rs | 321 +++++++++++++++++++++++++++++++------ 3 files changed, 282 insertions(+), 54 deletions(-) diff --git a/src/config/mod.rs b/src/config/mod.rs index 74a7ae6..e99d77b 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -55,6 +55,7 @@ use std::{ use syntect::highlighting::ThemeSet; use terminal_colorsaurus::{ColorScheme, QueryOptions, color_scheme}; use tokio::runtime::Handle; +use crate::supervisor::mailbox::Inbox; pub const TEMP_ROLE_NAME: &str = "temp"; pub const TEMP_RAG_NAME: &str = "temp"; @@ -210,6 +211,10 @@ pub struct Config { pub(crate) tool_call_tracker: Option, #[serde(skip)] pub supervisor: Option>>, + #[serde(skip)] + pub current_depth: usize, + #[serde(skip)] + pub inbox: Option>, } impl Default for Config { @@ -284,6 +289,8 @@ impl Default for Config { agent: None, tool_call_tracker: Some(ToolCallTracker::default()), supervisor: None, + current_depth: 0, + inbox: None, } } } diff --git a/src/function/mod.rs b/src/function/mod.rs index 4fc651c..1c92379 100644 --- a/src/function/mod.rs +++ b/src/function/mod.rs @@ -894,13 +894,13 @@ impl ToolCall { }) } _ if cmd_name.starts_with(SUPERVISOR_FUNCTION_PREFIX) => { - supervisor::handle_supervisor_tool(config, &cmd_name, &json_data).unwrap_or_else( - |e| { + supervisor::handle_supervisor_tool(config, &cmd_name, &json_data) + .await + .unwrap_or_else(|e| { let error_msg = format!("Supervisor tool failed: {e}"); eprintln!("{}", warning_text(&format!("⚠️ {error_msg} ⚠️"))); json!({"tool_call_error": error_msg}) - }, - ) + }) } _ => match run_llm_function(cmd_name, cmd_args, envs, agent_name) { Ok(Some(contents)) => serde_json::from_str(&contents) diff --git a/src/function/supervisor.rs b/src/function/supervisor.rs index ca8879b..e133822 100644 --- a/src/function/supervisor.rs +++ b/src/function/supervisor.rs @@ -1,11 +1,19 @@ use super::{FunctionDeclaration, JsonSchema}; -use crate::config::GlobalConfig; -use crate::supervisor::mailbox::{Envelope, EnvelopePayload}; +use crate::client::call_chat_completions; +use crate::config::{Config, GlobalConfig, Input}; +use crate::supervisor::mailbox::{Envelope, EnvelopePayload, Inbox}; +use crate::supervisor::{AgentExitStatus, AgentHandle, AgentResult}; +use crate::utils::{AbortSignal, create_abort_signal}; -use anyhow::{Result, bail, anyhow}; +use anyhow::{Result, anyhow, bail}; use chrono::Utc; use indexmap::IndexMap; +use log::debug; +use parking_lot::RwLock; use serde_json::{Value, json}; +use std::pin::Pin; +use std::sync::Arc; +use uuid::Uuid; pub const SUPERVISOR_FUNCTION_PREFIX: &str = "agent__"; @@ -217,7 +225,11 @@ pub fn supervisor_function_declarations() -> Vec { ] } -pub fn handle_supervisor_tool( +// --------------------------------------------------------------------------- +// Dispatch +// --------------------------------------------------------------------------- + +pub async fn handle_supervisor_tool( config: &GlobalConfig, cmd_name: &str, args: &Value, @@ -227,9 +239,9 @@ pub fn handle_supervisor_tool( .unwrap_or(cmd_name); match action { - "spawn" => handle_spawn(config, args), - "check" => handle_check(config, args), - "collect" => handle_collect(config, args), + "spawn" => handle_spawn(config, args).await, + "check" => handle_check(config, args).await, + "collect" => handle_collect(config, args).await, "list" => handle_list(config), "cancel" => handle_cancel(config, args), "send_message" => handle_send_message(config, args), @@ -241,43 +253,218 @@ pub fn handle_supervisor_tool( } } -fn handle_spawn(_config: &GlobalConfig, args: &Value) -> Result { - let _agent_name = args +// --------------------------------------------------------------------------- +// Child agent execution loop +// --------------------------------------------------------------------------- + +/// Run a child agent to completion, returning its accumulated text output. +/// +/// This mirrors `start_directive` in main.rs but: +/// - Uses `call_chat_completions(print=false)` so nothing goes to stdout +/// - Returns the output text instead of printing it +/// - Loops on tool calls just like `start_directive`'s recursion +/// +/// Returns a boxed future to break the recursive type cycle: +/// handle_spawn → tokio::spawn(run_child_agent) → call_chat_completions +/// → eval_tool_calls → ToolCall::eval → handle_supervisor_tool → handle_spawn +/// Without boxing, the compiler cannot prove Send for the recursive async type. +fn run_child_agent( + child_config: GlobalConfig, + initial_input: Input, + abort_signal: AbortSignal, +) -> Pin> + Send>> { + Box::pin(async move { + let mut accumulated_output = String::new(); + let mut input = initial_input; + + loop { + let client = input.create_client()?; + child_config.write().before_chat_completion(&input)?; + + let (output, tool_results) = call_chat_completions( + &input, + false, // print=false — silent, no stdout + false, // extract_code=false + client.as_ref(), + abort_signal.clone(), + ) + .await?; + + child_config + .write() + .after_chat_completion(&input, &output, &tool_results)?; + + if !output.is_empty() { + if !accumulated_output.is_empty() { + accumulated_output.push('\n'); + } + accumulated_output.push_str(&output); + } + + if tool_results.is_empty() { + break; + } + + // Feed tool results back for the next round (mirrors start_directive recursion) + input = input.merge_tool_results(output, tool_results); + } + + Ok(accumulated_output) + }) +} + +// --------------------------------------------------------------------------- +// Handlers +// --------------------------------------------------------------------------- + +async fn handle_spawn(config: &GlobalConfig, args: &Value) -> Result { + let agent_name = args .get("agent") .and_then(Value::as_str) - .ok_or_else(|| anyhow!("'agent' is required"))?; - let _prompt = args + .ok_or_else(|| anyhow!("'agent' is required"))? + .to_string(); + let prompt = args .get("prompt") .and_then(Value::as_str) - .ok_or_else(|| anyhow!("'prompt' is required"))?; + .ok_or_else(|| anyhow!("'prompt' is required"))? + .to_string(); let _task_id = args.get("task_id").and_then(Value::as_str); - // TODO: Step 3 — actual agent spawning via tokio::spawn - // For now, return a placeholder so the tool declarations compile and wire up + // Generate a unique agent ID + let short_uuid = &Uuid::new_v4().to_string()[..8]; + let agent_id = format!("agent_{agent_name}_{short_uuid}"); + + // --- Validate capacity --- + // Read the supervisor to check capacity, then drop locks before doing async work + let (max_depth, current_depth) = { + let cfg = config.read(); + let supervisor = cfg + .supervisor + .as_ref() + .ok_or_else(|| anyhow!("No supervisor active — agent spawning not enabled"))?; + let sup = supervisor.read(); + if sup.active_count() >= sup.max_concurrent() { + return Ok(json!({ + "status": "error", + "message": format!( + "At capacity: {}/{} agents running. Wait for one to finish or cancel one.", + sup.active_count(), + sup.max_concurrent() + ), + })); + } + (sup.max_depth(), cfg.current_depth + 1) + }; + + if current_depth > max_depth { + return Ok(json!({ + "status": "error", + "message": format!("Max agent depth exceeded ({current_depth}/{max_depth})"), + })); + } + + // --- Build an isolated child Config --- + let child_inbox = Arc::new(Inbox::new()); + + let child_config: GlobalConfig = { + let mut child_cfg = config.read().clone(); + + child_cfg.agent = None; + child_cfg.session = None; + child_cfg.rag = None; + child_cfg.supervisor = None; + child_cfg.last_message = None; + child_cfg.tool_call_tracker = None; + + child_cfg.stream = false; + child_cfg.save = false; + child_cfg.current_depth = current_depth; + child_cfg.inbox = Some(Arc::clone(&child_inbox)); + + Arc::new(RwLock::new(child_cfg)) + }; + + // Load the target agent into the child config + let child_abort = create_abort_signal(); + Config::use_agent(&child_config, &agent_name, None, child_abort.clone()).await?; + + // Create the initial input from the prompt + let input = Input::from_str(&child_config, &prompt, None); + + debug!("Spawning child agent '{agent_name}' as '{agent_id}'"); + + // --- Spawn the agent task --- + let spawn_agent_id = agent_id.clone(); + let spawn_agent_name = agent_name.clone(); + let spawn_abort = child_abort.clone(); + + let join_handle = tokio::spawn(async move { + let result = run_child_agent(child_config, input, spawn_abort).await; + + match result { + Ok(output) => Ok(AgentResult { + id: spawn_agent_id, + agent_name: spawn_agent_name, + output, + exit_status: AgentExitStatus::Completed, + }), + Err(e) => Ok(AgentResult { + id: spawn_agent_id, + agent_name: spawn_agent_name, + output: String::new(), + exit_status: AgentExitStatus::Failed(e.to_string()), + }), + } + }); + + // Register the handle with the supervisor + let handle = AgentHandle { + id: agent_id.clone(), + agent_name: agent_name.clone(), + depth: current_depth, + inbox: child_inbox, + abort_signal: child_abort, + join_handle, + }; + + { + let cfg = config.read(); + let supervisor = cfg + .supervisor + .as_ref() + .ok_or_else(|| anyhow!("No supervisor active"))?; + let mut sup = supervisor.write(); + sup.register(handle)?; + } + Ok(json!({ - "status": "error", - "message": "agent__spawn is not yet implemented — coming in Step 3" + "status": "ok", + "id": agent_id, + "agent": agent_name, + "message": format!("Agent '{agent_name}' spawned as '{agent_id}'. Use agent__check or agent__collect to get results."), })) } -fn handle_check(config: &GlobalConfig, args: &Value) -> Result { +async fn handle_check(config: &GlobalConfig, args: &Value) -> Result { let id = args .get("id") .and_then(Value::as_str) .ok_or_else(|| anyhow!("'id' is required"))?; - let cfg = config.read(); - let supervisor = cfg - .supervisor - .as_ref() - .ok_or_else(|| anyhow!("No supervisor active"))?; - let sup = supervisor.read(); + let is_finished = { + let cfg = config.read(); + let supervisor = cfg + .supervisor + .as_ref() + .ok_or_else(|| anyhow!("No supervisor active"))?; + let sup = supervisor.read(); + sup.is_finished(id) + }; - match sup.is_finished(id) { + match is_finished { Some(true) => { - drop(sup); - drop(cfg); - handle_collect(config, args) + // Finished — collect the result + handle_collect(config, args).await } Some(false) => Ok(json!({ "status": "pending", @@ -291,27 +478,29 @@ fn handle_check(config: &GlobalConfig, args: &Value) -> Result { } } -fn handle_collect(config: &GlobalConfig, args: &Value) -> Result { +async fn handle_collect(config: &GlobalConfig, args: &Value) -> Result { let id = args .get("id") .and_then(Value::as_str) .ok_or_else(|| anyhow!("'id' is required"))?; - let cfg = config.read(); - let supervisor = cfg - .supervisor - .as_ref() - .ok_or_else(|| anyhow!("No supervisor active"))?; - let mut sup = supervisor.write(); + // Extract the join handle while holding the lock, then drop the lock before awaiting + let handle = { + let cfg = config.read(); + let supervisor = cfg + .supervisor + .as_ref() + .ok_or_else(|| anyhow!("No supervisor active"))?; + let mut sup = supervisor.write(); + sup.take(id) + }; - match sup.take_if_finished(id) { + match handle { Some(handle) => { - drop(sup); - drop(cfg); - - let rt = tokio::runtime::Handle::current(); - let result = rt - .block_on(handle.join_handle) + // Await the join handle OUTSIDE of any lock + let result = handle + .join_handle + .await .map_err(|e| anyhow!("Agent task panicked: {e}"))? .map_err(|e| anyhow!("Agent failed: {e}"))?; @@ -325,7 +514,7 @@ fn handle_collect(config: &GlobalConfig, args: &Value) -> Result { } None => Ok(json!({ "status": "error", - "message": format!("Agent '{id}' not found or still running. Use agent__check first.") + "message": format!("Agent '{id}' not found. Use agent__check to verify it exists and is finished.") })), } } @@ -432,13 +621,32 @@ fn handle_send_message(config: &GlobalConfig, args: &Value) -> Result { } } -fn handle_check_inbox(_config: &GlobalConfig) -> Result { - // The parent agent's own inbox — will be wired when we plumb Inbox into Agent - // For now, return empty - Ok(json!({ - "messages": [], - "count": 0, - })) +fn handle_check_inbox(config: &GlobalConfig) -> Result { + let cfg = config.read(); + match &cfg.inbox { + Some(inbox) => { + let messages: Vec = inbox + .drain() + .into_iter() + .map(|e| { + json!({ + "from": e.from, + "payload": e.payload, + "timestamp": e.timestamp.to_rfc3339(), + }) + }) + .collect(); + let count = messages.len(); + Ok(json!({ + "messages": messages, + "count": count, + })) + } + None => Ok(json!({ + "messages": [], + "count": 0, + })), + } } fn handle_task_create(config: &GlobalConfig, args: &Value) -> Result { @@ -531,7 +739,20 @@ fn handle_task_complete(config: &GlobalConfig, args: &Value) -> Result { .ok_or_else(|| anyhow!("No supervisor active"))?; let mut sup = supervisor.write(); - let newly_runnable = sup.task_queue_mut().complete(task_id); + let newly_runnable_ids = sup.task_queue_mut().complete(task_id); + + let newly_runnable: Vec = newly_runnable_ids + .iter() + .filter_map(|id| { + sup.task_queue().get(id).map(|t| { + json!({ + "id": t.id, + "subject": t.subject, + "description": t.description, + }) + }) + }) + .collect(); Ok(json!({ "status": "ok",