feat: scaffolding work for fan-out nodes for parallel branch execution support and stubbed out Map node types

This commit is contained in:
2026-05-20 11:37:23 -06:00
parent e43c2e477a
commit 8c398b6360
5 changed files with 536 additions and 21 deletions
+27 -12
View File
@@ -154,9 +154,12 @@ async fn step(
match &node.node_type {
NodeType::Agent(agent_node) => {
AgentNodeExecutor::execute(agent_node, state, ctx).await?;
let next = node.next.clone().ok_or_else(|| {
anyhow!("agent node '{current}' has no `next` and is not an end node")
})?;
let next = node
.next_single()?
.ok_or_else(|| {
anyhow!("agent node '{current}' has no `next` and is not an end node")
})?
.to_string();
Ok(StepResult::Continue(next))
}
NodeType::Script(script_node) => {
@@ -173,9 +176,17 @@ async fn step(
return Err(e);
}
};
let next = dynamic.or_else(|| node.next.clone()).ok_or_else(|| {
anyhow!("script node '{current}' did not emit `_next` and has no static `next`")
})?;
let next = match dynamic {
Some(n) => n,
None => node
.next_single()?
.ok_or_else(|| {
anyhow!(
"script node '{current}' did not emit `_next` and has no static `next`"
)
})?
.to_string(),
};
Ok(StepResult::Continue(next))
}
NodeType::Approval(approval_node) => {
@@ -183,21 +194,25 @@ async fn step(
Ok(StepResult::Continue(next))
}
NodeType::Input(input_node) => {
let next =
InputNodeExecutor::execute(input_node, node.next.as_deref(), state, ctx).await?;
let next_id = node.next_single()?;
let next = InputNodeExecutor::execute(input_node, next_id, state, ctx).await?;
Ok(StepResult::Continue(next))
}
NodeType::Llm(llm_node) => {
let next = LlmNodeExecutor::execute(llm_node, node.next.as_deref(), state, ctx).await?;
let next_id = node.next_single()?;
let next = LlmNodeExecutor::execute(llm_node, next_id, state, ctx).await?;
Ok(StepResult::Continue(next))
}
NodeType::Rag(rag_node) => {
let next =
RagNodeExecutor::execute(rag_node, current, node.next.as_deref(), state, ctx)
.await?;
let next_id = node.next_single()?;
let next = RagNodeExecutor::execute(rag_node, current, next_id, state, ctx).await?;
Ok(StepResult::Continue(next))
}
NodeType::End(end_node) => Ok(StepResult::End(resolve_end_output(end_node, state))),
NodeType::Map(_) => bail!(
"Map nodes are not yet supported in this build \
(parallel branch execution lands in Phase D/E)."
),
}
}