fix: bug found by copilot that would create a lock on the PollSender for sse-based MCP servers
This commit is contained in:
@@ -1,7 +1,7 @@
|
|||||||
use anyhow::{Context, Result, anyhow};
|
use anyhow::{Context, Result, anyhow};
|
||||||
use fmt::{Display, Formatter};
|
use fmt::{Display, Formatter};
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
use mpsc::{Receiver, Sender, channel};
|
use mpsc::{Receiver, Sender, channel, OwnedPermit};
|
||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
|
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
|
||||||
use reqwest_eventsource::{Event, EventSource};
|
use reqwest_eventsource::{Event, EventSource};
|
||||||
@@ -9,8 +9,10 @@ use rmcp::model::{ClientJsonRpcMessage, ServerJsonRpcMessage};
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
use std::future::Future;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::Poll;
|
use std::task::Poll;
|
||||||
|
use mpsc::error::SendError;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio::time::Duration;
|
use tokio::time::Duration;
|
||||||
use url::Url;
|
use url::Url;
|
||||||
@@ -73,6 +75,7 @@ impl LegacySseTransport {
|
|||||||
tx: PollSender {
|
tx: PollSender {
|
||||||
tx: self.tx,
|
tx: self.tx,
|
||||||
permit: None,
|
permit: None,
|
||||||
|
acquiring: None,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
SseStream { rx: self.rx },
|
SseStream { rx: self.rx },
|
||||||
@@ -231,9 +234,13 @@ impl Display for SseSinkError {
|
|||||||
|
|
||||||
impl Error for SseSinkError {}
|
impl Error for SseSinkError {}
|
||||||
|
|
||||||
|
type ReserveOwned<T> =
|
||||||
|
Pin<Box<dyn Future<Output = Result<OwnedPermit<T>, SendError<()>>> + Send>>;
|
||||||
|
|
||||||
struct PollSender<T> {
|
struct PollSender<T> {
|
||||||
tx: Sender<T>,
|
tx: Sender<T>,
|
||||||
permit: Option<mpsc::OwnedPermit<T>>,
|
permit: Option<OwnedPermit<T>>,
|
||||||
|
acquiring: Option<ReserveOwned<T>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Send + 'static> PollSender<T> {
|
impl<T: Send + 'static> PollSender<T> {
|
||||||
@@ -241,14 +248,21 @@ impl<T: Send + 'static> PollSender<T> {
|
|||||||
if self.permit.is_some() {
|
if self.permit.is_some() {
|
||||||
return Poll::Ready(Ok(()));
|
return Poll::Ready(Ok(()));
|
||||||
}
|
}
|
||||||
let tx = self.tx.clone();
|
|
||||||
let mut fut = Box::pin(tx.reserve_owned());
|
let fut = self
|
||||||
|
.acquiring
|
||||||
|
.get_or_insert_with(|| Box::pin(self.tx.clone().reserve_owned()));
|
||||||
|
|
||||||
match fut.as_mut().poll(cx) {
|
match fut.as_mut().poll(cx) {
|
||||||
Poll::Ready(Ok(permit)) => {
|
Poll::Ready(Ok(permit)) => {
|
||||||
|
self.acquiring = None;
|
||||||
self.permit = Some(permit);
|
self.permit = Some(permit);
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
Poll::Ready(Err(_)) => Poll::Ready(Err(SseSinkError::Closed)),
|
Poll::Ready(Err(_)) => {
|
||||||
|
self.acquiring = None;
|
||||||
|
Poll::Ready(Err(SseSinkError::Closed))
|
||||||
|
}
|
||||||
Poll::Pending => Poll::Pending,
|
Poll::Pending => Poll::Pending,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user