feat: created the RenderMode enum to suppress stdout streaming during parallel graph super-steps
This commit is contained in:
@@ -1,6 +1,6 @@
|
|||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
use crate::config::paths;
|
use crate::config::{paths, RenderMode};
|
||||||
use crate::{
|
use crate::{
|
||||||
config::{AppConfig, Input, RequestContext},
|
config::{AppConfig, Input, RequestContext},
|
||||||
function::{FunctionDeclaration, ToolCall, ToolResult, eval_tool_calls},
|
function::{FunctionDeclaration, ToolCall, ToolResult, eval_tool_calls},
|
||||||
@@ -459,6 +459,9 @@ pub async fn call_chat_completions_streaming(
|
|||||||
) -> Result<(String, Vec<ToolResult>)> {
|
) -> Result<(String, Vec<ToolResult>)> {
|
||||||
let (tx, rx) = unbounded_channel();
|
let (tx, rx) = unbounded_channel();
|
||||||
let mut handler = SseHandler::new(tx, abort_signal.clone());
|
let mut handler = SseHandler::new(tx, abort_signal.clone());
|
||||||
|
if ctx.render_mode == RenderMode::Silent {
|
||||||
|
handler.set_silent(true);
|
||||||
|
}
|
||||||
|
|
||||||
let (send_ret, render_ret) = tokio::join!(
|
let (send_ret, render_ret) = tokio::join!(
|
||||||
client.chat_completions_streaming(input, &mut handler),
|
client.chat_completions_streaming(input, &mut handler),
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ pub struct SseHandler {
|
|||||||
last_tool_calls: Vec<ToolCall>,
|
last_tool_calls: Vec<ToolCall>,
|
||||||
max_call_repeats: usize,
|
max_call_repeats: usize,
|
||||||
call_repeat_chain_len: usize,
|
call_repeat_chain_len: usize,
|
||||||
|
silent: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SseHandler {
|
impl SseHandler {
|
||||||
@@ -28,14 +29,28 @@ impl SseHandler {
|
|||||||
last_tool_calls: Vec::new(),
|
last_tool_calls: Vec::new(),
|
||||||
max_call_repeats: 2,
|
max_call_repeats: 2,
|
||||||
call_repeat_chain_len: 3,
|
call_repeat_chain_len: 3,
|
||||||
|
silent: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Suppresses stdout streaming of incoming tokens. Tokens are still buffered
|
||||||
|
/// internally (so the caller's `.take()` still returns the full response) —
|
||||||
|
/// only the per-token send to the SSE renderer is skipped. Used by parallel
|
||||||
|
/// graph super-step branches so concurrent LLM calls don't interleave on
|
||||||
|
/// stdout.
|
||||||
|
pub fn set_silent(&mut self, silent: bool) {
|
||||||
|
self.silent = silent;
|
||||||
|
}
|
||||||
|
|
||||||
pub fn text(&mut self, text: &str) -> Result<()> {
|
pub fn text(&mut self, text: &str) -> Result<()> {
|
||||||
if text.is_empty() {
|
if text.is_empty() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
self.buffer.push_str(text);
|
self.buffer.push_str(text);
|
||||||
|
if self.silent {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
let ret = self
|
let ret = self
|
||||||
.sender
|
.sender
|
||||||
.send(SseEvent::Text(text.to_string()))
|
.send(SseEvent::Text(text.to_string()))
|
||||||
|
|||||||
+1
-1
@@ -21,7 +21,7 @@ pub use self::app_config::AppConfig;
|
|||||||
pub use self::app_state::AppState;
|
pub use self::app_state::AppState;
|
||||||
pub use self::input::Input;
|
pub use self::input::Input;
|
||||||
#[allow(unused_imports)]
|
#[allow(unused_imports)]
|
||||||
pub use self::request_context::RequestContext;
|
pub use self::request_context::{RenderMode, RequestContext};
|
||||||
pub use self::role::{
|
pub use self::role::{
|
||||||
CODE_ROLE, CREATE_TITLE_ROLE, EXPLAIN_SHELL_ROLE, Role, RoleLike, SHELL_ROLE,
|
CODE_ROLE, CREATE_TITLE_ROLE, EXPLAIN_SHELL_ROLE, Role, RoleLike, SHELL_ROLE,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -46,6 +46,17 @@ pub struct AutoContinueConfig {
|
|||||||
pub continuation_prompt: Option<String>,
|
pub continuation_prompt: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Controls how LLM token streams are presented to the user. `Silent` is set
|
||||||
|
/// on branch contexts during parallel graph super-steps so concurrent LLM
|
||||||
|
/// calls don't interleave token-by-token on stdout — the full response still
|
||||||
|
/// lands in graph state via the normal output_schema / state_updates pathway.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
|
||||||
|
pub enum RenderMode {
|
||||||
|
#[default]
|
||||||
|
Streaming,
|
||||||
|
Silent,
|
||||||
|
}
|
||||||
|
|
||||||
pub struct RequestContext {
|
pub struct RequestContext {
|
||||||
pub app: Arc<AppState>,
|
pub app: Arc<AppState>,
|
||||||
|
|
||||||
@@ -74,6 +85,8 @@ pub struct RequestContext {
|
|||||||
pub auto_continue_count: usize,
|
pub auto_continue_count: usize,
|
||||||
pub todo_list: TodoList,
|
pub todo_list: TodoList,
|
||||||
pub last_continuation_response: Option<String>,
|
pub last_continuation_response: Option<String>,
|
||||||
|
|
||||||
|
pub render_mode: RenderMode,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RequestContext {
|
impl RequestContext {
|
||||||
@@ -100,6 +113,7 @@ impl RequestContext {
|
|||||||
auto_continue_count: 0,
|
auto_continue_count: 0,
|
||||||
todo_list: TodoList::default(),
|
todo_list: TodoList::default(),
|
||||||
last_continuation_response: None,
|
last_continuation_response: None,
|
||||||
|
render_mode: RenderMode::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -146,6 +160,7 @@ impl RequestContext {
|
|||||||
auto_continue_count: 0,
|
auto_continue_count: 0,
|
||||||
todo_list: TodoList::default(),
|
todo_list: TodoList::default(),
|
||||||
last_continuation_response: None,
|
last_continuation_response: None,
|
||||||
|
render_mode: RenderMode::default(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -195,6 +210,7 @@ impl RequestContext {
|
|||||||
auto_continue_count: 0,
|
auto_continue_count: 0,
|
||||||
todo_list: self.todo_list.clone(),
|
todo_list: self.todo_list.clone(),
|
||||||
last_continuation_response: None,
|
last_continuation_response: None,
|
||||||
|
render_mode: self.render_mode,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -233,6 +249,7 @@ impl RequestContext {
|
|||||||
auto_continue_count: 0,
|
auto_continue_count: 0,
|
||||||
todo_list: TodoList::default(),
|
todo_list: TodoList::default(),
|
||||||
last_continuation_response: None,
|
last_continuation_response: None,
|
||||||
|
render_mode: parent.render_mode,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ use super::state::StateManager;
|
|||||||
use super::types::{EndNode, Graph, Node, NodeType};
|
use super::types::{EndNode, Graph, Node, NodeType};
|
||||||
use super::user_interaction::{ApprovalNodeExecutor, InputNodeExecutor};
|
use super::user_interaction::{ApprovalNodeExecutor, InputNodeExecutor};
|
||||||
use super::validator::{AgentValidationContext, GraphValidator};
|
use super::validator::{AgentValidationContext, GraphValidator};
|
||||||
use crate::config::RequestContext;
|
use crate::config::{RenderMode, RequestContext};
|
||||||
use crate::utils::AbortSignal;
|
use crate::utils::AbortSignal;
|
||||||
use anyhow::{Context, Result, anyhow, bail};
|
use anyhow::{Context, Result, anyhow, bail};
|
||||||
use futures_util::future::join_all;
|
use futures_util::future::join_all;
|
||||||
@@ -144,7 +144,8 @@ impl GraphExecutor {
|
|||||||
let snapshot = state.read_snapshot();
|
let snapshot = state.read_snapshot();
|
||||||
let semaphore = Arc::new(Semaphore::new(max_concurrency));
|
let semaphore = Arc::new(Semaphore::new(max_concurrency));
|
||||||
|
|
||||||
let mut branch_tasks = Vec::with_capacity(frontier.len());
|
let frontier_size = frontier.len();
|
||||||
|
let mut branch_tasks = Vec::with_capacity(frontier_size);
|
||||||
for node_id in &frontier {
|
for node_id in &frontier {
|
||||||
let node = graph
|
let node = graph
|
||||||
.get_node(node_id)
|
.get_node(node_id)
|
||||||
@@ -153,7 +154,10 @@ impl GraphExecutor {
|
|||||||
})?
|
})?
|
||||||
.clone();
|
.clone();
|
||||||
let branch_state = state.fork_for_branch_state();
|
let branch_state = state.fork_for_branch_state();
|
||||||
let branch_ctx = ctx.fork_for_branch();
|
let mut branch_ctx = ctx.fork_for_branch();
|
||||||
|
if frontier_size > 1 {
|
||||||
|
branch_ctx.render_mode = RenderMode::Silent;
|
||||||
|
}
|
||||||
let script_exec_clone = script_executor.clone();
|
let script_exec_clone = script_executor.clone();
|
||||||
let graph_clone = Arc::clone(&graph);
|
let graph_clone = Arc::clone(&graph);
|
||||||
let current = node_id.clone();
|
let current = node_id.clone();
|
||||||
|
|||||||
+4
-3
@@ -4,14 +4,14 @@ use super::llm::LlmNodeExecutor;
|
|||||||
use super::rag::RagNodeExecutor;
|
use super::rag::RagNodeExecutor;
|
||||||
use super::state::StateManager;
|
use super::state::StateManager;
|
||||||
use super::types::{MapNode, NodeType};
|
use super::types::{MapNode, NodeType};
|
||||||
use crate::config::RequestContext;
|
use crate::config::{RenderMode, RequestContext};
|
||||||
|
use crate::graph::type_name;
|
||||||
use anyhow::{Context, Result, anyhow};
|
use anyhow::{Context, Result, anyhow};
|
||||||
use futures_util::future::join_all;
|
use futures_util::future::join_all;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::Semaphore;
|
use tokio::sync::Semaphore;
|
||||||
use crate::graph::type_name;
|
|
||||||
|
|
||||||
// Map sub-branches are atomic — the branch node has no `next` (enforced by
|
// Map sub-branches are atomic — the branch node has no `next` (enforced by
|
||||||
// validator rule C.5). But LLM/RAG node executors require an `Option<&str>` for
|
// validator rule C.5). But LLM/RAG node executors require an `Option<&str>` for
|
||||||
@@ -66,7 +66,8 @@ impl MapNodeExecutor {
|
|||||||
let as_name = node.as_name.clone();
|
let as_name = node.as_name.clone();
|
||||||
let branch_clone = branch_node.clone();
|
let branch_clone = branch_node.clone();
|
||||||
let mut sub_state = state.fork_for_branch_state();
|
let mut sub_state = state.fork_for_branch_state();
|
||||||
let sub_ctx = ctx.fork_for_branch();
|
let mut sub_ctx = ctx.fork_for_branch();
|
||||||
|
sub_ctx.render_mode = RenderMode::Silent;
|
||||||
let script_clone = step_ctx.script_executor.clone();
|
let script_clone = step_ctx.script_executor.clone();
|
||||||
let sub_branch_id = node.branch.clone();
|
let sub_branch_id = node.branch.clone();
|
||||||
let sem = semaphore.clone();
|
let sem = semaphore.clone();
|
||||||
|
|||||||
Reference in New Issue
Block a user