Rust中四种进程相互通信方式


探索在同一台机器上执行的不同进程之间的不同通信方式,并尽可能快地完成。我们专注于高速进程间通信 (IPC),但其中一些方法可以扩展到网络。我们将使用 Rust 进行探索。

由于这些是独立的进程,因此我们在进程内采用的大多数方法都无法使用。这些技术不是线程间或异步例程间的通信,而是在不同程序间共享数据的技术。它们甚至可能都不是用 Rust 编写的。

代码大部分都是片段,但完整的源代码可以在这里找到,最后还有基准测试结果。

案例
我们希望从一个进程向另一个进程发送一条消息(“ping”),并在收到消息后回复确认(“pong”)。这个循环让我们有机会计算两个进程之间往返所需的时间。计时很复杂,下面有说明,但我们将运行许多循环,并从那里计算平均时间。

我们这里关注的是低延迟,而不是高吞吐量。

方法 1 - 管道
这是连接同一台机器上的进程时最先想到的方法。就像 cat | grep 一样,我们只需将生产者的 stdout 连接到消费者的 stdin,反之亦然。这可以在 Windows、Linux 和 MacOS 上使用。

消费者进程从 stdin 向数组中读取五个字节,检查它们是否等于 ping 后的换行符,然后作出相应的响应。它也会响应 pong。

use std::io::{stdin, stdout, Read, Write};  
  
fn main() {  
    let mut arr = [0u8, 0, 0, 0, 0];  
    loop {  
        let read_result = stdin().read_exact(&mut arr);  
        if read_result.is_ok() {  
            let output = match &arr {  
                b"ping\n" => b"pong\n",  
                b
"pong\n" => b"ping\n",  
                _ => b
"Error",  
            };  
            stdout().write(output).unwrap();  
        }  
    }  
}

生产者进程稍微复杂一些,因为它必须先创建和处理消费者,但它会发出 ping,等待响应,如果没有 ping 就会惊慌失措。

pub fn run_inner(&mut self, n: usize, mut return_value: &mut [u8; 5]) {  
    if let Some(ref mut pipes_input) = self.pipe_proc.stdin {  
        if let Some(ref mut pipes_output) = self.pipe_proc.stdout {  
            for _ in 0..n {  
                pipes_input.write(b"ping\n").unwrap();  
                pipes_output.read_exact(return_value).unwrap();  
                if return_value != b
"pong\n" {  
                    panic!(
"Unexpected response")  
                }  
            }  
        }  
    }  
}


除了对管道进行一些繁琐的 ref mut 处理外,编写这个程序还是很容易的。如果使用更复杂的数据结构,这可能会很麻烦,因为必须决定信息之间的分隔符,而不仅仅是换行符。

方法 2 - TCP
一种自然的方法是尝试通过 HTTP 连接客户端和服务器。但这感觉就像对 HTTP 服务器进行基准测试一样危险,所以我直接使用 TCP。

...

// Producer
impl TcpRunner {  
    pub fn new(start_child: bool, tcp_nodelay: bool) -> TcpRunner {  
        let listener = TcpListener::bind(
"127.0.0.1:0").unwrap();  
        let port = listener.local_addr().unwrap().port();  
        let exe = crate::executable_path(
"tcp_consumer");  
        let child_proc = if start_child {  
            Some(Command::new(exe).args(&[port.to_string(), tcp_nodelay.to_string()]).spawn().unwrap())  
        } else {  
            None  
        };  
        let stream = TcpStreamWrapper::from_listener(listener, tcp_nodelay);  
        Self { child_proc, wrapper: stream, tcp_nodelay }  
    }  
  
    pub fn run(&mut self, n: usize, print: bool) {  
       
// TODO: Decide whether this can be done without copying from the socket  
        let mut buf = [0u8; 4];  
        for _ in 0..n {  
            self.wrapper.stream.write(b
"ping").unwrap();  
            self.wrapper.stream.read_exact(&mut buf).unwrap();  
            if !buf.eq(b
"pong") {  
                panic!(
"Sent ping didn't get pong")  
            }  
        }  
    }  
}

...

// pipes_consumer.rs
// Consumer
fn main() {  
    let args: Vec<String> = std::env::args().collect();  
    let port = u16::from_str(&args[1]).unwrap();  
    let nodelay = bool::from_str(&args[2]).unwrap();  
    let mut wrapper = ipc::tcp::TcpStreamWrapper::from_port(port, nodelay);  
    let mut buf = [0u8; 4];  
    while let Ok(_) = wrapper.stream.read(&mut buf) {  
        if buf.eq(b
"ping") {  
            wrapper.stream.write(b
"pong").unwrap();  
        } else {  
            panic!(
"Received unknown value {:?}", buf)  
        }  
    }  
}

总而言之,这相当简单。目前,ping 被写入套接字,复制,然后检查。然后 Pong 被写回。代码中有一条注释突出显示了这一点,但我不清楚是否可以在不将其复制到本地缓冲区的情况下读取套接字。考虑到它只有 5 个字节,并且无论如何都需要系统调用,这可能是可以忽略不计的。

