1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use alloc::boxed::Box;
use alloc::{sync::Arc, vec::Vec};
use bitflags::bitflags;
use core::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use lock::Mutex;
bitflags! {
#[derive(Default)]
pub struct Event: u32 {
const READABLE = 1 << 0;
const WRITABLE = 1 << 1;
const ERROR = 1 << 2;
const CLOSED = 1 << 3;
const PROCESS_QUIT = 1 << 10;
const CHILD_PROCESS_QUIT = 1 << 11;
const RECEIVE_SIGNAL = 1 << 12;
const SEMAPHORE_REMOVED = 1 << 20;
const SEMAPHORE_CAN_ACQUIRE = 1 << 21;
}
}
pub type EventHandler = Box<dyn Fn(Event) -> bool + Send>;
#[derive(Default)]
pub struct EventBus {
event: Event,
callbacks: Vec<EventHandler>,
}
impl EventBus {
pub fn new() -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(Self::default()))
}
pub fn set(&mut self, set: Event) {
self.change(Event::empty(), set);
}
pub fn clear(&mut self, set: Event) {
self.change(set, Event::empty());
}
pub fn change(&mut self, reset: Event, set: Event) {
let orig = self.event;
let mut new = self.event;
new.remove(reset);
new.insert(set);
self.event = new;
if new != orig {
self.callbacks.retain(|f| !f(new));
}
}
pub fn subscribe(&mut self, callback: EventHandler) {
self.callbacks.push(callback);
}
pub fn get_callback_len(&self) -> usize {
self.callbacks.len()
}
}
pub fn wait_for_event(bus: Arc<Mutex<EventBus>>, mask: Event) -> impl Future<Output = Event> {
EventBusFuture { bus, mask }
}
#[must_use = "future does nothing unless polled/`await`-ed"]
struct EventBusFuture {
bus: Arc<Mutex<EventBus>>,
mask: Event,
}
impl Future for EventBusFuture {
type Output = Event;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut lock = self.bus.lock();
if !(lock.event & self.mask).is_empty() {
return Poll::Ready(lock.event);
}
let waker = cx.waker().clone();
let mask = self.mask;
lock.subscribe(Box::new(move |s| {
if (s & mask).is_empty() {
return false;
}
waker.wake_by_ref();
true
}));
Poll::Pending
}
}