1#![feature(local_waker)]
2
3mod op;
4mod runtime;
5
6use std::collections::VecDeque;
7use std::time::Duration;
8
9use evering::uring::Uring;
10
11use self::op::{Rqe, RqeData, Sqe, SqeData};
12use self::runtime::{Runtime, RuntimeHandle};
13
14fn main() {
15 let (sq, mut rq) = evering::uring::Builder::new().build();
16
17 std::thread::scope(|cx| {
18 cx.spawn(|| {
19 let rt = Runtime::new(sq);
20 rt.block_on(async {
21 let tasks = (0..)
22 .map(|i| async move {
23 let now = std::time::Instant::now();
24 let token = op::ping(Duration::from_millis(fastrand::u64(0..500))).await;
25 let elapsed = now.elapsed().as_millis();
26 println!("finished pong({i}) elapsed={elapsed}ms with token={token:#x}");
27 })
28 .map(RuntimeHandle::spawn)
29 .take(fastrand::usize(32..=64))
30 .collect::<Vec<_>>();
31
32 for task in tasks {
33 task.await;
34 }
35 op::exit().await;
36 println!("finished exit");
37 });
38 drop(rt.into_sender());
39 });
40 cx.spawn(|| {
41 let mut local_queue = VecDeque::new();
42 loop {
43 let mut should_exit = false;
44 if let Some(Sqe { id, data }) = rq.recv() {
45 println!("accepted task {data:?}");
46 let data = match data {
47 SqeData::Exit => {
48 should_exit = true;
49 RqeData::Exited
50 },
51 SqeData::Ping { delay } => {
52 std::thread::sleep(delay);
53 RqeData::Pong {
54 token: fastrand::u64(..),
55 }
56 },
57 };
58 if fastrand::bool() {
59 local_queue.push_back(Rqe { id, data });
60 } else {
61 local_queue.push_front(Rqe { id, data });
62 }
63 }
64
65 if local_queue.is_empty() {
66 std::thread::yield_now();
67 } else if should_exit || fastrand::bool() {
68 for rqe in local_queue.drain(..) {
69 rq.send(rqe).expect("out of capacity");
70 }
71 }
72
73 if should_exit {
74 break;
75 }
76 }
77 });
78 });
79}