From 41b2638bddc6df544b7080c33639c059bf80ba3d Mon Sep 17 00:00:00 2001 From: Alex Clarke Date: Mon, 11 May 2026 13:57:02 -0600 Subject: [PATCH] fix: make the agent__collect escalation-aware so it doesn't freeze on sub-agent escalations --- src/function/supervisor.rs | 74 +++++++++++++++++++++++++++++++------- 1 file changed, 62 insertions(+), 12 deletions(-) diff --git a/src/function/supervisor.rs b/src/function/supervisor.rs index 0fe4ade..56b8455 100644 --- a/src/function/supervisor.rs +++ b/src/function/supervisor.rs @@ -13,6 +13,8 @@ use parking_lot::RwLock; use serde_json::{Value, json}; use std::pin::Pin; use std::sync::Arc; +use std::time::Duration; +use tokio::time; use uuid::Uuid; pub const SUPERVISOR_FUNCTION_PREFIX: &str = "agent__"; @@ -601,11 +603,24 @@ async fn handle_check(ctx: &mut RequestContext, args: &Value) -> Result { match is_finished { Some(true) => handle_collect(ctx, args).await, - Some(false) => Ok(json!({ - "status": "pending", - "id": id, - "message": "Agent is still running" - })), + Some(false) => { + let mut result = json!({ + "status": "pending", + "id": id, + "message": "Agent is still running" + }); + + if let Some(queue) = ctx.root_escalation_queue() && + queue.has_pending() { + let summary = queue.pending_summary(); + result["pending_escalations"] = json!(summary); + result["message"] = json!( + "Agent is still running. Child agents have pending escalations that need your reply via agent__reply_escalation." + ); + } + + Ok(result) + } None => Ok(json!({ "status": "error", "message": format!("No agent found with id '{id}'") @@ -619,12 +634,47 @@ async fn handle_collect(ctx: &mut RequestContext, args: &Value) -> Result .and_then(Value::as_str) .ok_or_else(|| anyhow!("'id' is required"))?; + let supervisor = ctx + .supervisor + .as_ref() + .cloned() + .ok_or_else(|| anyhow!("No supervisor active"))?; + + { + let sup = supervisor.read(); + if sup.is_finished(id).is_none() { + return Ok(json!({ + "status": "error", + "message": format!("Agent '{id}' not found. Use agent__check to verify it exists and is finished.") + })); + } + } + + loop { + let is_finished = { + let sup = supervisor.read(); + sup.is_finished(id).unwrap_or(false) + }; + + if is_finished { + break; + } + + if let Some(queue) = ctx.root_escalation_queue() && + queue.has_pending() { + let summary = queue.pending_summary(); + return Ok(json!({ + "status": "pending", + "id": id, + "message": format!("Agent '{id}' is still running, but child agents have pending escalations that need your reply. Reply via agent__reply_escalation, then call agent__collect again."), + "pending_escalations": summary, + })); + } + + time::sleep(Duration::from_millis(200)).await; + } + let handle = { - let supervisor = ctx - .supervisor - .as_ref() - .cloned() - .ok_or_else(|| anyhow!("No supervisor active"))?; let mut sup = supervisor.write(); sup.take(id) }; @@ -649,7 +699,7 @@ async fn handle_collect(ctx: &mut RequestContext, args: &Value) -> Result } None => Ok(json!({ "status": "error", - "message": format!("Agent '{id}' not found. Use agent__check to verify it exists and is finished.") + "message": format!("Agent '{id}' completed but could not be collected. It may have been collected by another call.") })), } } @@ -1193,7 +1243,7 @@ mod tests { let inbox = Arc::new(Inbox::new()); let abort = create_abort_signal(); let join_handle = tokio::spawn(async { - tokio::time::sleep(std::time::Duration::from_secs(60)).await; + time::sleep(Duration::from_secs(60)).await; Ok(AgentResult { id: "slow".into(), agent_name: "test".into(),