0%

与同学们互相讨论

在本次学习过程中,我对Rust异步编程中的一些核心挑战进行了深入思考:

1. 函数着色和异步Drop的挑战

目前Rust异步编程中最棘手的问题是函数着色和async drop机制。特别是在资源管理方面,由于future是基于poll的机制,在处理background task和事务时面临两个主要场景的挑战:

  • 结构体持有background task(如连接或stream)时的资源释放
  • 需要uncancelable语义的事务处理场景

2. 资源安全释放的困境

在系统信号处理方面存在显著挑战。特别是在进程接收信号时的安全资源释放问题仍然没有完善的解决方案。虽然对于使用事务的SQL系统影响相对较小,但在文件系统或WAL文件等场景中,这个问题尤为突出。

3. 互斥锁的使用策略

在异步编程中,关于互斥锁的选择需要特别注意。Tokio提供了一个重要的见解:在异步代码中使用标准库的普通Mutex通常是可行且推荐的。异步互斥锁的主要优势在于能够在.await点保持锁定状态,但这也使其比阻塞式互斥锁更昂贵。因此,对于单纯的数据访问,使用标准库的阻塞式互斥锁是更合适的选择。

4. 操作系统层面的应用

在操作系统内核中,协程的应用主要可以考虑用于替代传统的自旋锁场景,而io_uring的引入则可能带来更广泛的应用空间。

5. 异步编程模型的适用场景

从资源管理的角度来看,异步编程模型在不同场景下有其独特的优势和局限。特别是在处理IO密集型任务时,异步模型能够提供更好的性能和资源利用率。

第一周:

  • 复习并重新学习WASM上的async runtime相关知识
  • 计划完善OS仓库,准备移植文件系统和SMP多核启动

第二周:

  • 阅读Rust语言相关RFC文档:
    • async_trait与async fn in trait
    • Object Safety
    • pin_project_lite
  • 为内核态编写async runtime
  • 开始实现文件系统与block device
  • 计划将async runtime合并到OS仓库中

第三周:

  • 因家人住院需要陪护,本周暂无工作产出

概要设计

众所周知,操作系统内核对性能和时延的需求都是非常高的。当应用程序通过传统系统调用使用操作系统能力时,CPU需要经过两次特权模式转换、用户栈的保存与恢复等操作,这会消耗数十甚至数百CPU时钟周期。因此,在操作系统内核中引入异步协程机制时,不应增加这些基础开销。

在应用程序通过操作系统与外部设备交互时,由于大多数外部设备的性能都慢于CPU执行速度,且受限于信号传输延迟,响应延迟是不可避免的。操作系统在等待外部响应时采用自旋等待是非常低效的。因此,我的设想是在IO部分引入协程机制,通过用户进程的IO操作与内核线程通信的方式,为内核代码赋予异步能力。

在二阶段学习rCoreOS时,我发现其中与virtIO block-device的交互部分非常适合进行异步协程改造。

然而,我不赞成将所有内核代码和锁都替换为异步协程版本。如前所述,操作系统内核代码对性能非常敏感。虽然无栈协程的切换开销较小,但在内核开发中这种开销仍然不容忽视。此外,由于CPU大部分时间都会交由用户线程执行以最大化资源利用率,载体线程(CPU核心)会尽可能运行用户代码,这在某种程度上违反了协程使用原则,可能导致其他任务出现饥饿现象。

总结

四阶段开始前, 我曾立下这个目标: 希望能开发出我自己的操作系统内核, 并在 Lichee Pi 4A 真机设备上成功启动.

无奈上周夫人孕期遭遇大出血, 紧急住院保胎一周. 我作为陪护家属, 暂停了一周的学习与工作照顾夫人. 故未能完成四阶段开始前立下的目标😔.

从小白到操作系统开发者:我的训练营历程与成长

参加训练营已经快一年了,回顾这段时间的学习与实践,感觉收获满满,成长也非常明显。虽然过程中经历了不少挑战,但我从中学到了很多,也更加明白了自己未来的目标和方向。

从初识RTOS和操作系统

从今年刚刚开始接触开源操作系统训练营,感觉开辟了一片新的天地在训练营的第一阶段,我还是一个对RTOS(实时操作系统)了解甚少的“小白”。当时,我的理解仅限于知道 FreeRTOS 是什么,而对于操作系统的具体结构和工作原理几乎没有任何概念。然而,通过这段时间的学习,我开始接触并逐步理解操作系统的基本组成部分,像是内核、调度、进程管理等。这个阶段的学习让我明白了操作系统不仅仅是运行应用程序的基础,更是硬件和软件之间的重要桥梁。

网络问题与任务挑战

不过,学习的过程并不是一帆风顺的。由于网络不稳定,很多简单的任务总是上传不了,这让我有时感到非常沮丧。更有一次,我因为任务时间限制和一些技术难题,没能在结课期间按时完成所有的任务。这些问题让我认识到,技术的实现不仅仅依赖于个人的能力,还需要更好的时间管理和稳定的技术环境支持。

深入探索异步编程与Python适配

进入训练营的第二阶段后,任务的难度逐步增加。我选择了一个相对较简单的方向,与其他小伙伴们一起研究 异步编程。虽然这与我之前的经验较为接近,但在实际实现过程中,我还是遇到了不少挑战,尤其是在理解异步编程模型时。

然而,通过逐步的实现,我不仅加深了对异步编程的理解,还接触了Python适配的相关知识。实际上,这个阶段让我学到了很多之前没有涉及的技术,这些新知识极大地丰富了我的技术栈,也让我对未来的技术方向更加清晰。

Python与OpenCV的结合

尽管我没有完全按预期完成所有任务,但这个阶段让我深入思考了自己的学习方法。我曾经盲目追求高难度的任务,而忽略了任务的实际意义和自己的能力匹配。因此,在这个阶段,我主动选择了一个相对简单的任务——OpenCV 的相关实践。尽管我最终未能在 Starry 上完成OpenCV的测例,但这个过程让我更加明白了如何进行项目规划和任务分配。

