feat: Auto-dispatch support of sub-agents and support for the teammate pattern between subagents

This commit is contained in:
2026-02-17 15:18:27 -07:00
parent 7f267a10a1
commit b86f76ddb9
4 changed files with 264 additions and 67 deletions
+16
View File
@@ -429,6 +429,14 @@ impl Agent {
self.config.max_agent_depth self.config.max_agent_depth
} }
pub fn summarization_model(&self) -> Option<&str> {
self.config.summarization_model.as_deref()
}
pub fn summarization_threshold(&self) -> usize {
self.config.summarization_threshold
}
pub fn continuation_count(&self) -> usize { pub fn continuation_count(&self) -> usize {
self.continuation_count self.continuation_count
} }
@@ -639,6 +647,10 @@ pub struct AgentConfig {
pub conversation_starters: Vec<String>, pub conversation_starters: Vec<String>,
#[serde(default)] #[serde(default)]
pub documents: Vec<String>, pub documents: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub summarization_model: Option<String>,
#[serde(default = "default_summarization_threshold")]
pub summarization_threshold: usize,
} }
fn default_max_auto_continues() -> usize { fn default_max_auto_continues() -> usize {
@@ -657,6 +669,10 @@ fn default_true() -> bool {
true true
} }
fn default_summarization_threshold() -> usize {
4000
}
impl AgentConfig { impl AgentConfig {
pub fn load(path: &Path) -> Result<Self> { pub fn load(path: &Path) -> Result<Self> {
let contents = read_to_string(path) let contents = read_to_string(path)
+6
View File
@@ -212,6 +212,10 @@ pub struct Config {
#[serde(skip)] #[serde(skip)]
pub supervisor: Option<Arc<RwLock<Supervisor>>>, pub supervisor: Option<Arc<RwLock<Supervisor>>>,
#[serde(skip)] #[serde(skip)]
pub parent_supervisor: Option<Arc<RwLock<Supervisor>>>,
#[serde(skip)]
pub self_agent_id: Option<String>,
#[serde(skip)]
pub current_depth: usize, pub current_depth: usize,
#[serde(skip)] #[serde(skip)]
pub inbox: Option<Arc<Inbox>>, pub inbox: Option<Arc<Inbox>>,
@@ -289,6 +293,8 @@ impl Default for Config {
agent: None, agent: None,
tool_call_tracker: Some(ToolCallTracker::default()), tool_call_tracker: Some(ToolCallTracker::default()),
supervisor: None, supervisor: None,
parent_supervisor: None,
self_agent_id: None,
current_depth: 0, current_depth: 0,
inbox: None, inbox: None,
} }
+200 -49
View File
@@ -1,6 +1,6 @@
use super::{FunctionDeclaration, JsonSchema}; use super::{FunctionDeclaration, JsonSchema};
use crate::client::call_chat_completions; use crate::client::{Model, ModelType, call_chat_completions};
use crate::config::{Config, GlobalConfig, Input}; use crate::config::{Config, GlobalConfig, Input, Role, RoleLike};
use crate::supervisor::mailbox::{Envelope, EnvelopePayload, Inbox}; use crate::supervisor::mailbox::{Envelope, EnvelopePayload, Inbox};
use crate::supervisor::{AgentExitStatus, AgentHandle, AgentResult}; use crate::supervisor::{AgentExitStatus, AgentHandle, AgentResult};
use crate::utils::{AbortSignal, create_abort_signal}; use crate::utils::{AbortSignal, create_abort_signal};
@@ -189,6 +189,22 @@ pub fn supervisor_function_declarations() -> Vec<FunctionDeclaration> {
..Default::default() ..Default::default()
}, },
), ),
(
"agent".to_string(),
JsonSchema {
type_value: Some("string".to_string()),
description: Some("Agent to auto-spawn when this task becomes runnable (e.g. 'explore', 'coder'). If set, an agent will be spawned automatically when all dependencies complete.".into()),
..Default::default()
},
),
(
"prompt".to_string(),
JsonSchema {
type_value: Some("string".to_string()),
description: Some("Prompt to send to the auto-spawned agent. Required if agent is set.".into()),
..Default::default()
},
),
])), ])),
required: Some(vec!["subject".to_string()]), required: Some(vec!["subject".to_string()]),
..Default::default() ..Default::default()
@@ -244,7 +260,7 @@ pub async fn handle_supervisor_tool(
"check_inbox" => handle_check_inbox(config), "check_inbox" => handle_check_inbox(config),
"task_create" => handle_task_create(config, args), "task_create" => handle_task_create(config, args),
"task_list" => handle_task_list(config), "task_list" => handle_task_list(config),
"task_complete" => handle_task_complete(config, args), "task_complete" => handle_task_complete(config, args).await,
_ => bail!("Unknown supervisor action: {action}"), _ => bail!("Unknown supervisor action: {action}"),
} }
} }
@@ -262,14 +278,9 @@ fn run_child_agent(
let client = input.create_client()?; let client = input.create_client()?;
child_config.write().before_chat_completion(&input)?; child_config.write().before_chat_completion(&input)?;
let (output, tool_results) = call_chat_completions( let (output, tool_results) =
&input, call_chat_completions(&input, false, false, client.as_ref(), abort_signal.clone())
false, .await?;
false,
client.as_ref(),
abort_signal.clone(),
)
.await?;
child_config child_config
.write() .write()
@@ -341,6 +352,7 @@ async fn handle_spawn(config: &GlobalConfig, args: &Value) -> Result<Value> {
let child_config: GlobalConfig = { let child_config: GlobalConfig = {
let mut child_cfg = config.read().clone(); let mut child_cfg = config.read().clone();
child_cfg.parent_supervisor = child_cfg.supervisor.clone();
child_cfg.agent = None; child_cfg.agent = None;
child_cfg.session = None; child_cfg.session = None;
child_cfg.rag = None; child_cfg.rag = None;
@@ -352,6 +364,7 @@ async fn handle_spawn(config: &GlobalConfig, args: &Value) -> Result<Value> {
child_cfg.save = false; child_cfg.save = false;
child_cfg.current_depth = current_depth; child_cfg.current_depth = current_depth;
child_cfg.inbox = Some(Arc::clone(&child_inbox)); child_cfg.inbox = Some(Arc::clone(&child_inbox));
child_cfg.self_agent_id = Some(agent_id.clone());
Arc::new(RwLock::new(child_cfg)) Arc::new(RwLock::new(child_cfg))
}; };
@@ -430,9 +443,7 @@ async fn handle_check(config: &GlobalConfig, args: &Value) -> Result<Value> {
}; };
match is_finished { match is_finished {
Some(true) => { Some(true) => handle_collect(config, args).await,
handle_collect(config, args).await
}
Some(false) => Ok(json!({ Some(false) => Ok(json!({
"status": "pending", "status": "pending",
"id": id, "id": id,
@@ -469,12 +480,14 @@ async fn handle_collect(config: &GlobalConfig, args: &Value) -> Result<Value> {
.map_err(|e| anyhow!("Agent task panicked: {e}"))? .map_err(|e| anyhow!("Agent task panicked: {e}"))?
.map_err(|e| anyhow!("Agent failed: {e}"))?; .map_err(|e| anyhow!("Agent failed: {e}"))?;
let output = summarize_output(config, &result.agent_name, &result.output).await?;
Ok(json!({ Ok(json!({
"status": "completed", "status": "completed",
"id": result.id, "id": result.id,
"agent": result.agent_name, "agent": result.agent_name,
"exit_status": format!("{:?}", result.exit_status), "exit_status": format!("{:?}", result.exit_status),
"output": result.output, "output": output,
})) }))
} }
None => Ok(json!({ None => Ok(json!({
@@ -551,22 +564,31 @@ fn handle_send_message(config: &GlobalConfig, args: &Value) -> Result<Value> {
.ok_or_else(|| anyhow!("'message' is required"))?; .ok_or_else(|| anyhow!("'message' is required"))?;
let cfg = config.read(); let cfg = config.read();
let supervisor = cfg
// Determine sender identity: self_agent_id (child), agent name (parent), or "parent"
let sender = cfg
.self_agent_id
.clone()
.or_else(|| cfg.agent.as_ref().map(|a| a.name().to_string()))
.unwrap_or_else(|| "parent".to_string());
// Try local supervisor first (parent → child routing)
let inbox = cfg
.supervisor .supervisor
.as_ref() .as_ref()
.ok_or_else(|| anyhow!("No supervisor active"))?; .and_then(|sup| sup.read().inbox(id).cloned());
let sup = supervisor.read();
match sup.inbox(id) { // Fall back to parent_supervisor (sibling → sibling routing)
let inbox = inbox.or_else(|| {
cfg.parent_supervisor
.as_ref()
.and_then(|sup| sup.read().inbox(id).cloned())
});
match inbox {
Some(inbox) => { Some(inbox) => {
let parent_name = cfg
.agent
.as_ref()
.map(|a| a.name().to_string())
.unwrap_or_else(|| "parent".to_string());
inbox.deliver(Envelope { inbox.deliver(Envelope {
from: parent_name, from: sender,
to: id.to_string(), to: id.to_string(),
payload: EnvelopePayload::Text { payload: EnvelopePayload::Text {
content: message.to_string(), content: message.to_string(),
@@ -581,7 +603,7 @@ fn handle_send_message(config: &GlobalConfig, args: &Value) -> Result<Value> {
} }
None => Ok(json!({ None => Ok(json!({
"status": "error", "status": "error",
"message": format!("No agent found with id '{id}'"), "message": format!("No agent found with id '{id}'. Agent may not exist or may have already completed."),
})), })),
} }
} }
@@ -633,6 +655,12 @@ fn handle_task_create(config: &GlobalConfig, args: &Value) -> Result<Value> {
.collect() .collect()
}) })
.unwrap_or_default(); .unwrap_or_default();
let dispatch_agent = args.get("agent").and_then(Value::as_str).map(String::from);
let task_prompt = args.get("prompt").and_then(Value::as_str).map(String::from);
if dispatch_agent.is_some() && task_prompt.is_none() {
bail!("'prompt' is required when 'agent' is set");
}
let cfg = config.read(); let cfg = config.read();
let supervisor = cfg let supervisor = cfg
@@ -641,9 +669,12 @@ fn handle_task_create(config: &GlobalConfig, args: &Value) -> Result<Value> {
.ok_or_else(|| anyhow!("No supervisor active"))?; .ok_or_else(|| anyhow!("No supervisor active"))?;
let mut sup = supervisor.write(); let mut sup = supervisor.write();
let task_id = sup let task_id = sup.task_queue_mut().create(
.task_queue_mut() subject.to_string(),
.create(subject.to_string(), description.to_string()); description.to_string(),
dispatch_agent.clone(),
task_prompt,
);
let mut dep_errors = vec![]; let mut dep_errors = vec![];
for dep_id in &blocked_by { for dep_id in &blocked_by {
@@ -657,6 +688,10 @@ fn handle_task_create(config: &GlobalConfig, args: &Value) -> Result<Value> {
"task_id": task_id, "task_id": task_id,
}); });
if dispatch_agent.is_some() {
result["auto_dispatch"] = json!(true);
}
if !dep_errors.is_empty() { if !dep_errors.is_empty() {
result["warnings"] = json!(dep_errors); result["warnings"] = json!(dep_errors);
} }
@@ -684,6 +719,8 @@ fn handle_task_list(config: &GlobalConfig) -> Result<Value> {
"owner": t.owner, "owner": t.owner,
"blocked_by": t.blocked_by.iter().collect::<Vec<_>>(), "blocked_by": t.blocked_by.iter().collect::<Vec<_>>(),
"blocks": t.blocks.iter().collect::<Vec<_>>(), "blocks": t.blocks.iter().collect::<Vec<_>>(),
"agent": t.dispatch_agent,
"prompt": t.prompt,
}) })
}) })
.collect(); .collect();
@@ -691,37 +728,151 @@ fn handle_task_list(config: &GlobalConfig) -> Result<Value> {
Ok(json!({ "tasks": tasks })) Ok(json!({ "tasks": tasks }))
} }
fn handle_task_complete(config: &GlobalConfig, args: &Value) -> Result<Value> { async fn handle_task_complete(config: &GlobalConfig, args: &Value) -> Result<Value> {
let task_id = args let task_id = args
.get("task_id") .get("task_id")
.and_then(Value::as_str) .and_then(Value::as_str)
.ok_or_else(|| anyhow!("'task_id' is required"))?; .ok_or_else(|| anyhow!("'task_id' is required"))?;
let cfg = config.read(); let (newly_runnable, dispatchable) = {
let supervisor = cfg let cfg = config.read();
.supervisor let supervisor = cfg
.as_ref() .supervisor
.ok_or_else(|| anyhow!("No supervisor active"))?; .as_ref()
let mut sup = supervisor.write(); .ok_or_else(|| anyhow!("No supervisor active"))?;
let mut sup = supervisor.write();
let newly_runnable_ids = sup.task_queue_mut().complete(task_id); let newly_runnable_ids = sup.task_queue_mut().complete(task_id);
let newly_runnable: Vec<Value> = newly_runnable_ids let mut newly_runnable = Vec::new();
.iter() let mut to_dispatch: Vec<(String, String, String)> = Vec::new();
.filter_map(|id| {
sup.task_queue().get(id).map(|t| { for id in &newly_runnable_ids {
json!({ if let Some(t) = sup.task_queue().get(id) {
newly_runnable.push(json!({
"id": t.id, "id": t.id,
"subject": t.subject, "subject": t.subject,
"description": t.description, "description": t.description,
}) "agent": t.dispatch_agent,
}) }));
})
.collect();
Ok(json!({ if let (Some(agent), Some(prompt)) = (&t.dispatch_agent, &t.prompt) {
to_dispatch.push((id.clone(), agent.clone(), prompt.clone()));
}
}
}
let mut dispatchable = Vec::new();
for (tid, agent, prompt) in to_dispatch {
if sup.task_queue_mut().claim(&tid, &format!("auto:{agent}")) {
dispatchable.push((agent, prompt));
}
}
(newly_runnable, dispatchable)
};
let mut spawned = Vec::new();
for (agent, prompt) in &dispatchable {
let spawn_args = json!({
"agent": agent,
"prompt": prompt,
});
match handle_spawn(config, &spawn_args).await {
Ok(result) => {
let agent_id = result
.get("id")
.and_then(Value::as_str)
.unwrap_or("unknown");
debug!("Auto-dispatched agent '{}' for task queue", agent_id);
spawned.push(result);
}
Err(e) => {
spawned.push(json!({
"status": "error",
"agent": agent,
"message": format!("Auto-dispatch failed: {e}"),
}));
}
}
}
let mut result = json!({
"status": "ok", "status": "ok",
"task_id": task_id, "task_id": task_id,
"newly_runnable": newly_runnable, "newly_runnable": newly_runnable,
})) });
if !spawned.is_empty() {
result["auto_dispatched"] = json!(spawned);
}
Ok(result)
}
const SUMMARIZATION_PROMPT: &str = r#"You are a precise summarization assistant. Your job is to condense a sub-agent's output into a compact summary that preserves all actionable information.
Rules:
- Preserve ALL code snippets, file paths, error messages, and concrete recommendations
- Remove conversational filler, thinking-out-loud, and redundant explanations
- Keep the summary under 30% of the original length
- Use bullet points for multiple findings
- If the output contains a final answer or conclusion, lead with it"#;
async fn summarize_output(config: &GlobalConfig, agent_name: &str, output: &str) -> Result<String> {
let (threshold, summarization_model_id) = {
let cfg = config.read();
match cfg.agent.as_ref() {
Some(agent) => (
agent.summarization_threshold(),
agent.summarization_model().map(|s| s.to_string()),
),
None => return Ok(output.to_string()),
}
};
if output.len() < threshold {
debug!(
"Output from '{}' is {} chars (threshold {}), skipping summarization",
agent_name,
output.len(),
threshold
);
return Ok(output.to_string());
}
debug!(
"Output from '{}' is {} chars (threshold {}), summarizing...",
agent_name,
output.len(),
threshold
);
let model = {
let cfg = config.read();
match summarization_model_id {
Some(ref model_id) => Model::retrieve_model(&cfg, model_id, ModelType::Chat)?,
None => cfg.current_model().clone(),
}
};
let mut role = Role::new("summarizer", SUMMARIZATION_PROMPT);
role.set_model(model);
let user_message = format!(
"Summarize the following sub-agent output from '{}':\n\n{}",
agent_name, output
);
let input = Input::from_str(config, &user_message, Some(role));
let summary = input.fetch_chat_text().await?;
debug!(
"Summarized output from '{}': {} chars -> {} chars",
agent_name,
output.len(),
summary.len()
);
Ok(summary)
} }
+42 -18
View File
@@ -20,10 +20,18 @@ pub struct TaskNode {
pub owner: Option<String>, pub owner: Option<String>,
pub blocked_by: HashSet<String>, pub blocked_by: HashSet<String>,
pub blocks: HashSet<String>, pub blocks: HashSet<String>,
pub dispatch_agent: Option<String>,
pub prompt: Option<String>,
} }
impl TaskNode { impl TaskNode {
pub fn new(id: String, subject: String, description: String) -> Self { pub fn new(
id: String,
subject: String,
description: String,
dispatch_agent: Option<String>,
prompt: Option<String>,
) -> Self {
Self { Self {
id, id,
subject, subject,
@@ -32,6 +40,8 @@ impl TaskNode {
owner: None, owner: None,
blocked_by: HashSet::new(), blocked_by: HashSet::new(),
blocks: HashSet::new(), blocks: HashSet::new(),
dispatch_agent,
prompt,
} }
} }
@@ -54,10 +64,16 @@ impl TaskQueue {
} }
} }
pub fn create(&mut self, subject: String, description: String) -> String { pub fn create(
&mut self,
subject: String,
description: String,
dispatch_agent: Option<String>,
prompt: Option<String>,
) -> String {
let id = self.next_id.to_string(); let id = self.next_id.to_string();
self.next_id += 1; self.next_id += 1;
let task = TaskNode::new(id.clone(), subject, description); let task = TaskNode::new(id.clone(), subject, description, dispatch_agent, prompt);
self.tasks.insert(id.clone(), task); self.tasks.insert(id.clone(), task);
id id
} }
@@ -122,8 +138,10 @@ impl TaskQueue {
} }
pub fn claim(&mut self, task_id: &str, owner: &str) -> bool { pub fn claim(&mut self, task_id: &str, owner: &str) -> bool {
if let Some(task) = self.tasks.get_mut(task_id) && if let Some(task) = self.tasks.get_mut(task_id)
task.is_runnable() && task.owner.is_none() { && task.is_runnable()
&& task.owner.is_none()
{
task.owner = Some(owner.to_string()); task.owner = Some(owner.to_string());
task.status = TaskStatus::InProgress; task.status = TaskStatus::InProgress;
return true; return true;
@@ -154,8 +172,9 @@ impl TaskQueue {
if current == task_id { if current == task_id {
return true; return true;
} }
if visited.insert(current.clone()) && if visited.insert(current.clone())
let Some(task) = self.tasks.get(&current) { && let Some(task) = self.tasks.get(&current)
{
for dep in &task.blocked_by { for dep in &task.blocked_by {
stack.push(dep.clone()); stack.push(dep.clone());
} }
@@ -173,8 +192,13 @@ mod tests {
#[test] #[test]
fn test_create_and_list() { fn test_create_and_list() {
let mut queue = TaskQueue::new(); let mut queue = TaskQueue::new();
let id1 = queue.create("Research".into(), "Research auth patterns".into()); let id1 = queue.create(
let id2 = queue.create("Implement".into(), "Write the code".into()); "Research".into(),
"Research auth patterns".into(),
None,
None,
);
let id2 = queue.create("Implement".into(), "Write the code".into(), None, None);
assert_eq!(id1, "1"); assert_eq!(id1, "1");
assert_eq!(id2, "2"); assert_eq!(id2, "2");
@@ -184,8 +208,8 @@ mod tests {
#[test] #[test]
fn test_dependency_and_completion() { fn test_dependency_and_completion() {
let mut queue = TaskQueue::new(); let mut queue = TaskQueue::new();
let id1 = queue.create("Step 1".into(), "".into()); let id1 = queue.create("Step 1".into(), "".into(), None, None);
let id2 = queue.create("Step 2".into(), "".into()); let id2 = queue.create("Step 2".into(), "".into(), None, None);
queue.add_dependency(&id2, &id1).unwrap(); queue.add_dependency(&id2, &id1).unwrap();
@@ -201,9 +225,9 @@ mod tests {
#[test] #[test]
fn test_fan_in_dependency() { fn test_fan_in_dependency() {
let mut queue = TaskQueue::new(); let mut queue = TaskQueue::new();
let id1 = queue.create("A".into(), "".into()); let id1 = queue.create("A".into(), "".into(), None, None);
let id2 = queue.create("B".into(), "".into()); let id2 = queue.create("B".into(), "".into(), None, None);
let id3 = queue.create("C (needs A and B)".into(), "".into()); let id3 = queue.create("C (needs A and B)".into(), "".into(), None, None);
queue.add_dependency(&id3, &id1).unwrap(); queue.add_dependency(&id3, &id1).unwrap();
queue.add_dependency(&id3, &id2).unwrap(); queue.add_dependency(&id3, &id2).unwrap();
@@ -222,8 +246,8 @@ mod tests {
#[test] #[test]
fn test_cycle_detection() { fn test_cycle_detection() {
let mut queue = TaskQueue::new(); let mut queue = TaskQueue::new();
let id1 = queue.create("A".into(), "".into()); let id1 = queue.create("A".into(), "".into(), None, None);
let id2 = queue.create("B".into(), "".into()); let id2 = queue.create("B".into(), "".into(), None, None);
queue.add_dependency(&id2, &id1).unwrap(); queue.add_dependency(&id2, &id1).unwrap();
let result = queue.add_dependency(&id1, &id2); let result = queue.add_dependency(&id1, &id2);
@@ -234,7 +258,7 @@ mod tests {
#[test] #[test]
fn test_self_dependency_rejected() { fn test_self_dependency_rejected() {
let mut queue = TaskQueue::new(); let mut queue = TaskQueue::new();
let id1 = queue.create("A".into(), "".into()); let id1 = queue.create("A".into(), "".into(), None, None);
let result = queue.add_dependency(&id1, &id1); let result = queue.add_dependency(&id1, &id1);
assert!(result.is_err()); assert!(result.is_err());
} }
@@ -242,7 +266,7 @@ mod tests {
#[test] #[test]
fn test_claim() { fn test_claim() {
let mut queue = TaskQueue::new(); let mut queue = TaskQueue::new();
let id1 = queue.create("Task".into(), "".into()); let id1 = queue.create("Task".into(), "".into(), None, None);
assert!(queue.claim(&id1, "worker-1")); assert!(queue.claim(&id1, "worker-1"));
assert!(!queue.claim(&id1, "worker-2")); assert!(!queue.claim(&id1, "worker-2"));