Concurrency With Async

In this section, we will apply async to some of the same concurrency challenges we tackled with threads in chapter 16. Since we already talked about a lot of the key ideas there, in this section we will focus on what is different between threads and futures.

In many cases, the APIs for working with concurrency using async are very similar to those for using threads. In other cases, they end up being shaped fairly differently. Even when the APIs look similar, they often have different behavior and they nearly always have different performance characteristics.

Counting

The first task we tackled in Chapter 16 was counting up on two separate threads. Let’s do the same using async. The trpl crate supplies a spawn_task function which looks very similar to the thread::spawn API, and a sleep function which is an async version of the thread::sleep API. We can use these together to implement the same counting example as with threads.

To start, we will set up our main function with trpl::block_on:

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(1)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(1)).await;
        }
    });
}

Note: From this point forward in the chapter, every example will include this exact same code, so we will often skip it just like we do with main. Don’t forget to include it in your own code!

Then we can write two loops within that block, each with a trpl::sleep call in them. Similar to the threading example, we put one loop in the body of a trpl::spawn_task, the same way we did with thread::spawn, and the other in a top-level for loop. Notice that we also need to add a .await after the sleep calls.

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(1)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(1)).await;
        }
    });
}

Putting that all together, we end up with the code in Listing 17-TODO:

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(1)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(1)).await;
        }
    });
}

This does something very similar to what the thread-based implementation did, as we can see from the output when we run it. (As with the threading example, you may see a different order in your own terminal output when you run this.)

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!

This stops as soon as the for loop in the body of the main async block finishes, because the task spawned by spawn_task is shut down when the main function ends—just like threads are. Thus, if you want to run all the way to the completion of the task, you will need to use a join handle to wait for the first task to complete. With threads, we used the join method to “block” until the thread was done running. Here, we can use await to do the same thing:

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let handle = trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(1)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(1)).await;
        }

        handle.await;
    });
}

Now the output again looks like what we saw in the threading example.

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

So far, it looks like async and threads basically give us the same basic behavior. However, there are a few important differences already. One was using .await instead of calling join on the join handle. Another is that we needed to await both sleep calls. Most importantly, though, we did not need to spawn another operating system thread to do this. We were able to get concurrency for just the cost of a task, which has much faster startup time and uses much less memory than an OS thread.

What is more, we actually do not need the spawn_task call at all to get concurrency here. Remember that each async block compiles to an anonymous future. That means we can put each of these two loops in an async block and then ask the runtime to run them both to completion using trpl::join:

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let fut1 = async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(1)).await;
            }
        };

        let fut2 = async {
            for i in 1..5 {
                println!("hi number {i} from the second task!");
                trpl::sleep(Duration::from_millis(1)).await;
            }
        };

        trpl::join(fut1, fut2).await;
    });
}

When we run this, we see both futures run to completion:

hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

Here, you will see the exact same order every time, which is very different from what we saw with threads. That is because the trpl::join function is fair, meaning it checks both futures equally, rather than letting one race ahead. With threads, the operating system decides which thread to check, and that is ultimately out of our control. With an async runtime, the runtime itself decides which future to check, so it has the final say. In practice, the details get complicated because an async runtime might use operating system threads under the hood as part of how it manages concurrency, but a runtime can still choose to guarantee fairness even so. However, runtimes do not have to guarantee fairness for any given operation, and even within a given runtime, different APIs sometimes exist to let you choose whether fairness is something you care about as a caller.

Try some of these different variations on awaiting the futures and see what they do:

  • Remove the async block from around either or both of the loops.
  • Await each async block immediately after defining it.
  • Wrap only the first loop in an async block, and await the resulting future after the body of second loop.

For an extra challenge, see if you can figure out what the output will be in each case before running the code!

Futures, Tasks, and Threads

Message Passing

Sharing data between futures will look familiar. We can again use async versions of Rust’s types for message-passing. Instead of std::sync:mpsc::channel, we will use a tprl::channel, for example.

