evering_threaded/
main.rs

1#![feature(local_waker)]
2
3mod op;
4mod runtime;
5
6use std::collections::VecDeque;
7use std::time::Duration;
8
9use evering::uring::Uring;
10
11use self::op::{Rqe, RqeData, Sqe, SqeData};
12use self::runtime::{Runtime, RuntimeHandle};
13
14fn main() {
15    let (sq, mut rq) = evering::uring::Builder::new().build();
16
17    std::thread::scope(|cx| {
18        cx.spawn(|| {
19            let rt = Runtime::new(sq);
20            rt.block_on(async {
21                let tasks = (0..)
22                    .map(|i| async move {
23                        let now = std::time::Instant::now();
24                        let token = op::ping(Duration::from_millis(fastrand::u64(0..500))).await;
25                        let elapsed = now.elapsed().as_millis();
26                        println!("finished pong({i}) elapsed={elapsed}ms with token={token:#x}");
27                    })
28                    .map(RuntimeHandle::spawn)
29                    .take(fastrand::usize(32..=64))
30                    .collect::<Vec<_>>();
31
32                for task in tasks {
33                    task.await;
34                }
35                op::exit().await;
36                println!("finished exit");
37            });
38            drop(rt.into_sender());
39        });
40        cx.spawn(|| {
41            let mut local_queue = VecDeque::new();
42            loop {
43                let mut should_exit = false;
44                if let Some(Sqe { id, data }) = rq.recv() {
45                    println!("accepted task {data:?}");
46                    let data = match data {
47                        SqeData::Exit => {
48                            should_exit = true;
49                            RqeData::Exited
50                        },
51                        SqeData::Ping { delay } => {
52                            std::thread::sleep(delay);
53                            RqeData::Pong {
54                                token: fastrand::u64(..),
55                            }
56                        },
57                    };
58                    if fastrand::bool() {
59                        local_queue.push_back(Rqe { id, data });
60                    } else {
61                        local_queue.push_front(Rqe { id, data });
62                    }
63                }
64
65                if local_queue.is_empty() {
66                    std::thread::yield_now();
67                } else if should_exit || fastrand::bool() {
68                    for rqe in local_queue.drain(..) {
69                        rq.send(rqe).expect("out of capacity");
70                    }
71                }
72
73                if should_exit {
74                    break;
75                }
76            }
77        });
78    });
79}