From 5d017fbb4803106d63914ace0cec9d3eacbb2649 Mon Sep 17 00:00:00 2001 From: Alex Clarke Date: Tue, 19 May 2026 11:05:40 -0600 Subject: [PATCH] build: upgraded to the most recent version of reqwest --- Cargo.lock | 140 ++++++++++++++++++++++++++++++--------- Cargo.toml | 9 +-- src/client/stream.rs | 73 +++++++++++--------- src/mcp/sse_transport.rs | 30 ++++++--- 4 files changed, 172 insertions(+), 80 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2f9f5c2..57d533a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1978,12 +1978,6 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" -[[package]] -name = "futures-timer" -version = "3.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" - [[package]] name = "futures-util" version = "0.3.32" @@ -2507,7 +2501,6 @@ dependencies = [ "tokio", "tokio-rustls 0.26.4", "tower-service", - "webpki-roots", ] [[package]] @@ -2865,6 +2858,55 @@ dependencies = [ "syn", ] +[[package]] +name = "jni" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5efd9a482cf3a427f00d6b35f14332adc7902ce91efb778580e180ff90fa3498" +dependencies = [ + "cfg-if", + "combine", + "jni-macros", + "jni-sys", + "log", + "simd_cesu8", + "thiserror 2.0.18", + "walkdir", + "windows-link", +] + +[[package]] +name = "jni-macros" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a00109accc170f0bdb141fed3e393c565b6f5e072365c3bd58f5b062591560a3" +dependencies = [ + "proc-macro2", + "quote", + "rustc_version", + "simd_cesu8", + "syn", +] + +[[package]] +name = "jni-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6377a88cb3910bee9b0fa88d4f42e1d2da8e79915598f65fb0c7ee14c878af2" +dependencies = [ + "jni-sys-macros", +] + +[[package]] +name = "jni-sys-macros" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38c0b942f458fe50cdac086d2f946512305e5631e720728f2a61aabcd47a6264" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "jobserver" version = "0.1.34" @@ -3058,6 +3100,7 @@ dependencies = [ "dirs", "duct", "dunce", + "eventsource-stream", "fancy-regex", "futures-util", "fuzzy-matcher", @@ -3086,8 +3129,7 @@ dependencies = [ "rand 0.10.1", "rayon", "reedline", - "reqwest 0.12.28", - "reqwest-eventsource", + "reqwest 0.13.3", "rmcp", "rust-embed", "scraper", @@ -4166,6 +4208,7 @@ version = "0.11.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" dependencies = [ + "aws-lc-rs", "bytes", "getrandom 0.3.4", "lru-slab", @@ -4419,7 +4462,6 @@ dependencies = [ "native-tls", "percent-encoding", "pin-project-lite", - "quinn", "rustls 0.23.39", "rustls-native-certs", "rustls-pki-types", @@ -4439,14 +4481,13 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams 0.4.2", "web-sys", - "webpki-roots", ] [[package]] name = "reqwest" -version = "0.13.2" +version = "0.13.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab3f43e3283ab1488b624b44b0e988d0acea0b3214e694730a055cb6b2efa801" +checksum = "62e0021ea2c22aed41653bc7e1419abb2c97e038ff2c33d0e1309e49a97deec0" dependencies = [ "base64", "bytes", @@ -4456,19 +4497,26 @@ dependencies = [ "http-body 1.0.1", "http-body-util", "hyper 1.9.0", + "hyper-rustls 0.27.9", "hyper-tls", "hyper-util", "js-sys", "log", + "mime_guess", "native-tls", "percent-encoding", "pin-project-lite", + "quinn", + "rustls 0.23.39", "rustls-pki-types", + "rustls-platform-verifier", "serde", "serde_json", + "serde_urlencoded", "sync_wrapper", "tokio", "tokio-native-tls", + "tokio-rustls 0.26.4", "tokio-util", "tower", "tower-http", @@ -4480,22 +4528,6 @@ dependencies = [ "web-sys", ] -[[package]] -name = "reqwest-eventsource" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "632c55746dbb44275691640e7b40c907c16a2dc1a5842aa98aaec90da6ec6bde" -dependencies = [ - "eventsource-stream", - "futures-core", - "futures-timer", - "mime", - "nom 7.1.3", - "pin-project-lite", - "reqwest 0.12.28", - "thiserror 1.0.69", -] - [[package]] name = "rgb" version = "0.8.53" @@ -4533,7 +4565,7 @@ dependencies = [ "pastey", "pin-project-lite", "process-wrap", - "reqwest 0.13.2", + "reqwest 0.13.3", "rmcp-macros", "schemars 1.2.1", "serde", @@ -4685,7 +4717,6 @@ dependencies = [ "aws-lc-rs", "log", "once_cell", - "ring", "rustls-pki-types", "rustls-webpki 0.103.13", "subtle", @@ -4714,6 +4745,33 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-platform-verifier" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d1e2536ce4f35f4846aa13bff16bd0ff40157cdb14cc056c7b14ba41233ba0" +dependencies = [ + "core-foundation", + "core-foundation-sys", + "jni", + "log", + "once_cell", + "rustls 0.23.39", + "rustls-native-certs", + "rustls-platform-verifier-android", + "rustls-webpki 0.103.13", + "security-framework", + "security-framework-sys", + "webpki-root-certs", + "windows-sys 0.61.2", +] + +[[package]] +name = "rustls-platform-verifier-android" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" + [[package]] name = "rustls-webpki" version = "0.101.7" @@ -5195,6 +5253,22 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "703d5c7ef118737c72f1af64ad2f6f8c5e1921f818cdcb97b8fe6fc69bf66214" +[[package]] +name = "simd_cesu8" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94f90157bb87cddf702797c5dadfa0be7d266cdf49e22da2fcaa32eff75b2c33" +dependencies = [ + "rustc_version", + "simdutf8", +] + +[[package]] +name = "simdutf8" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" + [[package]] name = "simple_asn1" version = "0.6.4" @@ -6481,10 +6555,10 @@ dependencies = [ ] [[package]] -name = "webpki-roots" +name = "webpki-root-certs" version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52f5ee44c96cf55f1b349600768e3ece3a8f26010c05265ab73f945bb1a2eb9d" +checksum = "f31141ce3fc3e300ae89b78c0dd67f9708061d1d2eda54b8209346fd6be9a92c" dependencies = [ "rustls-pki-types", ] diff --git a/Cargo.toml b/Cargo.toml index 146010b..7f381b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,7 +51,7 @@ nu-ansi-term = "0.50.0" async-trait = "0.1.74" textwrap = "0.16.0" ansi_colours = "1.2.2" -reqwest-eventsource = "0.6.0" +eventsource-stream = "0.2.3" log = "0.4.28" log4rs = { version = "1.4.0", features = ["file_appender"] } shell-words = "1.1.0" @@ -109,13 +109,14 @@ rand = { version = "0.10.0", features = ["default"] } url = "2.5.8" [dependencies.reqwest] -version = "0.12.0" +version = "0.13.3" features = [ "json", "multipart", + "stream", + "form", "socks", - "rustls-tls", - "rustls-tls-native-roots", + "rustls", ] default-features = false diff --git a/src/client/stream.rs b/src/client/stream.rs index a14c461..41ba911 100644 --- a/src/client/stream.rs +++ b/src/client/stream.rs @@ -2,9 +2,9 @@ use super::{ToolCall, catch_error}; use crate::utils::AbortSignal; use anyhow::{Context, Result, anyhow, bail}; +use eventsource_stream::Eventsource; use futures_util::{Stream, StreamExt}; -use reqwest::RequestBuilder; -use reqwest_eventsource::{Error as EventSourceError, Event, RequestBuilderExt}; +use reqwest::{header, RequestBuilder}; use serde_json::Value; use tokio::sync::mpsc::UnboundedSender; @@ -193,11 +193,46 @@ pub async fn sse_stream(builder: RequestBuilder, mut handle: F) -> Result<()> where F: FnMut(SseMessage) -> Result, { - let mut es = builder.eventsource()?; + let res = builder + .header(header::ACCEPT, "text/event-stream") + .header(header::CACHE_CONTROL, "no-store") + .send() + .await?; + let status = res.status(); + if !status.is_success() { + let text = res.text().await?; + let data: Value = match text.parse() { + Ok(data) => data, + Err(_) => { + bail!( + "Invalid response data: {text} (status: {})", + status.as_u16() + ); + } + }; + catch_error(&data, status.as_u16())?; + return Ok(()); + } + + let content_type = res + .headers() + .get(header::CONTENT_TYPE) + .and_then(|value| value.to_str().ok()) + .map(|value| value.to_string()); + let is_event_stream = content_type + .as_deref() + .map(|ct| ct.starts_with("text/event-stream")) + .unwrap_or(false); + if !is_event_stream { + let header_value = content_type.unwrap_or_default(); + let text = res.text().await?; + bail!("Invalid response event-stream. content-type: {header_value}, data: {text}"); + } + + let mut es = res.bytes_stream().boxed().eventsource(); while let Some(event) = es.next().await { match event { - Ok(Event::Open) => {} - Ok(Event::Message(message)) => { + Ok(message) => { let message = SseMessage { event: message.event, data: message.data, @@ -207,33 +242,7 @@ where } } Err(err) => { - match err { - EventSourceError::StreamEnded => {} - EventSourceError::InvalidStatusCode(status, res) => { - let text = res.text().await?; - let data: Value = match text.parse() { - Ok(data) => data, - Err(_) => { - bail!( - "Invalid response data: {text} (status: {})", - status.as_u16() - ); - } - }; - catch_error(&data, status.as_u16())?; - } - EventSourceError::InvalidContentType(header_value, res) => { - let text = res.text().await?; - bail!( - "Invalid response event-stream. content-type: {}, data: {text}", - header_value.to_str().unwrap_or_default() - ); - } - _ => { - bail!("{}", err); - } - } - es.close(); + bail!("{err}"); } } } diff --git a/src/mcp/sse_transport.rs b/src/mcp/sse_transport.rs index 13ff192..db683bb 100644 --- a/src/mcp/sse_transport.rs +++ b/src/mcp/sse_transport.rs @@ -1,11 +1,11 @@ use anyhow::{Context, Result, anyhow}; +use eventsource_stream::{EventStream, Eventsource}; use fmt::{Display, Formatter}; use futures_util::StreamExt; use mpsc::error::SendError; use mpsc::{OwnedPermit, Receiver, Sender, channel}; use reqwest::Client; use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; -use reqwest_eventsource::{Event, EventSource}; use rmcp::model::{ClientJsonRpcMessage, ServerJsonRpcMessage}; use std::collections::HashMap; use std::error::Error; @@ -13,10 +13,14 @@ use std::fmt; use std::future::Future; use std::pin::Pin; use std::task::Poll; +use futures_util::stream::BoxStream; use tokio::sync::mpsc; use tokio::time::Duration; use url::Url; +type SseEventStream = + EventStream>>; + const CHANNEL_BUF: usize = 64; pub struct LegacySseTransport { @@ -47,8 +51,15 @@ impl LegacySseTransport { .build() .context("Failed to build HTTP client")?; - let request = client.get(sse_url); - let mut es = EventSource::new(request).context("Failed to open SSE connection")?; + let response = client + .get(sse_url) + .header(reqwest::header::ACCEPT, "text/event-stream") + .send() + .await + .context("Failed to open SSE connection")? + .error_for_status() + .context("SSE server returned an error status")?; + let mut es: SseEventStream = response.bytes_stream().boxed().eventsource(); let post_endpoint = wait_for_endpoint_event(&mut es, &base_url).await?; @@ -83,18 +94,17 @@ impl LegacySseTransport { } } -async fn wait_for_endpoint_event(es: &mut EventSource, base_url: &Url) -> Result { +async fn wait_for_endpoint_event(es: &mut SseEventStream, base_url: &Url) -> Result { let timeout = Duration::from_secs(30); tokio::time::timeout(timeout, async { while let Some(event) = es.next().await { match event { - Ok(Event::Open) => {} - Ok(Event::Message(msg)) if msg.event == "endpoint" => { + Ok(msg) if msg.event == "endpoint" => { let endpoint = msg.data.trim().to_string(); let resolved = resolve_endpoint(&endpoint, base_url)?; return Ok(resolved); } - Ok(Event::Message(_)) => {} + Ok(_) => {} Err(e) => { return Err(anyhow!( "SSE connection error while waiting for endpoint event: {e}" @@ -120,10 +130,10 @@ fn resolve_endpoint(endpoint: &str, base_url: &Url) -> Result { } } -async fn sse_reader_task(mut es: EventSource, tx: Sender) { +async fn sse_reader_task(mut es: SseEventStream, tx: Sender) { while let Some(event) = es.next().await { match event { - Ok(Event::Message(msg)) if msg.event == "message" => { + Ok(msg) if msg.event == "message" => { match serde_json::from_str::(&msg.data) { Ok(rpc_msg) => { if tx.send(rpc_msg).await.is_err() { @@ -136,14 +146,12 @@ async fn sse_reader_task(mut es: EventSource, tx: Sender) } } Ok(_) => {} - Err(reqwest_eventsource::Error::StreamEnded) => break, Err(e) => { error!("SSE stream error: {e}"); break; } } } - es.close(); } async fn post_writer_task(