fix: make the agent__collect escalation-aware so it doesn't freeze on sub-agent escalations

This commit is contained in:
2026-05-11 13:57:02 -06:00
parent 1fbdcd66d1
commit 84eb82b355
+62 -12
View File
@@ -13,6 +13,8 @@ use parking_lot::RwLock;
use serde_json::{Value, json}; use serde_json::{Value, json};
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use tokio::time;
use uuid::Uuid; use uuid::Uuid;
pub const SUPERVISOR_FUNCTION_PREFIX: &str = "agent__"; pub const SUPERVISOR_FUNCTION_PREFIX: &str = "agent__";
@@ -601,11 +603,24 @@ async fn handle_check(ctx: &mut RequestContext, args: &Value) -> Result<Value> {
match is_finished { match is_finished {
Some(true) => handle_collect(ctx, args).await, Some(true) => handle_collect(ctx, args).await,
Some(false) => Ok(json!({ Some(false) => {
"status": "pending", let mut result = json!({
"id": id, "status": "pending",
"message": "Agent is still running" "id": id,
})), "message": "Agent is still running"
});
if let Some(queue) = ctx.root_escalation_queue() &&
queue.has_pending() {
let summary = queue.pending_summary();
result["pending_escalations"] = json!(summary);
result["message"] = json!(
"Agent is still running. Child agents have pending escalations that need your reply via agent__reply_escalation."
);
}
Ok(result)
}
None => Ok(json!({ None => Ok(json!({
"status": "error", "status": "error",
"message": format!("No agent found with id '{id}'") "message": format!("No agent found with id '{id}'")
@@ -619,12 +634,47 @@ async fn handle_collect(ctx: &mut RequestContext, args: &Value) -> Result<Value>
.and_then(Value::as_str) .and_then(Value::as_str)
.ok_or_else(|| anyhow!("'id' is required"))?; .ok_or_else(|| anyhow!("'id' is required"))?;
let supervisor = ctx
.supervisor
.as_ref()
.cloned()
.ok_or_else(|| anyhow!("No supervisor active"))?;
{
let sup = supervisor.read();
if sup.is_finished(id).is_none() {
return Ok(json!({
"status": "error",
"message": format!("Agent '{id}' not found. Use agent__check to verify it exists and is finished.")
}));
}
}
loop {
let is_finished = {
let sup = supervisor.read();
sup.is_finished(id).unwrap_or(false)
};
if is_finished {
break;
}
if let Some(queue) = ctx.root_escalation_queue() &&
queue.has_pending() {
let summary = queue.pending_summary();
return Ok(json!({
"status": "pending",
"id": id,
"message": format!("Agent '{id}' is still running, but child agents have pending escalations that need your reply. Reply via agent__reply_escalation, then call agent__collect again."),
"pending_escalations": summary,
}));
}
time::sleep(Duration::from_millis(200)).await;
}
let handle = { let handle = {
let supervisor = ctx
.supervisor
.as_ref()
.cloned()
.ok_or_else(|| anyhow!("No supervisor active"))?;
let mut sup = supervisor.write(); let mut sup = supervisor.write();
sup.take(id) sup.take(id)
}; };
@@ -649,7 +699,7 @@ async fn handle_collect(ctx: &mut RequestContext, args: &Value) -> Result<Value>
} }
None => Ok(json!({ None => Ok(json!({
"status": "error", "status": "error",
"message": format!("Agent '{id}' not found. Use agent__check to verify it exists and is finished.") "message": format!("Agent '{id}' completed but could not be collected. It may have been collected by another call.")
})), })),
} }
} }
@@ -1193,7 +1243,7 @@ mod tests {
let inbox = Arc::new(Inbox::new()); let inbox = Arc::new(Inbox::new());
let abort = create_abort_signal(); let abort = create_abort_signal();
let join_handle = tokio::spawn(async { let join_handle = tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_secs(60)).await; time::sleep(Duration::from_secs(60)).await;
Ok(AgentResult { Ok(AgentResult {
id: "slow".into(), id: "slow".into(),
agent_name: "test".into(), agent_name: "test".into(),