ForgeStream

Taming concurrency in rust whilst keeping footguns at bay


Running multiple futures concurrently is a task async Rust was quite literally built to excel at.

FuturesUnordered is a great runtime-agnostic primitive for this purpose.

/// main.rs
use futures::{stream::FuturesUnordered, StreamExt};

#[tokio::main]
async fn main() {
    let mut runner = FuturesUnordered::new();
    runner.extend((0..10).map(task));
    while runner.next().await.is_some() {}
}

async fn task(i: usize) {
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    println!("{}", i);
}

/// 7
/// 0
/// 4
/// 2
/// 6
/// 8
/// 9
/// 1
/// 3
/// 5
/// Elapsed: 101ms

A common use case arises where the resource being used in parallel has limited availability, such as external APIs, database connections and event buses. In these cases, it’s beneficial to provide an upper limit to concurrency.

This articles iteratively builds out the basics of a reusable concurrency throttler that can share limits app-wide. Along the ay, it provides useful intuition and deeper understanding of async Rust and some common footguns that arise when configuring throttlers.

To start, we can implement a simple concurrency limit of 3 using FuturesUnordered directly.

let mut runner = FuturesUnordered::new();
let mut results_buf = vec![];
for i in 0..10 {
    runner.push(task(i));
    if runner.len() >= 3 {
        while let Some(result) = runner.next().await {
            results_buf.push(result);
        }
    }
}
while let Some(result) = runner.next().await {
    results_buf.push(result);
}

/// 0
/// 2
/// 1
/// 5
/// 3
/// 4
/// 8
/// 6
/// 7
/// 9
/// Elapsed: 408ms

While this works, it’s somewhat verbose and brittle for reuse.

Let’s create a wrapping ThrottledFuturesUnordered type to abstract the behaviour.

We’ll:

/// throttled_futures_unordered.rs
use std::{future::Future, sync::Arc};

use futures::{stream::FuturesUnordered, StreamExt};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};

pub struct ThrottledFuturesUnordered<Fut, R> {
    stream: FuturesUnordered<Fut>,
    semaphore: Arc<Semaphore>,
    taken_permits: Vec<OwnedSemaphorePermit>,
    _r: std::marker::PhantomData<R>,
}

impl<Fut, R> ThrottledFuturesUnordered<Fut, R>
where
    Fut: Future<Output = R>,
{
    pub fn new(max_concurrency: usize) -> Self {
        Self {
            stream: FuturesUnordered::new(),
            semaphore: Arc::new(Semaphore::new(max_concurrency)),
            taken_permits: Vec::new(),
            _r: std::marker::PhantomData,
        }
    }

    pub async fn push(&mut self, fut: Fut) {
        let permit = self
            .semaphore
            .clone()
            .acquire_owned()
            .await
            .expect("Sem closed");

        self.taken_permits.push(permit);
        self.stream.push(fut);
    }

    pub async fn extend(&mut self, iter: impl IntoIterator<Item = Fut>) {
        for fut in iter {
            self.push(fut).await;
        }
    }

    pub async fn next(&mut self) -> Option<R> {
        let result = self.stream.next().await;
        let _ = self.taken_permits.pop();
        result
    }
}

Let’s try running it.

let mut runner = ThrottledFuturesUnordered::new(3);
runner.extend((0..10).map(task)).await;
while runner.next().await.is_some() {}

/// deadlock

We encounter our first footgun: FuturesUnordered is single-threaded. It must be manually driven for it to keep processing the futures currently held in the stream. If we try to push() more than the maximum before calling next(), we’ll encounter a deadlock.

To fix the issue, we need to drive the stream whilst waiting for a permit, allowing permits to be released whilst waiting to acquire one. This is similar to throttling FuturesUnordered directly by “driving” with next() and adding results to a results_buf upon reaching the limit before pushing any more futures.

/// throttled_futures_unordered.rs
use std::{future::Future, sync::Arc};

use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};

