总结
最后这一阶段,鄙人主要围绕rust的异步编程async/await及经典运行时库tokio展开阅读研究。但并没有深入理解其中原理及实现,更多的是比较浅显地了解,像tokio的有些模块并没有看,还又比较多的疑问在里面。如果后续有时间的话,希望能继续阅读并深入。代码实现上,仅在rcore上实现了一个极为简单的用户态运行时,跑了几个异步任务,有点协程的意思了(。但是并没有实现waker机制,后续考虑进一步实现完善
协程
有栈协程
函数运行在调用栈上,把函数作为一个协程,那么协程的上下文就是这个函数及其嵌套函数的栈帧存储的值,以及此时寄存器存储的值。如果我们调度协程,也就是保存当前正在运行的协程上下文,然后恢复下个将要运行的协程的上下文。这样我们就轻松的完成了协程调度。并且因为保存的上下文和普通函数执行的上下文是一样的,所以有栈协程可以在任意嵌套函数中挂起(无栈协程不行)。
有栈协程的优点在易用性上,通常只需要调用对应的方法,就可以切换上下文挂起协程。在有栈协程调度时,需要频繁的切换上下文,开销较大。单从实现上看,有栈协程更接近于内核级线程,都需要为每个线程保存单独的上下文(寄存器、栈等),区别在于有栈协程的调度由应用程序自行实现,对内核是透明的,而内核级线程的调度由系统内核完成,是抢占式的。
无栈协程
相比于有栈协程直接切换栈帧的思路,无栈协程在不改变函数调用栈的情况下,采用类似生成器的思路实现了上下文切换。通过编译器将生成器改写为对应的迭代器类型(内部实现是一个状态机)。
而无栈协程需要在编译器将代码编译为对应的状态机代码,挂起的位置在编译器确定。无栈协程的优点在性能上,不需要保存单独的上下文,内存占用低,切换成本低,性能高。缺点是需要编译器提供语义支持,无栈协程的实现是通过编译器对语法糖做支持,rust的aysnc\await就是语法糖,编译器将带有这些关键字的方法编译为生成器,以及对应的类型作为状态机。
只有状态机的支持才能进行协程调度,例如Rust中的tokio,基于Future的用户态线程,根据poll方法获取Future状态,它不可以在任意嵌套函数中挂起(同步代码未实现状态机)。
tokio
我主要从tokio::main宏出发,一步步分析其中调用关系。主要围绕runtime库的build及Runtime结构体的方法及函数
使用#[tokio::main]
宏生成的代码
1 | fn main() { |
结构体Builder
用作运行时runtime
配置,通过new_current_thread()
使用单线程运行时,new_multi_thread()
使用多线程运行时,并会返回Builder
实例
new_multi_thread
1 | /// Returns a new builder with the multi thread scheduler selected. |
调用new
方法返回一个Kind
为MultiThread
的Builder
,对于定时器和I/O事件释放CPU之前需要61ticks
enable_all
1 | /// Enables both I/O and time drivers. |
操作成员变量,使能运行时I/O和定时器
build
1 | /// Creates the configured `Runtime`. |
运行时创建的核心步骤,创建已经做配置的runtime
,并返回Runtime
实例准备创建异步任务
1 | pub struct Runtime { |
scheduler
:异步任务调度器
handle
:运行时句柄
blocking_pool
:阻塞池句柄,用于发出关闭信号
build -> build_threaded_runtime
1 | fn build_threaded_runtime(&mut self) -> io::Result<Runtime> { |
- 引入
num_cpus
:用于获取系统的 CPU 核心数量。
Config
:运行时的配置项。
MultiThread
:多线程调度器,负责在多个线程间调度异步任务。
- 确定核心线程数
1 | let core_threads = self.worker_threads.unwrap_or_else(num_cpus); |
self.worker_threads
:用户在Builder
配置的核心线程数。如果未指定,使用系统的 CPU 核心数作为默认值(num_cpus()
)。core_threads
是最终确定的核心线程数,表示运行时的调度线程数量。
- 初始化驱动
1 | let (driver, driver_handle) = driver::Driver::new(self.get_cfg(core_threads))?; |
driver
:负责管理底层 IO 和定时器的核心组件。driver_handle
:运行时与驱动交互的句柄,用于调度异步任务和管理定时器。get_cfg(core_threads)
:返回结构体driver::Cfg
,包含驱动的配置项,包含workers等配置。
- 创建阻塞任务池
1 | let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads); |
阻塞任务池:用于执行需要阻塞运行的任务(例如文件 IO 或计算密集型任务),避免阻塞异步任务调度线程。
- 线程数 = 用户指定的
max_blocking_threads
+ 核心线程数。
- 线程数 = 用户指定的
blocking_spawner
:用于将任务提交到阻塞任务池的对象。
- 生成随机数种子
1 | let seed_generator_1 = self.seed_generator.next_generator(); |
- Tokio 的调度器可能需要随机化任务的分配,例如在多线程调度器中均衡负载。
seed_generator_1
和seed_generator_2
是生成的两个独立随机数种子,分别用于不同的组件。
- 初始化调度器
1 | let (scheduler, handle, launch) = MultiThread::new( |
调用
MultiThread::new
创建一个多线程调度器,返回:scheduler
:核心调度器,管理线程间任务的分配和调度。handle
:调度器的句柄,用于外部与调度器交互。launch
:启动调度器工作线程的接口。
传递的参数:
core_threads
:核心线程数。driver
和driver_handle
:用于与 IO 和定时器交互的驱动组件。blocking_spawner
:用于执行阻塞任务的接口。seed_generator_2
:随机数种子,用于内部调度逻辑。Config
:调度器的配置,包括以下内容:before_park
和after_unpark
:线程挂起与唤醒时的回调函数。before_spawn
和after_termination
:任务生成和结束时的回调函数。
global_queue_interval
和event_interval
:全局队列检查和事件处理的时间间隔。local_queue_capacity
:本地任务队列容量。
其他运行时的定制项。
- 创建运行时句柄
1 | let handle = Handle { inner: scheduler::Handle::MultiThread(handle) }; |
Handle
是对调度器的抽象封装,用于外部访问运行时和调度器的功能。
- 启动工作线程
1 | let _enter = handle.enter(); |
handle.enter()
:将当前线程设置为调度器的上下文,用于初始化调度环境。launch.launch()
:启动调度器的所有工作线程,使其开始运行。
- 返回运行时
1 | Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool)) |
- 创建并返回完整的
Runtime
实例,包括:Scheduler::MultiThread(scheduler)
:多线程调度器。handle
:运行时句柄。blocking_pool
:阻塞任务池。
driver::Driver::new方法
1 | pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> { |
- 创建 IO 栈
1 | let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?; |
调用
create_io_stack
创建 IO 栈(底层 IO 相关的资源),返回:io_stack
:IO 栈的核心组件,用于管理底层 IO 操作。io_handle
:用于和 IO 栈交互的句柄。signal_handle
:用于处理信号的句柄(如处理Ctrl+C
)。
参数解释:
cfg.enable_io
:如果为true
,启用 IO 支持;否则,不创建 IO 栈。cfg.nevents
:指定事件队列的大小,用于多路复用器。
- 创建时钟
1 | let clock = create_clock(cfg.enable_pause_time, cfg.start_paused); |
- 调用
create_clock
创建时钟组件,用于时间相关功能的管理,返回一个Clock
对象。 - 参数解释:
cfg.enable_pause_time
:是否支持暂停时间。cfg.start_paused
:是否让时钟在启动时进入暂停状态。
时钟的主要用途:
- 为定时器、延迟任务提供精确的时间点。
- 支持测试或调试时暂停时间的功能。
- 创建时间驱动器
1 | let (time_driver, time_handle) = |
调用
create_time_driver
创建时间驱动器,返回:
time_driver
:管理时间相关任务的核心组件(例如定时器、延迟任务)。time_handle
:与时间驱动器交互的句柄。
参数解释:
cfg.enable_time
:是否启用时间功能。io_stack
:IO 栈,与时间驱动器共享底层的事件循环。&clock
:前面创建的时钟,提供时间相关支持。cfg.workers
:用于分配时间任务的工作线程数。
时间驱动器的主要功能:
- 实现异步延迟(如
tokio::time::sleep
)。 - 管理基于时间的任务调度。
- 返回结果
1 | Ok(( |
- 返回一个包含驱动器和其句柄的元组:
Self { inner: time_driver }
- 驱动器的核心部分是时间驱动器,封装到
Self
结构体中。
- 驱动器的核心部分是时间驱动器,封装到
Handle
:- 包含多个句柄,用于驱动器与外部的交互:
io_handle
:处理 IO 事件的句柄。signal_handle
:处理信号的句柄。time_handle
:管理时间任务的句柄。clock
:提供时间支持的时钟实例。
- 包含多个句柄,用于驱动器与外部的交互:
tokio启动
通过launch.launch();
启动所有的 worker
线程:
1 | pub(crate) fn launch(mut self) { |
runtime::spawn_blocking
调用时, || run(worker)
匿名函数会被传进去,这其实就是 worker 线程要执行的逻辑。
如下,匿名函数会被包装为 BlockingTask
,并被放在 blocking thread 的 run queue 中,这样当它运行时就会执行这个匿名函数。因为这时没有足够的线程,就会初始化一个新的 OS 线程(如果有 idle 的线程,就会通过 condvar 通知),并开始执行 blocking 线程的逻辑。每个 worker 都占用一个 blocking 线程,并在 blocking 线程中运行直到最后。
1 | // runtime::spawn_blocking: |