//! Synchronous combinator extensions to futures::Stream use futures::{ future::{ready, Ready}, stream::{Any, Filter, FilterMap, Fold, ForEach, Scan, SkipWhile, Stream, StreamExt, TakeWhile}, }; /// Synchronous combinators to augment futures::StreamExt. Most Stream /// combinators take asynchronous arguments, but often only simple predicates /// are required to steer a Stream like an Iterator. This suite provides a /// convenience to reduce boilerplate by de-cluttering non-async predicates. /// /// This interface is not necessarily complete; feel free to add as-needed. pub trait ReadyExt where Self: Stream + Send + Sized, { fn ready_any(self, f: F) -> Any, impl FnMut(Item) -> Ready> where F: Fn(Item) -> bool; fn ready_filter<'a, F>(self, f: F) -> Filter, impl FnMut(&Item) -> Ready + 'a> where F: Fn(&Item) -> bool + 'a; fn ready_filter_map(self, f: F) -> FilterMap>, impl FnMut(Item) -> Ready>> where F: Fn(Item) -> Option; fn ready_fold(self, init: T, f: F) -> Fold, T, impl FnMut(T, Item) -> Ready> where F: Fn(T, Item) -> T; fn ready_for_each(self, f: F) -> ForEach, impl FnMut(Item) -> Ready<()>> where F: FnMut(Item); fn ready_take_while<'a, F>(self, f: F) -> TakeWhile, impl FnMut(&Item) -> Ready + 'a> where F: Fn(&Item) -> bool + 'a; fn ready_scan( self, init: T, f: F, ) -> Scan>, impl FnMut(&mut T, Item) -> Ready>> where F: Fn(&mut T, Item) -> Option; fn ready_scan_each( self, init: T, f: F, ) -> Scan>, impl FnMut(&mut T, Item) -> Ready>> where F: Fn(&mut T, &Item); fn ready_skip_while<'a, F>(self, f: F) -> SkipWhile, impl FnMut(&Item) -> Ready + 'a> where F: Fn(&Item) -> bool + 'a; } impl ReadyExt for S where S: Stream + Send + Sized, { #[inline] fn ready_any(self, f: F) -> Any, impl FnMut(Item) -> Ready> where F: Fn(Item) -> bool, { self.any(move |t| ready(f(t))) } #[inline] fn ready_filter<'a, F>(self, f: F) -> Filter, impl FnMut(&Item) -> Ready + 'a> where F: Fn(&Item) -> bool + 'a, { self.filter(move |t| ready(f(t))) } #[inline] fn ready_filter_map(self, f: F) -> FilterMap>, impl FnMut(Item) -> Ready>> where F: Fn(Item) -> Option, { self.filter_map(move |t| ready(f(t))) } #[inline] fn ready_fold(self, init: T, f: F) -> Fold, T, impl FnMut(T, Item) -> Ready> where F: Fn(T, Item) -> T, { self.fold(init, move |a, t| ready(f(a, t))) } #[inline] #[allow(clippy::unit_arg)] fn ready_for_each(self, mut f: F) -> ForEach, impl FnMut(Item) -> Ready<()>> where F: FnMut(Item), { self.for_each(move |t| ready(f(t))) } #[inline] fn ready_take_while<'a, F>(self, f: F) -> TakeWhile, impl FnMut(&Item) -> Ready + 'a> where F: Fn(&Item) -> bool + 'a, { self.take_while(move |t| ready(f(t))) } #[inline] fn ready_scan( self, init: T, f: F, ) -> Scan>, impl FnMut(&mut T, Item) -> Ready>> where F: Fn(&mut T, Item) -> Option, { self.scan(init, move |s, t| ready(f(s, t))) } fn ready_scan_each( self, init: T, f: F, ) -> Scan>, impl FnMut(&mut T, Item) -> Ready>> where F: Fn(&mut T, &Item), { self.ready_scan(init, move |s, t| { f(s, &t); Some(t) }) } #[inline] fn ready_skip_while<'a, F>(self, f: F) -> SkipWhile, impl FnMut(&Item) -> Ready + 'a> where F: Fn(&Item) -> bool + 'a, { self.skip_while(move |t| ready(f(t))) } }