diff --git a/src/config/prompts.rs b/src/config/prompts.rs index 7f0eb67..8c1bf68 100644 --- a/src/config/prompts.rs +++ b/src/config/prompts.rs @@ -93,6 +93,36 @@ pub(in crate::config) const DEFAULT_SPAWN_INSTRUCTIONS: &str = indoc! {" agent__collect --id agent_explore_e5f6g7h8 ``` + ### CRITICAL: Never end your turn with pending agents + + Spawned agents do NOT report back on their own. They run in the background until you + actively reclaim them with `agent__collect` (to get their output) or `agent__cancel` + (to discard them). If you spawn agents and then emit a final message without reclaiming + them, the system will detect the unreclaimed agents and reject the turn-end, injecting + a reminder forcing you to handle them. After several such reminders, the system will + auto-cancel them and warn you that work was lost. + + The correct flow when you have nothing else to do: + + ``` + # WRONG - do NOT do this: + agent__spawn --agent explore --prompt \"...\" + agent__spawn --agent explore --prompt \"...\" + # ... emit text like \"I will synthesize once they report back.\" and stop + # ^ The agents will be abandoned. Their output will be lost. + + # RIGHT - always do this: + agent__spawn --agent explore --prompt \"...\" + agent__spawn --agent explore --prompt \"...\" + agent__collect --id # blocks until done + agent__collect --id # blocks until done + # ... NOW you can synthesize and end your turn + ``` + + `agent__collect` is a **blocking wait**: it pauses your execution until the agent + completes, then returns the output as a tool result. Use it freely — it is the + correct primitive for \"I'm done with my own work and just need the agents' results\". + ### Parallel Spawning (DEFAULT for multi-agent work) When a task needs multiple agents, **spawn them all at once**, then collect: diff --git a/src/config/request_context.rs b/src/config/request_context.rs index bef6db2..61aa8c5 100644 --- a/src/config/request_context.rs +++ b/src/config/request_context.rs @@ -120,6 +120,7 @@ pub struct RequestContext { pub escalation_queue: Option>, pub current_depth: usize, pub auto_continue_count: usize, + pub pending_agents_guardrail_count: u32, pub todo_list: TodoList, pub skill_registry: SkillRegistry, pub last_continuation_response: Option, @@ -149,6 +150,7 @@ impl RequestContext { escalation_queue: None, current_depth: 0, auto_continue_count: 0, + pending_agents_guardrail_count: 0, todo_list: TodoList::default(), skill_registry: SkillRegistry::default(), last_continuation_response: None, @@ -204,6 +206,7 @@ impl RequestContext { escalation_queue: None, current_depth: 0, auto_continue_count: 0, + pending_agents_guardrail_count: 0, todo_list: TodoList::default(), skill_registry: SkillRegistry::default(), last_continuation_response: None, @@ -246,6 +249,7 @@ impl RequestContext { escalation_queue: self.escalation_queue.clone(), current_depth: self.current_depth, auto_continue_count: 0, + pending_agents_guardrail_count: 0, todo_list: self.todo_list.clone(), skill_registry: self.skill_registry.clone(), last_continuation_response: None, @@ -286,6 +290,7 @@ impl RequestContext { escalation_queue: parent.escalation_queue.clone(), current_depth, auto_continue_count: 0, + pending_agents_guardrail_count: 0, todo_list: TodoList::default(), skill_registry: SkillRegistry::default(), last_continuation_response: None, @@ -2787,7 +2792,7 @@ impl RequestContext { if self.agent.take().is_some() { if let Some(supervisor) = self.supervisor.clone() { - supervisor.read().cancel_all(); + supervisor.read().cancel_recursive(); } self.supervisor = None; self.parent_supervisor = None; @@ -2796,6 +2801,7 @@ impl RequestContext { self.escalation_queue = None; self.current_depth = 0; self.auto_continue_count = 0; + self.pending_agents_guardrail_count = 0; self.todo_list = TodoList::default(); self.rag.take(); self.discontinuous_last_message(); diff --git a/src/function/supervisor.rs b/src/function/supervisor.rs index c15841f..e73ba68 100644 --- a/src/function/supervisor.rs +++ b/src/function/supervisor.rs @@ -3,7 +3,7 @@ use crate::client::{Model, ModelType, call_chat_completions}; use crate::config::{Agent, AppState, Input, RequestContext, Role, RoleLike}; use crate::supervisor::mailbox::{Envelope, EnvelopePayload, Inbox}; use crate::supervisor::{AgentExitStatus, AgentHandle, AgentResult, Supervisor}; -use crate::utils::{AbortSignal, create_abort_signal}; +use crate::utils::{AbortSignal, create_abort_signal, wait_abort_signal}; use crate::graph; use anyhow::{Context, Result, anyhow, bail}; @@ -16,10 +16,69 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; use tokio::time; +use tokio::time::Instant; use uuid::Uuid; pub const SUPERVISOR_FUNCTION_PREFIX: &str = "agent__"; +pub const PENDING_AGENTS_GUARDRAIL_MAX: u32 = 3; + +pub enum GuardrailAction { + NoAction, + Inject(String), + ForceTerminate(Vec), +} + +pub fn pending_agent_ids(ctx: &RequestContext) -> Vec { + let Some(sup) = ctx.supervisor.as_ref() else { + return Vec::new(); + }; + let sup = sup.read(); + sup.list_agents() + .into_iter() + .filter_map(|(id, _)| match sup.is_finished(id) { + Some(false) => Some(id.to_string()), + _ => None, + }) + .collect() +} + +pub fn build_pending_agents_guardrail_prompt(ids: &[String]) -> String { + let count = ids.len(); + let id_list = ids + .iter() + .map(|id| format!("- {id}")) + .collect::>() + .join("\n"); + format!( + "[SYSTEM GUARDRAIL] You attempted to end your turn while {count} spawned background agent(s) \ + are still running:\n{id_list}\n\nThese agents will be abandoned if your turn ends now. You MUST \ + reclaim each one before ending your turn. For each agent: call `agent__collect` (blocks until \ + done, returns output) or `agent__cancel` (discards). Do NOT emit a text-only response \ + expecting them to 'report back' — they will not." + ) +} + +pub fn check_pending_agents_guardrail(ctx: &mut RequestContext) -> GuardrailAction { + let pending = pending_agent_ids(ctx); + if pending.is_empty() { + ctx.pending_agents_guardrail_count = 0; + return GuardrailAction::NoAction; + } + + if ctx.pending_agents_guardrail_count >= PENDING_AGENTS_GUARDRAIL_MAX { + if let Some(sup) = ctx.supervisor.as_ref().cloned() { + sup.read().cancel_recursive(); + } + ctx.pending_agents_guardrail_count = 0; + + return GuardrailAction::ForceTerminate(pending); + } + + ctx.pending_agents_guardrail_count += 1; + GuardrailAction::Inject(build_pending_agents_guardrail_prompt(&pending)) +} + pub fn escalation_function_declarations() -> Vec { vec![FunctionDeclaration { name: format!("{SUPERVISOR_FUNCTION_PREFIX}reply_escalation"), @@ -55,7 +114,11 @@ pub fn supervisor_function_declarations() -> Vec { vec![ FunctionDeclaration { name: format!("{SUPERVISOR_FUNCTION_PREFIX}spawn"), - description: "Spawn a subagent to run in the background. Returns a task_id for tracking. The agent runs in parallel. You can continue working while it executes.".to_string(), + description: "Spawn a subagent to run in the background. Returns an `id` immediately so you can continue \ + working in parallel. CRITICAL: every spawned agent MUST be reclaimed before you end your \ + turn — call `agent__collect` to retrieve its output, or `agent__cancel` if you no longer \ + need it. Ending your turn with pending agents will abandon their work and the system will \ + reject the turn-end.".to_string(), parameters: JsonSchema { type_value: Some("object".to_string()), properties: Some(IndexMap::from([ @@ -109,7 +172,11 @@ pub fn supervisor_function_declarations() -> Vec { }, FunctionDeclaration { name: format!("{SUPERVISOR_FUNCTION_PREFIX}collect"), - description: "Wait for a spawned agent to finish and return its result. Blocks until the agent completes.".to_string(), + description: "Block until the named spawned agent finishes and return its result. This is your primary \ + wait primitive — it pauses your execution until the agent completes (or you are interrupted). \ + Call this for every agent you spawned before ending your turn. Do NOT end your turn assuming \ + agents will 'report back later' — they will not; they will be abandoned. If you no longer \ + need an agent's result, call `agent__cancel` instead.".to_string(), parameters: JsonSchema { type_value: Some("object".to_string()), properties: Some(IndexMap::from([( @@ -137,7 +204,10 @@ pub fn supervisor_function_declarations() -> Vec { }, FunctionDeclaration { name: format!("{SUPERVISOR_FUNCTION_PREFIX}cancel"), - description: "Cancel a running subagent by its ID.".to_string(), + description: "Cancel a running subagent by its ID. Use this when an agent's output is no longer needed \ + (e.g. you changed direction, or you're about to end your turn and don't want to wait). \ + Cancellation cascades: all of the cancelled agent's own descendants are also cancelled. This \ + call waits briefly for the agent to actually finish cleanup before returning.".to_string(), parameters: JsonSchema { type_value: Some("object".to_string()), properties: Some(IndexMap::from([( @@ -315,7 +385,7 @@ pub async fn handle_supervisor_tool( "check" => handle_check(ctx, args).await, "collect" => handle_collect(ctx, args).await, "list" => handle_list(ctx), - "cancel" => handle_cancel(ctx, args), + "cancel" => handle_cancel(ctx, args).await, "send_message" => handle_send_message(ctx, args), "check_inbox" => handle_check_inbox(ctx), "task_create" => handle_task_create(ctx, args), @@ -370,14 +440,28 @@ pub fn run_child_agent( } if tool_results.is_empty() { - break; + match check_pending_agents_guardrail(&mut child_ctx) { + GuardrailAction::NoAction => break, + GuardrailAction::ForceTerminate(ids) => { + log::warn!( + "Pending-agent guardrail force-cancelled {} agent(s) after max reminders: {:?}", + ids.len(), + ids + ); + break; + } + GuardrailAction::Inject(prompt) => { + input = Input::from_str(&child_ctx, &prompt, None)?; + continue; + } + } } input = input.merge_tool_results(output, tool_results); } if let Some(supervisor) = child_ctx.supervisor.clone() { - supervisor.read().cancel_all(); + supervisor.read().cancel_recursive(); } Ok(accumulated_output) @@ -642,6 +726,7 @@ async fn handle_spawn(ctx: &mut RequestContext, args: &Value) -> Result { let spawn_agent_id = agent_id.clone(); let spawn_agent_name = agent_name.clone(); let spawn_abort = child_abort.clone(); + let child_supervisor = child_ctx.supervisor.clone(); let join_handle = tokio::spawn(async move { let result = run_child_agent(child_ctx, input, spawn_abort).await; @@ -669,6 +754,7 @@ async fn handle_spawn(ctx: &mut RequestContext, args: &Value) -> Result { inbox: child_inbox, abort_signal: child_abort, join_handle, + child_supervisor, }; let supervisor = ctx @@ -683,7 +769,11 @@ async fn handle_spawn(ctx: &mut RequestContext, args: &Value) -> Result { "status": "ok", "id": agent_id, "agent": agent_name, - "message": format!("Agent '{agent_name}' spawned as '{agent_id}'. Use agent__check or agent__collect to get results."), + "message": format!("Agent '{agent_name}' spawned as '{agent_id}' and is running in the background. CRITICAL: \ + you MUST reclaim this agent before ending your turn — call `agent__collect` (blocks until \ + done, returns output) or `agent__cancel` (if you no longer need it). Ending your turn with \ + unreclaimed agents will be rejected and forces you to handle them. Do NOT assume the agent \ + will 'report back' on its own."), })) } @@ -743,7 +833,7 @@ async fn handle_collect(ctx: &mut RequestContext, args: &Value) -> Result .cloned() .ok_or_else(|| anyhow!("No supervisor active"))?; - { + let target_abort = { let sup = supervisor.read(); if sup.is_finished(id).is_none() { return Ok(json!({ @@ -751,7 +841,8 @@ async fn handle_collect(ctx: &mut RequestContext, args: &Value) -> Result "message": format!("Agent '{id}' not found. Use agent__check to verify it exists and is finished.") })); } - } + sup.abort_signal_for(id) + }; loop { let is_finished = { @@ -775,7 +866,27 @@ async fn handle_collect(ctx: &mut RequestContext, args: &Value) -> Result })); } - time::sleep(Duration::from_millis(200)).await; + match target_abort.as_ref() { + Some(abort) if abort.aborted() => { + let deadline = Instant::now() + Duration::from_secs(2); + while Instant::now() < deadline { + if supervisor.read().is_finished(id).unwrap_or(false) { + break; + } + time::sleep(Duration::from_millis(50)).await; + } + break; + } + Some(abort) => { + tokio::select! { + _ = time::sleep(Duration::from_millis(200)) => {} + _ = wait_abort_signal(abort) => {} + } + } + None => { + time::sleep(Duration::from_millis(200)).await; + } + } } let handle = { @@ -792,6 +903,7 @@ async fn handle_collect(ctx: &mut RequestContext, args: &Value) -> Result .map_err(|e| anyhow!("Agent failed: {e}"))?; let output = summarize_output(ctx, &result.agent_name, &result.output).await?; + ctx.pending_agents_guardrail_count = 0; Ok(json!({ "status": "completed", @@ -836,7 +948,7 @@ fn handle_list(ctx: &mut RequestContext) -> Result { })) } -fn handle_cancel(ctx: &mut RequestContext, args: &Value) -> Result { +async fn handle_cancel(ctx: &mut RequestContext, args: &Value) -> Result { let id = args .get("id") .and_then(Value::as_str) @@ -847,14 +959,34 @@ fn handle_cancel(ctx: &mut RequestContext, args: &Value) -> Result { .as_ref() .cloned() .ok_or_else(|| anyhow!("No supervisor active"))?; - let mut sup = supervisor.write(); - match sup.take(id) { + let handle = { + let mut sup = supervisor.write(); + sup.take(id) + }; + + match handle { Some(handle) => { + let agent_name = handle.agent_name.clone(); + if let Some(child_sup) = handle.child_supervisor.as_ref() { + child_sup.read().cancel_recursive(); + } handle.abort_signal.set_ctrlc(); + + let cleanup = tokio::time::timeout(Duration::from_secs(5), handle.join_handle).await; + + ctx.pending_agents_guardrail_count = 0; + + let message = match cleanup { + Ok(_) => format!("Cancelled agent '{agent_name}' and waited for cleanup."), + Err(_) => format!( + "Cancelled agent '{agent_name}'; cleanup did not complete within 5s. Its descendants have been signalled and will tear down asynchronously." + ), + }; + Ok(json!({ "status": "ok", - "message": format!("Cancelled agent '{}'", handle.agent_name), + "message": message, })) } None => Ok(json!({ @@ -1283,6 +1415,7 @@ mod tests { inbox: Arc::new(Inbox::new()), abort_signal: create_abort_signal(), join_handle, + child_supervisor: None, }; ctx.supervisor .as_ref() @@ -1362,6 +1495,7 @@ mod tests { inbox, abort_signal: abort, join_handle, + child_supervisor: None, }; ctx.supervisor .as_ref() @@ -1381,7 +1515,7 @@ mod tests { fn handle_cancel_registered_agent() { let mut ctx = ctx_with_supervisor(4, 3); register_fake_agent(&mut ctx, "a1", "explore"); - let result = handle_cancel(&mut ctx, &json!({"id": "a1"})).unwrap(); + let result = run_async(handle_cancel(&mut ctx, &json!({"id": "a1"}))).unwrap(); assert_eq!(result["status"], "ok"); assert_eq!(ctx.supervisor.as_ref().unwrap().read().active_count(), 0); } @@ -1389,14 +1523,14 @@ mod tests { #[test] fn handle_cancel_unknown_agent() { let mut ctx = ctx_with_supervisor(4, 3); - let result = handle_cancel(&mut ctx, &json!({"id": "missing"})).unwrap(); + let result = run_async(handle_cancel(&mut ctx, &json!({"id": "missing"}))).unwrap(); assert_eq!(result["status"], "error"); } #[test] fn handle_cancel_no_supervisor_errors() { let mut ctx = RequestContext::new(default_app_state(), WorkingMode::Cmd); - let result = handle_cancel(&mut ctx, &json!({"id": "x"})); + let result = run_async(handle_cancel(&mut ctx, &json!({"id": "x"}))); assert!(result.is_err()); } diff --git a/src/graph/llm.rs b/src/graph/llm.rs index d25d52f..9008d55 100644 --- a/src/graph/llm.rs +++ b/src/graph/llm.rs @@ -7,8 +7,10 @@ use crate::config::{ Input, RequestContext, Role, RoleLike, SkillPolicy, should_inject_skill_instructions, }; use crate::function::skill::skill_function_declarations; +use crate::function::supervisor::{GuardrailAction, check_pending_agents_guardrail}; use crate::utils::create_abort_signal; use anyhow::{Context, Error, Result, anyhow, bail}; +use log::warn; use serde_json::Value; use std::collections::HashSet; use std::sync::Arc; @@ -266,7 +268,28 @@ async fn run_chat_loop(node: &LlmNode, prompt: &str, ctx: &mut RequestContext) - } if tool_results.is_empty() { - return Ok(accumulated); + match check_pending_agents_guardrail(ctx) { + GuardrailAction::NoAction => return Ok(accumulated), + GuardrailAction::ForceTerminate(ids) => { + warn!( + "Pending-agent guardrail force-cancelled {} agent(s) after max reminders: {:?}", + ids.len(), + ids + ); + return Ok(accumulated); + } + GuardrailAction::Inject(prompt) => { + if turn + 1 == node.max_iterations { + bail!( + "llm node hit max_iterations ({}) before LLM concluded", + node.max_iterations + ); + } + let role = ctx.role.clone(); + input = Input::from_str(ctx, &prompt, role)?; + continue; + } + } } if turn + 1 == node.max_iterations { diff --git a/src/main.rs b/src/main.rs index 43c74b8..4c8f7bd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,6 +26,7 @@ use crate::config::{ RequestContext, SHELL_ROLE, TEMP_SESSION_NAME, WorkingMode, ensure_parent_exists, install_builtins, list_agents, load_env_file, macro_execute, sync_models, }; +use crate::function::supervisor::{GuardrailAction, check_pending_agents_guardrail}; use crate::render::{prompt_theme, render_error}; use crate::repl::Repl; use crate::utils::*; @@ -35,7 +36,7 @@ use clap::{CommandFactory, Parser}; use clap_complete::CompleteEnv; use client::ClientConfig; use inquire::{Select, Text, set_global_render_config}; -use log::LevelFilter; +use log::{LevelFilter, warn}; use log4rs::append::console::ConsoleAppender; use log4rs::append::file::FileAppender; use log4rs::config::{Appender, Logger, Root}; @@ -419,6 +420,21 @@ async fn start_directive( abort_signal, ) .await?; + } else { + match check_pending_agents_guardrail(ctx) { + GuardrailAction::Inject(prompt) => { + let guardrail_input = Input::from_str(ctx, &prompt, None)?; + return start_directive(ctx, guardrail_input, code_mode, abort_signal).await; + } + GuardrailAction::ForceTerminate(ids) => { + warn!( + "Pending-agent guardrail force-cancelled {} agent(s) after max reminders: {:?}", + ids.len(), + ids + ); + } + GuardrailAction::NoAction => {} + } } ctx.exit_session()?; diff --git a/src/repl/mod.rs b/src/repl/mod.rs index 4bdbbf0..f7758a1 100644 --- a/src/repl/mod.rs +++ b/src/repl/mod.rs @@ -12,6 +12,7 @@ use crate::config::{ macro_execute, }; use crate::config::{AssetCategory, paths}; +use crate::function::supervisor::{GuardrailAction, check_pending_agents_guardrail}; use crate::render::render_error; use crate::utils::{ AbortSignal, abortable_run_with_spinner, create_abort_signal, dimmed_text, set_text, temp_file, @@ -306,6 +307,9 @@ Type ".help" for additional help. } Ok(Signal::CtrlC) => { self.abort_signal.set_ctrlc(); + if let Some(supervisor) = self.ctx.read().supervisor.clone() { + supervisor.read().cancel_recursive(); + } println!("(To exit, press Ctrl+D or enter \".exit\")\n"); } Ok(Signal::CtrlD) => { @@ -315,6 +319,11 @@ Type ".help" for additional help. _ => {} } } + + if let Some(supervisor) = self.ctx.read().supervisor.clone() { + supervisor.read().cancel_recursive(); + } + self.ctx.write().exit_session()?; Ok(()) } @@ -435,6 +444,7 @@ pub async fn run_repl_command( abort_signal: AbortSignal, mut line: &str, ) -> Result { + ctx.pending_agents_guardrail_count = 0; if let Ok(Some(captures)) = MULTILINE_RE.captures(line) && let Some(text_match) = captures.get(1) { @@ -1011,6 +1021,20 @@ async fn ask( ) .await } else { + match check_pending_agents_guardrail(ctx) { + GuardrailAction::Inject(prompt) => { + let guardrail_input = Input::from_str(ctx, &prompt, None)?; + return ask(ctx, abort_signal, guardrail_input, false).await; + } + GuardrailAction::ForceTerminate(ids) => { + warn!( + "Pending-agent guardrail force-cancelled {} agent(s) after max reminders: {:?}", + ids.len(), + ids + ); + } + GuardrailAction::NoAction => {} + } let do_continue = should_continue(ctx); if do_continue { diff --git a/src/supervisor/mod.rs b/src/supervisor/mod.rs index 9c0c1c8..e1f86ea 100644 --- a/src/supervisor/mod.rs +++ b/src/supervisor/mod.rs @@ -5,6 +5,7 @@ pub mod taskqueue; use crate::utils::AbortSignal; use fmt::{Debug, Formatter}; use mailbox::Inbox; +use parking_lot::RwLock; use taskqueue::TaskQueue; use anyhow::{Result, bail}; @@ -33,6 +34,7 @@ pub struct AgentHandle { pub inbox: Arc, pub abort_signal: AbortSignal, pub join_handle: JoinHandle>, + pub child_supervisor: Option>>, } pub struct Supervisor { @@ -103,6 +105,10 @@ impl Supervisor { self.handles.get(id).map(|h| &h.inbox) } + pub fn abort_signal_for(&self, id: &str) -> Option { + self.handles.get(id).map(|h| h.abort_signal.clone()) + } + pub fn list_agents(&self) -> Vec<(&str, &str)> { self.handles .values() @@ -115,6 +121,15 @@ impl Supervisor { handle.abort_signal.set_ctrlc(); } } + + pub fn cancel_recursive(&self) { + for handle in self.handles.values() { + handle.abort_signal.set_ctrlc(); + if let Some(child_sup) = handle.child_supervisor.as_ref() { + child_sup.read().cancel_recursive(); + } + } + } } impl Debug for Supervisor { @@ -152,6 +167,7 @@ mod tests { inbox: Arc::new(Inbox::new()), abort_signal: create_abort_signal(), join_handle, + child_supervisor: None, } }