唯一值得关注的另一项是,我们可以设置 TCP_NODELAY,这将禁用Nagle 算法。通常,TCP 会短暂等待以构建一个足够大的值得发送的数据包。考虑到我们正在寻找快速传输,禁用此功能是有意义的。给出了有和没有此设置的基准测试。剧透 - 它似乎没有改变任何东西。

从实现角度来看,它比前一种情况稍微复杂一些。必须将要连接的端口传递给消费者,建立连接,但并不太难。我可以轻松地编写此代码,而且我也喜欢它可以在需要时跨网络拆分。对于复杂的用例,我可能会错过一些 HTTP 细节,但对于来回发送数据包,这感觉灵活且易于维护。

方法 3 - UDP
自然,下一个方法是尝试 UDP。在这些情况下,UDP 传统上用于“发射后不管”机制。与 TCP 不同,该协议不提供恢复丢失或无序数据包的方法。这可能是一个优势,因为它可以防止连接变得过于“繁琐”,但如果一致性很重要,则需要手动实现这些层 - 无论是带内还是带外。我们将回避这个讨论,因为我们在同一台机器上运行两个进程并使用环回适配器,但请注意,这种方式仍然可能丢失数据包。如果套接字缓冲区中填充的数据多于在读取循环中可以读取的数据,它将被毫无歉意地丢弃。也许另一篇文章会演示这一点。

我仅展示生产者,因为消费者非常相似。

pub struct UdpRunner {  
    child_proc: Option<Child>,  
    wrapper: UdpStreamWrapper,  
    their_port: u16,  
}  
  
impl UdpRunner {  
    pub fn new(start_child: bool) -> UdpRunner {  
        let wrapper = UdpStreamWrapper::new();  
        let their_port = portpicker::pick_unused_port().unwrap();  
        let exe = crate::executable_path("udp_consumer");  
        let child_proc = if start_child {  
            Some(  
                Command::new(exe)  
                    .args(&[wrapper.our_port.to_string(), their_port.to_string()])  
                    .spawn()  
                    .unwrap(),  
            )  
        } else {  
            None  
        };  
       
// Awkward sleep to make sure the child proc is ready  
        sleep(Duration::from_millis(100));  
        wrapper  
            .socket  
            .connect(format!(
"127.0.0.1:{}", their_port))  
            .expect(
"Child process can't connect");  
        Self {  
            child_proc,  
            wrapper,  
            their_port,  
        }  
    }  
  
    pub fn run(&mut self, n: usize, print: bool) {  
        let start = Instant::now();  
        let mut buf = [0u8; 4];  
        for _ in 0..n {  
            self.wrapper.socket.send(b
"ping").unwrap();  
            self.wrapper.socket.recv(&mut buf).unwrap();  
            if !buf.eq(b
"pong") {  
                panic!(
"Sent ping didn't get pong")  
            }  
        }  
    }  
}

总体来说,这种方法还不错,但由于以下几个原因,它显得更加繁琐:
  • 由于 UDP 是一种广播协议,它并不关心是否有人在监听。这意味着我们必须启动消费者,连接到生产者,并确认它们已连接。这可以在带外完成,但我只是通过休眠一段时间来破解它,这段时间似乎足够让消费者醒来并准备好接收指令
  • 该 API 与 TCP API 类似,但含义不同。具体来说,该connect方法不保证已建立任何连接,只是程序已将自身绑定到远程地址,该地址可能会或可能不会随后失败,或者只是将数据泵入以太网。它像 TCP 连接方法一样采用地址数组,但它没有有意义的方法来决定地址是否有用(因为它没有握手),因此只采用第一个地址并绑定到它。所有这些都在文档中,但它不符合人体工程学。也许bind会是一个更好的名字,尽管它确实有一个可能不合适的特定含义
UDP 的这些缺点众所周知,它用于这些缺点不那么重要或异步特性有用的场合。它还可以附加多个侦听器,而 TCP 则无法做到这一点。人们可以看到 UDP 在哪里可能有用,以及为什么 UDP 在在线游戏等用例中无处不在。

方法 4——共享内存
共享内存是一种在进程间共享数据的快速方法。一个进程分配一块内存,并将该句柄传递给另一个进程。然后每个进程都可以独立地读取或写入该内存块。如果你的第一直觉是担心同步和竞争条件,那你绝对正确。更糟糕的是,开箱即用的 Rust 在这里帮不了我们,尽管通常对这类事情很有帮助。我们只能靠自己,而且会这样unsafe。

首先,我们将编写在生产者和消费者中执行的代码来创建(或获取)某些共享内存的句柄,然后将其布置为生产者锁、消费者锁和一个四字节缓冲区,以供我们交换数据。

// Shared memory layout
//|    0    |    1    |    2    |    3    |    4    |    5    |    6    |    7    |
//|   producer lock   |   consumer lock   |      data buffer (ping or pong)       |
pub struct ShmemWrapper {  
    pub shmem: Shmem,  
    pub owner: bool,  
    pub our_event: Box<dyn EventImpl>,  
    pub their_event: Box<dyn EventImpl>,  
    pub data_start: usize,  
}  
  
