evering_utils/
runtime.rs

1use alloc::collections::VecDeque;
2use core::cell::RefCell;
3use core::pin::Pin;
4use core::task::{Context, LocalWaker, Poll};
5
6use evering::driver::{Driver, DriverHandle, OpId};
7use evering::op::{Completable, Op};
8use evering::uring::Uring;
9use local_executor::{Executor, ExecutorHandle, Task};
10
11pub struct Runtime<P, U: Uring> {
12    pub executor: Executor,
13    pub uring: RefCell<U>,
14    pub driver: Driver<P, U::Ext>,
15    pub pending_submissions: RefCell<VecDeque<LocalWaker>>,
16}
17
18impl<P, U: Uring> Runtime<P, U> {
19    pub fn new(uring: U) -> Self {
20        Self {
21            executor: Executor::new(),
22            driver: Driver::with_capacity(uring.header().size_a()),
23            uring: RefCell::new(uring),
24            pending_submissions: RefCell::default(),
25        }
26    }
27
28    pub fn run_on<C, Fut>(&self, complete: C, fut: Fut) -> RunOn<P, U, C, Fut>
29    where
30        C: FnMut(U::B),
31        Fut: Future,
32    {
33        RunOn {
34            rt: self,
35            complete,
36            fut,
37        }
38    }
39
40    pub fn block_on<T>(&self, fut: impl Future<Output = T>) -> T {
41        self.executor.block_on(fut)
42    }
43
44    pub fn into_uring(self) -> U {
45        self.uring.into_inner()
46    }
47
48    pub fn spawn<T, F, Rt>(handle: Rt, fut: F) -> Task<T>
49    where
50        T: 'static,
51        F: 'static + Future<Output = T>,
52        Rt: RuntimeHandle<Payload = P, Uring = U>,
53        Rt: ExecutorHandle,
54    {
55        Executor::spawn(handle, fut)
56    }
57
58    pub async fn submit<T, Rt>(
59        handle: Rt,
60        data: T,
61        new_entry: impl FnOnce(OpId, &mut T) -> U::A,
62    ) -> Op<T>
63    where
64        T: Completable<Driver = Rt>,
65        Rt: RuntimeHandle<Payload = P, Uring = U>,
66        Rt: DriverHandle<Payload = P, Ext = U::Ext>,
67        U::Ext: Default,
68    {
69        Self::submit_ext(handle, <_>::default(), data, new_entry).await
70    }
71
72    pub async fn submit_ext<T, Rt>(
73        handle: Rt,
74        ext: U::Ext,
75        mut data: T,
76        new_entry: impl FnOnce(OpId, &mut T) -> U::A,
77    ) -> Op<T>
78    where
79        T: Completable<Driver = Rt>,
80        Rt: RuntimeHandle<Payload = P, Uring = U>,
81        Rt: DriverHandle<Payload = P, Ext = U::Ext>,
82    {
83        let rt = RuntimeHandle::get(&handle);
84
85        let mut ext = Some(ext);
86        let id = rt
87            .wait_for_ok(|| {
88                rt.driver
89                    .try_submit_ext(ext.take().unwrap())
90                    .map_err(|e| ext = Some(e))
91            })
92            .await;
93
94        let mut ent = Some(new_entry(id, &mut data));
95        rt.wait_for_ok(|| {
96            rt.uring
97                .borrow_mut()
98                .send(ent.take().unwrap())
99                .map_err(|e| ent = Some(e))
100        })
101        .await;
102
103        Op::new(handle, id, data)
104    }
105
106    async fn wait_for_ok<T>(&self, mut f: impl FnMut() -> Result<T, ()>) -> T {
107        core::future::poll_fn(|cx| match f() {
108            Ok(t) => Poll::Ready(t),
109            Err(_) => {
110                self.pending_submissions
111                    .borrow_mut()
112                    .push_back(cx.local_waker().clone());
113                Poll::Pending
114            },
115        })
116        .await
117    }
118}
119
120pin_project_lite::pin_project! {
121    pub struct RunOn<'a,P, U, C, Fut>
122    where
123        U: Uring,
124    {
125        rt: &'a Runtime<P, U>,
126        complete:C,
127        #[pin]
128        fut: Fut,
129    }
130}
131
132impl<'a, P, U, C, Fut> Future for RunOn<'a, P, U, C, Fut>
133where
134    U: Uring,
135    C: FnMut(U::B),
136    Fut: Future,
137{
138    type Output = Fut::Output;
139
140    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
141        let mut this = self.project();
142        for ent in this.rt.uring.borrow_mut().recv_bulk() {
143            _ = this
144                .rt
145                .pending_submissions
146                .borrow_mut()
147                .pop_front()
148                .map(LocalWaker::wake);
149            (this.complete)(ent);
150        }
151        let mut noop_cx = Context::from_waker(core::task::Waker::noop());
152        match this.fut.as_mut().poll(&mut noop_cx) {
153            // Always wake ourself if pending as the given `Future` may wait us
154            // to wake it, which leads to a circular waiting chain.
155            Poll::Pending => {
156                cx.local_waker().wake_by_ref();
157                Poll::Pending
158            },
159            ready => ready,
160        }
161    }
162}
163
164pub trait RuntimeHandle: 'static + Unpin {
165    type Payload;
166    type Uring: Uring;
167    type Ref: core::ops::Deref<Target = Runtime<Self::Payload, Self::Uring>>;
168
169    fn get(&self) -> Self::Ref;
170}
171impl<P, U> RuntimeHandle for alloc::rc::Weak<Runtime<P, U>>
172where
173    P: 'static,
174    U: 'static + Uring,
175{
176    type Payload = P;
177    type Uring = U;
178    type Ref = alloc::rc::Rc<Runtime<P, U>>;
179
180    fn get(&self) -> Self::Ref {
181        self.upgrade().expect("not inside a valid executor")
182    }
183}
184
185pub struct ExecutorRef<R: RuntimeHandle>(pub R::Ref);
186impl<Rt: RuntimeHandle> ExecutorRef<Rt> {
187    pub fn new(rt: &Rt) -> Self {
188        Self(rt.get())
189    }
190}
191impl<Rt: RuntimeHandle> core::ops::Deref for ExecutorRef<Rt> {
192    type Target = Executor;
193
194    fn deref(&self) -> &Self::Target {
195        &self.0.executor
196    }
197}
198
199pub struct DriverRef<R: RuntimeHandle>(pub R::Ref);
200impl<Rt: RuntimeHandle> DriverRef<Rt> {
201    pub fn new(rt: &Rt) -> Self {
202        Self(rt.get())
203    }
204}
205impl<Rt: RuntimeHandle> core::ops::Deref for DriverRef<Rt> {
206    type Target = Driver<Rt::Payload, <Rt::Uring as Uring>::Ext>;
207
208    fn deref(&self) -> &Self::Target {
209        &self.0.driver
210    }
211}