使用Rustlang的Async Tokio运行时处理CPU密集型任务


Rust 内置了对异步 ( async) 编程模型的支持,类似于 JavaScript 等语言。
要充分利用多核和异步 I/O,必须使用运行时,虽然 Rust 社区有多种替代方案,但 Tokio 是事实上的标准。
CPU 密集型计算定义:以消耗大量 CPU 用于存储重组、预先计算各种索引或直接回答客户端查询的方式处理数据。
这些计算通常被分解成许多独立的块,称为“任务”,然后并行运行以利用现代 CPU 中可用的许多内核。
确定何时运行哪些任务通常由称为“任务调度程序”的东西完成,它将任务映射到可用的硬件内核/操作系统线程。
当在 Rust 生态系统中寻找任务调度器时,就像InfluxDB IOx 和 DataFusion 所做的那样,你自然会选择 Tokio,它看起来很不错:

  1. 您已经拥有 Tokio(没有新的依赖项)。
  2. Tokio 实现了一个复杂的工作窃取调度程序
  3. Tokio 有效地内置了对延续 ( async/ await) 的语言支持,以及许多相对成熟的流、异步锁定、通道、取消等库。
  4. Tokio 以在整个 Rust 生态系统中经过良好测试和大量使用而闻名。
  5. Tokio 通常会在同一个执行器线程上运行任务和未来,这对于缓存局部性非常有用。
  6. Tokio 有据可查,积极维护,并且一直在变得更好。

但是:
旧版本的 Tokio 文档(例如1.10.0)著名的告诫:
“如果您的代码受 CPU 限制,并且您希望限制用于运行它的线程数,您应该在另一个线程池(例如 Rayon)上运行它。”

这个声明在广泛的 Rust 社区中都引起了严重的混乱。多人读到它的意思是Runtime永远不应该将 Tokio 用于 CPU 密集型任务,关键实际上是同一个Runtime 实例(同一个线程池)不应该同时用于 I/O 和 CPU。
顺便说一句,Tokio 文档建议将Rayon用于 CPU 密集型任务。Rayon 是许多应用程序的绝佳选择,但它不支持 . async,因此如果您的代码必须执行任何 I/O,您将​​不得不跨越痛苦的同步/异步边界。我还发现映射一个基于拉取的执行模型具有挑战性,其中任务必须等待所有输入准备好才能运行到 Rayon。
 
智者说,“使用 Tokio 进行 CPU 密集型工作会增加你的请求尾部延迟,这是不可接受的。” 
请求尾部会影响是否活着的检查指标,对于使用容器编排系统(Kubernetes)部署的系统来说很重要,因为 Tokio 正在有效地充分利用您的 CPU 来处理大量数据处理任务,那么 Kubernetes 将无法获得所需的“一切正常”响应并终止您的进程。
这种推理得出了一个经典结论,即由于尾部延迟很关键,因此您不能将 Tokio 用于 CPU 繁重的任务。
然而,正如 Tokio 文档所建议的那样,在 CPU 完全饱和的同时避免被 Kubernetes 和朋友攻击,真正重要的是使用一个单独的线程池——一个用于“延迟很重要”的任务,例如响应/health,另一个用于 CPU繁重的任务。这些线程池的最佳线程数因您的需要而异,这是另一篇单独文章的好主题。
也许通过将 TokioRuntime视为一个复杂的线程池,使用不同Runtime实例的想法可能看起来更可口。
 
如何将 Tokio 用于 CPU 密集型任务?
这是我们如何在 InfluxDB IOx 中的单独 Tokio Runtime 上运行任务的简化版本。(完整版本可以在我们的 repo中找到,并且有额外的干净关闭和加入逻辑。)

pub struct DedicatedExecutor {
    state: Arc<Mutex<State>>,               
}                      
                       
/// Runs futures (and any `tasks` that are `tokio::task::spawned` by           
/// them) on a separate Tokio Executor             
struct State {                
    
/// Channel for requests -- the dedicated executor takes requests          
    
/// from here and runs them.                   
    requests: Option<std::sync::mpsc::Sender<Task>>,             
                       
    
/// Thread which has a different Tokio runtime
    
/// installed and spawns tasks there               
    thread: Option<std::thread::JoinHandle<()>>,                 
}            
 
impl DedicatedExecutor {                    
    
/// Creates a new `DedicatedExecutor` with a dedicated Tokio               
    
/// executor that is separate from the threadpool created via              
    
/// `[tokio::main]`.                
    pub fn new(thread_name: &str, num_threads: usize) -> Self {         
 let thread_name = thread_name.to_string();               
                       
 let (tx, rx) = std::sync::mpsc::channel::<Task>();              
                       
 let thread = std::thread::spawn(move || { 
     
// Create a new Runtime to run tasks                            
     let runtime = Tokio::runtime::Builder::new_multi_thread()          
  .enable_all()               
  .thread_name(&thread_name)                
  .worker_threads(num_threads)
  
// Lower OS priority of worker threads to prioritize main runtime                     
  .on_thread_start(move || set_current_thread_priority_low())       
  .build()                    
  .expect(
"Creating Tokio runtime");               
                       
  
// Pull task requests off the channel and send them to the executor                    
  runtime.block_on(async move {             
  while let Ok(task) = rx.recv() {                                 
      Tokio::task::spawn(async move {              
   task.run().await;                 
      });              
  }        
                      
 let state = State {                 
     requests: Some(tx),             
     thread: Some(thread),                  
 };                    
                       
 Self {                
     state: Arc::new(Mutex::new(state)),           
 }                     
    }            

此代码创建一个 new std::thread,它创建一个单独的多线程 TokioRuntime来运行任务,然后从Channel读取任务并将spawn它们发送到 new Runtime上。
注意:新线程是关键。如果您尝试Runtime在主线程或 Tokio 创建的线程之一上创建一个新线程,您将收到错误消息,因为已经安装了一个Runtime。
这是向第二个Runtime发送任务的相应代码。

impl DedicatedExecutor {
 
/// Runs the specified Future (and any tasks it spawns) on the
/// `DedicatedExecutor`.
pub fn spawn<T>(&self, task: T) -> Job<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (tx, rx) = tokio::sync::oneshot::channel();

let fut = Box::pin(async move {
let task_output = task.await;
tx.send(task_output).ok()
});
let mut state = self.state.lock();
let task = Task {
fut,
};

if let Some(requests) = &mut state.requests {
// would fail if someone has started shutdown
requests.send(task).ok();
} else {
warn!(
"tried to schedule task on an executor that was shutdown");
}

Job { rx, cancel }
}
}

上面的代码使用了一个Future被调用的包装器Job,它处理将结果从专用执行器传输回主执行器,如下所示:

#[pin_project(PinnedDrop)]
pub struct Job<T> {
#[pin]
rx: Receiver<T>,
}

impl<T> Future for Job<T> {
type Output = Result<T, Error>;

fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
this.rx.poll(cx)
}
}

可以在这个Github gist中找到所有代码。