feat: Initial models for agent parallelization
This commit is contained in:
@@ -590,6 +590,12 @@ pub struct AgentConfig {
|
|||||||
pub agent_session: Option<String>,
|
pub agent_session: Option<String>,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub auto_continue: bool,
|
pub auto_continue: bool,
|
||||||
|
#[serde(default)]
|
||||||
|
pub can_spawn_agents: bool,
|
||||||
|
#[serde(default = "default_max_concurrent_agents")]
|
||||||
|
pub max_concurrent_agents: usize,
|
||||||
|
#[serde(default = "default_max_agent_depth")]
|
||||||
|
pub max_agent_depth: usize,
|
||||||
#[serde(default = "default_max_auto_continues")]
|
#[serde(default = "default_max_auto_continues")]
|
||||||
pub max_auto_continues: usize,
|
pub max_auto_continues: usize,
|
||||||
#[serde(default = "default_true")]
|
#[serde(default = "default_true")]
|
||||||
@@ -622,6 +628,14 @@ fn default_max_auto_continues() -> usize {
|
|||||||
10
|
10
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn default_max_concurrent_agents() -> usize {
|
||||||
|
4
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_max_agent_depth() -> usize {
|
||||||
|
3
|
||||||
|
}
|
||||||
|
|
||||||
fn default_true() -> bool {
|
fn default_true() -> bool {
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ mod repl;
|
|||||||
mod utils;
|
mod utils;
|
||||||
mod mcp;
|
mod mcp;
|
||||||
mod parsers;
|
mod parsers;
|
||||||
|
mod supervisor;
|
||||||
mod vault;
|
mod vault;
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
|
|||||||
@@ -0,0 +1,87 @@
|
|||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
/// A message envelope routed between agents.
|
||||||
|
///
|
||||||
|
/// Agents communicate by sending `Envelope`s to each other's mailboxes.
|
||||||
|
/// The sender fires and forgets; the receiver drains its inbox between
|
||||||
|
/// LLM turns via the `check_inbox` tool.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct Envelope {
|
||||||
|
pub from: String,
|
||||||
|
pub to: String,
|
||||||
|
pub payload: EnvelopePayload,
|
||||||
|
pub timestamp: chrono::DateTime<chrono::Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The content of an inter-agent message.
|
||||||
|
///
|
||||||
|
/// Separates the **control plane** (shutdown signals, task lifecycle events)
|
||||||
|
/// from the **data plane** (free-form text). Control-plane messages are
|
||||||
|
/// processed before data-plane messages to prevent race conditions.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(tag = "type", rename_all = "snake_case")]
|
||||||
|
pub enum EnvelopePayload {
|
||||||
|
Text { content: String },
|
||||||
|
TaskCompleted { task_id: String, summary: String },
|
||||||
|
ShutdownRequest { reason: String },
|
||||||
|
ShutdownApproved,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A per-agent inbox that collects incoming messages.
|
||||||
|
///
|
||||||
|
/// Backed by a `Vec` behind a `parking_lot::Mutex` so it can be shared
|
||||||
|
/// between the supervisor (which delivers messages) and the agent's tool
|
||||||
|
/// handler (which drains them). We use `parking_lot::Mutex` to match the
|
||||||
|
/// locking convention used elsewhere in Loki (`parking_lot::RwLock` for
|
||||||
|
/// GlobalConfig).
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub struct Inbox {
|
||||||
|
messages: parking_lot::Mutex<Vec<Envelope>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Inbox {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
messages: parking_lot::Mutex::new(Vec::new()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn deliver(&self, envelope: Envelope) {
|
||||||
|
self.messages.lock().push(envelope);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Drain all pending messages, returning them sorted with control-plane
|
||||||
|
/// messages first (shutdown, task events) then data-plane (text).
|
||||||
|
/// This ordering prevents the class of bugs where a text message
|
||||||
|
/// references state that a control message was supposed to set up.
|
||||||
|
pub fn drain(&self) -> Vec<Envelope> {
|
||||||
|
let mut msgs = {
|
||||||
|
let mut guard = self.messages.lock();
|
||||||
|
std::mem::take(&mut *guard)
|
||||||
|
};
|
||||||
|
|
||||||
|
// Stable partition: control messages first, then data messages,
|
||||||
|
// preserving relative order within each group.
|
||||||
|
msgs.sort_by_key(|e| match &e.payload {
|
||||||
|
EnvelopePayload::ShutdownRequest { .. } => 0,
|
||||||
|
EnvelopePayload::ShutdownApproved => 0,
|
||||||
|
EnvelopePayload::TaskCompleted { .. } => 1,
|
||||||
|
EnvelopePayload::Text { .. } => 2,
|
||||||
|
});
|
||||||
|
|
||||||
|
msgs
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn pending_count(&self) -> usize {
|
||||||
|
self.messages.lock().len()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Clone for Inbox {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
let messages = self.messages.lock().clone();
|
||||||
|
Self {
|
||||||
|
messages: parking_lot::Mutex::new(messages),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,130 @@
|
|||||||
|
pub mod mailbox;
|
||||||
|
pub mod taskqueue;
|
||||||
|
|
||||||
|
use crate::utils::AbortSignal;
|
||||||
|
use mailbox::Inbox;
|
||||||
|
use taskqueue::TaskQueue;
|
||||||
|
|
||||||
|
use anyhow::{Result, bail};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub enum AgentExitStatus {
|
||||||
|
Completed,
|
||||||
|
Cancelled,
|
||||||
|
Failed(String),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct AgentResult {
|
||||||
|
pub id: String,
|
||||||
|
pub agent_name: String,
|
||||||
|
pub output: String,
|
||||||
|
pub exit_status: AgentExitStatus,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct AgentHandle {
|
||||||
|
pub id: String,
|
||||||
|
pub agent_name: String,
|
||||||
|
pub depth: usize,
|
||||||
|
pub inbox: Arc<Inbox>,
|
||||||
|
pub abort_signal: AbortSignal,
|
||||||
|
pub join_handle: JoinHandle<Result<AgentResult>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Lives as an `Arc<parking_lot::RwLock<Supervisor>>` alongside GlobalConfig,
|
||||||
|
/// NOT inside it — avoids adding lock contention to the shared Config.
|
||||||
|
pub struct Supervisor {
|
||||||
|
handles: HashMap<String, AgentHandle>,
|
||||||
|
task_queue: TaskQueue,
|
||||||
|
max_concurrent: usize,
|
||||||
|
max_depth: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Supervisor {
|
||||||
|
pub fn new(max_concurrent: usize, max_depth: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
handles: HashMap::new(),
|
||||||
|
task_queue: TaskQueue::new(),
|
||||||
|
max_concurrent,
|
||||||
|
max_depth,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn active_count(&self) -> usize {
|
||||||
|
self.handles.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn max_concurrent(&self) -> usize {
|
||||||
|
self.max_concurrent
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn max_depth(&self) -> usize {
|
||||||
|
self.max_depth
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn task_queue(&self) -> &TaskQueue {
|
||||||
|
&self.task_queue
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn task_queue_mut(&mut self) -> &mut TaskQueue {
|
||||||
|
&mut self.task_queue
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn register(&mut self, handle: AgentHandle) -> Result<()> {
|
||||||
|
if self.handles.len() >= self.max_concurrent {
|
||||||
|
bail!(
|
||||||
|
"Cannot spawn agent: at capacity ({}/{})",
|
||||||
|
self.handles.len(),
|
||||||
|
self.max_concurrent
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if handle.depth > self.max_depth {
|
||||||
|
bail!(
|
||||||
|
"Cannot spawn agent: max depth exceeded ({}/{})",
|
||||||
|
handle.depth,
|
||||||
|
self.max_depth
|
||||||
|
);
|
||||||
|
}
|
||||||
|
self.handles.insert(handle.id.clone(), handle);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_finished(&self, id: &str) -> Option<bool> {
|
||||||
|
self.handles.get(id).map(|h| h.join_handle.is_finished())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn take_if_finished(&mut self, id: &str) -> Option<AgentHandle> {
|
||||||
|
if self
|
||||||
|
.handles
|
||||||
|
.get(id)
|
||||||
|
.is_some_and(|h| h.join_handle.is_finished())
|
||||||
|
{
|
||||||
|
self.handles.remove(id)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn take(&mut self, id: &str) -> Option<AgentHandle> {
|
||||||
|
self.handles.remove(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn inbox(&self, id: &str) -> Option<&Arc<Inbox>> {
|
||||||
|
self.handles.get(id).map(|h| &h.inbox)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn list_agents(&self) -> Vec<(&str, &str)> {
|
||||||
|
self.handles
|
||||||
|
.values()
|
||||||
|
.map(|h| (h.id.as_str(), h.agent_name.as_str()))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn cancel_all(&self) {
|
||||||
|
for handle in self.handles.values() {
|
||||||
|
handle.abort_signal.set_ctrlc();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,254 @@
|
|||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::collections::{HashMap, HashSet};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum TaskStatus {
|
||||||
|
Pending,
|
||||||
|
Blocked,
|
||||||
|
InProgress,
|
||||||
|
Completed,
|
||||||
|
Failed,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct TaskNode {
|
||||||
|
pub id: String,
|
||||||
|
pub subject: String,
|
||||||
|
pub description: String,
|
||||||
|
pub status: TaskStatus,
|
||||||
|
pub owner: Option<String>,
|
||||||
|
pub blocked_by: HashSet<String>,
|
||||||
|
pub blocks: HashSet<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TaskNode {
|
||||||
|
pub fn new(id: String, subject: String, description: String) -> Self {
|
||||||
|
Self {
|
||||||
|
id,
|
||||||
|
subject,
|
||||||
|
description,
|
||||||
|
status: TaskStatus::Pending,
|
||||||
|
owner: None,
|
||||||
|
blocked_by: HashSet::new(),
|
||||||
|
blocks: HashSet::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_runnable(&self) -> bool {
|
||||||
|
self.status == TaskStatus::Pending && self.blocked_by.is_empty()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Default)]
|
||||||
|
pub struct TaskQueue {
|
||||||
|
tasks: HashMap<String, TaskNode>,
|
||||||
|
next_id: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TaskQueue {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
tasks: HashMap::new(),
|
||||||
|
next_id: 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn create(&mut self, subject: String, description: String) -> String {
|
||||||
|
let id = self.next_id.to_string();
|
||||||
|
self.next_id += 1;
|
||||||
|
let task = TaskNode::new(id.clone(), subject, description);
|
||||||
|
self.tasks.insert(id.clone(), task);
|
||||||
|
id
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_dependency(&mut self, task_id: &str, blocked_by: &str) -> Result<(), String> {
|
||||||
|
if task_id == blocked_by {
|
||||||
|
return Err("A task cannot depend on itself".into());
|
||||||
|
}
|
||||||
|
if !self.tasks.contains_key(blocked_by) {
|
||||||
|
return Err(format!("Dependency task '{blocked_by}' does not exist"));
|
||||||
|
}
|
||||||
|
if !self.tasks.contains_key(task_id) {
|
||||||
|
return Err(format!("Task '{task_id}' does not exist"));
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.would_create_cycle(task_id, blocked_by) {
|
||||||
|
return Err(format!(
|
||||||
|
"Adding dependency {task_id} -> {blocked_by} would create a cycle"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(task) = self.tasks.get_mut(task_id) {
|
||||||
|
task.blocked_by.insert(blocked_by.to_string());
|
||||||
|
task.status = TaskStatus::Blocked;
|
||||||
|
}
|
||||||
|
if let Some(blocker) = self.tasks.get_mut(blocked_by) {
|
||||||
|
blocker.blocks.insert(task_id.to_string());
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn complete(&mut self, task_id: &str) -> Vec<String> {
|
||||||
|
let mut newly_runnable = Vec::new();
|
||||||
|
|
||||||
|
let dependents: Vec<String> = self
|
||||||
|
.tasks
|
||||||
|
.get(task_id)
|
||||||
|
.map(|t| t.blocks.iter().cloned().collect())
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
if let Some(task) = self.tasks.get_mut(task_id) {
|
||||||
|
task.status = TaskStatus::Completed;
|
||||||
|
}
|
||||||
|
|
||||||
|
for dep_id in &dependents {
|
||||||
|
if let Some(dep) = self.tasks.get_mut(dep_id) {
|
||||||
|
dep.blocked_by.remove(task_id);
|
||||||
|
if dep.blocked_by.is_empty() && dep.status == TaskStatus::Blocked {
|
||||||
|
dep.status = TaskStatus::Pending;
|
||||||
|
newly_runnable.push(dep_id.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
newly_runnable
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn fail(&mut self, task_id: &str) {
|
||||||
|
if let Some(task) = self.tasks.get_mut(task_id) {
|
||||||
|
task.status = TaskStatus::Failed;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn claim(&mut self, task_id: &str, owner: &str) -> bool {
|
||||||
|
if let Some(task) = self.tasks.get_mut(task_id) {
|
||||||
|
if task.is_runnable() && task.owner.is_none() {
|
||||||
|
task.owner = Some(owner.to_string());
|
||||||
|
task.status = TaskStatus::InProgress;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn runnable_tasks(&self) -> Vec<&TaskNode> {
|
||||||
|
self.tasks.values().filter(|t| t.is_runnable()).collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get(&self, task_id: &str) -> Option<&TaskNode> {
|
||||||
|
self.tasks.get(task_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn list(&self) -> Vec<&TaskNode> {
|
||||||
|
let mut tasks: Vec<&TaskNode> = self.tasks.values().collect();
|
||||||
|
tasks.sort_by_key(|t| t.id.parse::<usize>().unwrap_or(0));
|
||||||
|
tasks
|
||||||
|
}
|
||||||
|
|
||||||
|
// DFS cycle detection: would adding task_id -> blocked_by create a cycle?
|
||||||
|
// A cycle exists if blocked_by can reach task_id through existing dependencies.
|
||||||
|
fn would_create_cycle(&self, task_id: &str, blocked_by: &str) -> bool {
|
||||||
|
let mut visited = HashSet::new();
|
||||||
|
let mut stack = vec![blocked_by.to_string()];
|
||||||
|
|
||||||
|
while let Some(current) = stack.pop() {
|
||||||
|
if current == task_id {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if visited.insert(current.clone()) {
|
||||||
|
if let Some(task) = self.tasks.get(¤t) {
|
||||||
|
for dep in &task.blocked_by {
|
||||||
|
stack.push(dep.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_create_and_list() {
|
||||||
|
let mut queue = TaskQueue::new();
|
||||||
|
let id1 = queue.create("Research".into(), "Research auth patterns".into());
|
||||||
|
let id2 = queue.create("Implement".into(), "Write the code".into());
|
||||||
|
|
||||||
|
assert_eq!(id1, "1");
|
||||||
|
assert_eq!(id2, "2");
|
||||||
|
assert_eq!(queue.list().len(), 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_dependency_and_completion() {
|
||||||
|
let mut queue = TaskQueue::new();
|
||||||
|
let id1 = queue.create("Step 1".into(), "".into());
|
||||||
|
let id2 = queue.create("Step 2".into(), "".into());
|
||||||
|
|
||||||
|
queue.add_dependency(&id2, &id1).unwrap();
|
||||||
|
|
||||||
|
assert!(queue.get(&id1).unwrap().is_runnable());
|
||||||
|
assert!(!queue.get(&id2).unwrap().is_runnable());
|
||||||
|
assert_eq!(queue.get(&id2).unwrap().status, TaskStatus::Blocked);
|
||||||
|
|
||||||
|
let unblocked = queue.complete(&id1);
|
||||||
|
assert_eq!(unblocked, vec![id2.clone()]);
|
||||||
|
assert!(queue.get(&id2).unwrap().is_runnable());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_fan_in_dependency() {
|
||||||
|
let mut queue = TaskQueue::new();
|
||||||
|
let id1 = queue.create("A".into(), "".into());
|
||||||
|
let id2 = queue.create("B".into(), "".into());
|
||||||
|
let id3 = queue.create("C (needs A and B)".into(), "".into());
|
||||||
|
|
||||||
|
queue.add_dependency(&id3, &id1).unwrap();
|
||||||
|
queue.add_dependency(&id3, &id2).unwrap();
|
||||||
|
|
||||||
|
assert!(!queue.get(&id3).unwrap().is_runnable());
|
||||||
|
|
||||||
|
let unblocked = queue.complete(&id1);
|
||||||
|
assert!(unblocked.is_empty());
|
||||||
|
assert!(!queue.get(&id3).unwrap().is_runnable());
|
||||||
|
|
||||||
|
let unblocked = queue.complete(&id2);
|
||||||
|
assert_eq!(unblocked, vec![id3.clone()]);
|
||||||
|
assert!(queue.get(&id3).unwrap().is_runnable());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_cycle_detection() {
|
||||||
|
let mut queue = TaskQueue::new();
|
||||||
|
let id1 = queue.create("A".into(), "".into());
|
||||||
|
let id2 = queue.create("B".into(), "".into());
|
||||||
|
|
||||||
|
queue.add_dependency(&id2, &id1).unwrap();
|
||||||
|
let result = queue.add_dependency(&id1, &id2);
|
||||||
|
assert!(result.is_err());
|
||||||
|
assert!(result.unwrap_err().contains("cycle"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_self_dependency_rejected() {
|
||||||
|
let mut queue = TaskQueue::new();
|
||||||
|
let id1 = queue.create("A".into(), "".into());
|
||||||
|
let result = queue.add_dependency(&id1, &id1);
|
||||||
|
assert!(result.is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_claim() {
|
||||||
|
let mut queue = TaskQueue::new();
|
||||||
|
let id1 = queue.create("Task".into(), "".into());
|
||||||
|
|
||||||
|
assert!(queue.claim(&id1, "worker-1"));
|
||||||
|
assert!(!queue.claim(&id1, "worker-2"));
|
||||||
|
assert_eq!(queue.get(&id1).unwrap().status, TaskStatus::InProgress);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user