evering_threaded/
runtime.rs

1use std::cell::RefCell;
2use std::mem::ManuallyDrop;
3use std::rc::{Rc, Weak};
4
5use evering::driver::OpId;
6use evering::op::Completable;
7use evering_utils::runtime::ExecutorRef;
8use local_executor::Task;
9
10use crate::op::{Rqe, RqeData, Sqe};
11
12type Sender = evering::uring::Sender<Sqe, Rqe>;
13type RuntimeInner = evering_utils::runtime::Runtime<RqeData, Sender>;
14
15pub struct Runtime(ManuallyDrop<Rc<RuntimeInner>>);
16
17impl Runtime {
18    pub fn new(sender: Sender) -> Self {
19        Self(ManuallyDrop::new(Rc::new(RuntimeInner::new(sender))))
20    }
21
22    pub fn block_on<T>(&self, fut: impl Future<Output = T>) -> T {
23        let _guard = RuntimeHandle::enter(&self.0);
24        let rt = &self.0;
25        rt.block_on(rt.run_on(|rqe| _ = rt.driver.complete(rqe.id, rqe.data), fut))
26    }
27
28    pub fn into_sender(mut self) -> Sender {
29        let rc = unsafe { ManuallyDrop::take(&mut self.0) };
30        std::mem::forget(self);
31        Rc::into_inner(rc)
32            .unwrap_or_else(|| unreachable!("there should not be other strong references"))
33            .into_uring()
34    }
35}
36
37impl Drop for Runtime {
38    fn drop(&mut self) {
39        let rc = unsafe { ManuallyDrop::take(&mut self.0) };
40        // Leak the Driver so that no pending resources will expire.
41        // TODO: should wait instead?
42        if !rc.driver.is_empty() {
43            std::mem::forget(rc);
44        }
45    }
46}
47
48thread_local! {
49    static CX: RefCell<Weak<RuntimeInner>> = const { RefCell::new(Weak::new()) };
50}
51
52pub(crate) struct RuntimeHandle;
53
54impl evering_utils::runtime::RuntimeHandle for RuntimeHandle {
55    type Payload = RqeData;
56    type Uring = Sender;
57    type Ref = Rc<RuntimeInner>;
58    fn get(&self) -> Self::Ref {
59        CX.with_borrow(Weak::upgrade)
60            .expect("not inside a valid reactor")
61    }
62}
63impl local_executor::ExecutorHandle for RuntimeHandle {
64    type Ref = ExecutorRef<RuntimeHandle>;
65    fn get(&self) -> Self::Ref {
66        ExecutorRef::new(self)
67    }
68}
69impl evering::driver::DriverHandle for RuntimeHandle {
70    type Payload = RqeData;
71    type Ext = ();
72    type Ref = evering_utils::runtime::DriverRef<RuntimeHandle>;
73    fn get(&self) -> Self::Ref {
74        evering_utils::runtime::DriverRef::new(self)
75    }
76}
77
78impl RuntimeHandle {
79    fn enter(cx: &Rc<RuntimeInner>) -> impl Drop {
80        struct Revert;
81        impl Drop for Revert {
82            fn drop(&mut self) {
83                CX.with_borrow_mut(|d| *d = Weak::new())
84            }
85        }
86        CX.with_borrow_mut(|d| {
87            if d.strong_count() != 0 {
88                panic!("cannot run within a nested reactor")
89            }
90            *d = Rc::downgrade(cx)
91        });
92        Revert
93    }
94
95    pub fn spawn<T, F>(fut: F) -> Task<T>
96    where
97        T: 'static,
98        F: 'static + Future<Output = T>,
99    {
100        RuntimeInner::spawn(Self, fut)
101    }
102
103    pub async fn submit<T>(data: T, new_entry: impl FnOnce(OpId, &mut T) -> Sqe) -> T::Output
104    where
105        T: Completable<Driver = RuntimeHandle>,
106    {
107        RuntimeInner::submit(Self, data, new_entry).await.await
108    }
109}