Evan Schwartz

Pinning Down "Future Is Not Send" Errors

If you use async Rust and Tokio, you are likely to run into some variant of the "future is not Send" compiler error. While transitioning some sequential async code to use streams, a friend suggested a small technique for pinning down the source of the non-Send errors. It helped a lot, so I thought it would be worth writing up in case it saves others some annoying debugging time.

I'll give a bit of background on Futures and Send bounds first, but if you want to skip past that you can jump to The DX Problem with Non-Send Futures or Pinning Down the Source of Non-Send Errors.

Why Futures Must Be Send

I wrote another blog post about the relationship between async Rust and Send + Sync + 'static so we won't go into detail about that here. The main thing we'll focus on here is that if you're using Tokio, you're probably going to be spawning some Futures, and if you spawn a Future it must be Send + Sync + 'static.

pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
where
    F: Future + Send + 'static,
    F::Output: Send + 'static,

How Futures Lose Their Send Markers

Most types are automatically marked as Send, meaning they can be safely moved between threads. As the The Rustonomicon says:

Major exceptions include:

Pointers and Rcs cannot be moved between threads and nor can anything that contains them.

Futures are structs that represent the state machine for each step of the asynchronous operation. When a value is used across an await point, that value must be stored in the Future. As a result, using a non-Send value across an await point makes the whole Future non-Send.

The DX Problem with Non-Send Futures

To illustrate the problem in the simplest way possible, let's take an extremely simplified example.

Below, we have an async noop function and an async not_send function. The not_send function holds an Rc across an await point and thus loses its Send bound -- but shhh! let's pretend we don't know that yet. We then have an async_chain that calls both methods and a function that spawns that Future.

use tokio;
   
async fn noop() {}
   
async fn not_send() -> usize {
    let ret = std::rc::Rc::new(2); // <-- this value is used across the await point
    noop().await;
    *ret
}
   
async fn async_chain() -> usize {
    noop().await;
    not_send().await
}
   
fn spawn_async_chain() {
    tokio::spawn(async move {
        let result = async_chain().await;
        println!("{}", result);
    }); // <-- compiler points here
}

This code doesn't compile (playground link). But where does the compiler direct our attention? If we only take a quick look at the error message, it seems like the error is coming from the tokio::spawn call:

error: future cannot be sent between threads safely
   --> src/lib.rs:17:5
    |
17  | /     tokio::spawn(async move {
18  | |         let result = async_chain().await;
19  | |         println!("{}", result);
20  | |     });
    | |______^ future created by async block is not `Send`
    |
    = help: within `{async block@src/lib.rs:17:18: 17:28}`, the trait `Send` is not implemented for `Rc<usize>`
note: future is not `Send` as this value is used across an await
   --> src/lib.rs:7:12
    |
6   |     let ret = std::rc::Rc::new(2);
    |         --- has type `Rc<usize>` which is not `Send`
7   |     noop().await;
    |            ^^^^^ await occurs here, with `ret` maybe used later
note: required by a bound in `tokio::spawn`
   --> /playground/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.43.0/src/task/spawn.rs:168:21
    |
166 |     pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
    |            ----- required by a bound in this function
167 |     where
168 |         F: Future + Send + 'static,
    |                     ^^^^ required by this bound in `spawn`

In this example, it's easy to spot the mention of the Rc<usize> not being Send -- but we know what we're looking for! Also, our async chain is pretty short so that types and error messages are still somewhat readable. The longer that chain grows, the harder it is to spot the actual source of the problem.

The crux of the issue is that the compiler draws our attention first to the place where the bounds check fails. In this case, it fails when we try to spawn a non-Send Future -- rather than where the Future loses its Send bound.

Pinning Down the Source of Not-Send Errors

There are a number of different ways we could pin down the source of these errors, but here are two:

Replacing async fn with an impl Future Return Type

Instead of using an async fn, we can instead use a normal fn that returns a Future. (This is what the async keyword does under the hood, so we can just forego that bit of syntactic sugar.)

We can transform our example above into something that looks like the code below using an async block, or alternatively using Future combinators.

Neither of these will compile (playground link), but this time the compiler errors will point to the Futures returned by async_chain or combinator_chain not fulfilling the Send bound that we are specifying.

use tokio;
use std::future::Future;
use futures::FutureExt;

async fn noop() {}

async fn not_send() -> usize {
    let ret = std::rc::Rc::new(2);
    noop().await;
    *ret
}

fn async_chain() -> impl Future<Output = usize> + Send + 'static { // note the return type
    async move {
        noop().await;
        not_send().await
    } // <-- now the compiler points here
}

fn spawn_async_chain() {
    tokio::spawn(async move {
        let result = async_chain().await;
        println!("{}", result);
    });
}

fn combinator_chain() -> impl Future<Output = usize> + Send + 'static { // <-- the compiler will also point here
    noop().then(|_| not_send())
}

