Module uring

Source
Expand description

此模块提供了用于建立通信连接的数据结构.

Uring 定义了用于发送和接收消息的端口,而 Builder 则用于构造一个通信连接的两端.UringAUringB 是成对的两个通信端口,二者在连接中的地位是对等的.

§非对等通信

SenderReceiver 分别是 A、B 两个端口的别名,用于在语义上区分在通信中的两个不同角色.以下演示了如何建立非对等通信连接,

let items = vec![1, 2, 3, 4, 5];
// 初始化发送端 tx 和接收端 rx
let (mut tx, mut rx) = Builder::<i32, i32>::new().build();
std::thread::scope(|cx| {
    cx.spawn(|| {
        // 将请求批量发送至接收端
        tx.send_bulk(items.iter().copied());
        let mut r = vec![];
        // 接收并处理响应结果
        while r.len() != items.len() {
            r.extend(tx.recv_bulk().map(|i| i >> 1));
            std::thread::yield_now();
        }
        assert_eq!(r, items);
    });
    cx.spawn(|| {
        let mut n = 0;
        loop {
            // 接受请求,处理之,并将响应送回
            while let Some(i) = rx.recv() {
                rx.send(i << 1).unwrap();
                n += 1;
            }
            if n == items.len() {
                break;
            }
            std::thread::yield_now();
        }
    });
});

§对等通信

尽管 UringAUringB 可以用来建立对等通信,但如果一方的角色无法在编译期间确定,二者就无法应对了.UringEither 允许在运行时决定某一方的角色,但它要求通信双方的消息类型是一致的.以下演示了如何用它建立对等通信,

let items = vec![1, 2, 3, 4, 5];
let worker = |mut p: UringEither<i32>| {
    //               ^ UringA 和 UringB 是两个不同的类型
    p.send_bulk(items.iter().copied());
    let mut r = vec![];
    while r.len() != items.len() {
        r.extend(p.recv_bulk());
        std::thread::yield_now();
    }
    assert_eq!(r, items);
};
let (pa, pb) = Builder::<i32, i32>::new().build();
std::thread::scope(|cx| {
    cx.spawn(|| worker(UringEither::A(pa)));
    cx.spawn(|| worker(UringEither::B(pb)));
});

§内存共享

在不同进程之间通过共享内存来建立连接时,分配给 Uring 的内存对双方来说可能是不同的地址.这时就需要通过 RawUring 来手动处理这一差异.Uring 可以和 RawUring 相互转换,而后者暴露了必要的接口以便控制底层的内存细节.

以下展示了如何手动管理 Uring 的内存分配,

let header = Builder::<i32, i32>::new().build_header();
//                                      ^ 仅初始化 Header
// 随后手动分配内存,也可以从已分配的内存中构造 RawUring
let buf_a = alloc_buffer(header.size_a());
let buf_b = alloc_buffer(header.size_b());
let header = alloc_header(header);
let build_raw = || { // <- RawUring 是非 Clone 的
    let mut raw = RawUring::dangling();
    raw.header = header;
    raw.buf_a = buf_a;
    raw.buf_b = buf_b;
    raw
};
let (pa, pb);
// SAFETY: 我们可以确保内存是有效的
unsafe {
    pa = UringA::from_raw(build_raw());
    pb = UringB::from_raw(build_raw());
}
assert!(pa.is_connected() && pb.is_connected());
// 对于自定义分配的内存,我们必须手动释放
assert!(pa.dispose_raw().is_err());
//                       ^ 当任一端存活时,内存不会被释放
let raw = pb.dispose_raw().unwrap();
//                         ^ 此时可以安全的释放内存
dealloc_uring(raw);

Structs§

Builder
DisposeError
Drain
Header
Queue
RawUring
UringA
UringB

Enums§

UringEither

Traits§

Uring

Type Aliases§

Receiver
Sender