异步编程是一个并发编程模型。async是Rust选择的异步编程模型。
1. async简单入门
async/.await
是 Rust 内置的语言特性,可以让我们用同步的方式去编写异步的代码。
通过 async 标记的语法块会被转换成实现了Future特征的状态机。 与同步调用阻塞当前线程不同,当Future执行并遇到阻塞时,它会让出当前线程的控制权,这样其它的Future就可以在该线程中运行,这种方式完全不会导致当前线程的阻塞。
使用async
使用async fn语法创建一个异步函数,该函数的返回值是一个Future,如果直接调用这个函数,不会有任何的输出。因为Future还未被执行。执行一个异步函数需要使用一个执行器(executor)
// `block_on`会阻塞当前线程直到指定的`Future`执行完成,这种阻塞当前线程以等待任务完成的方式较为简单、粗暴,
// 好在其它运行时的执行器(executor)会提供更加复杂的行为,例如将多个`future`调度到同一个线程上执行。
use futures::executor::block_on;
async fn hello_world() {
println!("hello, world!");
}
fn main() {
let future = hello_world(); // 返回一个Future, 因此不会打印任何输出
block_on(future); // 执行`Future`并等待其运行完成,此时"hello, world!"会被打印输出
}
使用.await
在上述代码的main函数中,我们使用block_on这个执行器等待Future的完成,让代码看上去非常像是同步代码,但是如果你要在一个async fn函数中去调用另一个async fn并等待其完成后再执行后续的代码,该如何做?例如:
use futures::executor::block_on;
async fn hello_world() {
hello_cat();
println!("hello, world!");
}
async fn hello_cat() {
println!("hello, kitty!");
}
fn main() {
let future = hello_world();
block_on(future);
}
上面的代码会报错,因为hello_cat返回的Future没有被运行。使用await可以解决这个问题
use futures::executor::block_on;
async fn hello_world() {
hello_cat().await;
println!("hello, world!");
}
async fn hello_cat() {
println!("hello, kitty!");
}
fn main() {
let future = hello_world();
block_on(future);
}
输出结果:
hello, kitty!
hello, world!
输出的顺序跟代码定义的顺序完全符合,因此,我们在上面代码中使用同步的代码顺序实现了异步的执行效果,非常简单、高效,而且很好理解,未来也绝对不会有回调地狱的发生。
总之,在async fn函数中使用.await可以等待另一个异步调用的完成。但是与block_on不同,.await并不会阻塞当前的线程,而是异步的等待Future A的完成,在等待的过程中,该线程还可以继续执行其它的Future B,最终实现了并发处理的效果。
2. 定海神针pin与unpin
在 Rust 中,所有的类型可以分为两类:
- 类型的值可以在内存中安全地被移动,例如数值、字符串、布尔值、结构体、枚举,总之你能想到的几乎所有类型都可以落入到此范畴内
- 自引用类型
Pin可以防止一个类型在内存中被移动。Unpin表示类型可以在内存中安全地移动。
3. async/await 和 Stream流处理
async的生命周期
async fn函数如果拥有引用类型的参数,那它返回Future的生命周期就会被这些参数的生命周期所限制,也就是说参数要比Future的生命周期更久。
async move
async允许我们使用move关键字来将环境中变量的所有权转移到语句块内,就像闭包那样。
当.await遇见多线程执行器
需要注意的是,当使用多线程 Future 执行器( executor )时, Future 可能会在线程间被移动,因此 async 语句块中的变量必须要能在线程间传递。 至于 Future 会在线程间移动的原因是:它内部的任何.await
都可能导致它被切换到一个新线程上去执行。
由于需要在多线程环境使用,意味着 Rc、 RefCell 、没有实现 Send 的所有权类型、没有实现 Sync 的引用类型,它们都是不安全的,因此无法被使用
需要注意!实际上它们还是有可能被使用的,只要在 .await 调用期间,它们没有在作用域范围内。
类似的原因,在 .await
时使用普通的锁也不安全,例如 Mutex 。原因是,它可能会导致线程池被锁:当一个任务获取锁 A 后,若它将线程的控制权还给执行器,然后执行器又调度运行另一个任务,该任务也去尝试获取了锁 A ,结果当前线程会直接卡死,最终陷入死锁中。
因此,为了避免这种情况的发生,我们需要使用 futures 包下的锁 futures::lock 来替代 Mutex 完成任务。
Stream流处理
Stream 特征类似于 Future 特征,但是前者在完成前可以生成多个值,这种行为跟标准库中的 Iterator 特征倒是颇为相似。
4. 同时运行多个Future
join!
futures 包中提供了很多实用的工具,其中一个就是 join! 宏, 它允许我们同时等待多个不同 Future 的完成,且可以并发地运行这些 Future。
use futures::join;
async fn enjoy_book_and_music() -> (Book, Music) {
let book_fut = enjoy_book();
let music_fut = enjoy_music();
join!(book_fut, music_fut)
}
同时 join! 会返回一个元组,里面的值是对应的 Future 执行结束后输出的值。
如果希望同时运行一个数组里的多个异步任务,可以使用 futures::future::join_all 方法
try_join!
由于 join! 必须等待它管理的所有 Future 完成后才能完成,如果你希望在某一个 Future 报错后就立即停止所有 Future 的执行,可以使用 try_join!,特别是当 Future 返回 Result 时:
use futures::try_join;
async fn get_book() -> Result<Book, String> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }
async fn get_book_and_music() -> Result<(Book, Music), String> {
let book_fut = get_book();
let music_fut = get_music();
try_join!(book_fut, music_fut)
}
有一点需要注意,传给 try_join! 的所有 Future 都必须拥有相同的错误类型。如果错误类型不同,可以考虑使用来自 futures::future::TryFutureExt 模块的 map_err 和 err_info 方法将错误进行转换:
use futures::{
future::TryFutureExt,
try_join,
};
async fn get_book() -> Result<Book, ()> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }
async fn get_book_and_music() -> Result<(Book, Music), String> {
let book_fut = get_book().map_err(|()| "Unable to get book".to_string());
let music_fut = get_music();
try_join!(book_fut, music_fut)
}
select!
join! 只有等所有 Future 结束后,才能集中处理结果,如果你想同时等待多个 Future ,且任何一个 Future 结束后,都可以立即被处理,可以考虑使用 futures::select!:
评论 (0)