pub struct ThrottledFuturesUnordered<Fut, R> {
    pub(super) stream: FuturesUnordered<Fut>,
    pub(super) semaphore: Arc<Semaphore>,
    pub(super) taken_permits: Vec<OwnedSemaphorePermit>,
    pub(super) result_buf: Vec<R>,
    pub(super) _r: std::marker::PhantomData<R>,
}

impl<Fut, R> ThrottledFuturesUnordered<Fut, R>
where
    Fut: Future<Output = R>,
{
    pub fn new(max_concurrency: usize) -> Self {
        Self {
            stream: FuturesUnordered::new(),
            semaphore: Arc::new(Semaphore::new(max_concurrency)),
            taken_permits: Vec::new(),
            result_buf: Vec::new(),
            _r: std::marker::PhantomData,
        }
    }

    pub async fn push(&mut self, fut: Fut) {
        let sem = self.semaphore.clone();

        let permit = futures::select!(
            permit = sem.acquire_owned().fuse() => permit,
            _ = self.drive().fuse() => unreachable!()
        )
        .expect("Sem closed");

        self.taken_permits.push(permit);
        self.stream.push(fut);
    }

    pub async fn extend(&mut self, iter: impl IntoIterator<Item = Fut>) {
        for fut in iter {
            self.push(fut).await;
        }
    }

    pub async fn next(&mut self) -> Option<R> {
        if let Some(result) = self.result_buf.pop() {
            Some(result)
        } else {
            let result = self.stream.next().await;
            let _ = self.taken_permits.pop();
            result
        }
    }

    async fn drive(&mut self) {
        while let Some(result) = self.stream.next().await {
            self.result_buf.push(result);
            let _ = self.taken_permits.pop();
        }
        // The name is somewhat confusing, but this call will sleep forever:
        futures::future::pending().await
    }
}

/// 0
/// 1
/// 2
/// 4
/// 3
/// 5
/// 6
/// 7
/// 8
/// 9
/// Elapsed: 408ms

We now have a simple replacement for FuturesUnordered that internally restricts concurrency.

This abstraction allows us to think about and extend the throttler with more complex processing rules such as:

We’ll focus on sharing limits across different futures and throughout an application.

The other potential constraints to add are left as an exercise for the reader. Be warned, additional constraints such as prioritization introduce new deadlock opportunities that will need consideration.

To allow cloning our throttler, we need a higher order throttler that contains our Semaphore limiter and is agnostic to the specific Future being run.

/// shared_throttled_futures_unordered.rs
use std::{future::Future, sync::Arc};

use futures::stream::FuturesUnordered;
use tokio::sync::Semaphore;

use crate::throttled_futures_unordered::ThrottledFuturesUnordered;

#[derive(Clone)]
pub struct SharedThrottledFuturesUnordered {
    semaphore: Arc<Semaphore>,
}

impl SharedThrottledFuturesUnordered {
    pub fn new(max_concurrency: usize) -> Self {
        Self {
            semaphore: Arc::new(Semaphore::new(max_concurrency)),
        }
    }

    pub async fn run<R, Fut: Future<Output = R>>(&self, fut: Fut) -> R {
        let mut runner = self.throttler();
        runner.push(fut).await;
        runner.next().await.unwrap()
    }

    pub fn throttler<R, Fut: Future<Output = R>>(&self) -> ThrottledFuturesUnordered<Fut, R> {
        ThrottledFuturesUnordered {
            stream: FuturesUnordered::new(),
            semaphore: self.semaphore.clone(),
            taken_permits: Vec::new(),
            result_buf: Vec::new(),
            _r: std::marker::PhantomData,
        }
    }
}
let shared = SharedThrottledFuturesUnordered::new(3);
let _ = futures::future::join_all((0..10).map(|i| {
    let mut throttler = shared.throttler();
    async move {
        throttler.push(task(i)).await;
        throttler.next().await.unwrap();
    }
}))
.await;

