0%

2024-秋冬季开源操作系统训练营第四阶段总结-yck

总结

最后这一阶段,鄙人主要围绕rust的异步编程async/await及经典运行时库tokio展开阅读研究。但并没有深入理解其中原理及实现,更多的是比较浅显地了解,像tokio的有些模块并没有看,还又比较多的疑问在里面。如果后续有时间的话,希望能继续阅读并深入。代码实现上,仅在rcore上实现了一个极为简单的用户态运行时,跑了几个异步任务,有点协程的意思了(。但是并没有实现waker机制,后续考虑进一步实现完善

协程

有栈协程

函数运行在调用栈上,把函数作为一个协程,那么协程的上下文就是这个函数及其嵌套函数的栈帧存储的值,以及此时寄存器存储的值。如果我们调度协程,也就是保存当前正在运行的协程上下文,然后恢复下个将要运行的协程的上下文。这样我们就轻松的完成了协程调度。并且因为保存的上下文和普通函数执行的上下文是一样的,所以有栈协程可以在任意嵌套函数中挂起(无栈协程不行)。

有栈协程的优点在易用性上,通常只需要调用对应的方法,就可以切换上下文挂起协程。在有栈协程调度时,需要频繁的切换上下文,开销较大。单从实现上看,有栈协程更接近于内核级线程,都需要为每个线程保存单独的上下文(寄存器、栈等),区别在于有栈协程的调度由应用程序自行实现,对内核是透明的,而内核级线程的调度由系统内核完成,是抢占式的。

无栈协程

相比于有栈协程直接切换栈帧的思路,无栈协程在不改变函数调用栈的情况下,采用类似生成器的思路实现了上下文切换。通过编译器将生成器改写为对应的迭代器类型(内部实现是一个状态机)。

而无栈协程需要在编译器将代码编译为对应的状态机代码,挂起的位置在编译器确定。无栈协程的优点在性能上,不需要保存单独的上下文,内存占用低,切换成本低,性能高。缺点是需要编译器提供语义支持,无栈协程的实现是通过编译器对语法糖做支持,rust的aysnc\await就是语法糖,编译器将带有这些关键字的方法编译为生成器,以及对应的类型作为状态机。

只有状态机的支持才能进行协程调度,例如Rust中的tokio,基于Future的用户态线程,根据poll方法获取Future状态,它不可以在任意嵌套函数中挂起(同步代码未实现状态机)。

tokio

我主要从tokio::main宏出发,一步步分析其中调用关系。主要围绕runtime库的build及Runtime结构体的方法及函数

使用#[tokio::main]宏生成的代码

1
2
3
4
5
6
7
8
9
fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
// ...
})
}

结构体Builder用作运行时runtime配置,通过new_current_thread()使用单线程运行时,new_multi_thread()使用多线程运行时,并会返回Builder实例

new_multi_thread

1
2
3
4
5
6
7
8
9
/// Returns a new builder with the multi thread scheduler selected.
///
/// Configuration methods can be chained on the return value.
#[cfg(feature = "rt-multi-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
pub fn new_multi_thread() -> Builder {
// The number `61` is fairly arbitrary. I believe this value was copied from golang.
Builder::new(Kind::MultiThread, 61)
}

调用new方法返回一个KindMultiThreadBuilder,对于定时器和I/O事件释放CPU之前需要61ticks

enable_all

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/// Enables both I/O and time drivers.
///
/// Doing this is a shorthand for calling `enable_io` and `enable_time`
/// individually. If additional components are added to Tokio in the future,
/// `enable_all` will include these future components.
pub fn enable_all(&mut self) -> &mut Self {
#[cfg(any(
feature = "net",
all(unix, feature = "process"),
all(unix, feature = "signal")
))]
self.enable_io();
#[cfg(feature = "time")]
self.enable_time();

self
}

操作成员变量,使能运行时I/O和定时器

build

1
2
3
4
5
6
7
8
9
10
11
12
/// Creates the configured `Runtime`.
///
/// The returned `Runtime` instance is ready to spawn tasks.
pub fn build(&mut self) -> io::Result<Runtime> {
match &self.kind {
Kind::CurrentThread => self.build_current_thread_runtime(),
#[cfg(feature = "rt-multi-thread")]
Kind::MultiThread => self.build_threaded_runtime(),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
Kind::MultiThreadAlt => self.build_alt_threaded_runtime(),
}
}

