1use std::cell::RefCell;
2use std::mem::ManuallyDrop;
3use std::rc::{Rc, Weak};
4
5use evering::driver::OpId;
6use evering::op::Completable;
7use evering_utils::runtime::ExecutorRef;
8use local_executor::Task;
9
10use crate::op::{Rqe, RqeData, Sqe};
11
12type Sender = evering::uring::Sender<Sqe, Rqe>;
13type RuntimeInner = evering_utils::runtime::Runtime<RqeData, Sender>;
14
15pub struct Runtime(ManuallyDrop<Rc<RuntimeInner>>);
16
17impl Runtime {
18 pub fn new(sender: Sender) -> Self {
19 Self(ManuallyDrop::new(Rc::new(RuntimeInner::new(sender))))
20 }
21
22 pub fn block_on<T>(&self, fut: impl Future<Output = T>) -> T {
23 let _guard = RuntimeHandle::enter(&self.0);
24 self.0.block_on(self.run_on_no_guard(fut))
25 }
26
27 pub async fn run_on<T>(&self, fut: impl Future<Output = T>) -> T {
28 let _guard = RuntimeHandle::enter(&self.0);
29 self.run_on_no_guard(fut).await
30 }
31
32 async fn run_on_no_guard<T>(&self, fut: impl Future<Output = T>) -> T {
33 self.0
34 .run_on(|rqe| _ = self.0.driver.complete(rqe.id, rqe.data), fut)
35 .await
36 }
37
38 pub fn into_uring(mut self) -> Sender {
39 let rc = unsafe { ManuallyDrop::take(&mut self.0) };
40 std::mem::forget(self);
41 Rc::into_inner(rc)
42 .unwrap_or_else(|| unreachable!("there should not be other strong references"))
43 .into_uring()
44 }
45}
46
47impl Drop for Runtime {
48 fn drop(&mut self) {
49 let rc = unsafe { ManuallyDrop::take(&mut self.0) };
50 if !rc.driver.is_empty() {
53 std::mem::forget(rc);
54 }
55 }
56}
57
58thread_local! {
59 static CX: RefCell<Weak<RuntimeInner>> = const { RefCell::new(Weak::new()) };
60}
61
62pub struct RuntimeHandle;
63
64impl evering_utils::runtime::RuntimeHandle for RuntimeHandle {
65 type Payload = RqeData;
66 type Uring = Sender;
67 type Ref = Rc<RuntimeInner>;
68 fn get(&self) -> Self::Ref {
69 CX.with_borrow(Weak::upgrade)
70 .expect("not inside a valid reactor")
71 }
72}
73impl local_executor::ExecutorHandle for RuntimeHandle {
74 type Ref = ExecutorRef<RuntimeHandle>;
75 fn get(&self) -> Self::Ref {
76 ExecutorRef::new(self)
77 }
78}
79impl evering::driver::DriverHandle for RuntimeHandle {
80 type Payload = RqeData;
81 type Ext = ();
82 type Ref = evering_utils::runtime::DriverRef<RuntimeHandle>;
83 fn get(&self) -> Self::Ref {
84 evering_utils::runtime::DriverRef::new(self)
85 }
86}
87
88impl RuntimeHandle {
89 fn enter(cx: &Rc<RuntimeInner>) -> impl Drop {
90 struct Revert;
91 impl Drop for Revert {
92 fn drop(&mut self) {
93 CX.with_borrow_mut(|d| *d = Weak::new())
94 }
95 }
96 CX.with_borrow_mut(|d| {
97 if d.strong_count() != 0 {
98 panic!("cannot run within a nested reactor")
99 }
100 *d = Rc::downgrade(cx)
101 });
102 Revert
103 }
104
105 pub fn spawn<T, F>(fut: F) -> Task<T>
106 where
107 T: 'static,
108 F: 'static + Future<Output = T>,
109 {
110 RuntimeInner::spawn(Self, fut)
111 }
112
113 pub async fn submit<T>(data: T, new_entry: impl FnOnce(OpId, &mut T) -> Sqe) -> T::Output
114 where
115 T: Completable<Driver = RuntimeHandle>,
116 {
117 RuntimeInner::submit(Self, data, new_entry).await.await
118 }
119}