/// 0
/// 1
/// 2
/// 3
/// 4
/// 5
/// 6
/// 7
/// 8
/// 9
/// Elapsed: 407ms

The example is contrived, but shows this new abstraction allows one throttler limit to be cloneable and used app-wide, with variable Future types.

However, with the new abstraction we’ve introduced a new footgun: nested usage of the throttler. E.g. a throttled API function might call other throttled API functions internally.

let shared = SharedThrottledFuturesUnordered::new(3);
let _ = futures::future::join_all((0..5).map({
    move |i1| {
        let shared = shared.clone();
        async move {
            shared
                .run({
                    let shared = shared.clone();
                    async move {
                        for i2 in 0..2 {
                            shared.run(task(i1 * 2 + i2)).await;
                        }
                    }
                })
                .await
        }
    }
}))
.await;

/// 0
/// 2
/// deadlock

This scenario can deadlock because all outer futures start, but the inner usage cannot continue without obtaining permits held by the outer calls.

To solve this we want to allow inner usage to “steal” their calling future’s permit. The parent’s permit should be treated as one “pre-acquired” permit for the duration of the future, preventing the need to take a second permit unnecessarily.

We can do this with tokio::task_local!() and a custom future type that wraps the future being run.

The code becomes a bit more verbose but provides a robust solution:

After implementing this system, our previous nested example that deadlocked now successfully steals and replenishes the permit from it’s parent!

To see the full implementation, expand the sections below.

permit_scope.rs
/// permit_scope.rs
use std::sync::Arc;

use std::collections::{HashMap, VecDeque};

use tokio::sync::{oneshot, OwnedSemaphorePermit};

tokio::task_local! {
    pub(super) static PARENT_PERMIT_SCOPE_LOOKUP: Arc<HashMap<u64, Arc<ParentPermitScope>>>;
}

pub(super) fn with_parent_permit_scope<T>(
    runner_id: &u64,
    f: impl FnOnce(&Arc<ParentPermitScope>) -> T,
) -> Option<T> {
    PARENT_PERMIT_SCOPE_LOOKUP
        .try_with(|parent_scope_lookup| parent_scope_lookup.get(runner_id).map(f))
        .ok()
        .flatten()
}

#[derive(Debug)]
pub(super) struct ParentPermitScope {
    maybe_stored_parent_permit: parking_lot::Mutex<Option<PermitSpecialDrop>>,
    permit_waiters: parking_lot::Mutex<VecDeque<oneshot::Sender<PermitSpecialDrop>>>,
}

impl ParentPermitScope {
    pub fn new(maybe_stored_parent_permit: parking_lot::Mutex<Option<PermitSpecialDrop>>) -> Self {
        Self {
            maybe_stored_parent_permit,
            permit_waiters: parking_lot::Mutex::new(VecDeque::new()),
        }
    }

    pub fn try_get_parent_permit(&self) -> Option<PermitSpecialDrop> {
        self.maybe_stored_parent_permit.lock().take()
    }

    pub async fn wait_for_parent_permit(&self) -> PermitSpecialDrop {
        let (tx, rx) = oneshot::channel();
        self.permit_waiters.lock().push_back(tx);
        rx.await
            .expect("Sender dropped before parent permit was sent")
    }
}

#[derive(Debug)]
pub(super) struct PermitSpecialDrop {
    permit: Option<OwnedSemaphorePermit>,
    runner_id: u64,
}

impl PermitSpecialDrop {
    pub fn new(runner_id: u64, permit: OwnedSemaphorePermit) -> Self {
        Self {
            permit: Some(permit),
            runner_id,
        }
    }
}