在这阶段,我还负责了Python与OpenCV的结合,进行了一些与图像处理相关的实验和项目。通过这些实践,我深入了解了Python的图像处理能力,尤其是OpenCV库在计算机视觉中的应用。例如,如何使用OpenCV进行图像读取、处理和展示,这些操作帮助我了解了计算机如何通过视觉来感知和分析信息。

实现Python3.11的syscall系统调用

作为实习任务的一部分,我参与了分析和实现支持 Python3.11 程序的 syscall 系统调用,尤其是在 ArceOS/Starry 系统中,逐步完善Python程序所依赖的系统调用。具体来说,这个过程涉及了用户态程序的交叉编译、依赖库的交叉编译、以及系统模拟器 Qemu 中的实验。

  1. 交叉编译Python程序:首先,我需要交叉编译 Python3.11,以支持 aarch64 架构。这包括了编译所需的依赖库(如 libffizliblibuuidxz 等),并且每个依赖库都必须交叉编译,以保证在目标架构上能够正常运行。

  2. Qemu模拟环境:为了测试在交叉编译后的Python程序,我在 Qemu 模拟环境中运行了一个 aarch64 架构的 Alpine Linux,并在其中加载了交叉编译生成的Python库。通过这种方式,我能够测试并调试系统调用,确保它们能够在模拟器中正常执行。

  3. 系统调用分析:我使用 strace 工具来统计Python程序所使用的所有系统调用,并分析它们在不同架构下的表现。通过这个过程,我深入了解了 syscall 的工作机制,并能够在实际操作系统中正确实现这些调用。

通过这个任务,我不仅加深了对操作系统内核和系统调用机制的理解,还在实践中学会了如何处理复杂的交叉编译任务。

未来的目标–操作系统与编译器

回顾这一年的学习,我不仅了解了操作系统的基本原理,还在逐步实现的过程中积累了不少经验。尤其是从一个仅仅知道FreeRTOS的小白,到如今能够理解操作系统结构的程度,我感到自己有了明显的进步。

最令我兴奋的是,未来我希望能够在自己写的操作系统上运行自己开发的编译器。这是我一直以来的梦想,虽然这需要我不断努力和学习,但我相信,通过这段时间的积累,我已经打下了坚实的基础。

总结

参加训练营的这一年里,我收获了很多,不仅有热心的助教和同学的帮助,还有机会接触到多种新技术和工具。虽然遇到了一些困难,但这些都成为了我成长的动力。我相信,未来我会继续沿着这个方向前行,实现自己在操作系统和编译器领域的梦想。


stage4总结

stage 4 算是很扎实的一个阶段。这个阶段从头开始学习了我原来不怎么熟悉的异步运行时。整体上来说算是比较系统的学习了如何构建一个异步运行时,同时也阅读了不少tokio这种生产环境级别的异步运行时代码,受益良多。
本阶段还学习了一些关于 uring io 的知识,同时也顺便比较系统的梳理了 linux io 这块的知识点。通过每周一次的交流活动中能比较有效的调整自己的学习方向,同时也能补充很多有用的信息。很多同学水平很高,在交流过程中深感差距,还需要不断学习。

stage4 工作进度

本周工作(第一周):

阅读以下两个链接中文档,比较系统的了解了 rust 的异步原理

####下周安排(第二周):

  • 阅读 tokio 源码,学习生产环境中的异步运行时实现

本周工作(第二周):

由于本项目最终要实现一个基于 uring io 的异步运行时,于是决定从 io 的角度切入 tokio 的源码阅读。在阅读过程中发现 tokio 的文件 io 部分都是转发给某个独立线程,是基于阻塞式的操作。为了对比文件 io ,还阅读了部分 tcp 相关的源码,证实了的确网络 io 是使用了基于 epoll 的 mio 来做管理。而且本周对 uring io 有一个粗略地了解,目前看来 uring io 在文件 io 方面可能优势会更明显。 网络 io 这块相较于 epoll 优势没那么大,那么接下来第三周可能要优先实现基于 uring io 的文件 io 异步运行时的相关工作。

此外本周还阅读了以下资料

下周安排(第三周):

编写一个简易的基于 uring io 的文件 io 的异步运行时。

本周工作(第三周):

本周首先调研了一下在 smol 中实现基于 uring io 的可能性。首先 smol 社区中已经有一个 PR
async-io PR:Integrate io_uring in the Reactor
● 简要介绍了实现思路,
● 作者说要等等 polling Expose raw handles for the Poller 这个 issuing合并.
大概粗略浏览了一下,但是由于对 uring io 不太熟悉用法,还没有看懂,暂时搁置。

然后继续阅读了 uring io 的相关资料,包括

最后是实现我自己的uring io异步运行时
我的async-fs-uring,这里实现了一个建议的基于 reactor 模式异步运行时,并实现了基于uring io的文件读。主要思想就是把 io 委托给 reactor 去提交,然后 reactor 不断轮询,如果有 io 完成了,就返回给对应的异步任务。实现过程中比较困难的点就是buf 管理,需要保证 buf 在异步读过程中一直有效。我这里做法是直接把 buf 的所有权移交给 UringReadFuture.这只是一个权宜之计,因为我这里实现的比较简单,在异步读进行过程中 UringReadFuture不会被 drop 掉。实际上后来也阅读了 tokio-uring 的相关设计文档,也了解到了一些更合理的设计方案,但是还没有时间来实现。

未来计划:通过实现一个建议的基于 uring io 的异步运行时让我对 uring io 有了基本的了解。后续可能会进一步了解生产环境级的基于 uring io 的异步运行时的实现以及与传统阻塞式和epoll结合的异步运行时的实现差异

前言

在阶段4我进入了项目四:基于协程异步机制的操作系统,由于之前缺乏对相关知识的了解,前期花了大量时间来阅读源码和理解,最后才实现了在OS中boot了一个简单的的异步executor。

