diff --git a/futures-util/src/stream/stream/find.rs b/futures-util/src/stream/stream/find.rs new file mode 100644 index 000000000..055847e94 --- /dev/null +++ b/futures-util/src/stream/stream/find.rs @@ -0,0 +1,101 @@ +use crate::fns::FnMut1; +use crate::Stream; + +use core::fmt; +use core::future::Future; +use core::pin::Pin; +use core::task::{ready, Context, Poll}; +use futures_core::FusedFuture; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`find`](super::StreamExt::find) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Find + where St: Stream, + { + #[pin] + stream: St, + f: F, + done: bool, + #[pin] + pending_fut: Option, + pending_item: Option, + } +} + +impl fmt::Debug for Find +where + St: Stream + fmt::Debug, + St::Item: fmt::Debug, + Fut: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Find") + .field("stream", &self.stream) + .field("done", &self.done) + .field("pending_fut", &self.pending_fut) + .field("pending_item", &self.pending_item) + .finish() + } +} + +impl Find +where + St: Stream, + F: for<'a> FnMut1<&'a St::Item, Output = Fut>, + Fut: Future, +{ + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f, done: false, pending_fut: None, pending_item: None } + } +} + +impl FusedFuture for Find +where + St: Stream, + F: FnMut(&St::Item) -> Fut, + Fut: Future, +{ + fn is_terminated(&self) -> bool { + self.done && self.pending_fut.is_none() + } +} + +impl Future for Find +where + St: futures_core::Stream, + F: for<'a> FnMut1<&'a St::Item, Output = Fut>, + Fut: Future, +{ + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + Poll::Ready(loop { + if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() { + // we're currently processing a future to produce a new value + let res = ready!(fut.poll(cx)); + this.pending_fut.set(None); + if res { + *this.done = true; + break this.pending_item.take(); + } + } else if !*this.done { + // we're waiting on a new item from the stream + match ready!(this.stream.as_mut().poll_next(cx)) { + Some(item) => { + this.pending_fut.set(Some(this.f.call_mut(&item))); + *this.pending_item = Some(item); + } + None => { + *this.done = true; + break None; + } + } + } else { + break None; + } + }) + } +} diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index ee30f8da6..2f19de62f 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -49,6 +49,9 @@ pub use self::filter::Filter; mod filter_map; pub use self::filter_map::FilterMap; +mod find; +use self::find::Find; + mod flatten; delegate_all!( @@ -672,6 +675,32 @@ pub trait StreamExt: Stream { assert_future::(Fold::new(self, f, init)) } + /// Returns the first element in the stream that matches the given asynchronous predicate. + /// + /// Searches through the stream and applies the provided asynchronous predicate to each element + /// until a match is found. Returns `Some(item)` with the matching element, or `None` if no + /// element matches the predicate. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::{future, stream::{self, StreamExt}}; + /// + /// let number_stream = stream::iter([1, 3, 6, 5, 9]); + /// let first_even = number_stream.find(|x| future::ready(x % 2 == 0)); + /// assert_eq!(first_even.await, Some(6)); + /// # }); + /// ``` + fn find(self, f: F) -> Find + where + Self: Sized, + F: FnMut(&Self::Item) -> Fut, + Fut: Future, + { + assert_future::, _>(Find::new(self, f)) + } + /// Execute predicate over asynchronous stream, and return `true` if any element in stream satisfied a predicate. /// /// # Examples