diff --git a/src/config/agent.rs b/src/config/agent.rs index 0145bd6..630c10b 100644 --- a/src/config/agent.rs +++ b/src/config/agent.rs @@ -590,6 +590,12 @@ pub struct AgentConfig { pub agent_session: Option, #[serde(default)] pub auto_continue: bool, + #[serde(default)] + pub can_spawn_agents: bool, + #[serde(default = "default_max_concurrent_agents")] + pub max_concurrent_agents: usize, + #[serde(default = "default_max_agent_depth")] + pub max_agent_depth: usize, #[serde(default = "default_max_auto_continues")] pub max_auto_continues: usize, #[serde(default = "default_true")] @@ -622,6 +628,14 @@ fn default_max_auto_continues() -> usize { 10 } +fn default_max_concurrent_agents() -> usize { + 4 +} + +fn default_max_agent_depth() -> usize { + 3 +} + fn default_true() -> bool { true } diff --git a/src/main.rs b/src/main.rs index 86b67de..03ea6a0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,7 @@ mod repl; mod utils; mod mcp; mod parsers; +mod supervisor; mod vault; #[macro_use] diff --git a/src/supervisor/mailbox.rs b/src/supervisor/mailbox.rs new file mode 100644 index 0000000..82d6858 --- /dev/null +++ b/src/supervisor/mailbox.rs @@ -0,0 +1,87 @@ +use serde::{Deserialize, Serialize}; + +/// A message envelope routed between agents. +/// +/// Agents communicate by sending `Envelope`s to each other's mailboxes. +/// The sender fires and forgets; the receiver drains its inbox between +/// LLM turns via the `check_inbox` tool. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Envelope { + pub from: String, + pub to: String, + pub payload: EnvelopePayload, + pub timestamp: chrono::DateTime, +} + +/// The content of an inter-agent message. +/// +/// Separates the **control plane** (shutdown signals, task lifecycle events) +/// from the **data plane** (free-form text). Control-plane messages are +/// processed before data-plane messages to prevent race conditions. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum EnvelopePayload { + Text { content: String }, + TaskCompleted { task_id: String, summary: String }, + ShutdownRequest { reason: String }, + ShutdownApproved, +} + +/// A per-agent inbox that collects incoming messages. +/// +/// Backed by a `Vec` behind a `parking_lot::Mutex` so it can be shared +/// between the supervisor (which delivers messages) and the agent's tool +/// handler (which drains them). We use `parking_lot::Mutex` to match the +/// locking convention used elsewhere in Loki (`parking_lot::RwLock` for +/// GlobalConfig). +#[derive(Debug, Default)] +pub struct Inbox { + messages: parking_lot::Mutex>, +} + +impl Inbox { + pub fn new() -> Self { + Self { + messages: parking_lot::Mutex::new(Vec::new()), + } + } + + pub fn deliver(&self, envelope: Envelope) { + self.messages.lock().push(envelope); + } + + /// Drain all pending messages, returning them sorted with control-plane + /// messages first (shutdown, task events) then data-plane (text). + /// This ordering prevents the class of bugs where a text message + /// references state that a control message was supposed to set up. + pub fn drain(&self) -> Vec { + let mut msgs = { + let mut guard = self.messages.lock(); + std::mem::take(&mut *guard) + }; + + // Stable partition: control messages first, then data messages, + // preserving relative order within each group. + msgs.sort_by_key(|e| match &e.payload { + EnvelopePayload::ShutdownRequest { .. } => 0, + EnvelopePayload::ShutdownApproved => 0, + EnvelopePayload::TaskCompleted { .. } => 1, + EnvelopePayload::Text { .. } => 2, + }); + + msgs + } + + pub fn pending_count(&self) -> usize { + self.messages.lock().len() + } +} + +impl Clone for Inbox { + fn clone(&self) -> Self { + let messages = self.messages.lock().clone(); + Self { + messages: parking_lot::Mutex::new(messages), + } + } +} diff --git a/src/supervisor/mod.rs b/src/supervisor/mod.rs new file mode 100644 index 0000000..4d36bdf --- /dev/null +++ b/src/supervisor/mod.rs @@ -0,0 +1,130 @@ +pub mod mailbox; +pub mod taskqueue; + +use crate::utils::AbortSignal; +use mailbox::Inbox; +use taskqueue::TaskQueue; + +use anyhow::{Result, bail}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::task::JoinHandle; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum AgentExitStatus { + Completed, + Cancelled, + Failed(String), +} + +pub struct AgentResult { + pub id: String, + pub agent_name: String, + pub output: String, + pub exit_status: AgentExitStatus, +} + +pub struct AgentHandle { + pub id: String, + pub agent_name: String, + pub depth: usize, + pub inbox: Arc, + pub abort_signal: AbortSignal, + pub join_handle: JoinHandle>, +} + +/// Lives as an `Arc>` alongside GlobalConfig, +/// NOT inside it — avoids adding lock contention to the shared Config. +pub struct Supervisor { + handles: HashMap, + task_queue: TaskQueue, + max_concurrent: usize, + max_depth: usize, +} + +impl Supervisor { + pub fn new(max_concurrent: usize, max_depth: usize) -> Self { + Self { + handles: HashMap::new(), + task_queue: TaskQueue::new(), + max_concurrent, + max_depth, + } + } + + pub fn active_count(&self) -> usize { + self.handles.len() + } + + pub fn max_concurrent(&self) -> usize { + self.max_concurrent + } + + pub fn max_depth(&self) -> usize { + self.max_depth + } + + pub fn task_queue(&self) -> &TaskQueue { + &self.task_queue + } + + pub fn task_queue_mut(&mut self) -> &mut TaskQueue { + &mut self.task_queue + } + + pub fn register(&mut self, handle: AgentHandle) -> Result<()> { + if self.handles.len() >= self.max_concurrent { + bail!( + "Cannot spawn agent: at capacity ({}/{})", + self.handles.len(), + self.max_concurrent + ); + } + if handle.depth > self.max_depth { + bail!( + "Cannot spawn agent: max depth exceeded ({}/{})", + handle.depth, + self.max_depth + ); + } + self.handles.insert(handle.id.clone(), handle); + Ok(()) + } + + pub fn is_finished(&self, id: &str) -> Option { + self.handles.get(id).map(|h| h.join_handle.is_finished()) + } + + pub fn take_if_finished(&mut self, id: &str) -> Option { + if self + .handles + .get(id) + .is_some_and(|h| h.join_handle.is_finished()) + { + self.handles.remove(id) + } else { + None + } + } + + pub fn take(&mut self, id: &str) -> Option { + self.handles.remove(id) + } + + pub fn inbox(&self, id: &str) -> Option<&Arc> { + self.handles.get(id).map(|h| &h.inbox) + } + + pub fn list_agents(&self) -> Vec<(&str, &str)> { + self.handles + .values() + .map(|h| (h.id.as_str(), h.agent_name.as_str())) + .collect() + } + + pub fn cancel_all(&self) { + for handle in self.handles.values() { + handle.abort_signal.set_ctrlc(); + } + } +} diff --git a/src/supervisor/taskqueue.rs b/src/supervisor/taskqueue.rs new file mode 100644 index 0000000..bd78a46 --- /dev/null +++ b/src/supervisor/taskqueue.rs @@ -0,0 +1,254 @@ +use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, HashSet}; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum TaskStatus { + Pending, + Blocked, + InProgress, + Completed, + Failed, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TaskNode { + pub id: String, + pub subject: String, + pub description: String, + pub status: TaskStatus, + pub owner: Option, + pub blocked_by: HashSet, + pub blocks: HashSet, +} + +impl TaskNode { + pub fn new(id: String, subject: String, description: String) -> Self { + Self { + id, + subject, + description, + status: TaskStatus::Pending, + owner: None, + blocked_by: HashSet::new(), + blocks: HashSet::new(), + } + } + + pub fn is_runnable(&self) -> bool { + self.status == TaskStatus::Pending && self.blocked_by.is_empty() + } +} + +#[derive(Debug, Clone, Default)] +pub struct TaskQueue { + tasks: HashMap, + next_id: usize, +} + +impl TaskQueue { + pub fn new() -> Self { + Self { + tasks: HashMap::new(), + next_id: 1, + } + } + + pub fn create(&mut self, subject: String, description: String) -> String { + let id = self.next_id.to_string(); + self.next_id += 1; + let task = TaskNode::new(id.clone(), subject, description); + self.tasks.insert(id.clone(), task); + id + } + + pub fn add_dependency(&mut self, task_id: &str, blocked_by: &str) -> Result<(), String> { + if task_id == blocked_by { + return Err("A task cannot depend on itself".into()); + } + if !self.tasks.contains_key(blocked_by) { + return Err(format!("Dependency task '{blocked_by}' does not exist")); + } + if !self.tasks.contains_key(task_id) { + return Err(format!("Task '{task_id}' does not exist")); + } + + if self.would_create_cycle(task_id, blocked_by) { + return Err(format!( + "Adding dependency {task_id} -> {blocked_by} would create a cycle" + )); + } + + if let Some(task) = self.tasks.get_mut(task_id) { + task.blocked_by.insert(blocked_by.to_string()); + task.status = TaskStatus::Blocked; + } + if let Some(blocker) = self.tasks.get_mut(blocked_by) { + blocker.blocks.insert(task_id.to_string()); + } + Ok(()) + } + + pub fn complete(&mut self, task_id: &str) -> Vec { + let mut newly_runnable = Vec::new(); + + let dependents: Vec = self + .tasks + .get(task_id) + .map(|t| t.blocks.iter().cloned().collect()) + .unwrap_or_default(); + + if let Some(task) = self.tasks.get_mut(task_id) { + task.status = TaskStatus::Completed; + } + + for dep_id in &dependents { + if let Some(dep) = self.tasks.get_mut(dep_id) { + dep.blocked_by.remove(task_id); + if dep.blocked_by.is_empty() && dep.status == TaskStatus::Blocked { + dep.status = TaskStatus::Pending; + newly_runnable.push(dep_id.clone()); + } + } + } + + newly_runnable + } + + pub fn fail(&mut self, task_id: &str) { + if let Some(task) = self.tasks.get_mut(task_id) { + task.status = TaskStatus::Failed; + } + } + + pub fn claim(&mut self, task_id: &str, owner: &str) -> bool { + if let Some(task) = self.tasks.get_mut(task_id) { + if task.is_runnable() && task.owner.is_none() { + task.owner = Some(owner.to_string()); + task.status = TaskStatus::InProgress; + return true; + } + } + false + } + + pub fn runnable_tasks(&self) -> Vec<&TaskNode> { + self.tasks.values().filter(|t| t.is_runnable()).collect() + } + + pub fn get(&self, task_id: &str) -> Option<&TaskNode> { + self.tasks.get(task_id) + } + + pub fn list(&self) -> Vec<&TaskNode> { + let mut tasks: Vec<&TaskNode> = self.tasks.values().collect(); + tasks.sort_by_key(|t| t.id.parse::().unwrap_or(0)); + tasks + } + + // DFS cycle detection: would adding task_id -> blocked_by create a cycle? + // A cycle exists if blocked_by can reach task_id through existing dependencies. + fn would_create_cycle(&self, task_id: &str, blocked_by: &str) -> bool { + let mut visited = HashSet::new(); + let mut stack = vec![blocked_by.to_string()]; + + while let Some(current) = stack.pop() { + if current == task_id { + return true; + } + if visited.insert(current.clone()) { + if let Some(task) = self.tasks.get(¤t) { + for dep in &task.blocked_by { + stack.push(dep.clone()); + } + } + } + } + + false + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_create_and_list() { + let mut queue = TaskQueue::new(); + let id1 = queue.create("Research".into(), "Research auth patterns".into()); + let id2 = queue.create("Implement".into(), "Write the code".into()); + + assert_eq!(id1, "1"); + assert_eq!(id2, "2"); + assert_eq!(queue.list().len(), 2); + } + + #[test] + fn test_dependency_and_completion() { + let mut queue = TaskQueue::new(); + let id1 = queue.create("Step 1".into(), "".into()); + let id2 = queue.create("Step 2".into(), "".into()); + + queue.add_dependency(&id2, &id1).unwrap(); + + assert!(queue.get(&id1).unwrap().is_runnable()); + assert!(!queue.get(&id2).unwrap().is_runnable()); + assert_eq!(queue.get(&id2).unwrap().status, TaskStatus::Blocked); + + let unblocked = queue.complete(&id1); + assert_eq!(unblocked, vec![id2.clone()]); + assert!(queue.get(&id2).unwrap().is_runnable()); + } + + #[test] + fn test_fan_in_dependency() { + let mut queue = TaskQueue::new(); + let id1 = queue.create("A".into(), "".into()); + let id2 = queue.create("B".into(), "".into()); + let id3 = queue.create("C (needs A and B)".into(), "".into()); + + queue.add_dependency(&id3, &id1).unwrap(); + queue.add_dependency(&id3, &id2).unwrap(); + + assert!(!queue.get(&id3).unwrap().is_runnable()); + + let unblocked = queue.complete(&id1); + assert!(unblocked.is_empty()); + assert!(!queue.get(&id3).unwrap().is_runnable()); + + let unblocked = queue.complete(&id2); + assert_eq!(unblocked, vec![id3.clone()]); + assert!(queue.get(&id3).unwrap().is_runnable()); + } + + #[test] + fn test_cycle_detection() { + let mut queue = TaskQueue::new(); + let id1 = queue.create("A".into(), "".into()); + let id2 = queue.create("B".into(), "".into()); + + queue.add_dependency(&id2, &id1).unwrap(); + let result = queue.add_dependency(&id1, &id2); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("cycle")); + } + + #[test] + fn test_self_dependency_rejected() { + let mut queue = TaskQueue::new(); + let id1 = queue.create("A".into(), "".into()); + let result = queue.add_dependency(&id1, &id1); + assert!(result.is_err()); + } + + #[test] + fn test_claim() { + let mut queue = TaskQueue::new(); + let id1 = queue.create("Task".into(), "".into()); + + assert!(queue.claim(&id1, "worker-1")); + assert!(!queue.claim(&id1, "worker-2")); + assert_eq!(queue.get(&id1).unwrap().status, TaskStatus::InProgress); + } +}