The Receiver::recv() method in the std channel blocks until it receives a message. The trpl::Receiver::recv() method, by contrast, is an async function. Instead of blocking, it sleeps until a message is received or the send side of the channel closes. One other difference with this particular recv() implementation is that it returns an Option of the type sent over the channel instead of a Result.

We can start by introducing an async version of the channel

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}

If we run this, though, it never stops! You will need to shut it down using ctrl-c. We can see that tx sends all the messages,and rx receives and prints them, but we never see the “Done!” message, and the program never stops running. That’s because of the combination of the while let loop and the trpl::join call:

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}

Let’s consider the way this loop works:

  • The trpl::join future only completes once both futures passed to it have completed.
  • The tx future completes after sending the second message.
  • The rx future will not complete until the while let loop ends, though.
  • The while let loop will not end until rx.recv().await produces None.
  • The rx.recv().await will only return None once the other end of the channel is closed.
  • The channel will only close if we call rx.close() or when the sender side, tx, is dropped.
  • We do not call rx.close() anywhere, and tx will not be dropped until the function exits.
  • The function cannot exit because it is blocked on trpl::join completing, which takes us back to the top of the list!

To solve this, then, we need to make sure the channel gets closed so that trpl::join will complete. We could manually close rx somewhere by calling rx.close(), but that does not make much sense in this case. The idea is that rx should keep listening until tx is done sending. Stopping after handling some arbitrary number of messages would make the program shut down, but it would mean we could miss messages if the sending side changed. Given that we cannot use rx.close(), we need to make sure that tx gets dropped before the end of the function.

Right now, the async block only borrows tx. We can confirm this by adding another async block which uses tx, and using trpl::join3 to wait for all three futures to complete:

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(received) = rx.recv().await {
                println!("Got: {received}");
            }
        };

        let tx_fut2 = async {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        trpl::join3(tx_fut, tx_fut2, rx_fut).await;
    });
}

Now both blocks borrow tx, so they are both able to use it to send messages, which rx can then receive. When we run that code, we see the extra output from the new async block, and the message it sends being received by the rx.recv():

Got: hi
Got: more
Got: from
Got: messages
Got: the
Got: for
Got: future
Got: you

As before, we also see that the program does not shut down on its own and requires a ctrl-c. Now that we have seen how async blocks borrow the items they reference from their outer scope, we can go ahead and remove the extra block we just added, and switch back to using trpl::join instead of trpl::join3.

This little exploration makes the original issue much clearer: it is ultimately about ownership. We need to move tx into the async block so that once that block ends, tx will be dropped.

In Chapter 13, we learned how to use the move keyword with closures, and in Chapter 16, we saw that we often need to use closures marked with move when working with threads. As we have discovered, the same dynamics apply to async blocks—so the move keyword also works with async blocks, allowing them to take ownership of the data they reference.

Remember, any time you write a future, a runtime is ultimately responsible for executing it. That means that an async block might outlive the function where you write it, the same way a closure can.

We can do that by making the first async block an async move block.

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                eprintln!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}

The result is Listing 17-TODO, and when we run this version of the code, it shuts down gracefully after the last message is sent.

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                eprintln!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}

Multiple Producers with Async

This async channel is also a multiple-producer channel, so we can call clone on tx if we want to send messages from multiple futures. For example, we can make the code from Listing 17-TODO work by cloning the tx before moving it into the first async block, moving the original tx into the second async block, and switchign back to join3.

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        trpl::join3(tx1_fut, tx_fut, rx_fut).await;
    });
}

Both of these blocks need to be async move blocks, or else we will end up back in the same infinite loop we started out in.

The async keyword does not yet work with closures directly. That is, there is no direct equivalent to async fn for anonymous functions. As a result, you cannot write code like these function calls:

example_1(async || { ... });
example_2(async move || { ... });

However, since async blocks themselves can be marked with move, this ends up not being a problem. Remember that async blocks compile to anonymous futures. That means you can write calls like this instead:

example_1(|| async { ... });
example_2(|| async move { ... });

These closures now return anonymous futures, meaning they work basically the same way that an async function does.