docs: Initial documentation cleanup of parallel agent MVP
This commit is contained in:
@@ -225,10 +225,6 @@ pub fn supervisor_function_declarations() -> Vec<FunctionDeclaration> {
|
|||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
|
||||||
// Dispatch
|
|
||||||
// ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
pub async fn handle_supervisor_tool(
|
pub async fn handle_supervisor_tool(
|
||||||
config: &GlobalConfig,
|
config: &GlobalConfig,
|
||||||
cmd_name: &str,
|
cmd_name: &str,
|
||||||
@@ -253,21 +249,6 @@ pub async fn handle_supervisor_tool(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
|
||||||
// Child agent execution loop
|
|
||||||
// ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
/// Run a child agent to completion, returning its accumulated text output.
|
|
||||||
///
|
|
||||||
/// This mirrors `start_directive` in main.rs but:
|
|
||||||
/// - Uses `call_chat_completions(print=false)` so nothing goes to stdout
|
|
||||||
/// - Returns the output text instead of printing it
|
|
||||||
/// - Loops on tool calls just like `start_directive`'s recursion
|
|
||||||
///
|
|
||||||
/// Returns a boxed future to break the recursive type cycle:
|
|
||||||
/// handle_spawn → tokio::spawn(run_child_agent) → call_chat_completions
|
|
||||||
/// → eval_tool_calls → ToolCall::eval → handle_supervisor_tool → handle_spawn
|
|
||||||
/// Without boxing, the compiler cannot prove Send for the recursive async type.
|
|
||||||
fn run_child_agent(
|
fn run_child_agent(
|
||||||
child_config: GlobalConfig,
|
child_config: GlobalConfig,
|
||||||
initial_input: Input,
|
initial_input: Input,
|
||||||
@@ -283,8 +264,8 @@ fn run_child_agent(
|
|||||||
|
|
||||||
let (output, tool_results) = call_chat_completions(
|
let (output, tool_results) = call_chat_completions(
|
||||||
&input,
|
&input,
|
||||||
false, // print=false — silent, no stdout
|
false,
|
||||||
false, // extract_code=false
|
false,
|
||||||
client.as_ref(),
|
client.as_ref(),
|
||||||
abort_signal.clone(),
|
abort_signal.clone(),
|
||||||
)
|
)
|
||||||
@@ -305,7 +286,6 @@ fn run_child_agent(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Feed tool results back for the next round (mirrors start_directive recursion)
|
|
||||||
input = input.merge_tool_results(output, tool_results);
|
input = input.merge_tool_results(output, tool_results);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -313,10 +293,6 @@ fn run_child_agent(
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
|
||||||
// Handlers
|
|
||||||
// ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
async fn handle_spawn(config: &GlobalConfig, args: &Value) -> Result<Value> {
|
async fn handle_spawn(config: &GlobalConfig, args: &Value) -> Result<Value> {
|
||||||
let agent_name = args
|
let agent_name = args
|
||||||
.get("agent")
|
.get("agent")
|
||||||
@@ -330,18 +306,15 @@ async fn handle_spawn(config: &GlobalConfig, args: &Value) -> Result<Value> {
|
|||||||
.to_string();
|
.to_string();
|
||||||
let _task_id = args.get("task_id").and_then(Value::as_str);
|
let _task_id = args.get("task_id").and_then(Value::as_str);
|
||||||
|
|
||||||
// Generate a unique agent ID
|
|
||||||
let short_uuid = &Uuid::new_v4().to_string()[..8];
|
let short_uuid = &Uuid::new_v4().to_string()[..8];
|
||||||
let agent_id = format!("agent_{agent_name}_{short_uuid}");
|
let agent_id = format!("agent_{agent_name}_{short_uuid}");
|
||||||
|
|
||||||
// --- Validate capacity ---
|
|
||||||
// Read the supervisor to check capacity, then drop locks before doing async work
|
|
||||||
let (max_depth, current_depth) = {
|
let (max_depth, current_depth) = {
|
||||||
let cfg = config.read();
|
let cfg = config.read();
|
||||||
let supervisor = cfg
|
let supervisor = cfg
|
||||||
.supervisor
|
.supervisor
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.ok_or_else(|| anyhow!("No supervisor active — agent spawning not enabled"))?;
|
.ok_or_else(|| anyhow!("No supervisor active; Agent spawning not enabled"))?;
|
||||||
let sup = supervisor.read();
|
let sup = supervisor.read();
|
||||||
if sup.active_count() >= sup.max_concurrent() {
|
if sup.active_count() >= sup.max_concurrent() {
|
||||||
return Ok(json!({
|
return Ok(json!({
|
||||||
@@ -363,7 +336,6 @@ async fn handle_spawn(config: &GlobalConfig, args: &Value) -> Result<Value> {
|
|||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Build an isolated child Config ---
|
|
||||||
let child_inbox = Arc::new(Inbox::new());
|
let child_inbox = Arc::new(Inbox::new());
|
||||||
|
|
||||||
let child_config: GlobalConfig = {
|
let child_config: GlobalConfig = {
|
||||||
@@ -384,16 +356,13 @@ async fn handle_spawn(config: &GlobalConfig, args: &Value) -> Result<Value> {
|
|||||||
Arc::new(RwLock::new(child_cfg))
|
Arc::new(RwLock::new(child_cfg))
|
||||||
};
|
};
|
||||||
|
|
||||||
// Load the target agent into the child config
|
|
||||||
let child_abort = create_abort_signal();
|
let child_abort = create_abort_signal();
|
||||||
Config::use_agent(&child_config, &agent_name, None, child_abort.clone()).await?;
|
Config::use_agent(&child_config, &agent_name, None, child_abort.clone()).await?;
|
||||||
|
|
||||||
// Create the initial input from the prompt
|
|
||||||
let input = Input::from_str(&child_config, &prompt, None);
|
let input = Input::from_str(&child_config, &prompt, None);
|
||||||
|
|
||||||
debug!("Spawning child agent '{agent_name}' as '{agent_id}'");
|
debug!("Spawning child agent '{agent_name}' as '{agent_id}'");
|
||||||
|
|
||||||
// --- Spawn the agent task ---
|
|
||||||
let spawn_agent_id = agent_id.clone();
|
let spawn_agent_id = agent_id.clone();
|
||||||
let spawn_agent_name = agent_name.clone();
|
let spawn_agent_name = agent_name.clone();
|
||||||
let spawn_abort = child_abort.clone();
|
let spawn_abort = child_abort.clone();
|
||||||
@@ -417,7 +386,6 @@ async fn handle_spawn(config: &GlobalConfig, args: &Value) -> Result<Value> {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Register the handle with the supervisor
|
|
||||||
let handle = AgentHandle {
|
let handle = AgentHandle {
|
||||||
id: agent_id.clone(),
|
id: agent_id.clone(),
|
||||||
agent_name: agent_name.clone(),
|
agent_name: agent_name.clone(),
|
||||||
@@ -463,7 +431,6 @@ async fn handle_check(config: &GlobalConfig, args: &Value) -> Result<Value> {
|
|||||||
|
|
||||||
match is_finished {
|
match is_finished {
|
||||||
Some(true) => {
|
Some(true) => {
|
||||||
// Finished — collect the result
|
|
||||||
handle_collect(config, args).await
|
handle_collect(config, args).await
|
||||||
}
|
}
|
||||||
Some(false) => Ok(json!({
|
Some(false) => Ok(json!({
|
||||||
@@ -484,7 +451,6 @@ async fn handle_collect(config: &GlobalConfig, args: &Value) -> Result<Value> {
|
|||||||
.and_then(Value::as_str)
|
.and_then(Value::as_str)
|
||||||
.ok_or_else(|| anyhow!("'id' is required"))?;
|
.ok_or_else(|| anyhow!("'id' is required"))?;
|
||||||
|
|
||||||
// Extract the join handle while holding the lock, then drop the lock before awaiting
|
|
||||||
let handle = {
|
let handle = {
|
||||||
let cfg = config.read();
|
let cfg = config.read();
|
||||||
let supervisor = cfg
|
let supervisor = cfg
|
||||||
@@ -497,7 +463,6 @@ async fn handle_collect(config: &GlobalConfig, args: &Value) -> Result<Value> {
|
|||||||
|
|
||||||
match handle {
|
match handle {
|
||||||
Some(handle) => {
|
Some(handle) => {
|
||||||
// Await the join handle OUTSIDE of any lock
|
|
||||||
let result = handle
|
let result = handle
|
||||||
.join_handle
|
.join_handle
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -1,10 +1,5 @@
|
|||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
/// A message envelope routed between agents.
|
|
||||||
///
|
|
||||||
/// Agents communicate by sending `Envelope`s to each other's mailboxes.
|
|
||||||
/// The sender fires and forgets; the receiver drains its inbox between
|
|
||||||
/// LLM turns via the `check_inbox` tool.
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct Envelope {
|
pub struct Envelope {
|
||||||
pub from: String,
|
pub from: String,
|
||||||
@@ -13,11 +8,6 @@ pub struct Envelope {
|
|||||||
pub timestamp: chrono::DateTime<chrono::Utc>,
|
pub timestamp: chrono::DateTime<chrono::Utc>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The content of an inter-agent message.
|
|
||||||
///
|
|
||||||
/// Separates the **control plane** (shutdown signals, task lifecycle events)
|
|
||||||
/// from the **data plane** (free-form text). Control-plane messages are
|
|
||||||
/// processed before data-plane messages to prevent race conditions.
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
#[serde(tag = "type", rename_all = "snake_case")]
|
#[serde(tag = "type", rename_all = "snake_case")]
|
||||||
pub enum EnvelopePayload {
|
pub enum EnvelopePayload {
|
||||||
@@ -27,13 +17,6 @@ pub enum EnvelopePayload {
|
|||||||
ShutdownApproved,
|
ShutdownApproved,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A per-agent inbox that collects incoming messages.
|
|
||||||
///
|
|
||||||
/// Backed by a `Vec` behind a `parking_lot::Mutex` so it can be shared
|
|
||||||
/// between the supervisor (which delivers messages) and the agent's tool
|
|
||||||
/// handler (which drains them). We use `parking_lot::Mutex` to match the
|
|
||||||
/// locking convention used elsewhere in Loki (`parking_lot::RwLock` for
|
|
||||||
/// GlobalConfig).
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
pub struct Inbox {
|
pub struct Inbox {
|
||||||
messages: parking_lot::Mutex<Vec<Envelope>>,
|
messages: parking_lot::Mutex<Vec<Envelope>>,
|
||||||
@@ -50,18 +33,12 @@ impl Inbox {
|
|||||||
self.messages.lock().push(envelope);
|
self.messages.lock().push(envelope);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Drain all pending messages, returning them sorted with control-plane
|
|
||||||
/// messages first (shutdown, task events) then data-plane (text).
|
|
||||||
/// This ordering prevents the class of bugs where a text message
|
|
||||||
/// references state that a control message was supposed to set up.
|
|
||||||
pub fn drain(&self) -> Vec<Envelope> {
|
pub fn drain(&self) -> Vec<Envelope> {
|
||||||
let mut msgs = {
|
let mut msgs = {
|
||||||
let mut guard = self.messages.lock();
|
let mut guard = self.messages.lock();
|
||||||
std::mem::take(&mut *guard)
|
std::mem::take(&mut *guard)
|
||||||
};
|
};
|
||||||
|
|
||||||
// Stable partition: control messages first, then data messages,
|
|
||||||
// preserving relative order within each group.
|
|
||||||
msgs.sort_by_key(|e| match &e.payload {
|
msgs.sort_by_key(|e| match &e.payload {
|
||||||
EnvelopePayload::ShutdownRequest { .. } => 0,
|
EnvelopePayload::ShutdownRequest { .. } => 0,
|
||||||
EnvelopePayload::ShutdownApproved => 0,
|
EnvelopePayload::ShutdownApproved => 0,
|
||||||
|
|||||||
@@ -35,8 +35,6 @@ pub struct AgentHandle {
|
|||||||
pub join_handle: JoinHandle<Result<AgentResult>>,
|
pub join_handle: JoinHandle<Result<AgentResult>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Lives as an `Arc<parking_lot::RwLock<Supervisor>>` alongside GlobalConfig,
|
|
||||||
/// NOT inside it — avoids adding lock contention to the shared Config.
|
|
||||||
pub struct Supervisor {
|
pub struct Supervisor {
|
||||||
handles: HashMap<String, AgentHandle>,
|
handles: HashMap<String, AgentHandle>,
|
||||||
task_queue: TaskQueue,
|
task_queue: TaskQueue,
|
||||||
|
|||||||
@@ -122,13 +122,13 @@ impl TaskQueue {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn claim(&mut self, task_id: &str, owner: &str) -> bool {
|
pub fn claim(&mut self, task_id: &str, owner: &str) -> bool {
|
||||||
if let Some(task) = self.tasks.get_mut(task_id) {
|
if let Some(task) = self.tasks.get_mut(task_id) &&
|
||||||
if task.is_runnable() && task.owner.is_none() {
|
task.is_runnable() && task.owner.is_none() {
|
||||||
task.owner = Some(owner.to_string());
|
task.owner = Some(owner.to_string());
|
||||||
task.status = TaskStatus::InProgress;
|
task.status = TaskStatus::InProgress;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -146,8 +146,6 @@ impl TaskQueue {
|
|||||||
tasks
|
tasks
|
||||||
}
|
}
|
||||||
|
|
||||||
// DFS cycle detection: would adding task_id -> blocked_by create a cycle?
|
|
||||||
// A cycle exists if blocked_by can reach task_id through existing dependencies.
|
|
||||||
fn would_create_cycle(&self, task_id: &str, blocked_by: &str) -> bool {
|
fn would_create_cycle(&self, task_id: &str, blocked_by: &str) -> bool {
|
||||||
let mut visited = HashSet::new();
|
let mut visited = HashSet::new();
|
||||||
let mut stack = vec![blocked_by.to_string()];
|
let mut stack = vec![blocked_by.to_string()];
|
||||||
@@ -156,14 +154,13 @@ impl TaskQueue {
|
|||||||
if current == task_id {
|
if current == task_id {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if visited.insert(current.clone()) {
|
if visited.insert(current.clone()) &&
|
||||||
if let Some(task) = self.tasks.get(¤t) {
|
let Some(task) = self.tasks.get(¤t) {
|
||||||
for dep in &task.blocked_by {
|
for dep in &task.blocked_by {
|
||||||
stack.push(dep.clone());
|
stack.push(dep.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user