async keyword

在 Rust 中,使用 async 关键字修饰的函数会返回一个实现了 Future trait 的匿名类型

1
2
3
4
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

例如

1
2
3
async fn my_async_function() -> u32 {
42
}

编译器会将其转换为类似以下的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
fn my_async_function() -> impl Future<Output = u32> {
struct MyAsyncFunction {
// 异步函数的状态
}

impl Future for MyAsyncFunction {
type Output = u32;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// 异步函数的执行逻辑
Poll::Ready(42)
}
}

MyAsyncFunction {
// 初始化异步函数的状态
}
}

这种设计允许编译器生成最优化的异步执行代码,同时提供了灵活性和类型安全性。

当然,对于rust的异步编程还有许多深入的概念,例如关于自引用的 Pin<> , 关于优化轮询的 Waker 等。

why async-OS

所以为什么要实现一个基于协程异步机制的操作系统呢?答案当然是为了并发的性能。内核可以通过轻量的内核线程和优化的异步调度执行来提升对系统调用的批处理速度。

参考async-module关于系统调用的优化,存在两种方向:

  1. 减少由于系统调用导致的特权级以及上下文切换开销
  2. 异步批处理

在高并发场景下,使用类似dpdk/spdk等通过用户态轮询完全绕过内核是可行的,但是如果仍然使用系统调用,那么当应用通过系统调用同步地进入内核态时,内核就可以对这些系统调用进行异步批处理,从而提升性能。
并且异步调度的 poll / wake 机制更适合设备驱动的工作状态。

design

考虑一种简单的情形,在OS初始化阶段,把栈初始化之后,直接开始运行全局的executor,负责对内核中的异步协程进行调度。这样,我们所有的系统调用都可以写成async的形式。

那么,当用户程序需要调用系统调用时,会先同步的进入内核态并设置scause寄存器的值来指定系统调用号,然后调用syscall之后await,在系统调用执行完之后再切换回用户态。所以这里的syscall api实现了一个异步和同步切换的过程。
当然另一种思路是,通过内核向用户态发送通知或者共享内存等方式,实现完全的异步系统调用,这里不再讨论。

implement

所以,我们在内核态需要建立自己的异步运行时,在抽象上的第一个问题是,Rust 欠缺对 async-trait 的支持。

例如

1
2
3
pub trait Mutex {
async fn wait(&mut self) -> FutexWait;
}

Rust 编译器默认不支持 async trait function。编译器提示说使用 async-trait 这个 crate。可惜的是,这个 crate 不是零开销的, 会将返回值改写成 Box 的形式。

还是继续考虑对futex的实现。我们既然想让对互斥锁的wait支持异步,那么就先实现一个 Future。

1
2
3
4
5
6
7
8
9
10
11
12
13
impl Future for FutexWait {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
loop {
match this.queue.poll(&mut this.id, cx.waker()) {
Ok(poll) => break poll,
Err(queue) => this.queue = queue,
}
}
}
}

如果可以从互斥锁队列queue中拿到结果,那么就返回poll, 否则就对等待队列进行更新并继续循环。

接下来考虑进程的执行单元task,Task 结构体包含了任务的各种属性,如可执行文件、父任务、子任务、任务 ID、时间信息、信号相关的字段等。如果我们想要把task交给executor执行,就需要为task的返回值实现 Future 。

也就是说,我们把task的返回值当成 Output 以实现一个 TaskFut:Future 的结构体 , 接着将这个 task 封装成一个异步的 loop 传入 executor 中。 在 loop 中 ,我们通过 trap 切换回用户空间 , 并且捕获用户空间的中断和异常 , 在切换回内核空间之后继续处理。

try

接着,在老师的指导下,我进行了将二阶段 rCore-tutorial 操作系统实现异步的尝试。

在 rust 的入口处,我们使用mm::init()来使用HEAP_ALLOCATOR来初始化堆内存,接下来我们就可以直接在堆内存上建立executor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#[no_mangle]
/// the rust entry-point of os
pub fn rust_main() -> ! {
clear_bss();
println!("[kernel] Hello, world!");
logging::init();
mm::init();
mm::remap_test();

let mut rt = RUNTIME.execlusive_access();
rt.spawn(task1)
rt.run()
}

async fn task1() {
println!("[kernel] task1: hello async world!");
}

这样就是最简单的内核态的async执行测试。

第一阶段总结

第一阶段主要考察 Rust 基础编程和数据结构与算法,由于我有一定 Rust 基础,所以前 100 道题比较轻松,但从未使用 Rust 刷过算法题,因此后 10 道题让我学习了如何使用 Rust 编写常见数据结构如链表、栈、堆、图等,以及常见的排序、搜索算法等,有所收获!

第二阶段总结

第二阶段我的学习方法是先看完 v3 book 对应章节,然后再做实验题。v3 book 写得循序渐进,质量上乘,读懂后做实验题都比较轻松。第二阶段的精华都在 v3 book 中,十分建议精读一遍,我自己精读一遍后发现了若干内容和文字错误,还提交了 19 个 pr 修复。这期间我还写了篇博客分析有栈协程示例代码。

五个实验题中最后的死锁检测算法花费的时间要久一点,因为前面章节没有铺垫,直接就是抛出一个算法让实现,且算法只有过程描述,没有原理分析。这个算法似乎很少在生产上见到被实际使用(可能是我孤陋寡闻),我建议换成其他更有意义的实验,比如实现某一种同步互斥原语。

第三阶段总结

第三阶段主要是理解 ArceOS 组件化概念,如何划分组件以及如何将这些组件组装成不同的系统,如 Unikernel/宏内核/hypervisor 等。

在这个阶段,我完成了四个练习(print_with_color/support_hashmap/alt_alloc/simple_hv)和一个挑战任务:针对特定应用实现一个专门优化过的内存分配器。挑战任务并没有花太多时间,只是在 slab 分配器上做了些修改,所以得分不高,刚过及格线,排名第 14。