运行时创建的核心步骤,创建已经做配置的runtime,并返回Runtime实例准备创建异步任务

1
2
3
4
5
6
7
8
9
10
pub struct Runtime {
/// Task scheduler
scheduler: Scheduler,

/// Handle to runtime, also contains driver handles
handle: Handle,

/// Blocking pool handle, used to signal shutdown
blocking_pool: BlockingPool,
}

scheduler:异步任务调度器

handle:运行时句柄

blocking_pool:阻塞池句柄,用于发出关闭信号

build -> build_threaded_runtime

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sys::num_cpus;
use crate::runtime::{Config, runtime::Scheduler};
use crate::runtime::scheduler::{self, MultiThread};

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);

let (driver, driver_handle) = driver::Driver::new(self.get_cfg(core_threads))?;

// Create the blocking pool
let blocking_pool =
blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_spawner = blocking_pool.spawner().clone();

// Generate a rng seed for this runtime.
let seed_generator_1 = self.seed_generator.next_generator();
let seed_generator_2 = self.seed_generator.next_generator();

let (scheduler, handle, launch) = MultiThread::new(
core_threads,
driver,
driver_handle,
blocking_spawner,
seed_generator_2,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
before_spawn: self.before_spawn.clone(),
after_termination: self.after_termination.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
local_queue_capacity: self.local_queue_capacity,
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
seed_generator: seed_generator_1,
metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
},
);

let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };

// Spawn the thread pool workers
let _enter = handle.enter();
launch.launch();

Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
}
  1. 引入

num_cpus:用于获取系统的 CPU 核心数量。

Config:运行时的配置项。

MultiThread:多线程调度器,负责在多个线程间调度异步任务。

  1. 确定核心线程数
1
let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
  • self.worker_threads:用户在Builder配置的核心线程数。如果未指定,使用系统的 CPU 核心数作为默认值(num_cpus())。
  • core_threads 是最终确定的核心线程数,表示运行时的调度线程数量。
  1. 初始化驱动
1
let (driver, driver_handle) = driver::Driver::new(self.get_cfg(core_threads))?;
  • driver:负责管理底层 IO 和定时器的核心组件。
  • driver_handle:运行时与驱动交互的句柄,用于调度异步任务和管理定时器。
  • get_cfg(core_threads):返回结构体driver::Cfg,包含驱动的配置项,包含workers等配置。
  1. 创建阻塞任务池
1
2
let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_spawner = blocking_pool.spawner().clone();
  • 阻塞任务池:用于执行需要阻塞运行的任务(例如文件 IO 或计算密集型任务),避免阻塞异步任务调度线程。

    • 线程数 = 用户指定的 max_blocking_threads + 核心线程数。
  • blocking_spawner:用于将任务提交到阻塞任务池的对象。

  1. 生成随机数种子
1
2
let seed_generator_1 = self.seed_generator.next_generator();
let seed_generator_2 = self.seed_generator.next_generator();
  • Tokio 的调度器可能需要随机化任务的分配,例如在多线程调度器中均衡负载。
  • seed_generator_1seed_generator_2 是生成的两个独立随机数种子,分别用于不同的组件。
  1. 初始化调度器
1
2
3
4
5
6
7
8
let (scheduler, handle, launch) = MultiThread::new(
core_threads,
driver,
driver_handle,
blocking_spawner,
seed_generator_2,
Config { ... },
);
  • 调用 MultiThread::new 创建一个多线程调度器,返回:

    • scheduler:核心调度器,管理线程间任务的分配和调度。
    • handle:调度器的句柄,用于外部与调度器交互。
    • launch:启动调度器工作线程的接口。
  • 传递的参数:

    • core_threads:核心线程数。

    • driverdriver_handle:用于与 IO 和定时器交互的驱动组件。

    • blocking_spawner:用于执行阻塞任务的接口。

    • seed_generator_2:随机数种子,用于内部调度逻辑。

    • Config:调度器的配置,包括以下内容:

      • before_parkafter_unpark:线程挂起与唤醒时的回调函数。
      • before_spawnafter_termination:任务生成和结束时的回调函数。
    • global_queue_intervalevent_interval:全局队列检查和事件处理的时间间隔。

      • local_queue_capacity:本地任务队列容量。
    • 其他运行时的定制项。

  1. 创建运行时句柄
1
let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
  • Handle 是对调度器的抽象封装,用于外部访问运行时和调度器的功能。
  1. 启动工作线程