fn spawn_combinator_chain() {
    tokio::spawn(async move {
        let result = combinator_chain().await;
        println!("{}", result);
    });
}

The idea here is that we are foregoing the async fn syntax to explicitly state that the Future our functions return must be Send + 'static.

Helper Function to Enforce Send + 'static

In the code below (playground link), we'll keep our original async fns but this time we'll use a helper function send_static_future to ensure that the value we pass to it implements Send. Here, the compiler will also point us to the right place.

use tokio;
use std::future::Future;
use futures::FutureExt;

/// This function doesn't do anything except ensure that
/// the value passed to it implements Future + Send + 'static
fn send_static_future<T: Future + Send + 'static>(t: T) -> T {
    t
}

async fn noop() {}

async fn not_send() -> usize {
    let ret = std::rc::Rc::new(2);
    noop().await;
    *ret
}

async fn async_chain() -> usize {
    send_static_future(async move {
        noop().await;
        not_send().await
    }).await // <-- the compiler error points here
}

fn spawn_async_chain() {
    tokio::spawn(async move {
        let result = async_chain().await;
        println!("{}", result);
    });
}

async fn combinator_chain() -> usize {
    send_static_future(noop().then(|_| not_send())).await // <-- and here
}

fn spawn_combinator_chain() {
    tokio::spawn(async move {
        let result = combinator_chain().await;
        println!("{}", result);
    });
}

While debugging, you could wrap any part of the async chain with the send_static_future function call until you've pinpointed the non-Send part.

(This is similar what the static_assertions::assert_impl_all macro creates under the hood -- and using that crate is another option.)

Identifying Non-Send Stream Combinators

Since the introduction of async/await, I have mostly stopped using Future combinators. However, combinators still seem like the way to go when working with Streams.

Streams present the same DX problems we've seen above when you have a combinator that produces a non-Send result.

Here's a simple example (playground link) that demonstrates the same issue we had with Futures above:

use futures::{pin_mut, stream, Stream, StreamExt};
use std::sync::{Arc, Mutex};

async fn noop() {}

fn stream_processing() -> impl Stream<Item = usize> {
    let state = Arc::new(Mutex::new(0));
    stream::iter(0..100).filter_map(move |i| {
        let state = state.clone();
        async move {
	    // This is contrived but we're intentionally keeping the
	    // MutexGuard across the await to make the Future non-Send
            let mut state = state.lock().unwrap();
            noop().await;
            *state += i;
            if *state % 2 == 0 {
                Some(*state)
            } else {
                None
            }
        }
    })
    // (Imagine we had a more complicated stream processing pipeline)
}

fn spawn_stream_processing() {
    tokio::spawn(async move {
        let stream = stream_processing();
        pin_mut!(stream);
        while let Some(number) = stream.next().await {
            println!("{number}");
        }
    }); // <-- the compiler error points us here
}

As with the Futures examples above, we can use the same type of helper function to identify which of our closures is returning a non-Send Future (playground link):

use futures::{pin_mut, stream, Future, Stream, StreamExt};
use std::sync::{Arc, Mutex};

async fn noop() {}

fn send_static_future<T: Future + Send + 'static>(t: T) -> T {
    t
}

fn stream_processing() -> impl Stream<Item = usize> {
    let state = Arc::new(Mutex::new(0));
    stream::iter(0..100).filter_map(move |i| {
        send_static_future({
            let state = state.clone();
            async move {
                let mut state = state.lock().unwrap();
                noop().await;
                *state += i;
                if *state % 2 == 0 {
                    Some(*state)
                } else {
                    None
                }
            }
        }) // <-- now the compiler points us here
    })
    // (Imagine we had a more complicated stream processing pipeline)
}

fn spawn_stream_processing() {
    tokio::spawn(async move {
        let stream = stream_processing();
        pin_mut!(stream);
        while let Some(number) = stream.next().await {
            println!("{number}");
        }
    });
}

Conclusion

Async Rust is powerful, but it sometimes comes with the frustrating experience of hunting down the source of trait implementation errors.

I ran into this while working on Scour, a personalized content feed. The MVP used a set of sequential async steps to scrape and process feeds. However, that became too slow when the number of feeds grew to the thousands.

Transitioning to using Streams allows me to take advantage of combinators like flat_map_unordered, which polls nested streams with a configurable level of concurrency. This works well for my use case, but writing the code initially involved plenty of non-Send-Future hunting.

The techniques described above helped me narrow down why my Stream combinator chains were becoming non-Send. I hope you find them useful as well!

Thanks to Alex Kesling for mentioning this technique and saving me a couple hours of fighting with rustc.

See Also

If you're working with Rust streams, you might also want to check out:


Discuss on r/rust, Lobsters, or Hacker News.


#rust #scour