diff --git a/assets/agents/sisyphus/tools.sh b/assets/agents/sisyphus/tools.sh index 7d8e70f..955d9ab 100755 --- a/assets/agents/sisyphus/tools.sh +++ b/assets/agents/sisyphus/tools.sh @@ -146,7 +146,7 @@ search_content() { fi } -# @cmd Ask the user to select ONE option from a list. The first option should be your recommended choice — append '(Recommended)' to its label. Returns the selected option's label text. +# @cmd Ask the user to select ONE option from a list. The first option should be your recommended choice; append '(Recommended)' to its label. Returns the selected option's label text. # @option --question! The question to present to the user # @option --options! The JSON array of options to present (first option = recommended, append '(Recommended)' to its label) ask_user() { diff --git a/src/config/agent.rs b/src/config/agent.rs index 535aeb3..973bc8d 100644 --- a/src/config/agent.rs +++ b/src/config/agent.rs @@ -204,6 +204,7 @@ impl Agent { } functions.append_teammate_functions(); + functions.append_user_interaction_functions(); agent_config.replace_tools_placeholder(&functions); diff --git a/src/config/mod.rs b/src/config/mod.rs index 7e184d9..1c972a9 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -30,6 +30,7 @@ use crate::mcp::{ MCP_SEARCH_META_FUNCTION_NAME_PREFIX, McpRegistry, }; use crate::supervisor::Supervisor; +use crate::supervisor::escalation::EscalationQueue; use crate::supervisor::mailbox::Inbox; use crate::vault::{GlobalVault, Vault, create_vault_password_file, interpolate_secrets}; use anyhow::{Context, Result, anyhow, bail}; @@ -220,6 +221,8 @@ pub struct Config { pub current_depth: usize, #[serde(skip)] pub inbox: Option>, + #[serde(skip)] + pub root_escalation_queue: Option>, } impl Default for Config { @@ -298,6 +301,7 @@ impl Default for Config { self_agent_id: None, current_depth: 0, inbox: None, + root_escalation_queue: None, } } } diff --git a/src/config/prompts.rs b/src/config/prompts.rs index 0f140af..765fbd5 100644 --- a/src/config/prompts.rs +++ b/src/config/prompts.rs @@ -85,6 +85,21 @@ pub(in crate::config) const DEFAULT_SPAWN_INSTRUCTIONS: &str = indoc! {" # If dependents have --agent set, they auto-dispatch agent__task_complete --task_id task_1 ``` + + ### Escalation Handling + + Child agents may need user input but cannot prompt the user directly. When this happens, + you will see `pending_escalations` in your tool results listing blocked children and their questions. + + | Tool | Purpose | + |------|----------| + | `agent__reply_escalation` | Unblock a child agent by answering its escalated question. | + + When you see a pending escalation: + 1. Read the child's question and options. + 2. If you can answer from context, call `agent__reply_escalation` with your answer. + 3. If you need the user's input, call the appropriate `user__*` tool yourself, then relay the answer via `agent__reply_escalation`. + 4. **Respond promptly**; the child agent is blocked and waiting (5-minute timeout). "}; pub(in crate::config) const DEFAULT_TEAMMATE_INSTRUCTIONS: &str = indoc! {" diff --git a/src/function/mod.rs b/src/function/mod.rs index 4fb60a7..5421c14 100644 --- a/src/function/mod.rs +++ b/src/function/mod.rs @@ -123,6 +123,31 @@ pub async fn eval_tool_calls( if is_all_null { output = vec![]; } + + if !output.is_empty() { + let (has_escalations, summary) = { + let cfg = config.read(); + if cfg.current_depth == 0 && let Some(ref queue) = cfg.root_escalation_queue && queue.has_pending() { + (true, queue.pending_summary()) + } else { + (false, vec![]) + } + }; + + if has_escalations { + let notification = json!({ + "pending_escalations": summary, + "instruction": "Child agents are BLOCKED waiting for your reply. Call agent__reply_escalation for each pending escalation to unblock them." + }); + let synthetic_call = ToolCall::new( + "__escalation_notification".to_string(), + json!({}), + Some("escalation_check".to_string()), + ); + output.push(ToolResult::new(synthetic_call, notification)); + } + } + Ok(output) } @@ -276,6 +301,8 @@ impl Functions { pub fn append_supervisor_functions(&mut self) { self.declarations .extend(supervisor::supervisor_function_declarations()); + self.declarations + .extend(supervisor::escalation_function_declarations()); } pub fn append_teammate_functions(&mut self) { diff --git a/src/function/supervisor.rs b/src/function/supervisor.rs index 5d9e99d..4f208bb 100644 --- a/src/function/supervisor.rs +++ b/src/function/supervisor.rs @@ -1,6 +1,7 @@ use super::{FunctionDeclaration, JsonSchema}; use crate::client::{Model, ModelType, call_chat_completions}; use crate::config::{Config, GlobalConfig, Input, Role, RoleLike}; +use crate::supervisor::escalation::EscalationQueue; use crate::supervisor::mailbox::{Envelope, EnvelopePayload, Inbox}; use crate::supervisor::{AgentExitStatus, AgentHandle, AgentResult}; use crate::utils::{AbortSignal, create_abort_signal}; @@ -17,6 +18,37 @@ use uuid::Uuid; pub const SUPERVISOR_FUNCTION_PREFIX: &str = "agent__"; +pub fn escalation_function_declarations() -> Vec { + vec![FunctionDeclaration { + name: format!("{SUPERVISOR_FUNCTION_PREFIX}reply_escalation"), + description: "Reply to a pending escalation from a child agent. The child is blocked waiting for this reply. Use this after seeing pending_escalations notifications.".to_string(), + parameters: JsonSchema { + type_value: Some("object".to_string()), + properties: Some(IndexMap::from([ + ( + "escalation_id".to_string(), + JsonSchema { + type_value: Some("string".to_string()), + description: Some("The escalation ID from the pending_escalations notification".into()), + ..Default::default() + }, + ), + ( + "reply".to_string(), + JsonSchema { + type_value: Some("string".to_string()), + description: Some("Your answer to the child agent's question. For ask/confirm questions, use the exact option text. For input questions, provide the text response.".into()), + ..Default::default() + }, + ), + ])), + required: Some(vec!["escalation_id".to_string(), "reply".to_string()]), + ..Default::default() + }, + agent: false, + }] +} + pub fn supervisor_function_declarations() -> Vec { vec![ FunctionDeclaration { @@ -285,6 +317,7 @@ pub async fn handle_supervisor_tool( "task_list" => handle_task_list(config), "task_complete" => handle_task_complete(config, args).await, "task_fail" => handle_task_fail(config, args), + "reply_escalation" => handle_reply_escalation(config, args), _ => bail!("Unknown supervisor action: {action}"), } } @@ -377,6 +410,13 @@ async fn handle_spawn(config: &GlobalConfig, args: &Value) -> Result { let child_inbox = Arc::new(Inbox::new()); + { + let mut cfg = config.write(); + if cfg.root_escalation_queue.is_none() { + cfg.root_escalation_queue = Some(Arc::new(EscalationQueue::new())); + } + } + let child_config: GlobalConfig = { let mut child_cfg = config.read().clone(); @@ -664,6 +704,41 @@ fn handle_check_inbox(config: &GlobalConfig) -> Result { } } +fn handle_reply_escalation(config: &GlobalConfig, args: &Value) -> Result { + let escalation_id = args + .get("escalation_id") + .and_then(Value::as_str) + .ok_or_else(|| anyhow!("'escalation_id' is required"))?; + let reply = args + .get("reply") + .and_then(Value::as_str) + .ok_or_else(|| anyhow!("'reply' is required"))?; + + let queue = { + let cfg = config.read(); + cfg.root_escalation_queue + .clone() + .ok_or_else(|| anyhow!("No escalation queue available"))? + }; + + match queue.take(escalation_id) { + Some(request) => { + let from_agent = request.from_agent_name.clone(); + let question = request.question.clone(); + let _ = request.reply_tx.send(reply.to_string()); + Ok(json!({ + "status": "ok", + "message": format!("Reply sent to agent '{from_agent}' for escalation '{escalation_id}'"), + "original_question": question, + })) + } + None => Ok(json!({ + "status": "error", + "message": format!("No pending escalation found with id '{escalation_id}'. It may have already been replied to or timed out."), + })), + } +} + fn handle_task_create(config: &GlobalConfig, args: &Value) -> Result { let subject = args .get("subject") diff --git a/src/supervisor/escalation.rs b/src/supervisor/escalation.rs new file mode 100644 index 0000000..18a4533 --- /dev/null +++ b/src/supervisor/escalation.rs @@ -0,0 +1,80 @@ +use fmt::{Debug, Formatter}; +use serde_json::{Value, json}; +use std::collections::HashMap; +use std::fmt; +use tokio::sync::oneshot; +use uuid::Uuid; + +pub struct EscalationRequest { + pub id: String, + pub from_agent_id: String, + pub from_agent_name: String, + pub question: String, + pub options: Option>, + pub reply_tx: oneshot::Sender, +} + +pub struct EscalationQueue { + pending: parking_lot::Mutex>, +} + +impl EscalationQueue { + pub fn new() -> Self { + Self { + pending: parking_lot::Mutex::new(HashMap::new()), + } + } + + pub fn submit(&self, request: EscalationRequest) -> String { + let id = request.id.clone(); + self.pending.lock().insert(id.clone(), request); + id + } + + pub fn take(&self, escalation_id: &str) -> Option { + self.pending.lock().remove(escalation_id) + } + + pub fn pending_summary(&self) -> Vec { + self.pending + .lock() + .values() + .map(|r| { + let mut entry = json!({ + "escalation_id": r.id, + "from_agent_id": r.from_agent_id, + "from_agent_name": r.from_agent_name, + "question": r.question, + }); + if let Some(ref options) = r.options { + entry["options"] = json!(options); + } + entry + }) + .collect() + } + + pub fn has_pending(&self) -> bool { + !self.pending.lock().is_empty() + } +} + +impl Default for EscalationQueue { + fn default() -> Self { + Self::new() + } +} + +impl Debug for EscalationQueue { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let count = self.pending.lock().len(); + f.debug_struct("EscalationQueue") + .field("pending_count", &count) + .finish() + } +} + +pub fn new_escalation_id() -> String { + let short = &Uuid::new_v4().to_string()[..8]; + format!("esc_{short}") +} diff --git a/src/supervisor/mod.rs b/src/supervisor/mod.rs index a120b26..4fd7a21 100644 --- a/src/supervisor/mod.rs +++ b/src/supervisor/mod.rs @@ -1,3 +1,4 @@ +pub mod escalation; pub mod mailbox; pub mod taskqueue;