1
2
let _enter = handle.enter();
launch.launch();
  • handle.enter():将当前线程设置为调度器的上下文,用于初始化调度环境。
  • launch.launch():启动调度器的所有工作线程,使其开始运行。
  1. 返回运行时
1
Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
  • 创建并返回完整的Runtime实例,包括:
    • Scheduler::MultiThread(scheduler):多线程调度器。
    • handle:运行时句柄。
    • blocking_pool:阻塞任务池。

driver::Driver::new方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;

let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);

let (time_driver, time_handle) =
create_time_driver(cfg.enable_time, io_stack, &clock, cfg.workers);

Ok((
Self { inner: time_driver },
Handle {
io: io_handle,
signal: signal_handle,
time: time_handle,
clock,
},
))
}
  1. 创建 IO 栈
1
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;
  • 调用 create_io_stack 创建 IO 栈(底层 IO 相关的资源),返回:

    • io_stack:IO 栈的核心组件,用于管理底层 IO 操作。
    • io_handle:用于和 IO 栈交互的句柄。
    • signal_handle:用于处理信号的句柄(如处理 Ctrl+C)。
  • 参数解释:

    • cfg.enable_io:如果为 true,启用 IO 支持;否则,不创建 IO 栈。
    • cfg.nevents:指定事件队列的大小,用于多路复用器。
  1. 创建时钟
1
let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);
  • 调用 create_clock 创建时钟组件,用于时间相关功能的管理,返回一个 Clock 对象。
  • 参数解释:
    • cfg.enable_pause_time:是否支持暂停时间。
    • cfg.start_paused:是否让时钟在启动时进入暂停状态。

时钟的主要用途:

  • 为定时器、延迟任务提供精确的时间点。
  • 支持测试或调试时暂停时间的功能。
  1. 创建时间驱动器
1
2
let (time_driver, time_handle) =
create_time_driver(cfg.enable_time, io_stack, &clock, cfg.workers);
  • 调用 create_time_driver

    创建时间驱动器,返回:

    • time_driver:管理时间相关任务的核心组件(例如定时器、延迟任务)。
    • time_handle:与时间驱动器交互的句柄。
  • 参数解释:

    • cfg.enable_time:是否启用时间功能。
    • io_stack:IO 栈,与时间驱动器共享底层的事件循环。
    • &clock:前面创建的时钟,提供时间相关支持。
    • cfg.workers:用于分配时间任务的工作线程数。

时间驱动器的主要功能:

  • 实现异步延迟(如 tokio::time::sleep)。
  • 管理基于时间的任务调度。
  1. 返回结果
1
2
3
4
5
6
7
8
9
Ok((
Self { inner: time_driver },
Handle {
io: io_handle,
signal: signal_handle,
time: time_handle,
clock,
},
))
  • 返回一个包含驱动器和其句柄的元组:
    • Self { inner: time_driver }
      • 驱动器的核心部分是时间驱动器,封装到 Self 结构体中。
    • Handle
      • 包含多个句柄,用于驱动器与外部的交互:
        • io_handle:处理 IO 事件的句柄。
        • signal_handle:处理信号的句柄。
        • time_handle:管理时间任务的句柄。
        • clock:提供时间支持的时钟实例。

tokio启动

通过launch.launch();启动所有的 worker 线程:

1
2
3
4
5
pub(crate) fn launch(mut self) {
for worker in self.0.drain(..) {
runtime::spawn_blocking(move || run(worker));
}
}

runtime::spawn_blocking 调用时, || run(worker) 匿名函数会被传进去,这其实就是 worker 线程要执行的逻辑。

如下,匿名函数会被包装为 BlockingTask,并被放在 blocking thread 的 run queue 中,这样当它运行时就会执行这个匿名函数。因为这时没有足够的线程,就会初始化一个新的 OS 线程(如果有 idle 的线程,就会通过 condvar 通知),并开始执行 blocking 线程的逻辑。每个 worker 都占用一个 blocking 线程,并在 blocking 线程中运行直到最后。

1
2
3
4
5
6
7
8
9
10
11
// runtime::spawn_blocking:
let (task, _handle) = task::joinable(BlockingTask::new(func));

let mut shared = self.inner.shared.lock();
shared.queue.push_back(task);

let mut builder = thread::Builder::new(); // Create OS thread
// run worker thread
builder.spawn(move || {
rt.blocking_spawner.inner.run(id);
})