test: Added integration tests for the sub-agent spawning system and inter-agent communication mechanisms
This commit is contained in:
@@ -1087,3 +1087,497 @@ async fn summarize_output(ctx: &RequestContext, agent_name: &str, output: &str)
|
||||
|
||||
Ok(summary)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::config::{AppState, WorkingMode};
|
||||
use crate::supervisor::escalation::{EscalationQueue, EscalationRequest};
|
||||
use serde_json::json;
|
||||
|
||||
fn default_app_state() -> Arc<AppState> {
|
||||
Arc::new(AppState::test_default())
|
||||
}
|
||||
|
||||
fn ctx_with_supervisor(max_concurrent: usize, max_depth: usize) -> RequestContext {
|
||||
let mut ctx = RequestContext::new(default_app_state(), WorkingMode::Cmd);
|
||||
ctx.supervisor = Some(Arc::new(RwLock::new(Supervisor::new(
|
||||
max_concurrent,
|
||||
max_depth,
|
||||
))));
|
||||
ctx
|
||||
}
|
||||
|
||||
fn register_fake_agent(ctx: &mut RequestContext, id: &str, name: &str) {
|
||||
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||
let id_owned = id.to_string();
|
||||
let name_owned = name.to_string();
|
||||
let join_handle = rt.spawn(async move {
|
||||
Ok(AgentResult {
|
||||
id: id_owned,
|
||||
agent_name: name_owned,
|
||||
output: "fake output".into(),
|
||||
exit_status: AgentExitStatus::Completed,
|
||||
})
|
||||
});
|
||||
std::mem::forget(rt);
|
||||
|
||||
let handle = AgentHandle {
|
||||
id: id.to_string(),
|
||||
agent_name: name.to_string(),
|
||||
depth: 1,
|
||||
inbox: Arc::new(Inbox::new()),
|
||||
abort_signal: create_abort_signal(),
|
||||
join_handle,
|
||||
};
|
||||
ctx.supervisor
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.write()
|
||||
.register(handle)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn run_async<F: Future>(f: F) -> F::Output {
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap()
|
||||
.block_on(f)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_list_empty_supervisor() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
let result = handle_list(&mut ctx).unwrap();
|
||||
assert_eq!(result["active_count"], 0);
|
||||
assert_eq!(result["max_concurrent"], 4);
|
||||
assert!(result["agents"].as_array().unwrap().is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_list_with_agents() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
register_fake_agent(&mut ctx, "a1", "explore");
|
||||
register_fake_agent(&mut ctx, "a2", "coder");
|
||||
let result = handle_list(&mut ctx).unwrap();
|
||||
assert_eq!(result["active_count"], 2);
|
||||
let agents = result["agents"].as_array().unwrap();
|
||||
assert_eq!(agents.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_list_no_supervisor_errors() {
|
||||
let mut ctx = RequestContext::new(default_app_state(), WorkingMode::Cmd);
|
||||
let result = handle_list(&mut ctx);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_check_unknown_agent() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
let result = run_async(handle_check(&mut ctx, &json!({"id": "nonexistent"})));
|
||||
let val = result.unwrap();
|
||||
assert_eq!(val["status"], "error");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_check_pending_agent() {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
rt.block_on(async {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
let inbox = Arc::new(Inbox::new());
|
||||
let abort = create_abort_signal();
|
||||
let join_handle = tokio::spawn(async {
|
||||
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
|
||||
Ok(AgentResult {
|
||||
id: "slow".into(),
|
||||
agent_name: "test".into(),
|
||||
output: String::new(),
|
||||
exit_status: AgentExitStatus::Completed,
|
||||
})
|
||||
});
|
||||
let handle = AgentHandle {
|
||||
id: "slow".into(),
|
||||
agent_name: "test".into(),
|
||||
depth: 1,
|
||||
inbox,
|
||||
abort_signal: abort,
|
||||
join_handle,
|
||||
};
|
||||
ctx.supervisor
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.write()
|
||||
.register(handle)
|
||||
.unwrap();
|
||||
|
||||
let result = handle_check(&mut ctx, &json!({"id": "slow"}))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result["status"], "pending");
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_cancel_registered_agent() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
register_fake_agent(&mut ctx, "a1", "explore");
|
||||
let result = handle_cancel(&mut ctx, &json!({"id": "a1"})).unwrap();
|
||||
assert_eq!(result["status"], "ok");
|
||||
assert_eq!(ctx.supervisor.as_ref().unwrap().read().active_count(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_cancel_unknown_agent() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
let result = handle_cancel(&mut ctx, &json!({"id": "missing"})).unwrap();
|
||||
assert_eq!(result["status"], "error");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_cancel_no_supervisor_errors() {
|
||||
let mut ctx = RequestContext::new(default_app_state(), WorkingMode::Cmd);
|
||||
let result = handle_cancel(&mut ctx, &json!({"id": "x"}));
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_send_message_to_registered_agent() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
register_fake_agent(&mut ctx, "a1", "explore");
|
||||
let result = handle_send_message(
|
||||
&mut ctx,
|
||||
&json!({"id": "a1", "message": "hello from parent"}),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(result["status"], "ok");
|
||||
|
||||
let inbox = ctx
|
||||
.supervisor
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.read()
|
||||
.inbox("a1")
|
||||
.unwrap()
|
||||
.clone();
|
||||
let msgs = inbox.drain();
|
||||
assert_eq!(msgs.len(), 1);
|
||||
match &msgs[0].payload {
|
||||
EnvelopePayload::Text { content } => assert_eq!(content, "hello from parent"),
|
||||
_ => panic!("expected text payload"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_send_message_to_unknown_agent() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
let result =
|
||||
handle_send_message(&mut ctx, &json!({"id": "missing", "message": "hi"})).unwrap();
|
||||
assert_eq!(result["status"], "error");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_check_inbox_with_messages() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
let inbox = Arc::new(Inbox::new());
|
||||
inbox.deliver(Envelope {
|
||||
from: "sibling".into(),
|
||||
to: "me".into(),
|
||||
payload: EnvelopePayload::Text {
|
||||
content: "hey".into(),
|
||||
},
|
||||
timestamp: Utc::now(),
|
||||
});
|
||||
ctx.inbox = Some(inbox);
|
||||
|
||||
let result = handle_check_inbox(&mut ctx).unwrap();
|
||||
assert_eq!(result["count"], 1);
|
||||
let messages = result["messages"].as_array().unwrap();
|
||||
assert_eq!(messages[0]["from"], "sibling");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_check_inbox_no_inbox() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
let result = handle_check_inbox(&mut ctx).unwrap();
|
||||
assert_eq!(result["count"], 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_check_inbox_empty_inbox() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
ctx.inbox = Some(Arc::new(Inbox::new()));
|
||||
let result = handle_check_inbox(&mut ctx).unwrap();
|
||||
assert_eq!(result["count"], 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_reply_escalation_success() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
let queue = Arc::new(EscalationQueue::new());
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
queue.submit(EscalationRequest {
|
||||
id: "esc_1".into(),
|
||||
from_agent_id: "a1".into(),
|
||||
from_agent_name: "explore".into(),
|
||||
question: "What do?".into(),
|
||||
options: None,
|
||||
reply_tx: tx,
|
||||
});
|
||||
ctx.escalation_queue = Some(queue);
|
||||
|
||||
let result = handle_reply_escalation(
|
||||
&mut ctx,
|
||||
&json!({"escalation_id": "esc_1", "reply": "do X"}),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(result["status"], "ok");
|
||||
assert_eq!(rx.blocking_recv().unwrap(), "do X");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_reply_escalation_missing_id() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
ctx.escalation_queue = Some(Arc::new(EscalationQueue::new()));
|
||||
let result = handle_reply_escalation(
|
||||
&mut ctx,
|
||||
&json!({"escalation_id": "missing", "reply": "whatever"}),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(result["status"], "error");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_reply_escalation_no_queue_errors() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
let result =
|
||||
handle_reply_escalation(&mut ctx, &json!({"escalation_id": "x", "reply": "y"}));
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_task_create_simple() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
let result = handle_task_create(&mut ctx, &json!({"subject": "Do research"})).unwrap();
|
||||
assert_eq!(result["status"], "ok");
|
||||
assert!(result["task_id"].as_str().is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_task_create_with_dependencies() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
handle_task_create(&mut ctx, &json!({"subject": "Step 1"})).unwrap();
|
||||
let result =
|
||||
handle_task_create(&mut ctx, &json!({"subject": "Step 2", "blocked_by": ["1"]}))
|
||||
.unwrap();
|
||||
assert_eq!(result["status"], "ok");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_task_create_with_dispatch_agent() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
let result = handle_task_create(
|
||||
&mut ctx,
|
||||
&json!({"subject": "Auto task", "agent": "coder", "prompt": "do it"}),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(result["status"], "ok");
|
||||
assert_eq!(result["auto_dispatch"], true);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_task_create_agent_without_prompt_errors() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
let result = handle_task_create(&mut ctx, &json!({"subject": "Bad", "agent": "coder"}));
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_task_list_empty() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
let result = handle_task_list(&mut ctx).unwrap();
|
||||
assert!(result["tasks"].as_array().unwrap().is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_task_list_with_tasks() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
handle_task_create(&mut ctx, &json!({"subject": "A"})).unwrap();
|
||||
handle_task_create(&mut ctx, &json!({"subject": "B"})).unwrap();
|
||||
let result = handle_task_list(&mut ctx).unwrap();
|
||||
assert_eq!(result["tasks"].as_array().unwrap().len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_task_complete_unblocks_dependents() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
handle_task_create(&mut ctx, &json!({"subject": "Step 1"})).unwrap();
|
||||
handle_task_create(&mut ctx, &json!({"subject": "Step 2", "blocked_by": ["1"]})).unwrap();
|
||||
|
||||
let result = run_async(handle_task_complete(&mut ctx, &json!({"task_id": "1"}))).unwrap();
|
||||
assert_eq!(result["status"], "ok");
|
||||
let newly_runnable = result["newly_runnable"].as_array().unwrap();
|
||||
assert_eq!(newly_runnable.len(), 1);
|
||||
assert_eq!(newly_runnable[0]["id"], "2");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_task_fail_marks_failed() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
handle_task_create(&mut ctx, &json!({"subject": "Doomed"})).unwrap();
|
||||
let result = handle_task_fail(&mut ctx, &json!({"task_id": "1"})).unwrap();
|
||||
assert_eq!(result["status"], "ok");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_task_fail_reports_blocked_dependents() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
handle_task_create(&mut ctx, &json!({"subject": "A"})).unwrap();
|
||||
handle_task_create(&mut ctx, &json!({"subject": "B", "blocked_by": ["1"]})).unwrap();
|
||||
let result = handle_task_fail(&mut ctx, &json!({"task_id": "1"})).unwrap();
|
||||
let deps = result["blocked_dependents"].as_array().unwrap();
|
||||
assert_eq!(deps.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_task_fail_missing_task() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
let result = handle_task_fail(&mut ctx, &json!({"task_id": "nonexistent"})).unwrap();
|
||||
assert_eq!(result["status"], "error");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn dispatch_unknown_action_errors() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
let result = run_async(handle_supervisor_tool(&mut ctx, "agent__bogus", &json!({})));
|
||||
assert!(result.is_err());
|
||||
assert!(
|
||||
result
|
||||
.unwrap_err()
|
||||
.to_string()
|
||||
.contains("Unknown supervisor action")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn dispatch_routes_list() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
let result =
|
||||
run_async(handle_supervisor_tool(&mut ctx, "agent__list", &json!({}))).unwrap();
|
||||
assert!(result["active_count"].is_number());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn dispatch_routes_task_list() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
let result = run_async(handle_supervisor_tool(
|
||||
&mut ctx,
|
||||
"agent__task_list",
|
||||
&json!({}),
|
||||
))
|
||||
.unwrap();
|
||||
assert!(result["tasks"].is_array());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn new_for_child_inherits_escalation_queue() {
|
||||
let mut parent = ctx_with_supervisor(4, 3);
|
||||
let queue = parent.ensure_root_escalation_queue();
|
||||
|
||||
let child = RequestContext::new_for_child(
|
||||
default_app_state(),
|
||||
&parent,
|
||||
2,
|
||||
Arc::new(Inbox::new()),
|
||||
"child_1".into(),
|
||||
);
|
||||
|
||||
assert!(child.escalation_queue.is_some());
|
||||
assert!(Arc::ptr_eq(
|
||||
child.escalation_queue.as_ref().unwrap(),
|
||||
&queue
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn new_for_child_sets_depth_and_id() {
|
||||
let parent = ctx_with_supervisor(4, 3);
|
||||
let child = RequestContext::new_for_child(
|
||||
default_app_state(),
|
||||
&parent,
|
||||
3,
|
||||
Arc::new(Inbox::new()),
|
||||
"child_xyz".into(),
|
||||
);
|
||||
assert_eq!(child.current_depth, 3);
|
||||
assert_eq!(child.self_agent_id, Some("child_xyz".to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn new_for_child_has_inbox() {
|
||||
let parent = ctx_with_supervisor(4, 3);
|
||||
let inbox = Arc::new(Inbox::new());
|
||||
let child = RequestContext::new_for_child(
|
||||
default_app_state(),
|
||||
&parent,
|
||||
1,
|
||||
Arc::clone(&inbox),
|
||||
"c1".into(),
|
||||
);
|
||||
assert!(child.inbox.is_some());
|
||||
assert!(Arc::ptr_eq(child.inbox.as_ref().unwrap(), &inbox));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn new_for_child_inherits_parent_supervisor() {
|
||||
let parent = ctx_with_supervisor(4, 3);
|
||||
let child = RequestContext::new_for_child(
|
||||
default_app_state(),
|
||||
&parent,
|
||||
1,
|
||||
Arc::new(Inbox::new()),
|
||||
"c1".into(),
|
||||
);
|
||||
assert!(child.parent_supervisor.is_some());
|
||||
assert!(child.supervisor.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn new_for_child_starts_with_empty_scope() {
|
||||
let parent = ctx_with_supervisor(4, 3);
|
||||
let child = RequestContext::new_for_child(
|
||||
default_app_state(),
|
||||
&parent,
|
||||
1,
|
||||
Arc::new(Inbox::new()),
|
||||
"c1".into(),
|
||||
);
|
||||
assert!(child.tool_scope.functions.is_empty());
|
||||
assert!(child.tool_scope.mcp_runtime.is_empty());
|
||||
assert!(child.role.is_none());
|
||||
assert!(child.session.is_none());
|
||||
assert!(child.agent.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ensure_root_escalation_queue_creates_on_first_call() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
assert!(ctx.escalation_queue.is_none());
|
||||
let q = ctx.ensure_root_escalation_queue();
|
||||
assert!(!q.has_pending());
|
||||
assert!(ctx.escalation_queue.is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ensure_root_escalation_queue_returns_same_on_second_call() {
|
||||
let mut ctx = ctx_with_supervisor(4, 3);
|
||||
let q1 = ctx.ensure_root_escalation_queue();
|
||||
let q2 = ctx.ensure_root_escalation_queue();
|
||||
assert!(Arc::ptr_eq(&q1, &q2));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user