在这个阶段,我花费了许多时间来学习 RISC-V 的虚拟化扩展,本来是准备在第四阶段选择虚拟化项目的,但最后发现项目并没有采用 RISC-V 指令集(不想再花时间学习另一个指令集的特权架构,最终选择了基于协程异步机制的操作系统/驱动项目)。我研究了 arceos-hypervisor org 下的一些虚拟化项目和 KuangjuX/hypocaust-2 项目,arceos-hypervisor 项目因为需要做到组件复用,包括统一不同指令集架构,所以有很多抽象,学习成本有些高,而 KuangjuX/hypocaust-2 项目目前跑不起来,项目并没有指定 rust-toolchain,而且代码质量感觉不够高。最终我还是决定自己从零开始写一个纯 RISC-V 的一型 hypervisor,因为毕竟花了几个月时间参加训练营,还是想在最后自己能独立编写一个完整的实际项目。项目地址是 https://github.com/systemxlabs/riscv-hypervisor , 目前刚能将 rCore-Tutorial-v3 的第 5 章的内核作为 guest 跑起来,最终目标是能跑起 Linux。RISC-V 虚拟化扩展因为稳定不久,资料很少,所以写的过程中遇到许多困难,而且不好 debug,希望能坚持下去不弃坑~

第四阶段总结

第一周,阅读 200 行 rust 代码实现绿色线程、 200 行 rust 代码实现 futures 以及 blog_os 的 async/await 文章,输出了一篇博客,同时提了 pr 修复 rCore-Tutorial-v3 中 stackful_coroutine 示例代码,自己还将 stackful_coroutine 示例移植到 Linux for RISC-V 上(项目地址:https://github.com/systemxlabs/green-threads-in-200-lines-of-rust ),对原示例代码做了很多重构以便具有更好的可读性。调试自己的riscv hypervisor项目(第三周使用),目前可以运行 rCore-Tutorial-v3 第六章带文件系统的内核。

第二周,主要阅读 smol 生态 crates 源码,着重阅读了 polling / async-io / async-task / async-executor 库源码,理解最重要的 IO 多路复用 / reactor / driver / task / executor 等概念,输出了一篇博客

第三周,编写异步 os(项目地址:https://github.com/systemxlabs/async-os ),本来是想把之前写的 riscv-hypervisor 改成异步,但感觉没有意义,因为 os 上面可能有成千上万的线程,所以将 os 改成异步减少上下文切换是有意义的,但 hypervisor 跟 os 不同,hypervisor 上面没有那么多的 guests,且 hypervisor 为了高性能,通常都尽可能去避免 vm exit,所以我放弃将之前写的 riscv-hypervisor 改成异步,改为实现一个异步 os,参考 phoenix 和 rCore-Tutorial-v3,特点是全隔离内核、内核栈复用、trap return回用户态时无需保存内核执行流。目前已实现多核启动、device tree解析、物理页帧和堆分配器、页表和地址空间、内核和用户态trap handling、异步runtime。

总说

在训练营的第四阶段,几乎一直在读东西,一边看一边学asynchronous和Rust,之前几乎没有了解和使用过异步方面的东西。
学习效果不是很好,结营后还得多读多写这一块的代码,继续学习。

细说

第一周:

第二周:

第三周:

  • 读了io_uring的手册,顺带了解了一下使用环形队列和内存映射,无锁环提高性能的原理
  • 读了一下smol源码,主要是futures-lite和async-executor部分
  • 尝试使用iou封装的接口实现简单的异步文件读写,但是实现的waker有问题,没有成功

总结

最后这一阶段,鄙人主要围绕rust的异步编程async/await及经典运行时库tokio展开阅读研究。但并没有深入理解其中原理及实现,更多的是比较浅显地了解,像tokio的有些模块并没有看,还又比较多的疑问在里面。如果后续有时间的话,希望能继续阅读并深入。代码实现上,仅在rcore上实现了一个极为简单的用户态运行时,跑了几个异步任务,有点协程的意思了(。但是并没有实现waker机制,后续考虑进一步实现完善

协程

有栈协程

函数运行在调用栈上,把函数作为一个协程,那么协程的上下文就是这个函数及其嵌套函数的栈帧存储的值,以及此时寄存器存储的值。如果我们调度协程,也就是保存当前正在运行的协程上下文,然后恢复下个将要运行的协程的上下文。这样我们就轻松的完成了协程调度。并且因为保存的上下文和普通函数执行的上下文是一样的,所以有栈协程可以在任意嵌套函数中挂起(无栈协程不行)。

有栈协程的优点在易用性上,通常只需要调用对应的方法,就可以切换上下文挂起协程。在有栈协程调度时,需要频繁的切换上下文,开销较大。单从实现上看,有栈协程更接近于内核级线程,都需要为每个线程保存单独的上下文(寄存器、栈等),区别在于有栈协程的调度由应用程序自行实现,对内核是透明的,而内核级线程的调度由系统内核完成,是抢占式的。

无栈协程

相比于有栈协程直接切换栈帧的思路,无栈协程在不改变函数调用栈的情况下,采用类似生成器的思路实现了上下文切换。通过编译器将生成器改写为对应的迭代器类型(内部实现是一个状态机)。

而无栈协程需要在编译器将代码编译为对应的状态机代码,挂起的位置在编译器确定。无栈协程的优点在性能上,不需要保存单独的上下文,内存占用低,切换成本低,性能高。缺点是需要编译器提供语义支持,无栈协程的实现是通过编译器对语法糖做支持,rust的aysnc\await就是语法糖,编译器将带有这些关键字的方法编译为生成器,以及对应的类型作为状态机。

只有状态机的支持才能进行协程调度,例如Rust中的tokio,基于Future的用户态线程,根据poll方法获取Future状态,它不可以在任意嵌套函数中挂起(同步代码未实现状态机)。

tokio

我主要从tokio::main宏出发,一步步分析其中调用关系。主要围绕runtime库的build及Runtime结构体的方法及函数

使用#[tokio::main]宏生成的代码

1
2
3
4
5
6
7
8
9
fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
// ...
})
}

结构体Builder用作运行时runtime配置,通过new_current_thread()使用单线程运行时,new_multi_thread()使用多线程运行时,并会返回Builder实例

new_multi_thread

1
2
3
4
5
6
7
8
9
/// Returns a new builder with the multi thread scheduler selected.
///
/// Configuration methods can be chained on the return value.
#[cfg(feature = "rt-multi-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
pub fn new_multi_thread() -> Builder {
// The number `61` is fairly arbitrary. I believe this value was copied from golang.
Builder::new(Kind::MultiThread, 61)
}

调用new方法返回一个KindMultiThreadBuilder,对于定时器和I/O事件释放CPU之前需要61ticks

enable_all

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/// Enables both I/O and time drivers.
///
/// Doing this is a shorthand for calling `enable_io` and `enable_time`
/// individually. If additional components are added to Tokio in the future,
/// `enable_all` will include these future components.
pub fn enable_all(&mut self) -> &mut Self {
#[cfg(any(
feature = "net",
all(unix, feature = "process"),
all(unix, feature = "signal")
))]
self.enable_io();
#[cfg(feature = "time")]
self.enable_time();

self
}

操作成员变量,使能运行时I/O和定时器

build

1
2
3
4
5
6
7
8
9
10
11
12
/// Creates the configured `Runtime`.
///
/// The returned `Runtime` instance is ready to spawn tasks.
pub fn build(&mut self) -> io::Result<Runtime> {
match &self.kind {
Kind::CurrentThread => self.build_current_thread_runtime(),
#[cfg(feature = "rt-multi-thread")]
Kind::MultiThread => self.build_threaded_runtime(),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
Kind::MultiThreadAlt => self.build_alt_threaded_runtime(),
}
}

运行时创建的核心步骤,创建已经做配置的runtime,并返回Runtime实例准备创建异步任务

1
2
3
4
5
6
7
8
9
10
pub struct Runtime {
/// Task scheduler
scheduler: Scheduler,

/// Handle to runtime, also contains driver handles
handle: Handle,

/// Blocking pool handle, used to signal shutdown
blocking_pool: BlockingPool,
}

scheduler:异步任务调度器

handle:运行时句柄

blocking_pool:阻塞池句柄,用于发出关闭信号

build -> build_threaded_runtime

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sys::num_cpus;
use crate::runtime::{Config, runtime::Scheduler};
use crate::runtime::scheduler::{self, MultiThread};

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);

let (driver, driver_handle) = driver::Driver::new(self.get_cfg(core_threads))?;

// Create the blocking pool
let blocking_pool =
blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_spawner = blocking_pool.spawner().clone();

// Generate a rng seed for this runtime.
let seed_generator_1 = self.seed_generator.next_generator();
let seed_generator_2 = self.seed_generator.next_generator();

let (scheduler, handle, launch) = MultiThread::new(
core_threads,
driver,
driver_handle,
blocking_spawner,
seed_generator_2,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
before_spawn: self.before_spawn.clone(),
after_termination: self.after_termination.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
local_queue_capacity: self.local_queue_capacity,
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
seed_generator: seed_generator_1,
metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
},
);

let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };

// Spawn the thread pool workers
let _enter = handle.enter();
launch.launch();

Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
}
  1. 引入

num_cpus:用于获取系统的 CPU 核心数量。

Config:运行时的配置项。

MultiThread:多线程调度器,负责在多个线程间调度异步任务。

  1. 确定核心线程数
1
let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
  • self.worker_threads:用户在Builder配置的核心线程数。如果未指定,使用系统的 CPU 核心数作为默认值(num_cpus())。
  • core_threads 是最终确定的核心线程数,表示运行时的调度线程数量。
  1. 初始化驱动
1
let (driver, driver_handle) = driver::Driver::new(self.get_cfg(core_threads))?;
  • driver:负责管理底层 IO 和定时器的核心组件。
  • driver_handle:运行时与驱动交互的句柄,用于调度异步任务和管理定时器。
  • get_cfg(core_threads):返回结构体driver::Cfg,包含驱动的配置项,包含workers等配置。
  1. 创建阻塞任务池
1
2
let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_spawner = blocking_pool.spawner().clone();
  • 阻塞任务池:用于执行需要阻塞运行的任务(例如文件 IO 或计算密集型任务),避免阻塞异步任务调度线程。

    • 线程数 = 用户指定的 max_blocking_threads + 核心线程数。
  • blocking_spawner:用于将任务提交到阻塞任务池的对象。

  1. 生成随机数种子
1
2
let seed_generator_1 = self.seed_generator.next_generator();
let seed_generator_2 = self.seed_generator.next_generator();
  • Tokio 的调度器可能需要随机化任务的分配,例如在多线程调度器中均衡负载。
  • seed_generator_1seed_generator_2 是生成的两个独立随机数种子,分别用于不同的组件。
  1. 初始化调度器
1
2
3
4
5
6
7
8
let (scheduler, handle, launch) = MultiThread::new(
core_threads,
driver,
driver_handle,
blocking_spawner,
seed_generator_2,
Config { ... },
);
  • 调用 MultiThread::new 创建一个多线程调度器,返回:

    • scheduler:核心调度器,管理线程间任务的分配和调度。
    • handle:调度器的句柄,用于外部与调度器交互。
    • launch:启动调度器工作线程的接口。
  • 传递的参数:

    • core_threads:核心线程数。

    • driverdriver_handle:用于与 IO 和定时器交互的驱动组件。

    • blocking_spawner:用于执行阻塞任务的接口。

    • seed_generator_2:随机数种子,用于内部调度逻辑。

    • Config:调度器的配置,包括以下内容:

      • before_parkafter_unpark:线程挂起与唤醒时的回调函数。
      • before_spawnafter_termination:任务生成和结束时的回调函数。
    • global_queue_intervalevent_interval:全局队列检查和事件处理的时间间隔。

      • local_queue_capacity:本地任务队列容量。
    • 其他运行时的定制项。

  1. 创建运行时句柄
1
let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
  • Handle 是对调度器的抽象封装,用于外部访问运行时和调度器的功能。
  1. 启动工作线程
1
2
let _enter = handle.enter();
launch.launch();
  • handle.enter():将当前线程设置为调度器的上下文,用于初始化调度环境。
  • launch.launch():启动调度器的所有工作线程,使其开始运行。
  1. 返回运行时
1
Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
  • 创建并返回完整的Runtime实例,包括:
    • Scheduler::MultiThread(scheduler):多线程调度器。
    • handle:运行时句柄。
    • blocking_pool:阻塞任务池。

driver::Driver::new方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;

let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);

let (time_driver, time_handle) =
create_time_driver(cfg.enable_time, io_stack, &clock, cfg.workers);

Ok((
Self { inner: time_driver },
Handle {
io: io_handle,
signal: signal_handle,
time: time_handle,
clock,
},
))
}
  1. 创建 IO 栈
1
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;
  • 调用 create_io_stack 创建 IO 栈(底层 IO 相关的资源),返回:

    • io_stack:IO 栈的核心组件,用于管理底层 IO 操作。
    • io_handle:用于和 IO 栈交互的句柄。
    • signal_handle:用于处理信号的句柄(如处理 Ctrl+C)。
  • 参数解释:

    • cfg.enable_io:如果为 true,启用 IO 支持;否则,不创建 IO 栈。
    • cfg.nevents:指定事件队列的大小,用于多路复用器。
  1. 创建时钟
1
let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);
  • 调用 create_clock 创建时钟组件,用于时间相关功能的管理,返回一个 Clock 对象。
  • 参数解释:
    • cfg.enable_pause_time:是否支持暂停时间。
    • cfg.start_paused:是否让时钟在启动时进入暂停状态。

时钟的主要用途:

  • 为定时器、延迟任务提供精确的时间点。
  • 支持测试或调试时暂停时间的功能。
  1. 创建时间驱动器
1
2
let (time_driver, time_handle) =
create_time_driver(cfg.enable_time, io_stack, &clock, cfg.workers);
  • 调用 create_time_driver

    创建时间驱动器,返回:

    • time_driver:管理时间相关任务的核心组件(例如定时器、延迟任务)。
    • time_handle:与时间驱动器交互的句柄。
  • 参数解释:

    • cfg.enable_time:是否启用时间功能。
    • io_stack:IO 栈,与时间驱动器共享底层的事件循环。
    • &clock:前面创建的时钟,提供时间相关支持。
    • cfg.workers:用于分配时间任务的工作线程数。

时间驱动器的主要功能:

  • 实现异步延迟(如 tokio::time::sleep)。
  • 管理基于时间的任务调度。
  1. 返回结果
1
2
3
4
5
6
7
8
9
Ok((
Self { inner: time_driver },
Handle {
io: io_handle,
signal: signal_handle,
time: time_handle,
clock,
},
))
  • 返回一个包含驱动器和其句柄的元组:
    • Self { inner: time_driver }
      • 驱动器的核心部分是时间驱动器,封装到 Self 结构体中。
    • Handle
      • 包含多个句柄,用于驱动器与外部的交互:
        • io_handle:处理 IO 事件的句柄。
        • signal_handle:处理信号的句柄。
        • time_handle:管理时间任务的句柄。
        • clock:提供时间支持的时钟实例。

tokio启动

通过launch.launch();启动所有的 worker 线程:

1
2
3
4
5
pub(crate) fn launch(mut self) {
for worker in self.0.drain(..) {
runtime::spawn_blocking(move || run(worker));
}
}

runtime::spawn_blocking 调用时, || run(worker) 匿名函数会被传进去,这其实就是 worker 线程要执行的逻辑。

如下,匿名函数会被包装为 BlockingTask,并被放在 blocking thread 的 run queue 中,这样当它运行时就会执行这个匿名函数。因为这时没有足够的线程,就会初始化一个新的 OS 线程(如果有 idle 的线程,就会通过 condvar 通知),并开始执行 blocking 线程的逻辑。每个 worker 都占用一个 blocking 线程,并在 blocking 线程中运行直到最后。

1
2
3
4
5
6
7
8
9
10
11
// runtime::spawn_blocking:
let (task, _handle) = task::joinable(BlockingTask::new(func));

let mut shared = self.inner.shared.lock();
shared.queue.push_back(task);

let mut builder = thread::Builder::new(); // Create OS thread
// run worker thread
builder.spawn(move || {
rt.blocking_spawner.inner.run(id);
})

作为常年开发网络程序的我,对异步、并发操作却仅仅停留在使用层面,这对我来说是一件长期的困扰。于是在第四阶段,我毫不犹豫地选择了“基于协程异步机制的OS”方向。在这个方向上,我学到了很多新的知识,也遇到了很多新的问题。

第一周

第一周的任务主要是了解 Rust 中异步的基本概念,通过阅读资料,我了解到了 Rust 中的 asyncawait 关键字背后的原理,并且了解到了如何实现有栈/无栈协程。过去我只了解过有栈协程,而无栈协程对我来说是一个全新的概念,其 LLVM Generator 的实现机制让我感到十分精妙。在这一周中,我主要是通过阅读资料和代码来了解这些概念,对于这些概念的理解还不是很深入。

第二周

第二周主要的工作是阅读 Tokio 代码,我阅读了 Tokio 的 netsignalsync 模块,我将部分代码的分析记录在了共享文档中,这些代码的阅读让我对异步编程有了更深入的理解。

第三周

