Skip to content

Instantly share code, notes, and snippets.

@fancellu
Last active March 2, 2024 12:03
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fancellu/2d208b31b857aab89afd154e49518654 to your computer and use it in GitHub Desktop.
Save fancellu/2d208b31b857aab89afd154e49518654 to your computer and use it in GitHub Desktop.
Rust tokio barrier demo
use std::sync::Arc;
use tokio::sync::Barrier;
use tokio::sync::BarrierWaitResult;
use tokio::sync::Notify;
use tokio::time::sleep;
use tokio::time::Duration;
// Simulating the filling of boxes with 5 cans, from many tasks
async fn barrier_wait(barrier: Arc<Barrier>, notify: Arc<Notify>, id: usize) -> BarrierWaitResult {
println!("barrier wait {}", id);
// This will wait until at least total_needed cans are waiting on this barrier
let wait_result = barrier.wait().await;
// Note notify_one leader will be first to say this
println!("barrier passed through {}", id);
if wait_result.is_leader() {
println!("\tnotify_one {}", id);
// This lets the main loop continue onwards
notify.notify_one();
}
wait_result
}
#[tokio::main]
async fn main() {
// 5 cans in a box
let total_needed = 5;
// Needs to be multiple of total_needed
// Else we will end up waiting forever for last box to fill!
let total_cans = total_needed * 4;
let barrier = Arc::new(Barrier::new(total_needed));
let notify = Arc::new(Notify::new());
// To send off first batch
// Without this we will throttle when id==0, and never get to complete
// As we don't have any tasks yet!
notify.notify_one();
let mut tasks = Vec::new();
for id in 0..total_cans {
// Every total_needed cans we wait to be notified via notify_one
// This is how we throttle
if id % total_needed == 0 {
println!("waiting for notify, throttling");
notify.notified().await;
println!("got notify");
}
let barrier = barrier.clone();
let notify = notify.clone();
sleep(Duration::from_millis(100)).await;
tasks.push(tokio::spawn(async move {
barrier_wait(barrier, notify, id).await
}));
}
println!("waiting for all {} tasks to end", tasks.len());
// wait for all tasks to complete
for task in tasks {
task.await.unwrap();
}
}
@fancellu
Copy link
Author

fancellu commented Feb 26, 2024

waiting for notify, throttling
got notify
barrier wait 0
barrier wait 1
barrier wait 2
barrier wait 3
waiting for notify, throttling
barrier wait 4
barrier passed through 4
notify_one 4
barrier passed through 2
barrier passed through 0
barrier passed through 1
barrier passed through 3
got notify
barrier wait 5
barrier wait 6
barrier wait 7
barrier wait 8
waiting for notify, throttling
barrier wait 9
barrier passed through 9
notify_one 9
barrier passed through 8
barrier passed through 7
barrier passed through 5
barrier passed through 6
got notify
barrier wait 10
barrier wait 11
barrier wait 12
barrier wait 13
waiting for notify, throttling
barrier wait 14
barrier passed through 14
notify_one 14
barrier passed through 12
barrier passed through 13
barrier passed through 11
barrier passed through 10
got notify
barrier wait 15
barrier wait 16
barrier wait 17
barrier wait 18
waiting for all 20 tasks to end
barrier wait 19
barrier passed through 19
notify_one 19
barrier passed through 16
barrier passed through 15
barrier passed through 17
barrier passed through 18

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment