evering_ipc/
runtime.rs

1use std::cell::RefCell;
2use std::mem::ManuallyDrop;
3use std::rc::{Rc, Weak};
4
5use evering::driver::OpId;
6use evering::op::Completable;
7use evering_utils::runtime::ExecutorRef;
8use local_executor::Task;
9
10use crate::op::{Rqe, RqeData, Sqe};
11
12type Sender = evering::uring::Sender<Sqe, Rqe>;
13type RuntimeInner = evering_utils::runtime::Runtime<RqeData, Sender>;
14
15pub struct Runtime(ManuallyDrop<Rc<RuntimeInner>>);
16
17impl Runtime {
18    pub fn new(sender: Sender) -> Self {
19        Self(ManuallyDrop::new(Rc::new(RuntimeInner::new(sender))))
20    }
21
22    pub fn block_on<T>(&self, fut: impl Future<Output = T>) -> T {
23        let _guard = RuntimeHandle::enter(&self.0);
24        self.0.block_on(self.run_on_no_guard(fut))
25    }
26
27    pub async fn run_on<T>(&self, fut: impl Future<Output = T>) -> T {
28        let _guard = RuntimeHandle::enter(&self.0);
29        self.run_on_no_guard(fut).await
30    }
31
32    async fn run_on_no_guard<T>(&self, fut: impl Future<Output = T>) -> T {
33        self.0
34            .run_on(|rqe| _ = self.0.driver.complete(rqe.id, rqe.data), fut)
35            .await
36    }
37
38    pub fn into_uring(mut self) -> Sender {
39        let rc = unsafe { ManuallyDrop::take(&mut self.0) };
40        std::mem::forget(self);
41        Rc::into_inner(rc)
42            .unwrap_or_else(|| unreachable!("there should not be other strong references"))
43            .into_uring()
44    }
45}
46
47impl Drop for Runtime {
48    fn drop(&mut self) {
49        let rc = unsafe { ManuallyDrop::take(&mut self.0) };
50        // Leak the Driver so that no pending resources will expire.
51        // TODO: should wait instead?
52        if !rc.driver.is_empty() {
53            std::mem::forget(rc);
54        }
55    }
56}
57
58thread_local! {
59    static CX: RefCell<Weak<RuntimeInner>> = const { RefCell::new(Weak::new()) };
60}
61
62pub struct RuntimeHandle;
63
64impl evering_utils::runtime::RuntimeHandle for RuntimeHandle {
65    type Payload = RqeData;
66    type Uring = Sender;
67    type Ref = Rc<RuntimeInner>;
68    fn get(&self) -> Self::Ref {
69        CX.with_borrow(Weak::upgrade)
70            .expect("not inside a valid reactor")
71    }
72}
73impl local_executor::ExecutorHandle for RuntimeHandle {
74    type Ref = ExecutorRef<RuntimeHandle>;
75    fn get(&self) -> Self::Ref {
76        ExecutorRef::new(self)
77    }
78}
79impl evering::driver::DriverHandle for RuntimeHandle {
80    type Payload = RqeData;
81    type Ext = ();
82    type Ref = evering_utils::runtime::DriverRef<RuntimeHandle>;
83    fn get(&self) -> Self::Ref {
84        evering_utils::runtime::DriverRef::new(self)
85    }
86}
87
88impl RuntimeHandle {
89    fn enter(cx: &Rc<RuntimeInner>) -> impl Drop {
90        struct Revert;
91        impl Drop for Revert {
92            fn drop(&mut self) {
93                CX.with_borrow_mut(|d| *d = Weak::new())
94            }
95        }
96        CX.with_borrow_mut(|d| {
97            if d.strong_count() != 0 {
98                panic!("cannot run within a nested reactor")
99            }
100            *d = Rc::downgrade(cx)
101        });
102        Revert
103    }
104
105    pub fn spawn<T, F>(fut: F) -> Task<T>
106    where
107        T: 'static,
108        F: 'static + Future<Output = T>,
109    {
110        RuntimeInner::spawn(Self, fut)
111    }
112
113    pub async fn submit<T>(data: T, new_entry: impl FnOnce(OpId, &mut T) -> Sqe) -> T::Output
114    where
115        T: Completable<Driver = RuntimeHandle>,
116    {
117        RuntimeInner::submit(Self, data, new_entry).await.await
118    }
119}