第三周通过阅读 epollio-uring 等相关资料,我了解到了异步 IO 的原理。通过阅读 async-iopolling 的代码,我了解了 epoll 在 Rust 异步运行时中的运用,而通过阅读 monoiotokio-uring 的代码,我了解了 io-uring 在 Rust 异步运行时中的运用,这些代码的阅读让我对异步 IO 有了更深入的理解。io-uring 的机制起初让我感到困惑,但是在领悟到 io-uring 事实上是一个异步的系统调用框架而不是 epoll 这样的文件描述符复用机制后,我便茅塞顿开。

在阅读了大量相关的资料后,我也着手开始实现一个简单的异步运行时,通过对 mini-rust-runtime 的学习和修改,我将其中使用 polling 实现的基于 epoll 的异步运行时改为了基于 io-uring 的异步运行时,并初步支持了文件的异步读写和 TCP 连接。因为使用 io-uring 必须从系统调用层面进行编程,抽象层次很低,在查阅了大量资料后我才勉强完成了这个十分粗糙的实现。

总结

毫不夸张的说,这短短的三周我学到了近年来对我来说最有价值的知识,使我对异步编程的理解有了质的飞跃,使我未来能更好地开发出高性能的网络程序。在学习的过程中,我也遇到了很多问题,但是通过查阅资料和请教老师,我都得到了解决。这次训练营对我来说是一次非常有意义的经历,我也希望未来能有机会继续参加这样的活动。

第一周:

有栈协程与无栈协程

协程这块的概念可能比较混乱,不太能说得清,但是计科毕竟大多数情况下也不是深究定义的,所以纤程、绿色线程、协程大体上是可以混为一谈的,以下我们统称为协程。
尽管名称可能不同,但它们都可以被划分为两大类,一类是有栈(stackful)协程,一类是无栈(stackless)协程。
此处的有栈和无栈指的不是指协程在运行时是否需要栈,(毕竟不能回到“远古时代”的面条式编程)对于大多数语言来说,一个函数调用另一个函数,总是存在调用栈的;而是指协程是否可以在其任意嵌套函数中被挂起。有栈协程是可以的,而无栈协程则不可以。

有栈协程

实现一个协程的关键点在于如何保存、恢复和切换上下文。
在有栈协程中,我们将函数作为协程,保存上下文也就是保存从这个函数及其嵌套函数的栈帧存储的值。恢复上下文则是将这些值重新写回对应的栈帧和寄存器当中。切换上下文也就是保存当前的上下文,恢复下一个要执行的函数的上下文。有栈协程就是这么地朴素,也是我这种刚听说协程的萌新最易于理解的一种协程实现。

无栈协程

相比于无栈协程直接切换栈帧的思路,无栈协程则没有改变调用栈。而是使用了生成器(一种特殊的迭代器,能够在函数的执行过程中保存状态,并在需要时恢复执行)。这种特性可以用来模拟协程切换的行为,从而实现上下文切换。

无栈协程就是把代码转换成状态机,我们可以将 Future 视为一种协程。

rust的 Future 是通过状态机的形式来实现异步操作的。每个 Future 都是一个状态机,表示一个可能尚未完成的计算。
它通过轮询(polling)的方式推进计算过程,直到完成。
poll 方法会被异步运行时反复调用,当 Future 返回 Poll::Pending 时表示还没准备好,当返回 Poll::Ready 时表示完成。
所以,对future的运算实际上也就可以视为是在执行协程。

它不依赖操作系统或运行时的栈切换,而是通过将状态信息嵌入到 Future 的数据结构中。这样可以在编译时生成高效的代码来管理异步操作。

rust的协程

rust在古早版本(1.0之前)曾经有过一个有栈协程的方案——绿色线程。但是由于不符合零成本抽象的思想被移除了。此外,对于rust而言,绿色线程需要尽可能地减小预分配的堆栈大小,进而降低内存上的开销,毕竟需要比操作系统的线程更加轻量级,否则为什么不直接使用操作系统的线程呢?
之后,rust使用了无栈协程的方案,虽然增加了开发上的复杂度,但是良好地解决了并发问题。

其他

阅读了:

https://os.phil-opp.com/async-await/

两百行实现绿色线程:
https://zhuanlan.zhihu.com/p/100058478

第二周:

基本上就读读tokio和smol的源码,但是说实话,没太看明白,但是smol确实会更简单易懂一点。剩下的就是在读io_uring

io_uring机制

io_uring的设计主要围绕着环形缓冲区实现,SQ和CQ都是环形的,并且大小都是2的n次方(位运算奇技淫巧 index % size == index & (size - 1),当且仅当size为2的n次方时成立)。并且由于 UInt 的回环,所以可以直接tail - head,这种设计的一个优势是可以利用环的完整大小,而无需额外管理“环已满”标志。

应用程序与内核通过io_uring交互的过程大致如下:

应用程序通过提交队列SQ将SQE(IO请求)提交给内核,内核操作完成之后通过完成队列CQ将CQE(完成的IO请求)写回给应用程序,应用程序自行处理已完成的事件。

SQ 和 CQ 是内核与应用程序共享的内存区域,这样子,我们避免了频繁的用户态和内核态之间的切换,并且支持批量地提交请求,也减少了系统调用的次数,提高了性能。此外,由于 io_uring 可以直接访问用户态提供的缓冲区,避免不必要的内存拷贝操作。

其他

看了io_uring的一些资料:

https://kernel.dk/io_uring.pdf

https://zhuanlan.zhihu.com/p/361955546

稍微看了看:

https://tony612.github.io/tokio-internals/01.html

第三周:

实现运行时

大体上是参考了 async-rt-bookmaglev 这两个写出来的

仓库在my_runtime

其他

读了点与异步协程运行时以及IO_Uring有关的一些资料

一个实验性质的运行时: https://github.com/ringbahn/maglev

训练营内的一位同学的博客:https://zhuanlan.zhihu.com/p/12850908116

rust与io_uring: https://zhuanlan.zhihu.com/p/346219893

无栈协程: https://mthli.xyz/coroutines-in-c/

第四阶段选择了基于协程的操作系统,但并没有选择引入协程到操作系统的方向,不过也有相关思考,这段时间主要学习了rust的协程以及异步运行时的功能以及如何设计

