Skip to content

Commit 3ffd896

Browse files
committed
Expose stand-alone poll_stream() method instead
1 parent 603637e commit 3ffd896

File tree

3 files changed

+148
-96
lines changed

3 files changed

+148
-96
lines changed

futures-util/src/stream/futures_ordered.rs

Lines changed: 78 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use crate::future::FutureExt;
21
use crate::stream::{FuturesUnordered, StreamExt};
32
use alloc::collections::binary_heap::{BinaryHeap, PeekMut};
43
use core::cmp::Ordering;
@@ -169,6 +168,68 @@ impl<Fut: Future> FuturesOrdered<Fut> {
169168
self.in_progress_queue.push(wrapped);
170169
}
171170
}
171+
172+
/// Poll the contained futures in this queue.
173+
///
174+
/// `Stream::poll_next` must be used in order to retrieve the outputs.
175+
///
176+
/// `Poll::Ready` indicates that the underlying futures are all completed.
177+
pub fn poll_all(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
178+
let this = &mut *self;
179+
180+
loop {
181+
match ready!(this.in_progress_queue.poll_next_unpin(cx)) {
182+
Some(output) => {
183+
let index = output.index;
184+
this.queued_outputs.push(output);
185+
if index == this.next_outgoing_index {
186+
// the Stream is now ready to be polled
187+
cx.waker().wake_by_ref();
188+
break Poll::Pending;
189+
}
190+
}
191+
None => break Poll::Ready(()),
192+
}
193+
}
194+
}
195+
196+
/// `Some` if an output is immediately available
197+
fn peek_wrapper(&self) -> Option<()> {
198+
match self.queued_outputs.peek() {
199+
Some(next_output) if next_output.index == self.next_outgoing_index => Some(()),
200+
_ => None,
201+
}
202+
}
203+
204+
/// `None` if `Stream::is_terminated`
205+
fn poll_peek_wrapper(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
206+
let res = self.as_mut().poll_all(cx);
207+
let peek = self.peek_wrapper();
208+
209+
match res {
210+
Poll::Pending => match peek {
211+
None => Poll::Pending,
212+
output => Poll::Ready(output),
213+
},
214+
Poll::Ready(()) => Poll::Ready(peek),
215+
}
216+
}
217+
218+
/// WIP: if a value is immediately available, borrow it
219+
pub fn peek<'a>(&'a self) -> Option<&'a Fut::Output> {
220+
let peek = self.peek_wrapper().map(drop);
221+
peek.and_then(move |_| self.queued_outputs.peek()).map(|wrapper| &wrapper.data)
222+
}
223+
224+
/// WIP: `None` indicates the end of the stream
225+
pub fn poll_peek<'a>(
226+
mut self: Pin<&'a mut Self>,
227+
cx: &mut Context<'_>,
228+
) -> Poll<Option<&'a Fut::Output>> {
229+
let peek = ready!(self.as_mut().poll_peek_wrapper(cx));
230+
let this = self.get_mut();
231+
Poll::Ready(peek.and_then(move |_| this.queued_outputs.peek()).map(|wrapper| &wrapper.data))
232+
}
172233
}
173234