impl ShmemWrapper {  
    pub fn new(handle: Option<String>) -> ShmemWrapper {  
        let owner = handle.is_none();  
       
// If we've been given a memory handle, attach it, if not, create one  
        let mut shmem = match handle {  
            None => shmem_conf().create().unwrap(),  
            Some(h) => shmem_conf()  
                .os_id(&h)  
                .open()  
                .expect(&format!(
"Unable to open the shared memory at {}", h)),  
        };  
        let mut bytes = unsafe { shmem.as_slice_mut() };  
       
// The two events are locks - one for each side. Each side activates the lock while it's  
       
// writing, and then unlocks when the data can be read
        let ((our_event, lock_bytes_ours), (their_event, lock_bytes_theirs)) = unsafe {  
            if owner {  
                (  
                    BusyEvent::new(bytes.get_mut(0).unwrap(), true).unwrap(),  
                    BusyEvent::new(bytes.get_mut(2).unwrap(), true).unwrap(),  
                )  
            } else {  
                (
                    
// If we're not the owner, the events have been created already  
                    BusyEvent::from_existing(bytes.get_mut(2).unwrap()).unwrap(),  
                    BusyEvent::from_existing(bytes.get_mut(0).unwrap()).unwrap(),  
                )  
            }  
        };  
       
// Confirm that we've correctly indexed two bytes for each lock  
        assert!(lock_bytes_ours <= 2);  
        assert!(lock_bytes_theirs <= 2);  
        if owner {  
            our_event.set(EventState::Clear).unwrap();  
            their_event.set(EventState::Clear).unwrap();  
        }  
        ShmemWrapper {  
            shmem,  
            owner,  
            our_event,  
            their_event,  
            data_start: 4,  
        }  
    }  
  
    pub fn signal_start(&mut self) {  
        self.our_event.set(EventState::Clear).unwrap()  
    }  
    pub fn signal_finished(&mut self) {  
        self.our_event.set(EventState::Signaled).unwrap()  
    }  
  
    pub fn write(&mut self, data: &[u8; 4]) {  
        let mut bytes = unsafe { self.shmem.as_slice_mut() };  
  
        for i in 0..data.len() {  
            bytes[i + self.data_start] = data<i>;  
        }  
    }  
  
    pub fn read(&self) -> &[u8] {  
        unsafe { &self.shmem.as_slice()[self.data_start..self.data_start + 4] }  
    }  
}

有了这些结构,我们只需在被允许时锁定、写入、解锁,然后读取即可。

pub fn run(&mut self, n: usize, print: bool) { 
    for _ in 0..n {  
        // Activate our lock in preparation for writing  
        self.wrapper.signal_start();  
        self.wrapper.write(b
"ping");  
        
// Unlock after writing  
        self.wrapper.signal_finished();  
            
// Wait for their lock to be released so we can read  
        if self.wrapper.their_event.wait(Timeout::Infinite).is_ok() {  
            let str = self.wrapper.read();  
            if str != b
"pong" {  
                panic!(
"Sent ping didn't get pong")  
            }  
        }  
    } 
}

这段代码写起来非常糟糕,而且花了不少时间才弄好。我几乎可以肯定其中还有 bug。它之所以复杂是因为:
  • 我们必须自己完成所有同步,而不需要太多帮助。在某些情况下,你可以想象使用队列或消息系统在进程之间进行带外通信,让它们知道何时可以安全地读取或写入。但是,考虑到我们的消息非常小,这会破坏我们努力实现的所有性能
  • 这是非常低级的。我们必须自己整理字节,并使用大量不安全的方法来完成这项工作。这也使我们暴露于结构布局的变化,这会导致难以发现的错误
  • 我遇到了几个错误。在功能最丰富的板条箱的 Windows 实现中,分配内存页面时存在最小页面大小错误
  • 目前尚不清楚底层内存是否可以轻松调整大小。对于我们的目的而言,这不是问题,因为我们只有 8 个字节,但你可以想象这很快就会成为一个问题
说实话,除非我绝对确定我需要所有的共享内存性能,否则我不会想在生产中使用这样的代码。其他语言有共享内存框架来简化这样的事情,但我在 Rust 中找不到任何东西。


测试结果
Windows 和 Linux 的结果,但由于它们是不同的机器,因此请谨慎对待这些结果。不过,在平台内进行比较可能更公平。

  • 使用共享内存,我们可以在 200 纳秒内(大约 1000 个处理器周期)完成一次乒乓。
  • 其他方法的每次操作时间都低于共享内存

结论
我对大多数事物的表现如此相似感到惊讶。我粗略地调查了 Linux 特定的方法,如 dbus 和 Unix Domain Sockets,但它们似乎与非共享内存方法大致相同。唯一可以尝试的其他方法是内存映射文件,但我想把它留到我想对更大的数据块尝试类似的东西时再试。

如果我必须在生产中执行此操作,对于大多数工作负载,我可能仍会使用 HTTP / TCP 连接。它可移植、在消息失败时可靠,并且如果需要,我可以将其拆分到多台机器上。但是,对于延迟确实很重要的情况,使用共享内存的维护开销是值得的。