impl Drop for PermitSpecialDrop {
    fn drop(&mut self) {
        if let Some(permit) = self.permit.take() {
            with_parent_permit_scope(&self.runner_id, |parent_scope| {
                // Place the permit back in the parent if it was missing, otherwise drop.
                let mut permit = PermitSpecialDrop::new(self.runner_id, permit);
                // First see if we can send to a waiter instead of storing or dropping:
                let permit_waiters = &mut parent_scope.permit_waiters.lock();
                while let Some(waiter) = permit_waiters.pop_front() {
                    match waiter.send(permit) {
                        Ok(()) => return,
                        Err(returned_permit) => permit = returned_permit,
                    }
                }

                // Otherwise, store it for later.
                let mut guard = parent_scope.maybe_stored_parent_permit.lock();
                if guard.is_none() {
                    *guard = Some(permit);
                    return;
                }

                // If we couldn't store it, drop it.
                // we have to first take the inner semaphore permit out, so our drop implementation doesn't recurse
                permit.permit.take();
            });
        }
    }
}
fut_wrapper.rs
/// fut_wrapper.rs
use std::{collections::HashMap, future::Future, sync::Arc};

use crate::permit_scope::{ParentPermitScope, PARENT_PERMIT_SCOPE_LOOKUP};

pin_project_lite::pin_project! {
    #[derive(Debug)]
    #[must_use = "futures do nothing unless you `.await` or poll them"]
    #[project = FutWrapperProj]
    pub enum FutWrapper<T> {
        InProgress {
            #[pin]
            future: T,
            runner_id: u64,
            permit_scope_lookup: Option<Arc<HashMap<u64, Arc<ParentPermitScope>>>>,
        },
        Finished {
            output: T
        }
    }
}

impl<T> Future for FutWrapper<T>
where
    T: Future,
{
    type Output = FutWrapper<T::Output>;

    fn poll(
        self: std::pin::Pin<&mut Self>,
        cx: &mut futures::task::Context<'_>,
    ) -> futures::task::Poll<Self::Output> {
        match self.project() {
            FutWrapperProj::InProgress {
                future,
                permit_scope_lookup,
                runner_id: _runner_id,
                ..
            } => {
                let result = if let Some(scope_lookup) = &permit_scope_lookup {
                    // Provide the scope to the future for the duration of the poll:
                    PARENT_PERMIT_SCOPE_LOOKUP.sync_scope(scope_lookup.clone(), || future.poll(cx))
                } else {
                    future.poll(cx)
                };
                match result {
                    std::task::Poll::Ready(output) => {
                        std::task::Poll::Ready(FutWrapper::Finished { output })
                    }
                    std::task::Poll::Pending => std::task::Poll::Pending,
                }
            }
            FutWrapperProj::Finished { .. } => unreachable!(),
        }
    }
}
throttled_futures_unordered.rs
/// throttled_futures_unordered.rs
use std::{collections::HashMap, future::Future, sync::Arc};

use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use tokio::sync::Semaphore;

use crate::{
    fut_wrapper::FutWrapper,
    permit_scope::{
        with_parent_permit_scope, ParentPermitScope, PermitSpecialDrop, PARENT_PERMIT_SCOPE_LOOKUP,
    },
};

pub struct ThrottledFuturesUnordered<Fut, R> {
    pub(super) stream: FuturesUnordered<FutWrapper<Fut>>,
    pub(super) semaphore: Arc<Semaphore>,
    pub(super) result_buf: Vec<R>,
    pub(super) runner_id: u64,
    pub(super) _r: std::marker::PhantomData<R>,
}