协程只是可以挂起和恢复的函数

函数只有2个行为:调用和返回,函数返回后,栈上所拥有的状态会被全部销毁,协程则可以挂起,协程挂起时可以保留协程上下文,恢复则恢复协程的上下文,协程的上下文取决于协程内的局部变量等,反正是比线程上下文小,当协程像函数一样返回,协程也要被销毁

Cpp 协程和Rust协程的对比

cpp和rust同为无栈协程,但设计不同

  • 对于协程的调用者

cpp的做法是协程的调用者会得到来自promise_type结构体内get_return_object方法返回的对象,这里一般通过form_promise构造协程句柄coroutine_handle,调用者可以通过协程句柄resume协程和destroy协程,cpp的协程不是lazy的,这点和rust不一样,相当于拿到future后直接开始poll,不过cpp不需要waker,cpp的handle不需要程序员提供函数也不需要提供指针,已经构造好了,而rust需要rawwaker,需要一个函数表定义行为,再传入任务对象的指针,让waker能够操作任务对象,自然而然就能调度这个任务了,cpp只需要管理好何时resume,但rust是loop poll,程序员管理何时把future重新加入被poll的队列

rust则是每一个async函数都会自动生成并返回一个future对象,通过future trait可以看到主要是通过poll来执行协程内代码以及状态切换,这里贴一下tokio教学文档里的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
impl Future for MainFuture {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<()>
{
use MainFuture::*;

loop {
match *self {
State0 => {
let when = Instant::now() +
Duration::from_millis(10);
let future = Delay { when };
*self = State1(future);
}
State1(ref mut my_future) => {
match Pin::new(my_future).poll(cx) {
Poll::Ready(out) => {
assert_eq!(out, "done");
*self = Terminated;
return Poll::Ready(());
}
Poll::Pending => {
return Poll::Pending;
}
}
}
Terminated => {
panic!("future polled after completion")
}
}
}
}
}

rust中.await会变成对其future的poll,而waker则需要在对最外层future的poll时构造进context作为形参,通过poll的结果决定是否挂起,但是rust协程没有恢复这个操作,rust的协程是通过waker把任务重新调度回来再poll

cpp使用Awaiter来控制协程,符合直觉的操作,没有rust那么绕,resume就是直接重新进入协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
enum State {
Start,
YieldValue,
FinalSuspend,
Done
};

struct CoroutineStateMachine {
State current_state = Start;
int a = 1, b = 1; //

// promise_type的引用,协程的promise接口
promise_type& promise;

void resume() {
try {
switch (current_state) {
case Start:
// 执行 initial_suspend
if (promise.initial_suspend()) {
current_state = YieldValue;
return; // 挂起
}
// 进入协程主体
[[fallthrough]];

case YieldValue:
while (a < 1000000) {
// co_yield a
promise.yield_value(a);
current_state = YieldValue;
std::tie(a, b) = std::make_tuple(b, a + b);
return; // 挂起
}
// co_return
promise.return_void();
current_state = FinalSuspend;
[[fallthrough]];

case FinalSuspend:
// 执行 final_suspend
if (promise.final_suspend()) {
current_state = Done;
return; // 挂起
}
// 结束
[[fallthrough]];

case Done:
return; // 协程结束
}
} catch (...) {
// 异常处理
if (!promise.initial_await_resume_called()) {
promise.unhandled_exception();
}
}
}
};
  • 内存布局

cpp是全都开堆上,而rust的future可自由选择在栈上还是堆上,对于自引用的结构体,当其被协程捕获作为协程的局部变量时,不允许转移所有权,我们使用Pin来进行保障,因为引用所代表的地址已经被标记为无效

异步运行时设计

协程是用户态的任务调度机制,而线程是内核的任务机制,做的事和内核一样,不断执行不同任务,不过是协作式调度,我们需要手动挂起来让其他任务运行,设想单线程环境下的任务调度,我们需要把任务存储在一个集合中,一个接一个运行协程,协程挂起时就再放入集合中。初步想法是这样的,但我们又不在内核态,完全可以把任务交给内核,而不是协程一挂起就重新放回集合,当集合为空时我们的线程就可以休息(多线程环境下其他线程使协程重新加入集合并把休眠的线程唤醒[把阻塞任务丢给了专门的线程])或是主动检查异步任务是否准备完成(阻塞调用不在用户态[需要内核的异步支持])

这样我们的线程可以把阻塞都丢给其他线程或是内核执行,而本身只需要处理更多的任务,提高并发量

和线程一样,我们也需要一个调用在协程里创建一个协程,我们需要spawn,有时候我们需要等待另一个协程的结果(仍然不会阻塞,因为外层的协程也挂起了),我们要为此添加JoinHandle,如果我们持有一个线程池来执行任务,spawn出的协程就需要一个面对线程池调度的waker,当资源准备好时加入线程池所用的任务队列,但当我们对其JoinHandle进行.await的话我们需要把当前协程的waker和其默认的waker进行替换,因为我们需要这个原本自由的协程在我们等待他的协程上恢复而不是在线程池中恢复后,任务的返回值不被关心,等待自然drop,使用线程池我们可以把同步调用异步化,让阻塞在单独的线程上运行,如果使用io_uring的话就更轻松了

io_uring

io_uirng是真正的异步io,通过他我们可以实现上述的第二种方案,当异步任务都处理完了我们的线程就检查完成队列,然后重新加入集合中,进行poll,此时资源已经准备好,不会造成任何阻塞

对协程引入操作系统的想法

考虑多核情况下,每个核心运行一个协程执行器,对于Mutex或信号量如果资源申请失败那么直接把任务挂起,把waker和任务指针记录下来,当资源被释放时,自动寻找等待队列的第一个元素,通过waker提供的函数表操作任务指针,使其重新加入对应核心的任务执行队列中。协程在内核态给我感觉是和序列生成器一般,能在一些部分减少时间片的浪费,提高任务处理的效率