os/fs/
pipe.rs

1use super::File;
2use crate::mm::UserBuffer;
3use crate::sync::UPSafeCell;
4use alloc::sync::{Arc, Weak};
5
6use crate::task::suspend_current_and_run_next;
7
8pub struct Pipe {
9    readable: bool,
10    writable: bool,
11    buffer: Arc<UPSafeCell<PipeRingBuffer>>,
12}
13
14impl Pipe {
15    pub fn read_end_with_buffer(buffer: Arc<UPSafeCell<PipeRingBuffer>>) -> Self {
16        Self {
17            readable: true,
18            writable: false,
19            buffer,
20        }
21    }
22    pub fn write_end_with_buffer(buffer: Arc<UPSafeCell<PipeRingBuffer>>) -> Self {
23        Self {
24            readable: false,
25            writable: true,
26            buffer,
27        }
28    }
29}
30
31const RING_BUFFER_SIZE: usize = 32;
32
33#[derive(Copy, Clone, PartialEq)]
34enum RingBufferStatus {
35    Full,
36    Empty,
37    Normal,
38}
39
40pub struct PipeRingBuffer {
41    arr: [u8; RING_BUFFER_SIZE],
42    head: usize,
43    tail: usize,
44    status: RingBufferStatus,
45    write_end: Option<Weak<Pipe>>,
46}
47
48impl PipeRingBuffer {
49    pub fn new() -> Self {
50        Self {
51            arr: [0; RING_BUFFER_SIZE],
52            head: 0,
53            tail: 0,
54            status: RingBufferStatus::Empty,
55            write_end: None,
56        }
57    }
58    pub fn set_write_end(&mut self, write_end: &Arc<Pipe>) {
59        self.write_end = Some(Arc::downgrade(write_end));
60    }
61    pub fn write_byte(&mut self, byte: u8) {
62        self.status = RingBufferStatus::Normal;
63        self.arr[self.tail] = byte;
64        self.tail = (self.tail + 1) % RING_BUFFER_SIZE;
65        if self.tail == self.head {
66            self.status = RingBufferStatus::Full;
67        }
68    }
69    pub fn read_byte(&mut self) -> u8 {
70        self.status = RingBufferStatus::Normal;
71        let c = self.arr[self.head];
72        self.head = (self.head + 1) % RING_BUFFER_SIZE;
73        if self.head == self.tail {
74            self.status = RingBufferStatus::Empty;
75        }
76        c
77    }
78    pub fn available_read(&self) -> usize {
79        if self.status == RingBufferStatus::Empty {
80            0
81        } else if self.tail > self.head {
82            self.tail - self.head
83        } else {
84            self.tail + RING_BUFFER_SIZE - self.head
85        }
86    }
87    pub fn available_write(&self) -> usize {
88        if self.status == RingBufferStatus::Full {
89            0
90        } else {
91            RING_BUFFER_SIZE - self.available_read()
92        }
93    }
94    pub fn all_write_ends_closed(&self) -> bool {
95        self.write_end.as_ref().unwrap().upgrade().is_none()
96    }
97}
98
99/// Return (read_end, write_end)
100pub fn make_pipe() -> (Arc<Pipe>, Arc<Pipe>) {
101    let buffer = Arc::new(unsafe { UPSafeCell::new(PipeRingBuffer::new()) });
102    let read_end = Arc::new(Pipe::read_end_with_buffer(buffer.clone()));
103    let write_end = Arc::new(Pipe::write_end_with_buffer(buffer.clone()));
104    buffer.exclusive_access().set_write_end(&write_end);
105    (read_end, write_end)
106}
107
108impl File for Pipe {
109    fn readable(&self) -> bool {
110        self.readable
111    }
112    fn writable(&self) -> bool {
113        self.writable
114    }
115    fn read(&self, buf: UserBuffer) -> usize {
116        assert!(self.readable());
117        let want_to_read = buf.len();
118        let mut buf_iter = buf.into_iter();
119        let mut already_read = 0usize;
120        loop {
121            let mut ring_buffer = self.buffer.exclusive_access();
122            let loop_read = ring_buffer.available_read();
123            if loop_read == 0 {
124                if ring_buffer.all_write_ends_closed() {
125                    return already_read;
126                }
127                drop(ring_buffer);
128                suspend_current_and_run_next();
129                continue;
130            }
131            for _ in 0..loop_read {
132                if let Some(byte_ref) = buf_iter.next() {
133                    unsafe {
134                        *byte_ref = ring_buffer.read_byte();
135                    }
136                    already_read += 1;
137                    if already_read == want_to_read {
138                        return want_to_read;
139                    }
140                } else {
141                    return already_read;
142                }
143            }
144        }
145    }
146    fn write(&self, buf: UserBuffer) -> usize {
147        assert!(self.writable());
148        let want_to_write = buf.len();
149        let mut buf_iter = buf.into_iter();
150        let mut already_write = 0usize;
151        loop {
152            let mut ring_buffer = self.buffer.exclusive_access();
153            let loop_write = ring_buffer.available_write();
154            if loop_write == 0 {
155                drop(ring_buffer);
156                suspend_current_and_run_next();
157                continue;
158            }
159            // write at most loop_write bytes
160            for _ in 0..loop_write {
161                if let Some(byte_ref) = buf_iter.next() {
162                    ring_buffer.write_byte(unsafe { *byte_ref });
163                    already_write += 1;
164                    if already_write == want_to_write {
165                        return want_to_write;
166                    }
167                } else {
168                    return already_write;
169                }
170            }
171        }
172    }
173}