Pinning Down "Future Is Not Send" Errors
If you use async Rust and Tokio, you are likely to run into some variant of the "future is not Send" compiler error. While transitioning some sequential async code to use streams, a friend suggested a small technique for pinning down the source of the non-Send
errors. It helped a lot, so I thought it would be worth writing up in case it saves others some annoying debugging time.
I'll give a bit of background on Futures
and Send
bounds first, but if you want to skip past that you can jump to The DX Problem with Non-Send
Future
s or Pinning Down the Source of Non-Send
Errors.
Why Futures Must Be Send
I wrote another blog post about the relationship between async Rust and Send + Sync + 'static
so we won't go into detail about that here. The main thing we'll focus on here is that if you're using Tokio, you're probably going to be spawn
ing some Futures, and if you spawn
a Future it must be Send + Sync + 'static
.
pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
How Futures Lose Their Send
Markers
Most types are automatically marked as Send
, meaning they can be safely moved between threads. As the The Rustonomicon says:
Major exceptions include:
- raw pointers are neither Send nor Sync (because they have no safety guards).
UnsafeCell
isn't Sync (and thereforeCell
andRefCell
aren't).Rc
isn't Send or Sync (because the refcount is shared and unsynchronized).
Pointers and Rc
s cannot be moved between threads and nor can anything that contains them.
Future
s are structs that represent the state machine for each step of the asynchronous operation. When a value is used across an await
point, that value must be stored in the Future
. As a result, using a non-Send
value across an await
point makes the whole Future
non-Send
.
The DX Problem with Non-Send
Future
s
To illustrate the problem in the simplest way possible, let's take an extremely simplified example.
Below, we have an async noop
function and an async not_send
function. The not_send
function holds an Rc
across an await point and thus loses its Send
bound -- but shhh! let's pretend we don't know that yet. We then have an async_chain
that calls both methods and a function that spawn
s that Future
.
use tokio;
async fn noop() {}
async fn not_send() -> usize {
let ret = std::rc::Rc::new(2); // <-- this value is used across the await point
noop().await;
*ret
}
async fn async_chain() -> usize {
noop().await;
not_send().await
}
fn spawn_async_chain() {
tokio::spawn(async move {
let result = async_chain().await;
println!("{}", result);
}); // <-- compiler points here
}
This code doesn't compile (playground link). But where does the compiler direct our attention? If we only take a quick look at the error message, it seems like the error is coming from the tokio::spawn
call:
error: future cannot be sent between threads safely
--> src/lib.rs:17:5
|
17 | / tokio::spawn(async move {
18 | | let result = async_chain().await;
19 | | println!("{}", result);
20 | | });
| |______^ future created by async block is not `Send`
|
= help: within `{async block@src/lib.rs:17:18: 17:28}`, the trait `Send` is not implemented for `Rc<usize>`
note: future is not `Send` as this value is used across an await
--> src/lib.rs:7:12
|
6 | let ret = std::rc::Rc::new(2);
| --- has type `Rc<usize>` which is not `Send`
7 | noop().await;
| ^^^^^ await occurs here, with `ret` maybe used later
note: required by a bound in `tokio::spawn`
--> /playground/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.43.0/src/task/spawn.rs:168:21
|
166 | pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
| ----- required by a bound in this function
167 | where
168 | F: Future + Send + 'static,
| ^^^^ required by this bound in `spawn`
In this example, it's easy to spot the mention of the Rc<usize>
not being Send
-- but we know what we're looking for! Also, our async chain is pretty short so that types and error messages are still somewhat readable. The longer that chain grows, the harder it is to spot the actual source of the problem.
The crux of the issue is that the compiler draws our attention first to the place where the bounds check fails. In this case, it fails when we try to spawn
a non-Send
Future
-- rather than where the Future
loses its Send
bound.
Pinning Down the Source of Not-Send
Errors
There are a number of different ways we could pin down the source of these errors, but here are two:
Replacing async fn
with an impl Future
Return Type
Instead of using an async fn
, we can instead use a normal fn
that returns a Future
. (This is what the async
keyword does under the hood, so we can just forego that bit of syntactic sugar.)
We can transform our example above into something that looks like the code below using an async
block, or alternatively using Future
combinators.
Neither of these will compile (playground link), but this time the compiler errors will point to the Future
s returned by async_chain
or combinator_chain
not fulfilling the Send
bound that we are specifying.
use tokio;
use std::future::Future;
use futures::FutureExt;
async fn noop() {}
async fn not_send() -> usize {
let ret = std::rc::Rc::new(2);
noop().await;
*ret
}
fn async_chain() -> impl Future<Output = usize> + Send + 'static { // note the return type
async move {
noop().await;
not_send().await
} // <-- now the compiler points here
}
fn spawn_async_chain() {
tokio::spawn(async move {
let result = async_chain().await;
println!("{}", result);
});
}
fn combinator_chain() -> impl Future<Output = usize> + Send + 'static { // <-- the compiler will also point here
noop().then(|_| not_send())
}
fn spawn_combinator_chain() {
tokio::spawn(async move {
let result = combinator_chain().await;
println!("{}", result);
});
}
The idea here is that we are foregoing the async fn
syntax to explicitly state that the Future
our functions return must be Send + 'static
.
Helper Function to Enforce Send + 'static
In the code below (playground link), we'll keep our original async fn
s but this time we'll use a helper function send_static_future
to ensure that the value we pass to it implements Send
. Here, the compiler will also point us to the right place.
use tokio;
use std::future::Future;
use futures::FutureExt;
/// This function doesn't do anything except ensure that
/// the value passed to it implements Future + Send + 'static
fn send_static_future<T: Future + Send + 'static>(t: T) -> T {
t
}
async fn noop() {}
async fn not_send() -> usize {
let ret = std::rc::Rc::new(2);
noop().await;
*ret
}
async fn async_chain() -> usize {
send_static_future(async move {
noop().await;
not_send().await
}).await // <-- the compiler error points here
}
fn spawn_async_chain() {
tokio::spawn(async move {
let result = async_chain().await;
println!("{}", result);
});
}
async fn combinator_chain() -> usize {
send_static_future(noop().then(|_| not_send())).await // <-- and here
}
fn spawn_combinator_chain() {
tokio::spawn(async move {
let result = combinator_chain().await;
println!("{}", result);
});
}
While debugging, you could wrap any part of the async chain with the send_static_future
function call until you've pinpointed the non-Send
part.
(This is similar what the static_assertions::assert_impl_all
macro creates under the hood -- and using that crate is another option.)
Identifying Non-Send
Stream
Combinators
Since the introduction of async
/await
, I have mostly stopped using Future
combinators. However, combinators still seem like the way to go when working with Stream
s.
Stream
s present the same DX problems we've seen above when you have a combinator that produces a non-Send
result.
Here's a simple example (playground link) that demonstrates the same issue we had with Future
s above:
use futures::{pin_mut, stream, Stream, StreamExt};
use std::sync::{Arc, Mutex};
async fn noop() {}
fn stream_processing() -> impl Stream<Item = usize> {
let state = Arc::new(Mutex::new(0));
stream::iter(0..100).filter_map(move |i| {
let state = state.clone();
async move {
// This is contrived but we're intentionally keeping the
// MutexGuard across the await to make the Future non-Send
let mut state = state.lock().unwrap();
noop().await;
*state += i;
if *state % 2 == 0 {
Some(*state)
} else {
None
}
}
})
// (Imagine we had a more complicated stream processing pipeline)
}
fn spawn_stream_processing() {
tokio::spawn(async move {
let stream = stream_processing();
pin_mut!(stream);
while let Some(number) = stream.next().await {
println!("{number}");
}
}); // <-- the compiler error points us here
}
As with the Future
s examples above, we can use the same type of helper function to identify which of our closures is returning a non-Send
Future
(playground link):
use futures::{pin_mut, stream, Future, Stream, StreamExt};
use std::sync::{Arc, Mutex};
async fn noop() {}
fn send_static_future<T: Future + Send + 'static>(t: T) -> T {
t
}
fn stream_processing() -> impl Stream<Item = usize> {
let state = Arc::new(Mutex::new(0));
stream::iter(0..100).filter_map(move |i| {
send_static_future({
let state = state.clone();
async move {
let mut state = state.lock().unwrap();
noop().await;
*state += i;
if *state % 2 == 0 {
Some(*state)
} else {
None
}
}
}) // <-- now the compiler points us here
})
// (Imagine we had a more complicated stream processing pipeline)
}
fn spawn_stream_processing() {
tokio::spawn(async move {
let stream = stream_processing();
pin_mut!(stream);
while let Some(number) = stream.next().await {
println!("{number}");
}
});
}
Conclusion
Async Rust is powerful, but it sometimes comes with the frustrating experience of hunting down the source of trait implementation errors.
I ran into this while working on Scour, a personalized content feed. The MVP used a set of sequential async steps to scrape and process feeds. However, that became too slow when the number of feeds grew to the thousands.
Transitioning to using Stream
s allows me to take advantage of combinators like flat_map_unordered
, which polls nested streams with a configurable level of concurrency. This works well for my use case, but writing the code initially involved plenty of non-Send
-Future
hunting.
The techniques described above helped me narrow down why my Stream
combinator chains were becoming non-Send
. I hope you find them useful as well!
Thanks to Alex Kesling for mentioning this technique and saving me a couple hours of fighting with rustc
.
See Also
If you're working with Rust streams, you might also want to check out:
async-fn-stream
is a nice crate for creating streams using a simpler closure syntax.pumps
is an interesting and different take on Rust streams.argus
is an experimental VS Code extension that uses Rust's New Trait Solver to help you identify why some struct,Future
, orStream
does not implement the traits it should.
Discuss on r/rust, Lobsters, or Hacker News.