线程管理器

线程状态

从调度器的角度来看,每个线程都有一个独一无二的 Tid 来区分它和其他线程。

// in process/mod.rs
pub type Tid = usize;
pub type ExitCode = usize;

同时,线程的状态有下面几种:

// src/process/struct.rs
#[derive(Clone)]
pub enum Status {
    // 就绪:可以运行,但是要等到 CPU 的资源分配给它
    Ready,
    // 正在运行
    Running(Tid),
    // 睡眠:当前被阻塞,要满足某些条件才能继续运行
    Sleeping,
    // 退出:该线程执行完毕并退出
    Exited(ExitCode),
}

调度算法接口设计

在一个线程运行的过程中,调度器需要定期查看当前线程的已运行时间,如果已经达到一个阈值,那么出于公平起见,应该将 CPU 资源交给其他线程,也即切换到其他线程。 查看的间隔不能太长,这样的话等于线程调度根本没起到作用;但是也不能过于频繁, CPU 的资源大量投资在调度器上更是得不偿失。 我们的线程调度算法基于时钟中断,我们会在时钟中断中进入调度器看看当前线程是否需要切换出去。 因此,调度算法的接口 Scheduler 如下:

// src/process/scheduler.rs
pub trait Scheduler {
    // 如果 tid 不存在,表明将一个新线程加入线程调度
    // 否则表明一个已有的线程要继续运行
    fn push(&mut self, tid: Tid);
    // 从若干可运行线程中选择一个运行
    fn pop(&mut self) -> Option<Tid>;
    // 时钟中断中,提醒调度算法当前线程又运行了一个 tick
    // 返回的 bool 表示调度算法认为当前线程是否需要被切换出去
    fn tick(&mut self) -> bool;
    // 告诉调度算法一个线程已经结束
    fn exit(&mut self, tid: Tid);
}

线程池接口设计

调度算法 Scheduler 只管理 Tid ,和线程并没有关系。因此,我们使用线程池 ThreadPool 来给线程和 Tid 建立联系,将 Scheduler 的 Tid 调度变成线程调度。 事实上,每个线程刚被创建时并没有一个 Tid ,这是线程池给线程分配的。

// src/process/thread_pool.rs
// 线程池每个位置的信息
struct ThreadInfo {
    // 占据这个位置的线程当前运行状态
    status: Status,
    // 占据这个位置的线程
    thread: Option<Box<Thread>>,
}

pub struct ThreadPool {
    // 线程池
    // 如果一个位置是 None 表示未被线程占据
    threads: Vec<Option<ThreadInfo>>,
    // 调度算法
    // 这里的 dyn Scheduler 是 Trait object 语法
    // 表明 Box 里面的类型实现了 Scheduler Trait
    scheduler: Box<dyn Scheduler>,
}

线程池的方法

作为一个线程池,需要实现调度相关的一系列操作:

  • alloc_tid:为新线程分配一个新的 Tid
  • add:添加一个可立即开始运行的线程
  • acquire:从线程池中取一个线程开始运行
  • retrieve:让当前线程交出 CPU 资源
  • tick:时钟中断时查看当前所运行线程是否要切换出去
  • exit:退出线程

下面,我们依次来看看线程池的方法:

// src/process/thread_pool.rs

impl ThreadPool {
    // 新建一个线程池,其最大可容纳 size 个线程,使用调度器 scheduler
    pub fn new(size: usize, scheduler: Box<dyn Scheduler>) -> ThreadPool {
        ThreadPool {
            threads: {
                let mut v = Vec::new();
                v.resize_with(size, Default::default);
                v
            },
            scheduler,
        }
    }
    // 在线程池中找一个编号最小的空着的位置
    // 将编号作为 Tid 返回
    fn alloc_tid(&self) -> Tid {
        for (i, info) in self.threads.iter().enumerate() {
            if info.is_none() {
                return i;
            }
        }
        panic!("alloc tid failed!");
    }

    // 加入一个可立即开始运行的线程
    // 线程状态 Uninitialized -> Ready
    pub fn add(&mut self, _thread: Box<Thread>) {
        // 分配 Tid
        let tid = self.alloc_tid();
        // 修改线程池对应位置的信息
        self.threads[tid] = Some(
            ThreadInfo {
                // 状态:随时准备运行,等待 CPU 资源中
                status: Status::Ready,
                // 传入线程
                thread: Some(_thread),
            }
        );
        // 将线程的 Tid 加入调度器
        // 提醒调度器给这个线程分配 CPU 资源
        self.scheduler.push(tid);
    }

    // 从线程池中取一个线程开始运行
    // 线程状态 Ready -> Running
    pub fn acquire(&mut self) -> Option<(Tid, Box<Thread>)> {
        // 调用 Scheduler::pop ,从调度算法中获取接下来要运行的 Tid
        if let Some(tid) = self.scheduler.pop() {
            // 获取并更新线程池对应位置的信息
            let mut thread_info = self.threads[tid].as_mut().expect("thread not exist!");
            // 将线程状态改为 Running
            thread_info.status = Status::Running(tid);
            return Some((tid, thread_info.thread.take().expect("thread not exist!")));
        }
        else {
            return None;
        }
    }
    // 这个线程已运行了太长时间或者已运行结束,需要交出CPU资源
    // 但是要提醒线程池它仍需要分配 CPU 资源
    pub fn retrieve(&mut self, tid: Tid, thread: Box<Thread>) {
        // 线程池位置为空,表明这个线程刚刚通过 exit 退出
        if self.threads[tid].is_none() {
            // 不需要 CPU 资源了,退出
            return;
        }
        // 获取并修改线程池对应位置的信息
        let mut thread_info = self.threads[tid].as_mut().expect("thread not exist!");
        thread_info.thread = Some(thread);
        // 此时状态可能是 Status::Sleeping(线程可能会自动放弃 CPU 资源,进入睡眠状态),
        // 直到被唤醒之前都不必给它分配。
        // 而如果此时状态是Running,就说明只是单纯的耗尽了这次分配CPU资源,但还要占用CPU资源继续执行。
        if let Status::Running(_) = thread_info.status {
            // Running -> Ready
            thread_info.status = Status::Ready;
            // 通知线程池继续给此线程分配资源
            self.scheduler.push(tid);
        }
    }
    // Scheduler 的简单包装:时钟中断时查看当前所运行线程是否要切换出去
    pub fn tick(&mut self) -> bool {
        let ret = self.scheduler.tick();
        ret
    }
    // 这个线程已经退出了,线程状态 Running -> Exited
    pub fn exit(&mut self, tid: Tid) {
        // 清空线程池对应位置
        self.threads[tid] = None;
        // 通知调度器
        self.scheduler.exit(tid);
    }
}

现在我们有了一个线程池 ThreadPool ,它内含调度器,是一个不错的线程管理器。下一节我们将介绍调度线程 idle 以及调度单元 Processor

results matching ""

    No results matching ""