174235
impl<Fut: Future> Default for FuturesOrdered<Fut> {
@@ -180,18 +241,25 @@ impl<Fut: Future> Default for FuturesOrdered<Fut> {
180241
impl<Fut: Future> Stream for FuturesOrdered<Fut> {
181242
type Item = Fut::Output;
182243

183-
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
184-
let this = &mut *self;
244+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
245+
let this = self.get_mut();
185246

186247
// Check to see if we've already received the next value
187-
if let Some(next_output) = this.queued_outputs.peek_mut() {
188-
if next_output.index == this.next_outgoing_index {
189-
this.next_outgoing_index += 1;
190-
return Poll::Ready(Some(PeekMut::pop(next_output).data));
191-
}
192-
}
248+
let next_output = match this.peek_wrapper() {
249+
Some(_) => this.queued_outputs.peek_mut(),
250+
// otherwise poll for it
251+
None => match ready!(Pin::new(&mut *this).poll_peek_wrapper(cx)) {
252+
Some(_) => this.queued_outputs.peek_mut(),
253+
None => None,
254+
},
255+
};
193256

194-
this.poll_unpin(cx).map(|()| None)
257+
if let Some(next_output) = next_output {
258+
this.next_outgoing_index += 1;
259+
Poll::Ready(Some(PeekMut::pop(next_output).data))
260+
} else {
261+
Poll::Ready(None)
262+
}
195263
}
196264

197265
fn size_hint(&self) -> (usize, Option<usize>) {
@@ -200,29 +268,6 @@ impl<Fut: Future> Stream for FuturesOrdered<Fut> {
200268
}
201269
}
202270

203-
impl<Fut: Future> Future for FuturesOrdered<Fut> {
204-
type Output = ();
205-
206-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
207-
let this = &mut *self;
208-
209-
loop {
210-
match ready!(this.in_progress_queue.poll_next_unpin(cx)) {
211-
Some(output) => {
212-
let index = output.index;
213-
this.queued_outputs.push(output);
214-
if index == this.next_outgoing_index {
215-
// the Stream is now ready to be polled
216-
cx.waker().wake_by_ref();
217-
break Poll::Pending;
218-
}
219-
}
220-
None => break Poll::Ready(()),
221-
}
222-
}
223-
}
224-
}
225-
226271
impl<Fut: Future> Debug for FuturesOrdered<Fut> {
227272
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228273
write!(f, "FuturesOrdered {{ ... }}")

futures-util/src/stream/stream/buffer_unordered.rs

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,32 @@ where
5252
}
5353

5454
delegate_access_inner!(stream, St, (.));
55+
56+
/// Poll the underlying `Stream`, allowing the buffer to fill up.
57+
///
58+
/// This does not poll the futures produced by the stream,
59+
/// `Stream::poll_next` should be used to progress instead.
60+
///
61+
/// When `Poll::Ready` is returned, the underlying stream has been
62+
/// exhausted, and all of its futures have been buffered or consumed.
63+
pub fn poll_stream(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
64+
let mut this = self.project();
65+
66+
// First up, try to spawn off as many futures as possible by filling up
67+
// our queue of futures.
68+
while this.max.map(|max| this.in_progress_queue.len() < max.get()).unwrap_or(true) {
69+
match this.stream.as_mut().poll_next(cx) {
70+
Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut),
71+
Poll::Ready(None) | Poll::Pending => break,
72+
}
73+
}
74+
75+
if this.stream.is_done() {
76+
Poll::Ready(())
77+
} else {
78+
Poll::Pending
79+
}
80+
}
5581
}
5682

5783
impl<St> Stream for BufferUnordered<St>
@@ -62,7 +88,7 @@ where
6288
type Item = <St::Item as Future>::Output;
6389

6490
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
65-
let stream_res = self.as_mut().poll(cx);
91+
let stream_res = self.as_mut().poll_stream(cx);
6692

6793
let this = self.project();
6894

@@ -99,33 +125,6 @@ where
99125
}
100126
}
101127

102-
impl<St> Future for BufferUnordered<St>
103-
where
104-
St: Stream,
105-
St::Item: Future,
106-
{
107-
type Output = ();
108-
109-
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
110-
let mut this = self.project();
111-
112-
// First up, try to spawn off as many futures as possible by filling up
113-
// our queue of futures.
114-
while this.max.map(|max| this.in_progress_queue.len() < max.get()).unwrap_or(true) {
115-
match this.stream.as_mut().poll_next(cx) {
116-
Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut),
117-
Poll::Ready(None) | Poll::Pending => break,
118-
}
119-
}
120-
121-
if this.stream.is_done() {
122-
Poll::Ready(())
123-
} else {
124-
Poll::Pending
125-
}
126-
}
127-
}
128-
129128
// Forwarding impl of Sink from the underlying stream
130129
#[cfg(feature = "sink")]
131130
impl<S, Item> Sink<Item> for BufferUnordered<S>

