evering_threaded/
runtime.rs1use std::cell::RefCell;
2use std::mem::ManuallyDrop;
3use std::rc::{Rc, Weak};
4
5use evering::driver::OpId;
6use evering::op::Completable;
7use evering_utils::runtime::ExecutorRef;
8use local_executor::Task;
9
10use crate::op::{Rqe, RqeData, Sqe};
11
12type Sender = evering::uring::Sender<Sqe, Rqe>;
13type RuntimeInner = evering_utils::runtime::Runtime<RqeData, Sender>;
14
15pub struct Runtime(ManuallyDrop<Rc<RuntimeInner>>);
16
17impl Runtime {
18 pub fn new(sender: Sender) -> Self {
19 Self(ManuallyDrop::new(Rc::new(RuntimeInner::new(sender))))
20 }
21
22 pub fn block_on<T>(&self, fut: impl Future<Output = T>) -> T {
23 let _guard = RuntimeHandle::enter(&self.0);
24 let rt = &self.0;
25 rt.block_on(rt.run_on(|rqe| _ = rt.driver.complete(rqe.id, rqe.data), fut))
26 }
27
28 pub fn into_sender(mut self) -> Sender {
29 let rc = unsafe { ManuallyDrop::take(&mut self.0) };
30 std::mem::forget(self);
31 Rc::into_inner(rc)
32 .unwrap_or_else(|| unreachable!("there should not be other strong references"))
33 .into_uring()
34 }
35}
36
37impl Drop for Runtime {
38 fn drop(&mut self) {
39 let rc = unsafe { ManuallyDrop::take(&mut self.0) };
40 if !rc.driver.is_empty() {
43 std::mem::forget(rc);
44 }
45 }
46}
47
48thread_local! {
49 static CX: RefCell<Weak<RuntimeInner>> = const { RefCell::new(Weak::new()) };
50}
51
52pub(crate) struct RuntimeHandle;
53
54impl evering_utils::runtime::RuntimeHandle for RuntimeHandle {
55 type Payload = RqeData;
56 type Uring = Sender;
57 type Ref = Rc<RuntimeInner>;
58 fn get(&self) -> Self::Ref {
59 CX.with_borrow(Weak::upgrade)
60 .expect("not inside a valid reactor")
61 }
62}
63impl local_executor::ExecutorHandle for RuntimeHandle {
64 type Ref = ExecutorRef<RuntimeHandle>;
65 fn get(&self) -> Self::Ref {
66 ExecutorRef::new(self)
67 }
68}
69impl evering::driver::DriverHandle for RuntimeHandle {
70 type Payload = RqeData;
71 type Ext = ();
72 type Ref = evering_utils::runtime::DriverRef<RuntimeHandle>;
73 fn get(&self) -> Self::Ref {
74 evering_utils::runtime::DriverRef::new(self)
75 }
76}
77
78impl RuntimeHandle {
79 fn enter(cx: &Rc<RuntimeInner>) -> impl Drop {
80 struct Revert;
81 impl Drop for Revert {
82 fn drop(&mut self) {
83 CX.with_borrow_mut(|d| *d = Weak::new())
84 }
85 }
86 CX.with_borrow_mut(|d| {
87 if d.strong_count() != 0 {
88 panic!("cannot run within a nested reactor")
89 }
90 *d = Rc::downgrade(cx)
91 });
92 Revert
93 }
94
95 pub fn spawn<T, F>(fut: F) -> Task<T>
96 where
97 T: 'static,
98 F: 'static + Future<Output = T>,
99 {
100 RuntimeInner::spawn(Self, fut)
101 }
102
103 pub async fn submit<T>(data: T, new_entry: impl FnOnce(OpId, &mut T) -> Sqe) -> T::Output
104 where
105 T: Completable<Driver = RuntimeHandle>,
106 {
107 RuntimeInner::submit(Self, data, new_entry).await.await
108 }
109}