diff --git a/src/function/supervisor.rs b/src/function/supervisor.rs index e133822..851349d 100644 --- a/src/function/supervisor.rs +++ b/src/function/supervisor.rs @@ -225,10 +225,6 @@ pub fn supervisor_function_declarations() -> Vec { ] } -// --------------------------------------------------------------------------- -// Dispatch -// --------------------------------------------------------------------------- - pub async fn handle_supervisor_tool( config: &GlobalConfig, cmd_name: &str, @@ -253,21 +249,6 @@ pub async fn handle_supervisor_tool( } } -// --------------------------------------------------------------------------- -// Child agent execution loop -// --------------------------------------------------------------------------- - -/// Run a child agent to completion, returning its accumulated text output. -/// -/// This mirrors `start_directive` in main.rs but: -/// - Uses `call_chat_completions(print=false)` so nothing goes to stdout -/// - Returns the output text instead of printing it -/// - Loops on tool calls just like `start_directive`'s recursion -/// -/// Returns a boxed future to break the recursive type cycle: -/// handle_spawn → tokio::spawn(run_child_agent) → call_chat_completions -/// → eval_tool_calls → ToolCall::eval → handle_supervisor_tool → handle_spawn -/// Without boxing, the compiler cannot prove Send for the recursive async type. fn run_child_agent( child_config: GlobalConfig, initial_input: Input, @@ -283,8 +264,8 @@ fn run_child_agent( let (output, tool_results) = call_chat_completions( &input, - false, // print=false — silent, no stdout - false, // extract_code=false + false, + false, client.as_ref(), abort_signal.clone(), ) @@ -305,7 +286,6 @@ fn run_child_agent( break; } - // Feed tool results back for the next round (mirrors start_directive recursion) input = input.merge_tool_results(output, tool_results); } @@ -313,10 +293,6 @@ fn run_child_agent( }) } -// --------------------------------------------------------------------------- -// Handlers -// --------------------------------------------------------------------------- - async fn handle_spawn(config: &GlobalConfig, args: &Value) -> Result { let agent_name = args .get("agent") @@ -330,18 +306,15 @@ async fn handle_spawn(config: &GlobalConfig, args: &Value) -> Result { .to_string(); let _task_id = args.get("task_id").and_then(Value::as_str); - // Generate a unique agent ID let short_uuid = &Uuid::new_v4().to_string()[..8]; let agent_id = format!("agent_{agent_name}_{short_uuid}"); - // --- Validate capacity --- - // Read the supervisor to check capacity, then drop locks before doing async work let (max_depth, current_depth) = { let cfg = config.read(); let supervisor = cfg .supervisor .as_ref() - .ok_or_else(|| anyhow!("No supervisor active — agent spawning not enabled"))?; + .ok_or_else(|| anyhow!("No supervisor active; Agent spawning not enabled"))?; let sup = supervisor.read(); if sup.active_count() >= sup.max_concurrent() { return Ok(json!({ @@ -363,7 +336,6 @@ async fn handle_spawn(config: &GlobalConfig, args: &Value) -> Result { })); } - // --- Build an isolated child Config --- let child_inbox = Arc::new(Inbox::new()); let child_config: GlobalConfig = { @@ -384,16 +356,13 @@ async fn handle_spawn(config: &GlobalConfig, args: &Value) -> Result { Arc::new(RwLock::new(child_cfg)) }; - // Load the target agent into the child config let child_abort = create_abort_signal(); Config::use_agent(&child_config, &agent_name, None, child_abort.clone()).await?; - // Create the initial input from the prompt let input = Input::from_str(&child_config, &prompt, None); debug!("Spawning child agent '{agent_name}' as '{agent_id}'"); - // --- Spawn the agent task --- let spawn_agent_id = agent_id.clone(); let spawn_agent_name = agent_name.clone(); let spawn_abort = child_abort.clone(); @@ -417,7 +386,6 @@ async fn handle_spawn(config: &GlobalConfig, args: &Value) -> Result { } }); - // Register the handle with the supervisor let handle = AgentHandle { id: agent_id.clone(), agent_name: agent_name.clone(), @@ -463,7 +431,6 @@ async fn handle_check(config: &GlobalConfig, args: &Value) -> Result { match is_finished { Some(true) => { - // Finished — collect the result handle_collect(config, args).await } Some(false) => Ok(json!({ @@ -484,7 +451,6 @@ async fn handle_collect(config: &GlobalConfig, args: &Value) -> Result { .and_then(Value::as_str) .ok_or_else(|| anyhow!("'id' is required"))?; - // Extract the join handle while holding the lock, then drop the lock before awaiting let handle = { let cfg = config.read(); let supervisor = cfg @@ -497,7 +463,6 @@ async fn handle_collect(config: &GlobalConfig, args: &Value) -> Result { match handle { Some(handle) => { - // Await the join handle OUTSIDE of any lock let result = handle .join_handle .await diff --git a/src/supervisor/mailbox.rs b/src/supervisor/mailbox.rs index 82d6858..6c63e84 100644 --- a/src/supervisor/mailbox.rs +++ b/src/supervisor/mailbox.rs @@ -1,10 +1,5 @@ use serde::{Deserialize, Serialize}; -/// A message envelope routed between agents. -/// -/// Agents communicate by sending `Envelope`s to each other's mailboxes. -/// The sender fires and forgets; the receiver drains its inbox between -/// LLM turns via the `check_inbox` tool. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Envelope { pub from: String, @@ -13,11 +8,6 @@ pub struct Envelope { pub timestamp: chrono::DateTime, } -/// The content of an inter-agent message. -/// -/// Separates the **control plane** (shutdown signals, task lifecycle events) -/// from the **data plane** (free-form text). Control-plane messages are -/// processed before data-plane messages to prevent race conditions. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum EnvelopePayload { @@ -27,13 +17,6 @@ pub enum EnvelopePayload { ShutdownApproved, } -/// A per-agent inbox that collects incoming messages. -/// -/// Backed by a `Vec` behind a `parking_lot::Mutex` so it can be shared -/// between the supervisor (which delivers messages) and the agent's tool -/// handler (which drains them). We use `parking_lot::Mutex` to match the -/// locking convention used elsewhere in Loki (`parking_lot::RwLock` for -/// GlobalConfig). #[derive(Debug, Default)] pub struct Inbox { messages: parking_lot::Mutex>, @@ -50,18 +33,12 @@ impl Inbox { self.messages.lock().push(envelope); } - /// Drain all pending messages, returning them sorted with control-plane - /// messages first (shutdown, task events) then data-plane (text). - /// This ordering prevents the class of bugs where a text message - /// references state that a control message was supposed to set up. pub fn drain(&self) -> Vec { let mut msgs = { let mut guard = self.messages.lock(); std::mem::take(&mut *guard) }; - // Stable partition: control messages first, then data messages, - // preserving relative order within each group. msgs.sort_by_key(|e| match &e.payload { EnvelopePayload::ShutdownRequest { .. } => 0, EnvelopePayload::ShutdownApproved => 0, diff --git a/src/supervisor/mod.rs b/src/supervisor/mod.rs index 6514799..7f277fc 100644 --- a/src/supervisor/mod.rs +++ b/src/supervisor/mod.rs @@ -35,8 +35,6 @@ pub struct AgentHandle { pub join_handle: JoinHandle>, } -/// Lives as an `Arc>` alongside GlobalConfig, -/// NOT inside it — avoids adding lock contention to the shared Config. pub struct Supervisor { handles: HashMap, task_queue: TaskQueue, diff --git a/src/supervisor/taskqueue.rs b/src/supervisor/taskqueue.rs index bd78a46..649b93d 100644 --- a/src/supervisor/taskqueue.rs +++ b/src/supervisor/taskqueue.rs @@ -122,13 +122,13 @@ impl TaskQueue { } pub fn claim(&mut self, task_id: &str, owner: &str) -> bool { - if let Some(task) = self.tasks.get_mut(task_id) { - if task.is_runnable() && task.owner.is_none() { - task.owner = Some(owner.to_string()); - task.status = TaskStatus::InProgress; - return true; - } + if let Some(task) = self.tasks.get_mut(task_id) && + task.is_runnable() && task.owner.is_none() { + task.owner = Some(owner.to_string()); + task.status = TaskStatus::InProgress; + return true; } + false } @@ -146,8 +146,6 @@ impl TaskQueue { tasks } - // DFS cycle detection: would adding task_id -> blocked_by create a cycle? - // A cycle exists if blocked_by can reach task_id through existing dependencies. fn would_create_cycle(&self, task_id: &str, blocked_by: &str) -> bool { let mut visited = HashSet::new(); let mut stack = vec![blocked_by.to_string()]; @@ -156,11 +154,10 @@ impl TaskQueue { if current == task_id { return true; } - if visited.insert(current.clone()) { - if let Some(task) = self.tasks.get(¤t) { - for dep in &task.blocked_by { - stack.push(dep.clone()); - } + if visited.insert(current.clone()) && + let Some(task) = self.tasks.get(¤t) { + for dep in &task.blocked_by { + stack.push(dep.clone()); } } }