1use alloc::collections::VecDeque;
2use core::cell::RefCell;
3use core::pin::Pin;
4use core::task::{Context, LocalWaker, Poll};
5
6use evering::driver::{Driver, DriverHandle, OpId};
7use evering::op::{Completable, Op};
8use evering::uring::Uring;
9use local_executor::{Executor, ExecutorHandle, Task};
10
11pub struct Runtime<P, U: Uring> {
12 pub executor: Executor,
13 pub uring: RefCell<U>,
14 pub driver: Driver<P, U::Ext>,
15 pub pending_submissions: RefCell<VecDeque<LocalWaker>>,
16}
17
18impl<P, U: Uring> Runtime<P, U> {
19 pub fn new(uring: U) -> Self {
20 Self {
21 executor: Executor::new(),
22 driver: Driver::with_capacity(uring.header().size_a()),
23 uring: RefCell::new(uring),
24 pending_submissions: RefCell::default(),
25 }
26 }
27
28 pub fn run_on<C, Fut>(&self, complete: C, fut: Fut) -> RunOn<P, U, C, Fut>
29 where
30 C: FnMut(U::B),
31 Fut: Future,
32 {
33 RunOn {
34 rt: self,
35 complete,
36 fut,
37 }
38 }
39
40 pub fn block_on<T>(&self, fut: impl Future<Output = T>) -> T {
41 self.executor.block_on(fut)
42 }
43
44 pub fn into_uring(self) -> U {
45 self.uring.into_inner()
46 }
47
48 pub fn spawn<T, F, Rt>(handle: Rt, fut: F) -> Task<T>
49 where
50 T: 'static,
51 F: 'static + Future<Output = T>,
52 Rt: RuntimeHandle<Payload = P, Uring = U>,
53 Rt: ExecutorHandle,
54 {
55 Executor::spawn(handle, fut)
56 }
57
58 pub async fn submit<T, Rt>(
59 handle: Rt,
60 data: T,
61 new_entry: impl FnOnce(OpId, &mut T) -> U::A,
62 ) -> Op<T>
63 where
64 T: Completable<Driver = Rt>,
65 Rt: RuntimeHandle<Payload = P, Uring = U>,
66 Rt: DriverHandle<Payload = P, Ext = U::Ext>,
67 U::Ext: Default,
68 {
69 Self::submit_ext(handle, <_>::default(), data, new_entry).await
70 }
71
72 pub async fn submit_ext<T, Rt>(
73 handle: Rt,
74 ext: U::Ext,
75 mut data: T,
76 new_entry: impl FnOnce(OpId, &mut T) -> U::A,
77 ) -> Op<T>
78 where
79 T: Completable<Driver = Rt>,
80 Rt: RuntimeHandle<Payload = P, Uring = U>,
81 Rt: DriverHandle<Payload = P, Ext = U::Ext>,
82 {
83 let rt = RuntimeHandle::get(&handle);
84
85 let mut ext = Some(ext);
86 let id = rt
87 .wait_for_ok(|| {
88 rt.driver
89 .try_submit_ext(ext.take().unwrap())
90 .map_err(|e| ext = Some(e))
91 })
92 .await;
93
94 let mut ent = Some(new_entry(id, &mut data));
95 rt.wait_for_ok(|| {
96 rt.uring
97 .borrow_mut()
98 .send(ent.take().unwrap())
99 .map_err(|e| ent = Some(e))
100 })
101 .await;
102
103 Op::new(handle, id, data)
104 }
105
106 async fn wait_for_ok<T>(&self, mut f: impl FnMut() -> Result<T, ()>) -> T {
107 core::future::poll_fn(|cx| match f() {
108 Ok(t) => Poll::Ready(t),
109 Err(_) => {
110 self.pending_submissions
111 .borrow_mut()
112 .push_back(cx.local_waker().clone());
113 Poll::Pending
114 },
115 })
116 .await
117 }
118}
119
120pin_project_lite::pin_project! {
121 pub struct RunOn<'a,P, U, C, Fut>
122 where
123 U: Uring,
124 {
125 rt: &'a Runtime<P, U>,
126 complete:C,
127 #[pin]
128 fut: Fut,
129 }
130}
131
132impl<'a, P, U, C, Fut> Future for RunOn<'a, P, U, C, Fut>
133where
134 U: Uring,
135 C: FnMut(U::B),
136 Fut: Future,
137{
138 type Output = Fut::Output;
139
140 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
141 let mut this = self.project();
142 for ent in this.rt.uring.borrow_mut().recv_bulk() {
143 _ = this
144 .rt
145 .pending_submissions
146 .borrow_mut()
147 .pop_front()
148 .map(LocalWaker::wake);
149 (this.complete)(ent);
150 }
151 let mut noop_cx = Context::from_waker(core::task::Waker::noop());
152 match this.fut.as_mut().poll(&mut noop_cx) {
153 Poll::Pending => {
156 cx.local_waker().wake_by_ref();
157 Poll::Pending
158 },
159 ready => ready,
160 }
161 }
162}
163
164pub trait RuntimeHandle: 'static + Unpin {
165 type Payload;
166 type Uring: Uring;
167 type Ref: core::ops::Deref<Target = Runtime<Self::Payload, Self::Uring>>;
168
169 fn get(&self) -> Self::Ref;
170}
171impl<P, U> RuntimeHandle for alloc::rc::Weak<Runtime<P, U>>
172where
173 P: 'static,
174 U: 'static + Uring,
175{
176 type Payload = P;
177 type Uring = U;
178 type Ref = alloc::rc::Rc<Runtime<P, U>>;
179
180 fn get(&self) -> Self::Ref {
181 self.upgrade().expect("not inside a valid executor")
182 }
183}
184
185pub struct ExecutorRef<R: RuntimeHandle>(pub R::Ref);
186impl<Rt: RuntimeHandle> ExecutorRef<Rt> {
187 pub fn new(rt: &Rt) -> Self {
188 Self(rt.get())
189 }
190}
191impl<Rt: RuntimeHandle> core::ops::Deref for ExecutorRef<Rt> {
192 type Target = Executor;
193
194 fn deref(&self) -> &Self::Target {
195 &self.0.executor
196 }
197}
198
199pub struct DriverRef<R: RuntimeHandle>(pub R::Ref);
200impl<Rt: RuntimeHandle> DriverRef<Rt> {
201 pub fn new(rt: &Rt) -> Self {
202 Self(rt.get())
203 }
204}
205impl<Rt: RuntimeHandle> core::ops::Deref for DriverRef<Rt> {
206 type Target = Driver<Rt::Payload, <Rt::Uring as Uring>::Ext>;
207
208 fn deref(&self) -> &Self::Target {
209 &self.0.driver
210 }
211}