build: upgraded to the most recent version of reqwest

This commit is contained in:
2026-05-19 11:05:40 -06:00
parent 696ce03ee4
commit eb2843d38a
4 changed files with 172 additions and 80 deletions
+41 -32
View File
@@ -2,9 +2,9 @@ use super::{ToolCall, catch_error};
use crate::utils::AbortSignal;
use anyhow::{Context, Result, anyhow, bail};
use eventsource_stream::Eventsource;
use futures_util::{Stream, StreamExt};
use reqwest::RequestBuilder;
use reqwest_eventsource::{Error as EventSourceError, Event, RequestBuilderExt};
use reqwest::{header, RequestBuilder};
use serde_json::Value;
use tokio::sync::mpsc::UnboundedSender;
@@ -193,11 +193,46 @@ pub async fn sse_stream<F>(builder: RequestBuilder, mut handle: F) -> Result<()>
where
F: FnMut(SseMessage) -> Result<bool>,
{
let mut es = builder.eventsource()?;
let res = builder
.header(header::ACCEPT, "text/event-stream")
.header(header::CACHE_CONTROL, "no-store")
.send()
.await?;
let status = res.status();
if !status.is_success() {
let text = res.text().await?;
let data: Value = match text.parse() {
Ok(data) => data,
Err(_) => {
bail!(
"Invalid response data: {text} (status: {})",
status.as_u16()
);
}
};
catch_error(&data, status.as_u16())?;
return Ok(());
}
let content_type = res
.headers()
.get(header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.map(|value| value.to_string());
let is_event_stream = content_type
.as_deref()
.map(|ct| ct.starts_with("text/event-stream"))
.unwrap_or(false);
if !is_event_stream {
let header_value = content_type.unwrap_or_default();
let text = res.text().await?;
bail!("Invalid response event-stream. content-type: {header_value}, data: {text}");
}
let mut es = res.bytes_stream().boxed().eventsource();
while let Some(event) = es.next().await {
match event {
Ok(Event::Open) => {}
Ok(Event::Message(message)) => {
Ok(message) => {
let message = SseMessage {
event: message.event,
data: message.data,
@@ -207,33 +242,7 @@ where
}
}
Err(err) => {
match err {
EventSourceError::StreamEnded => {}
EventSourceError::InvalidStatusCode(status, res) => {
let text = res.text().await?;
let data: Value = match text.parse() {
Ok(data) => data,
Err(_) => {
bail!(
"Invalid response data: {text} (status: {})",
status.as_u16()
);
}
};
catch_error(&data, status.as_u16())?;
}
EventSourceError::InvalidContentType(header_value, res) => {
let text = res.text().await?;
bail!(
"Invalid response event-stream. content-type: {}, data: {text}",
header_value.to_str().unwrap_or_default()
);
}
_ => {
bail!("{}", err);
}
}
es.close();
bail!("{err}");
}
}
}
+19 -11
View File
@@ -1,11 +1,11 @@
use anyhow::{Context, Result, anyhow};
use eventsource_stream::{EventStream, Eventsource};
use fmt::{Display, Formatter};
use futures_util::StreamExt;
use mpsc::error::SendError;
use mpsc::{OwnedPermit, Receiver, Sender, channel};
use reqwest::Client;
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use reqwest_eventsource::{Event, EventSource};
use rmcp::model::{ClientJsonRpcMessage, ServerJsonRpcMessage};
use std::collections::HashMap;
use std::error::Error;
@@ -13,10 +13,14 @@ use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;
use futures_util::stream::BoxStream;
use tokio::sync::mpsc;
use tokio::time::Duration;
use url::Url;
type SseEventStream =
EventStream<BoxStream<'static, reqwest::Result<bytes::Bytes>>>;
const CHANNEL_BUF: usize = 64;
pub struct LegacySseTransport {
@@ -47,8 +51,15 @@ impl LegacySseTransport {
.build()
.context("Failed to build HTTP client")?;
let request = client.get(sse_url);
let mut es = EventSource::new(request).context("Failed to open SSE connection")?;
let response = client
.get(sse_url)
.header(reqwest::header::ACCEPT, "text/event-stream")
.send()
.await
.context("Failed to open SSE connection")?
.error_for_status()
.context("SSE server returned an error status")?;
let mut es: SseEventStream = response.bytes_stream().boxed().eventsource();
let post_endpoint = wait_for_endpoint_event(&mut es, &base_url).await?;
@@ -83,18 +94,17 @@ impl LegacySseTransport {
}
}
async fn wait_for_endpoint_event(es: &mut EventSource, base_url: &Url) -> Result<String> {
async fn wait_for_endpoint_event(es: &mut SseEventStream, base_url: &Url) -> Result<String> {
let timeout = Duration::from_secs(30);
tokio::time::timeout(timeout, async {
while let Some(event) = es.next().await {
match event {
Ok(Event::Open) => {}
Ok(Event::Message(msg)) if msg.event == "endpoint" => {
Ok(msg) if msg.event == "endpoint" => {
let endpoint = msg.data.trim().to_string();
let resolved = resolve_endpoint(&endpoint, base_url)?;
return Ok(resolved);
}
Ok(Event::Message(_)) => {}
Ok(_) => {}
Err(e) => {
return Err(anyhow!(
"SSE connection error while waiting for endpoint event: {e}"
@@ -120,10 +130,10 @@ fn resolve_endpoint(endpoint: &str, base_url: &Url) -> Result<String> {
}
}
async fn sse_reader_task(mut es: EventSource, tx: Sender<ServerJsonRpcMessage>) {
async fn sse_reader_task(mut es: SseEventStream, tx: Sender<ServerJsonRpcMessage>) {
while let Some(event) = es.next().await {
match event {
Ok(Event::Message(msg)) if msg.event == "message" => {
Ok(msg) if msg.event == "message" => {
match serde_json::from_str::<ServerJsonRpcMessage>(&msg.data) {
Ok(rpc_msg) => {
if tx.send(rpc_msg).await.is_err() {
@@ -136,14 +146,12 @@ async fn sse_reader_task(mut es: EventSource, tx: Sender<ServerJsonRpcMessage>)
}
}
Ok(_) => {}
Err(reqwest_eventsource::Error::StreamEnded) => break,
Err(e) => {
error!("SSE stream error: {e}");
break;
}
}
}
es.close();
}
async fn post_writer_task(