diff --git a/src/config/agent.rs b/src/config/agent.rs index 845276f..d87b128 100644 --- a/src/config/agent.rs +++ b/src/config/agent.rs @@ -429,6 +429,14 @@ impl Agent { self.config.max_agent_depth } + pub fn summarization_model(&self) -> Option<&str> { + self.config.summarization_model.as_deref() + } + + pub fn summarization_threshold(&self) -> usize { + self.config.summarization_threshold + } + pub fn continuation_count(&self) -> usize { self.continuation_count } @@ -639,6 +647,10 @@ pub struct AgentConfig { pub conversation_starters: Vec, #[serde(default)] pub documents: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub summarization_model: Option, + #[serde(default = "default_summarization_threshold")] + pub summarization_threshold: usize, } fn default_max_auto_continues() -> usize { @@ -657,6 +669,10 @@ fn default_true() -> bool { true } +fn default_summarization_threshold() -> usize { + 4000 +} + impl AgentConfig { pub fn load(path: &Path) -> Result { let contents = read_to_string(path) diff --git a/src/config/mod.rs b/src/config/mod.rs index aadf9b1..f5e44af 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -212,6 +212,10 @@ pub struct Config { #[serde(skip)] pub supervisor: Option>>, #[serde(skip)] + pub parent_supervisor: Option>>, + #[serde(skip)] + pub self_agent_id: Option, + #[serde(skip)] pub current_depth: usize, #[serde(skip)] pub inbox: Option>, @@ -289,6 +293,8 @@ impl Default for Config { agent: None, tool_call_tracker: Some(ToolCallTracker::default()), supervisor: None, + parent_supervisor: None, + self_agent_id: None, current_depth: 0, inbox: None, } diff --git a/src/function/supervisor.rs b/src/function/supervisor.rs index 851349d..7ca0f70 100644 --- a/src/function/supervisor.rs +++ b/src/function/supervisor.rs @@ -1,6 +1,6 @@ use super::{FunctionDeclaration, JsonSchema}; -use crate::client::call_chat_completions; -use crate::config::{Config, GlobalConfig, Input}; +use crate::client::{Model, ModelType, call_chat_completions}; +use crate::config::{Config, GlobalConfig, Input, Role, RoleLike}; use crate::supervisor::mailbox::{Envelope, EnvelopePayload, Inbox}; use crate::supervisor::{AgentExitStatus, AgentHandle, AgentResult}; use crate::utils::{AbortSignal, create_abort_signal}; @@ -189,6 +189,22 @@ pub fn supervisor_function_declarations() -> Vec { ..Default::default() }, ), + ( + "agent".to_string(), + JsonSchema { + type_value: Some("string".to_string()), + description: Some("Agent to auto-spawn when this task becomes runnable (e.g. 'explore', 'coder'). If set, an agent will be spawned automatically when all dependencies complete.".into()), + ..Default::default() + }, + ), + ( + "prompt".to_string(), + JsonSchema { + type_value: Some("string".to_string()), + description: Some("Prompt to send to the auto-spawned agent. Required if agent is set.".into()), + ..Default::default() + }, + ), ])), required: Some(vec!["subject".to_string()]), ..Default::default() @@ -244,7 +260,7 @@ pub async fn handle_supervisor_tool( "check_inbox" => handle_check_inbox(config), "task_create" => handle_task_create(config, args), "task_list" => handle_task_list(config), - "task_complete" => handle_task_complete(config, args), + "task_complete" => handle_task_complete(config, args).await, _ => bail!("Unknown supervisor action: {action}"), } } @@ -262,14 +278,9 @@ fn run_child_agent( let client = input.create_client()?; child_config.write().before_chat_completion(&input)?; - let (output, tool_results) = call_chat_completions( - &input, - false, - false, - client.as_ref(), - abort_signal.clone(), - ) - .await?; + let (output, tool_results) = + call_chat_completions(&input, false, false, client.as_ref(), abort_signal.clone()) + .await?; child_config .write() @@ -341,6 +352,7 @@ async fn handle_spawn(config: &GlobalConfig, args: &Value) -> Result { let child_config: GlobalConfig = { let mut child_cfg = config.read().clone(); + child_cfg.parent_supervisor = child_cfg.supervisor.clone(); child_cfg.agent = None; child_cfg.session = None; child_cfg.rag = None; @@ -352,6 +364,7 @@ async fn handle_spawn(config: &GlobalConfig, args: &Value) -> Result { child_cfg.save = false; child_cfg.current_depth = current_depth; child_cfg.inbox = Some(Arc::clone(&child_inbox)); + child_cfg.self_agent_id = Some(agent_id.clone()); Arc::new(RwLock::new(child_cfg)) }; @@ -430,9 +443,7 @@ async fn handle_check(config: &GlobalConfig, args: &Value) -> Result { }; match is_finished { - Some(true) => { - handle_collect(config, args).await - } + Some(true) => handle_collect(config, args).await, Some(false) => Ok(json!({ "status": "pending", "id": id, @@ -469,12 +480,14 @@ async fn handle_collect(config: &GlobalConfig, args: &Value) -> Result { .map_err(|e| anyhow!("Agent task panicked: {e}"))? .map_err(|e| anyhow!("Agent failed: {e}"))?; + let output = summarize_output(config, &result.agent_name, &result.output).await?; + Ok(json!({ "status": "completed", "id": result.id, "agent": result.agent_name, "exit_status": format!("{:?}", result.exit_status), - "output": result.output, + "output": output, })) } None => Ok(json!({ @@ -551,22 +564,31 @@ fn handle_send_message(config: &GlobalConfig, args: &Value) -> Result { .ok_or_else(|| anyhow!("'message' is required"))?; let cfg = config.read(); - let supervisor = cfg + + // Determine sender identity: self_agent_id (child), agent name (parent), or "parent" + let sender = cfg + .self_agent_id + .clone() + .or_else(|| cfg.agent.as_ref().map(|a| a.name().to_string())) + .unwrap_or_else(|| "parent".to_string()); + + // Try local supervisor first (parent → child routing) + let inbox = cfg .supervisor .as_ref() - .ok_or_else(|| anyhow!("No supervisor active"))?; - let sup = supervisor.read(); + .and_then(|sup| sup.read().inbox(id).cloned()); - match sup.inbox(id) { + // Fall back to parent_supervisor (sibling → sibling routing) + let inbox = inbox.or_else(|| { + cfg.parent_supervisor + .as_ref() + .and_then(|sup| sup.read().inbox(id).cloned()) + }); + + match inbox { Some(inbox) => { - let parent_name = cfg - .agent - .as_ref() - .map(|a| a.name().to_string()) - .unwrap_or_else(|| "parent".to_string()); - inbox.deliver(Envelope { - from: parent_name, + from: sender, to: id.to_string(), payload: EnvelopePayload::Text { content: message.to_string(), @@ -581,7 +603,7 @@ fn handle_send_message(config: &GlobalConfig, args: &Value) -> Result { } None => Ok(json!({ "status": "error", - "message": format!("No agent found with id '{id}'"), + "message": format!("No agent found with id '{id}'. Agent may not exist or may have already completed."), })), } } @@ -633,6 +655,12 @@ fn handle_task_create(config: &GlobalConfig, args: &Value) -> Result { .collect() }) .unwrap_or_default(); + let dispatch_agent = args.get("agent").and_then(Value::as_str).map(String::from); + let task_prompt = args.get("prompt").and_then(Value::as_str).map(String::from); + + if dispatch_agent.is_some() && task_prompt.is_none() { + bail!("'prompt' is required when 'agent' is set"); + } let cfg = config.read(); let supervisor = cfg @@ -641,9 +669,12 @@ fn handle_task_create(config: &GlobalConfig, args: &Value) -> Result { .ok_or_else(|| anyhow!("No supervisor active"))?; let mut sup = supervisor.write(); - let task_id = sup - .task_queue_mut() - .create(subject.to_string(), description.to_string()); + let task_id = sup.task_queue_mut().create( + subject.to_string(), + description.to_string(), + dispatch_agent.clone(), + task_prompt, + ); let mut dep_errors = vec![]; for dep_id in &blocked_by { @@ -657,6 +688,10 @@ fn handle_task_create(config: &GlobalConfig, args: &Value) -> Result { "task_id": task_id, }); + if dispatch_agent.is_some() { + result["auto_dispatch"] = json!(true); + } + if !dep_errors.is_empty() { result["warnings"] = json!(dep_errors); } @@ -684,6 +719,8 @@ fn handle_task_list(config: &GlobalConfig) -> Result { "owner": t.owner, "blocked_by": t.blocked_by.iter().collect::>(), "blocks": t.blocks.iter().collect::>(), + "agent": t.dispatch_agent, + "prompt": t.prompt, }) }) .collect(); @@ -691,37 +728,151 @@ fn handle_task_list(config: &GlobalConfig) -> Result { Ok(json!({ "tasks": tasks })) } -fn handle_task_complete(config: &GlobalConfig, args: &Value) -> Result { +async fn handle_task_complete(config: &GlobalConfig, args: &Value) -> Result { let task_id = args .get("task_id") .and_then(Value::as_str) .ok_or_else(|| anyhow!("'task_id' is required"))?; - let cfg = config.read(); - let supervisor = cfg - .supervisor - .as_ref() - .ok_or_else(|| anyhow!("No supervisor active"))?; - let mut sup = supervisor.write(); + let (newly_runnable, dispatchable) = { + let cfg = config.read(); + let supervisor = cfg + .supervisor + .as_ref() + .ok_or_else(|| anyhow!("No supervisor active"))?; + let mut sup = supervisor.write(); - let newly_runnable_ids = sup.task_queue_mut().complete(task_id); + let newly_runnable_ids = sup.task_queue_mut().complete(task_id); - let newly_runnable: Vec = newly_runnable_ids - .iter() - .filter_map(|id| { - sup.task_queue().get(id).map(|t| { - json!({ + let mut newly_runnable = Vec::new(); + let mut to_dispatch: Vec<(String, String, String)> = Vec::new(); + + for id in &newly_runnable_ids { + if let Some(t) = sup.task_queue().get(id) { + newly_runnable.push(json!({ "id": t.id, "subject": t.subject, "description": t.description, - }) - }) - }) - .collect(); + "agent": t.dispatch_agent, + })); - Ok(json!({ + if let (Some(agent), Some(prompt)) = (&t.dispatch_agent, &t.prompt) { + to_dispatch.push((id.clone(), agent.clone(), prompt.clone())); + } + } + } + + let mut dispatchable = Vec::new(); + for (tid, agent, prompt) in to_dispatch { + if sup.task_queue_mut().claim(&tid, &format!("auto:{agent}")) { + dispatchable.push((agent, prompt)); + } + } + + (newly_runnable, dispatchable) + }; + + let mut spawned = Vec::new(); + for (agent, prompt) in &dispatchable { + let spawn_args = json!({ + "agent": agent, + "prompt": prompt, + }); + match handle_spawn(config, &spawn_args).await { + Ok(result) => { + let agent_id = result + .get("id") + .and_then(Value::as_str) + .unwrap_or("unknown"); + debug!("Auto-dispatched agent '{}' for task queue", agent_id); + spawned.push(result); + } + Err(e) => { + spawned.push(json!({ + "status": "error", + "agent": agent, + "message": format!("Auto-dispatch failed: {e}"), + })); + } + } + } + + let mut result = json!({ "status": "ok", "task_id": task_id, "newly_runnable": newly_runnable, - })) + }); + + if !spawned.is_empty() { + result["auto_dispatched"] = json!(spawned); + } + + Ok(result) +} + +const SUMMARIZATION_PROMPT: &str = r#"You are a precise summarization assistant. Your job is to condense a sub-agent's output into a compact summary that preserves all actionable information. + +Rules: +- Preserve ALL code snippets, file paths, error messages, and concrete recommendations +- Remove conversational filler, thinking-out-loud, and redundant explanations +- Keep the summary under 30% of the original length +- Use bullet points for multiple findings +- If the output contains a final answer or conclusion, lead with it"#; + +async fn summarize_output(config: &GlobalConfig, agent_name: &str, output: &str) -> Result { + let (threshold, summarization_model_id) = { + let cfg = config.read(); + match cfg.agent.as_ref() { + Some(agent) => ( + agent.summarization_threshold(), + agent.summarization_model().map(|s| s.to_string()), + ), + None => return Ok(output.to_string()), + } + }; + + if output.len() < threshold { + debug!( + "Output from '{}' is {} chars (threshold {}), skipping summarization", + agent_name, + output.len(), + threshold + ); + return Ok(output.to_string()); + } + + debug!( + "Output from '{}' is {} chars (threshold {}), summarizing...", + agent_name, + output.len(), + threshold + ); + + let model = { + let cfg = config.read(); + match summarization_model_id { + Some(ref model_id) => Model::retrieve_model(&cfg, model_id, ModelType::Chat)?, + None => cfg.current_model().clone(), + } + }; + + let mut role = Role::new("summarizer", SUMMARIZATION_PROMPT); + role.set_model(model); + + let user_message = format!( + "Summarize the following sub-agent output from '{}':\n\n{}", + agent_name, output + ); + let input = Input::from_str(config, &user_message, Some(role)); + + let summary = input.fetch_chat_text().await?; + + debug!( + "Summarized output from '{}': {} chars -> {} chars", + agent_name, + output.len(), + summary.len() + ); + + Ok(summary) } diff --git a/src/supervisor/taskqueue.rs b/src/supervisor/taskqueue.rs index 649b93d..8c234f3 100644 --- a/src/supervisor/taskqueue.rs +++ b/src/supervisor/taskqueue.rs @@ -20,10 +20,18 @@ pub struct TaskNode { pub owner: Option, pub blocked_by: HashSet, pub blocks: HashSet, + pub dispatch_agent: Option, + pub prompt: Option, } impl TaskNode { - pub fn new(id: String, subject: String, description: String) -> Self { + pub fn new( + id: String, + subject: String, + description: String, + dispatch_agent: Option, + prompt: Option, + ) -> Self { Self { id, subject, @@ -32,6 +40,8 @@ impl TaskNode { owner: None, blocked_by: HashSet::new(), blocks: HashSet::new(), + dispatch_agent, + prompt, } } @@ -54,10 +64,16 @@ impl TaskQueue { } } - pub fn create(&mut self, subject: String, description: String) -> String { + pub fn create( + &mut self, + subject: String, + description: String, + dispatch_agent: Option, + prompt: Option, + ) -> String { let id = self.next_id.to_string(); self.next_id += 1; - let task = TaskNode::new(id.clone(), subject, description); + let task = TaskNode::new(id.clone(), subject, description, dispatch_agent, prompt); self.tasks.insert(id.clone(), task); id } @@ -122,8 +138,10 @@ impl TaskQueue { } pub fn claim(&mut self, task_id: &str, owner: &str) -> bool { - if let Some(task) = self.tasks.get_mut(task_id) && - task.is_runnable() && task.owner.is_none() { + if let Some(task) = self.tasks.get_mut(task_id) + && task.is_runnable() + && task.owner.is_none() + { task.owner = Some(owner.to_string()); task.status = TaskStatus::InProgress; return true; @@ -154,8 +172,9 @@ impl TaskQueue { if current == task_id { return true; } - if visited.insert(current.clone()) && - let Some(task) = self.tasks.get(¤t) { + if visited.insert(current.clone()) + && let Some(task) = self.tasks.get(¤t) + { for dep in &task.blocked_by { stack.push(dep.clone()); } @@ -173,8 +192,13 @@ mod tests { #[test] fn test_create_and_list() { let mut queue = TaskQueue::new(); - let id1 = queue.create("Research".into(), "Research auth patterns".into()); - let id2 = queue.create("Implement".into(), "Write the code".into()); + let id1 = queue.create( + "Research".into(), + "Research auth patterns".into(), + None, + None, + ); + let id2 = queue.create("Implement".into(), "Write the code".into(), None, None); assert_eq!(id1, "1"); assert_eq!(id2, "2"); @@ -184,8 +208,8 @@ mod tests { #[test] fn test_dependency_and_completion() { let mut queue = TaskQueue::new(); - let id1 = queue.create("Step 1".into(), "".into()); - let id2 = queue.create("Step 2".into(), "".into()); + let id1 = queue.create("Step 1".into(), "".into(), None, None); + let id2 = queue.create("Step 2".into(), "".into(), None, None); queue.add_dependency(&id2, &id1).unwrap(); @@ -201,9 +225,9 @@ mod tests { #[test] fn test_fan_in_dependency() { let mut queue = TaskQueue::new(); - let id1 = queue.create("A".into(), "".into()); - let id2 = queue.create("B".into(), "".into()); - let id3 = queue.create("C (needs A and B)".into(), "".into()); + let id1 = queue.create("A".into(), "".into(), None, None); + let id2 = queue.create("B".into(), "".into(), None, None); + let id3 = queue.create("C (needs A and B)".into(), "".into(), None, None); queue.add_dependency(&id3, &id1).unwrap(); queue.add_dependency(&id3, &id2).unwrap(); @@ -222,8 +246,8 @@ mod tests { #[test] fn test_cycle_detection() { let mut queue = TaskQueue::new(); - let id1 = queue.create("A".into(), "".into()); - let id2 = queue.create("B".into(), "".into()); + let id1 = queue.create("A".into(), "".into(), None, None); + let id2 = queue.create("B".into(), "".into(), None, None); queue.add_dependency(&id2, &id1).unwrap(); let result = queue.add_dependency(&id1, &id2); @@ -234,7 +258,7 @@ mod tests { #[test] fn test_self_dependency_rejected() { let mut queue = TaskQueue::new(); - let id1 = queue.create("A".into(), "".into()); + let id1 = queue.create("A".into(), "".into(), None, None); let result = queue.add_dependency(&id1, &id1); assert!(result.is_err()); } @@ -242,7 +266,7 @@ mod tests { #[test] fn test_claim() { let mut queue = TaskQueue::new(); - let id1 = queue.create("Task".into(), "".into()); + let id1 = queue.create("Task".into(), "".into(), None, None); assert!(queue.claim(&id1, "worker-1")); assert!(!queue.claim(&id1, "worker-2"));