impl<Fut, R> ThrottledFuturesUnordered<Fut, R>
where
    Fut: Future<Output = R>,
{
    pub async fn push(&mut self, future: Fut) {
        let sem = self.semaphore.clone();

        enum ParentPermitOrFut<Fut: Future<Output = PermitSpecialDrop>> {
            Permit(PermitSpecialDrop),
            Fut(Fut),
        }

        let parent_permit_fut = match with_parent_permit_scope(&self.runner_id, |scope| {
            let scope = scope.clone();
            scope
                .try_get_parent_permit()
                .map(ParentPermitOrFut::Permit)
                .unwrap_or_else(move || {
                    ParentPermitOrFut::Fut(async move { scope.wait_for_parent_permit().await })
                })
        }) {
            Some(ParentPermitOrFut::Permit(permit)) => Some(async move { permit }.left_future()),
            Some(ParentPermitOrFut::Fut(parent_permit_fut)) => {
                Some(parent_permit_fut.right_future())
            }
            None => None,
        };

        let permit = futures::select_biased!(
            parent_permit = async move {
                if let Some(fut) = parent_permit_fut {
                    fut.await
                } else {
                    futures::future::pending().await
                }
            }.fuse() => parent_permit,
            permit = sem.acquire_owned().fuse() => PermitSpecialDrop::new(self.runner_id, permit.expect("Sem closed")),
            _ = self.drive().fuse() => unreachable!()
        );

        self.stream.push(FutWrapper::InProgress {
            runner_id: self.runner_id,
            future,
            permit_scope_lookup: Some(Arc::new({
                let mut map = PARENT_PERMIT_SCOPE_LOOKUP
                    .try_with(|lookup| {
                        lookup
                            .iter()
                            .map(|(k, v)| (*k, v.clone()))
                            .collect::<HashMap<_, _>>()
                    })
                    .unwrap_or_default();
                map.insert(
                    self.runner_id,
                    Arc::new(ParentPermitScope::new(parking_lot::Mutex::new(Some(
                        permit,
                    )))),
                );
                map
            })),
        });
    }

    pub async fn extend(&mut self, iter: impl IntoIterator<Item = Fut>) {
        for fut in iter {
            self.push(fut).await;
        }
    }

    pub async fn next(&mut self) -> Option<R> {
        if let Some(result) = self.result_buf.pop() {
            Some(result)
        } else {
            let result = self.stream.next().await;
            if let Some(FutWrapper::Finished { output }) = result {
                Some(output)
            } else {
                None
            }
        }
    }

    async fn drive(&mut self) {
        while let Some(FutWrapper::Finished { output }) = self.stream.next().await {
            self.result_buf.push(output);
        }
        // This will sleep forever:
        futures::future::pending().await
    }
}
shared_throttled_futures_unordered.rs
/// shared_throttled_futures_unordered.rs
use std::{
    future::Future,
    sync::{
        atomic::{AtomicU64, Ordering},
        Arc,
    },
};

use futures::stream::FuturesUnordered;
use tokio::sync::Semaphore;

use crate::throttled_futures_unordered::ThrottledFuturesUnordered;

#[derive(Clone)]
pub struct SharedThrottledFuturesUnordered {
    semaphore: Arc<Semaphore>,
    runner_id: u64,
}

impl SharedThrottledFuturesUnordered {
    pub fn new(max_concurrency: usize) -> Self {
        static RUNNER_ID_GENERATOR: AtomicU64 = AtomicU64::new(0);

        Self {
            semaphore: Arc::new(Semaphore::new(max_concurrency)),
            runner_id: RUNNER_ID_GENERATOR.fetch_add(1, Ordering::Relaxed),
        }
    }

    pub async fn run<R, Fut: Future<Output = R>>(&self, fut: Fut) -> R {
        let mut runner = self.throttler();
        runner.push(fut).await;
        runner.next().await.unwrap()
    }

    pub fn throttler<R, Fut: Future<Output = R>>(&self) -> ThrottledFuturesUnordered<Fut, R> {
        ThrottledFuturesUnordered {
            stream: FuturesUnordered::new(),
            semaphore: self.semaphore.clone(),
            result_buf: Vec::new(),
            runner_id: self.runner_id,
            _r: std::marker::PhantomData,
        }
    }
}
Cargo dependencies
[dependencies]
pin-project-lite = "0.2"
parking_lot = "0.12"
futures = "0.3"
tokio = { version = "1", features = ["sync", "rt"] } # both these features are wasm-compatible

This implementation provides a basic throttled, sharable runner that covers the most significant footguns introduced when abstracting FuturesUnordered, can be extended with further configuration and constraints whilst providing valuable insight into async Rust.