diff --git a/src/client/common.rs b/src/client/common.rs index 1b84cb4..8bb1d6e 100644 --- a/src/client/common.rs +++ b/src/client/common.rs @@ -1,6 +1,6 @@ use super::*; -use crate::config::paths; +use crate::config::{paths, RenderMode}; use crate::{ config::{AppConfig, Input, RequestContext}, function::{FunctionDeclaration, ToolCall, ToolResult, eval_tool_calls}, @@ -459,6 +459,9 @@ pub async fn call_chat_completions_streaming( ) -> Result<(String, Vec)> { let (tx, rx) = unbounded_channel(); let mut handler = SseHandler::new(tx, abort_signal.clone()); + if ctx.render_mode == RenderMode::Silent { + handler.set_silent(true); + } let (send_ret, render_ret) = tokio::join!( client.chat_completions_streaming(input, &mut handler), diff --git a/src/client/stream.rs b/src/client/stream.rs index 3d6f1f8..df52238 100644 --- a/src/client/stream.rs +++ b/src/client/stream.rs @@ -16,6 +16,7 @@ pub struct SseHandler { last_tool_calls: Vec, max_call_repeats: usize, call_repeat_chain_len: usize, + silent: bool, } impl SseHandler { @@ -28,14 +29,28 @@ impl SseHandler { last_tool_calls: Vec::new(), max_call_repeats: 2, call_repeat_chain_len: 3, + silent: false, } } + /// Suppresses stdout streaming of incoming tokens. Tokens are still buffered + /// internally (so the caller's `.take()` still returns the full response) — + /// only the per-token send to the SSE renderer is skipped. Used by parallel + /// graph super-step branches so concurrent LLM calls don't interleave on + /// stdout. + pub fn set_silent(&mut self, silent: bool) { + self.silent = silent; + } + pub fn text(&mut self, text: &str) -> Result<()> { if text.is_empty() { return Ok(()); } self.buffer.push_str(text); + if self.silent { + return Ok(()); + } + let ret = self .sender .send(SseEvent::Text(text.to_string())) diff --git a/src/config/mod.rs b/src/config/mod.rs index a004e6a..fdca1fe 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -21,7 +21,7 @@ pub use self::app_config::AppConfig; pub use self::app_state::AppState; pub use self::input::Input; #[allow(unused_imports)] -pub use self::request_context::RequestContext; +pub use self::request_context::{RenderMode, RequestContext}; pub use self::role::{ CODE_ROLE, CREATE_TITLE_ROLE, EXPLAIN_SHELL_ROLE, Role, RoleLike, SHELL_ROLE, }; diff --git a/src/config/request_context.rs b/src/config/request_context.rs index 26a8498..88e9198 100644 --- a/src/config/request_context.rs +++ b/src/config/request_context.rs @@ -46,6 +46,17 @@ pub struct AutoContinueConfig { pub continuation_prompt: Option, } +/// Controls how LLM token streams are presented to the user. `Silent` is set +/// on branch contexts during parallel graph super-steps so concurrent LLM +/// calls don't interleave token-by-token on stdout — the full response still +/// lands in graph state via the normal output_schema / state_updates pathway. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum RenderMode { + #[default] + Streaming, + Silent, +} + pub struct RequestContext { pub app: Arc, @@ -74,6 +85,8 @@ pub struct RequestContext { pub auto_continue_count: usize, pub todo_list: TodoList, pub last_continuation_response: Option, + + pub render_mode: RenderMode, } impl RequestContext { @@ -100,6 +113,7 @@ impl RequestContext { auto_continue_count: 0, todo_list: TodoList::default(), last_continuation_response: None, + render_mode: RenderMode::default(), } } @@ -146,6 +160,7 @@ impl RequestContext { auto_continue_count: 0, todo_list: TodoList::default(), last_continuation_response: None, + render_mode: RenderMode::default(), }) } @@ -195,6 +210,7 @@ impl RequestContext { auto_continue_count: 0, todo_list: self.todo_list.clone(), last_continuation_response: None, + render_mode: self.render_mode, } } @@ -233,6 +249,7 @@ impl RequestContext { auto_continue_count: 0, todo_list: TodoList::default(), last_continuation_response: None, + render_mode: parent.render_mode, } } diff --git a/src/graph/executor.rs b/src/graph/executor.rs index 7466ba4..661dc53 100644 --- a/src/graph/executor.rs +++ b/src/graph/executor.rs @@ -9,7 +9,7 @@ use super::state::StateManager; use super::types::{EndNode, Graph, Node, NodeType}; use super::user_interaction::{ApprovalNodeExecutor, InputNodeExecutor}; use super::validator::{AgentValidationContext, GraphValidator}; -use crate::config::RequestContext; +use crate::config::{RenderMode, RequestContext}; use crate::utils::AbortSignal; use anyhow::{Context, Result, anyhow, bail}; use futures_util::future::join_all; @@ -144,7 +144,8 @@ impl GraphExecutor { let snapshot = state.read_snapshot(); let semaphore = Arc::new(Semaphore::new(max_concurrency)); - let mut branch_tasks = Vec::with_capacity(frontier.len()); + let frontier_size = frontier.len(); + let mut branch_tasks = Vec::with_capacity(frontier_size); for node_id in &frontier { let node = graph .get_node(node_id) @@ -153,7 +154,10 @@ impl GraphExecutor { })? .clone(); let branch_state = state.fork_for_branch_state(); - let branch_ctx = ctx.fork_for_branch(); + let mut branch_ctx = ctx.fork_for_branch(); + if frontier_size > 1 { + branch_ctx.render_mode = RenderMode::Silent; + } let script_exec_clone = script_executor.clone(); let graph_clone = Arc::clone(&graph); let current = node_id.clone(); diff --git a/src/graph/map.rs b/src/graph/map.rs index aa800fe..a3a18d8 100644 --- a/src/graph/map.rs +++ b/src/graph/map.rs @@ -4,14 +4,14 @@ use super::llm::LlmNodeExecutor; use super::rag::RagNodeExecutor; use super::state::StateManager; use super::types::{MapNode, NodeType}; -use crate::config::RequestContext; +use crate::config::{RenderMode, RequestContext}; +use crate::graph::type_name; use anyhow::{Context, Result, anyhow}; use futures_util::future::join_all; use serde_json::Value; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::Semaphore; -use crate::graph::type_name; // Map sub-branches are atomic — the branch node has no `next` (enforced by // validator rule C.5). But LLM/RAG node executors require an `Option<&str>` for @@ -66,7 +66,8 @@ impl MapNodeExecutor { let as_name = node.as_name.clone(); let branch_clone = branch_node.clone(); let mut sub_state = state.fork_for_branch_state(); - let sub_ctx = ctx.fork_for_branch(); + let mut sub_ctx = ctx.fork_for_branch(); + sub_ctx.render_mode = RenderMode::Silent; let script_clone = step_ctx.script_executor.clone(); let sub_branch_id = node.branch.clone(); let sem = semaphore.clone();