diff --git a/Cargo.lock b/Cargo.lock index feafeec..0ab4f0a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -517,6 +517,14 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "future_stream" +version = "0.1.0" +dependencies = [ + "futures", + "tokio", +] + [[package]] name = "futures" version = "0.3.30" diff --git a/Cargo.toml b/Cargo.toml index f682855..d596c26 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "members/tokryon", "members/tokio-send-sync", "members/matchmatchmatch", + "members/future_stream", ] default-members = [ ".", @@ -28,6 +29,7 @@ default-members = [ "members/tokryon", "members/tokio-send-sync", "members/matchmatchmatch", + "members/future_stream", ] [workspace.dependencies] @@ -42,6 +44,7 @@ tokio = { version = "1.35.1", features = [ "time", "sync", ] } +futures = { version = "0.3.30", features = ["executor"] } [package] name = "rs-basic" diff --git a/members/future_stream/Cargo.toml b/members/future_stream/Cargo.toml new file mode 100644 index 0000000..67f1383 --- /dev/null +++ b/members/future_stream/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "future_stream" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tokio.workspace = true +futures.workspace = true diff --git a/members/future_stream/src/main.rs b/members/future_stream/src/main.rs new file mode 100644 index 0000000..3d313a2 --- /dev/null +++ b/members/future_stream/src/main.rs @@ -0,0 +1,34 @@ +// see https://stackoverflow.com/questions/70774671/tokioselect-but-for-a-vec-of-futures +use futures::{stream::FuturesUnordered, StreamExt}; +use std::time::Duration; +use tokio::time::{sleep, Instant}; + +async fn wait(millis: u64) -> u64 { + sleep(Duration::from_millis(millis)).await; + millis +} + +#[tokio::main] +async fn main() { + let mut futures = FuturesUnordered::new(); + futures.push(wait(500)); + futures.push(wait(300)); + futures.push(wait(100)); + futures.push(wait(200)); + + let start_time = Instant::now(); + + let mut num_added = 0; + while let Some(wait_time) = futures.next().await { + println!("Waited {}ms", wait_time); + if num_added < 3 { + num_added += 1; + futures.push(wait(200)); + } + } + + println!( + "Completed all work in {}ms", + start_time.elapsed().as_millis() + ); +}