use futures_core::Stream; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::sync::mpsc; use tokio::sync::mpsc::{Receiver, Sender}; struct BoundedStream { recv: Receiver, } impl Stream for BoundedStream { type Item = T; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::into_inner(self).recv.poll_recv(cx) } } pub fn mpsc_channel_stream(size: usize) -> (Sender, impl Stream) { let (tx, rx) = mpsc::channel(size); let stream = BoundedStream { recv: rx }; (tx, stream) }