
Taming concurrency in rust whilst keeping footguns at bay
Running multiple futures concurrently is a task async Rust was quite literally built to excel at.
FuturesUnordered
is a great runtime-agnostic primitive for this purpose.
/// main.rs
use futures::{stream::FuturesUnordered, StreamExt};
#[tokio::main]
async fn main() {
let mut runner = FuturesUnordered::new();
runner.extend((0..10).map(task));
while runner.next().await.is_some() {}
}
async fn task(i: usize) {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
println!("{}", i);
}
/// 7
/// 0
/// 4
/// 2
/// 6
/// 8
/// 9
/// 1
/// 3
/// 5
/// Elapsed: 101ms
A common use case arises where the resource being used in parallel has limited availability, such as external APIs, database connections and event buses. In these cases, it’s beneficial to provide an upper limit to concurrency.
This articles iteratively builds out the basics of a reusable concurrency throttler that can share limits app-wide. Along the ay, it provides useful intuition and deeper understanding of async Rust and some common footguns that arise when configuring throttlers.
To start, we can implement a simple concurrency limit of 3
using FuturesUnordered
directly.
let mut runner = FuturesUnordered::new();
let mut results_buf = vec![];
for i in 0..10 {
runner.push(task(i));
if runner.len() >= 3 {
while let Some(result) = runner.next().await {
results_buf.push(result);
}
}
}
while let Some(result) = runner.next().await {
results_buf.push(result);
}
/// 0
/// 2
/// 1
/// 5
/// 3
/// 4
/// 8
/// 6
/// 7
/// 9
/// Elapsed: 408ms
While this works, it’s somewhat verbose and brittle for reuse.
Let’s create a wrapping ThrottledFuturesUnordered
type to abstract the behaviour.
We’ll:
- Internalise the underlying
FuturesUnordered
- Setup a
Semaphore
with the configuredmax_concurrency
instead of using the length ofFuturesUnordered
. Note this semaphore comes from tokio, but is under thesync
feature that is wasm-compatible. - Configure the
push()
method to acquire a semaphore permit and release one when a future is completed
/// throttled_futures_unordered.rs
use std::{future::Future, sync::Arc};
use futures::{stream::FuturesUnordered, StreamExt};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
pub struct ThrottledFuturesUnordered<Fut, R> {
stream: FuturesUnordered<Fut>,
semaphore: Arc<Semaphore>,
taken_permits: Vec<OwnedSemaphorePermit>,
_r: std::marker::PhantomData<R>,
}
impl<Fut, R> ThrottledFuturesUnordered<Fut, R>
where
Fut: Future<Output = R>,
{
pub fn new(max_concurrency: usize) -> Self {
Self {
stream: FuturesUnordered::new(),
semaphore: Arc::new(Semaphore::new(max_concurrency)),
taken_permits: Vec::new(),
_r: std::marker::PhantomData,
}
}
pub async fn push(&mut self, fut: Fut) {
let permit = self
.semaphore
.clone()
.acquire_owned()
.await
.expect("Sem closed");
self.taken_permits.push(permit);
self.stream.push(fut);
}
pub async fn extend(&mut self, iter: impl IntoIterator<Item = Fut>) {
for fut in iter {
self.push(fut).await;
}
}
pub async fn next(&mut self) -> Option<R> {
let result = self.stream.next().await;
let _ = self.taken_permits.pop();
result
}
}
Let’s try running it.
let mut runner = ThrottledFuturesUnordered::new(3);
runner.extend((0..10).map(task)).await;
while runner.next().await.is_some() {}
/// deadlock
We encounter our first footgun: FuturesUnordered
is single-threaded. It must be manually driven for it to keep processing the futures currently held in the stream. If we try to push()
more than the maximum before calling next()
, we’ll encounter a deadlock.
To fix the issue, we need to drive the stream whilst waiting for a permit, allowing permits to be released whilst waiting to acquire one. This is similar to throttling FuturesUnordered
directly by “driving” with next()
and adding results to a results_buf
upon reaching the limit before pushing any more futures.
/// throttled_futures_unordered.rs
use std::{future::Future, sync::Arc};
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
pub struct ThrottledFuturesUnordered<Fut, R> {
pub(super) stream: FuturesUnordered<Fut>,
pub(super) semaphore: Arc<Semaphore>,
pub(super) taken_permits: Vec<OwnedSemaphorePermit>,
pub(super) result_buf: Vec<R>,
pub(super) _r: std::marker::PhantomData<R>,
}
impl<Fut, R> ThrottledFuturesUnordered<Fut, R>
where
Fut: Future<Output = R>,
{
pub fn new(max_concurrency: usize) -> Self {
Self {
stream: FuturesUnordered::new(),
semaphore: Arc::new(Semaphore::new(max_concurrency)),
taken_permits: Vec::new(),
result_buf: Vec::new(),
_r: std::marker::PhantomData,
}
}
pub async fn push(&mut self, fut: Fut) {
let sem = self.semaphore.clone();
let permit = futures::select!(
permit = sem.acquire_owned().fuse() => permit,
_ = self.drive().fuse() => unreachable!()
)
.expect("Sem closed");
self.taken_permits.push(permit);
self.stream.push(fut);
}
pub async fn extend(&mut self, iter: impl IntoIterator<Item = Fut>) {
for fut in iter {
self.push(fut).await;
}
}
pub async fn next(&mut self) -> Option<R> {
if let Some(result) = self.result_buf.pop() {
Some(result)
} else {
let result = self.stream.next().await;
let _ = self.taken_permits.pop();
result
}
}
async fn drive(&mut self) {
while let Some(result) = self.stream.next().await {
self.result_buf.push(result);
let _ = self.taken_permits.pop();
}
// The name is somewhat confusing, but this call will sleep forever:
futures::future::pending().await
}
}
/// 0
/// 1
/// 2
/// 4
/// 3
/// 5
/// 6
/// 7
/// 8
/// 9
/// Elapsed: 408ms
We now have a simple replacement for FuturesUnordered
that internally restricts concurrency.
This abstraction allows us to think about and extend the throttler with more complex processing rules such as:
- Throughput throttling. E.g. initiating no more than one future every 30ms
- Prioritization, where higher priority futures are processed before lower priority ones
- Optional output ordering
- Multiple different futures using multiple internal
FuturesUnordered
instances to share limits across an entire application
We’ll focus on sharing limits across different futures and throughout an application.
The other potential constraints to add are left as an exercise for the reader. Be warned, additional constraints such as prioritization introduce new deadlock opportunities that will need consideration.
To allow cloning our throttler, we need a higher order throttler that contains our Semaphore
limiter and is agnostic to the specific Future
being run.
/// shared_throttled_futures_unordered.rs
use std::{future::Future, sync::Arc};
use futures::stream::FuturesUnordered;
use tokio::sync::Semaphore;
use crate::throttled_futures_unordered::ThrottledFuturesUnordered;
#[derive(Clone)]
pub struct SharedThrottledFuturesUnordered {
semaphore: Arc<Semaphore>,
}
impl SharedThrottledFuturesUnordered {
pub fn new(max_concurrency: usize) -> Self {
Self {
semaphore: Arc::new(Semaphore::new(max_concurrency)),
}
}
pub async fn run<R, Fut: Future<Output = R>>(&self, fut: Fut) -> R {
let mut runner = self.throttler();
runner.push(fut).await;
runner.next().await.unwrap()
}
pub fn throttler<R, Fut: Future<Output = R>>(&self) -> ThrottledFuturesUnordered<Fut, R> {
ThrottledFuturesUnordered {
stream: FuturesUnordered::new(),
semaphore: self.semaphore.clone(),
taken_permits: Vec::new(),
result_buf: Vec::new(),
_r: std::marker::PhantomData,
}
}
}
let shared = SharedThrottledFuturesUnordered::new(3);
let _ = futures::future::join_all((0..10).map(|i| {
let mut throttler = shared.throttler();
async move {
throttler.push(task(i)).await;
throttler.next().await.unwrap();
}
}))
.await;
/// 0
/// 1
/// 2
/// 3
/// 4
/// 5
/// 6
/// 7
/// 8
/// 9
/// Elapsed: 407ms
The example is contrived, but shows this new abstraction allows one throttler limit to be cloneable and used app-wide, with variable Future
types.
However, with the new abstraction we’ve introduced a new footgun: nested usage of the throttler. E.g. a throttled API function might call other throttled API functions internally.
let shared = SharedThrottledFuturesUnordered::new(3);
let _ = futures::future::join_all((0..5).map({
move |i1| {
let shared = shared.clone();
async move {
shared
.run({
let shared = shared.clone();
async move {
for i2 in 0..2 {
shared.run(task(i1 * 2 + i2)).await;
}
}
})
.await
}
}
}))
.await;
/// 0
/// 2
/// deadlock
This scenario can deadlock because all outer futures start, but the inner usage cannot continue without obtaining permits held by the outer calls.
To solve this we want to allow inner usage to “steal” their calling future’s permit. The parent’s permit should be treated as one “pre-acquired” permit for the duration of the future, preventing the need to take a second permit unnecessarily.
We can do this with tokio::task_local!()
and a custom future type that wraps the future being run.
The code becomes a bit more verbose but provides a robust solution:
- Create a task local that provides access to an
Arc
containing the parent’sParentPermitScope
, this contains aMutex<Option<PermitSpecialDrop>>
to the permit of the parent. - Create a custom
FutWrapper<Fut>
that also contains thisParentPermitScope
. Whenever this future is polled, it will inject it’sParentPermitScope
into the task local for the duration of the poll. - If the child future needs to acquire a permit, it first checks the task local and tries to steal the permit before attempting to get one from the general pool.
- The
PermitSpecialDrop
is a custom type that wraps the semaphore permit, but implements a custom drop that will “replenish” theParentPermitScope
if it currently containsNone
, therefore returning the permit to the parent once the future is complete and thisPermitSpecialDrop
is dropped. - Because a sibling child future may already be awaiting a permit when a child future returns the permit to the parent, a waiter channel is setup to instantly notify sibling waiters the parent permit is available again without the need for manual sleeps or polling.
- Because we might have multiple distinct shared throttlers and only task local variable, we’ll assign a random unique id to each shared throttler, and a hashmap in the task local from id to
ParentPermitScope
, to prevent cross-contamination.
After implementing this system, our previous nested example that deadlocked now successfully steals and replenishes the permit from it’s parent!
To see the full implementation, expand the sections below.
permit_scope.rs
/// permit_scope.rs
use std::sync::Arc;
use std::collections::{HashMap, VecDeque};
use tokio::sync::{oneshot, OwnedSemaphorePermit};
tokio::task_local! {
pub(super) static PARENT_PERMIT_SCOPE_LOOKUP: Arc<HashMap<u64, Arc<ParentPermitScope>>>;
}
pub(super) fn with_parent_permit_scope<T>(
runner_id: &u64,
f: impl FnOnce(&Arc<ParentPermitScope>) -> T,
) -> Option<T> {
PARENT_PERMIT_SCOPE_LOOKUP
.try_with(|parent_scope_lookup| parent_scope_lookup.get(runner_id).map(f))
.ok()
.flatten()
}
#[derive(Debug)]
pub(super) struct ParentPermitScope {
maybe_stored_parent_permit: parking_lot::Mutex<Option<PermitSpecialDrop>>,
permit_waiters: parking_lot::Mutex<VecDeque<oneshot::Sender<PermitSpecialDrop>>>,
}
impl ParentPermitScope {
pub fn new(maybe_stored_parent_permit: parking_lot::Mutex<Option<PermitSpecialDrop>>) -> Self {
Self {
maybe_stored_parent_permit,
permit_waiters: parking_lot::Mutex::new(VecDeque::new()),
}
}
pub fn try_get_parent_permit(&self) -> Option<PermitSpecialDrop> {
self.maybe_stored_parent_permit.lock().take()
}
pub async fn wait_for_parent_permit(&self) -> PermitSpecialDrop {
let (tx, rx) = oneshot::channel();
self.permit_waiters.lock().push_back(tx);
rx.await
.expect("Sender dropped before parent permit was sent")
}
}
#[derive(Debug)]
pub(super) struct PermitSpecialDrop {
permit: Option<OwnedSemaphorePermit>,
runner_id: u64,
}
impl PermitSpecialDrop {
pub fn new(runner_id: u64, permit: OwnedSemaphorePermit) -> Self {
Self {
permit: Some(permit),
runner_id,
}
}
}
impl Drop for PermitSpecialDrop {
fn drop(&mut self) {
if let Some(permit) = self.permit.take() {
with_parent_permit_scope(&self.runner_id, |parent_scope| {
// Place the permit back in the parent if it was missing, otherwise drop.
let mut permit = PermitSpecialDrop::new(self.runner_id, permit);
// First see if we can send to a waiter instead of storing or dropping:
let permit_waiters = &mut parent_scope.permit_waiters.lock();
while let Some(waiter) = permit_waiters.pop_front() {
match waiter.send(permit) {
Ok(()) => return,
Err(returned_permit) => permit = returned_permit,
}
}
// Otherwise, store it for later.
let mut guard = parent_scope.maybe_stored_parent_permit.lock();
if guard.is_none() {
*guard = Some(permit);
return;
}
// If we couldn't store it, drop it.
// we have to first take the inner semaphore permit out, so our drop implementation doesn't recurse
permit.permit.take();
});
}
}
}
fut_wrapper.rs
/// fut_wrapper.rs
use std::{collections::HashMap, future::Future, sync::Arc};
use crate::permit_scope::{ParentPermitScope, PARENT_PERMIT_SCOPE_LOOKUP};
pin_project_lite::pin_project! {
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[project = FutWrapperProj]
pub enum FutWrapper<T> {
InProgress {
#[pin]
future: T,
runner_id: u64,
permit_scope_lookup: Option<Arc<HashMap<u64, Arc<ParentPermitScope>>>>,
},
Finished {
output: T
}
}
}
impl<T> Future for FutWrapper<T>
where
T: Future,
{
type Output = FutWrapper<T::Output>;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut futures::task::Context<'_>,
) -> futures::task::Poll<Self::Output> {
match self.project() {
FutWrapperProj::InProgress {
future,
permit_scope_lookup,
runner_id: _runner_id,
..
} => {
let result = if let Some(scope_lookup) = &permit_scope_lookup {
// Provide the scope to the future for the duration of the poll:
PARENT_PERMIT_SCOPE_LOOKUP.sync_scope(scope_lookup.clone(), || future.poll(cx))
} else {
future.poll(cx)
};
match result {
std::task::Poll::Ready(output) => {
std::task::Poll::Ready(FutWrapper::Finished { output })
}
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
FutWrapperProj::Finished { .. } => unreachable!(),
}
}
}
throttled_futures_unordered.rs
/// throttled_futures_unordered.rs
use std::{collections::HashMap, future::Future, sync::Arc};
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use tokio::sync::Semaphore;
use crate::{
fut_wrapper::FutWrapper,
permit_scope::{
with_parent_permit_scope, ParentPermitScope, PermitSpecialDrop, PARENT_PERMIT_SCOPE_LOOKUP,
},
};
pub struct ThrottledFuturesUnordered<Fut, R> {
pub(super) stream: FuturesUnordered<FutWrapper<Fut>>,
pub(super) semaphore: Arc<Semaphore>,
pub(super) result_buf: Vec<R>,
pub(super) runner_id: u64,
pub(super) _r: std::marker::PhantomData<R>,
}
impl<Fut, R> ThrottledFuturesUnordered<Fut, R>
where
Fut: Future<Output = R>,
{
pub async fn push(&mut self, future: Fut) {
let sem = self.semaphore.clone();
enum ParentPermitOrFut<Fut: Future<Output = PermitSpecialDrop>> {
Permit(PermitSpecialDrop),
Fut(Fut),
}
let parent_permit_fut = match with_parent_permit_scope(&self.runner_id, |scope| {
let scope = scope.clone();
scope
.try_get_parent_permit()
.map(ParentPermitOrFut::Permit)
.unwrap_or_else(move || {
ParentPermitOrFut::Fut(async move { scope.wait_for_parent_permit().await })
})
}) {
Some(ParentPermitOrFut::Permit(permit)) => Some(async move { permit }.left_future()),
Some(ParentPermitOrFut::Fut(parent_permit_fut)) => {
Some(parent_permit_fut.right_future())
}
None => None,
};
let permit = futures::select_biased!(
parent_permit = async move {
if let Some(fut) = parent_permit_fut {
fut.await
} else {
futures::future::pending().await
}
}.fuse() => parent_permit,
permit = sem.acquire_owned().fuse() => PermitSpecialDrop::new(self.runner_id, permit.expect("Sem closed")),
_ = self.drive().fuse() => unreachable!()
);
self.stream.push(FutWrapper::InProgress {
runner_id: self.runner_id,
future,
permit_scope_lookup: Some(Arc::new({
let mut map = PARENT_PERMIT_SCOPE_LOOKUP
.try_with(|lookup| {
lookup
.iter()
.map(|(k, v)| (*k, v.clone()))
.collect::<HashMap<_, _>>()
})
.unwrap_or_default();
map.insert(
self.runner_id,
Arc::new(ParentPermitScope::new(parking_lot::Mutex::new(Some(
permit,
)))),
);
map
})),
});
}
pub async fn extend(&mut self, iter: impl IntoIterator<Item = Fut>) {
for fut in iter {
self.push(fut).await;
}
}
pub async fn next(&mut self) -> Option<R> {
if let Some(result) = self.result_buf.pop() {
Some(result)
} else {
let result = self.stream.next().await;
if let Some(FutWrapper::Finished { output }) = result {
Some(output)
} else {
None
}
}
}
async fn drive(&mut self) {
while let Some(FutWrapper::Finished { output }) = self.stream.next().await {
self.result_buf.push(output);
}
// This will sleep forever:
futures::future::pending().await
}
}
shared_throttled_futures_unordered.rs
/// shared_throttled_futures_unordered.rs
use std::{
future::Future,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use futures::stream::FuturesUnordered;
use tokio::sync::Semaphore;
use crate::throttled_futures_unordered::ThrottledFuturesUnordered;
#[derive(Clone)]
pub struct SharedThrottledFuturesUnordered {
semaphore: Arc<Semaphore>,
runner_id: u64,
}
impl SharedThrottledFuturesUnordered {
pub fn new(max_concurrency: usize) -> Self {
static RUNNER_ID_GENERATOR: AtomicU64 = AtomicU64::new(0);
Self {
semaphore: Arc::new(Semaphore::new(max_concurrency)),
runner_id: RUNNER_ID_GENERATOR.fetch_add(1, Ordering::Relaxed),
}
}
pub async fn run<R, Fut: Future<Output = R>>(&self, fut: Fut) -> R {
let mut runner = self.throttler();
runner.push(fut).await;
runner.next().await.unwrap()
}
pub fn throttler<R, Fut: Future<Output = R>>(&self) -> ThrottledFuturesUnordered<Fut, R> {
ThrottledFuturesUnordered {
stream: FuturesUnordered::new(),
semaphore: self.semaphore.clone(),
result_buf: Vec::new(),
runner_id: self.runner_id,
_r: std::marker::PhantomData,
}
}
}
Cargo dependencies
[dependencies]
pin-project-lite = "0.2"
parking_lot = "0.12"
futures = "0.3"
tokio = { version = "1", features = ["sync", "rt"] } # both these features are wasm-compatible
This implementation provides a basic throttled, sharable runner that covers the most significant footguns introduced when abstracting FuturesUnordered
, can be extended with further configuration and constraints whilst providing valuable insight into async Rust.