futures-util/src/stream/stream/buffered.rs

Lines changed: 43 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use crate::future::FutureExt;
21
use crate::stream::{Fuse, FuturesOrdered, StreamExt};
32
use core::fmt;
43
use core::num::NonZeroUsize;
@@ -54,6 +53,42 @@ where
5453
}
5554

5655
delegate_access_inner!(stream, St, (.));
56+
57+
/// Fill the buffer as much as is allowed by polling the underlying `Stream`.
58+
///
59+
/// Returns `false` if there are no more futures and the `Stream::is_terminated()`.
60+
/// Otherwise there may be more futures, but the buffer is out of room.
61+
fn poll_fill(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> {
62+
let mut this = self.project();
63+
64+
while this.max.map(|max| this.in_progress_queue.len() < max.get()).unwrap_or(true) {
65+
match ready!(this.stream.as_mut().poll_next(cx)) {
66+
Some(fut) => this.in_progress_queue.push_back(fut),
67+
None => break,
68+
}
69+
}
70+
71+
Poll::Ready(!this.stream.is_done())
72+
}
73+
74+
/// Poll the buffered `Stream`, allowing it to progress as long as there is
75+
/// room in the buffer.
76+
///
77+
/// This will also poll any futures produced by the stream, but only polls
78+
/// the underlying `Stream` as long as the buffer can hold more entries.
79+
/// `Stream::poll_next` should be used to progress to completion.
80+
///
81+
/// When `Poll::Ready` is returned, the underlying stream has been
82+
/// exhausted, and all of its futures have been run to completion.
83+
pub fn poll_stream(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
84+
let stream_res = self.as_mut().poll_fill(cx);
85+
86+
let this = self.project();
87+
let queue_res = Pin::new(&mut *this.in_progress_queue).poll_all(cx);
88+
89+
ready!(stream_res);
90+
queue_res
91+
}
5792
}
5893

5994
impl<St> Stream for Buffered<St>
@@ -64,7 +99,9 @@ where
6499
type Item = <St::Item as Future>::Output;
65100

66101
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67-
let _stream_res = self.as_mut().poll(cx);
102+
// First up, try to spawn off as many futures as possible by filling up
103+
// our queue of futures.
104+
let stream_res = self.as_mut().poll_fill(cx);
68105

69106
let this = self.project();
70107

@@ -75,10 +112,10 @@ where
75112
}
76113

77114
// If more values are still coming from the stream, we're not done yet
78-
if this.stream.is_done() {
79-
Poll::Ready(None)
80-
} else {
81-
Poll::Pending
115+
match stream_res {
116+
Poll::Pending => Poll::Pending,
117+
Poll::Ready(false) => Poll::Ready(None),
118+
Poll::Ready(true) => panic!("buffer is full, but we have no values???"),
82119
}
83120
}
84121

@@ -94,35 +131,6 @@ where
94131
}
95132
}
96133

97-
impl<St> Future for Buffered<St>
98-
where
99-
St: Stream,
100-
St::Item: Future,
101-
{
102-
type Output = ();
103-
104-
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
105-
let mut this = self.project();
106-
107-
// First up, try to spawn off as many futures as possible by filling up
108-
// our queue of futures.
109-
while this.max.map(|max| this.in_progress_queue.len() < max.get()).unwrap_or(true) {
110-
match this.stream.as_mut().poll_next(cx) {
111-
Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut),
112-
Poll::Ready(None) | Poll::Pending => break,
113-
}
114-
}
115-
116-
let queue_res = this.in_progress_queue.poll_unpin(cx);
117-
118-
if this.stream.is_done() {
119-
queue_res
120-
} else {
121-
Poll::Pending
122-
}
123-
}
124-
}
125-
126134
// Forwarding impl of Sink from the underlying stream
127135
#[cfg(feature = "sink")]
128136
impl<S, Item> Sink<Item> for Buffered<S>

0 commit comments

Comments
 (0)