Fang:实现Rust异步后台处理


尽管 Rust 的第一个稳定版本于 2015 年发布,但其生态系统中仍然存在一些用于解决常见任务的漏洞。其中之一是后台处理。
在软件工程中,后台处理是解决几个问题的常用方法:

  • 执行定期任务。例如,传递通知、更新缓存值。
  • 推迟昂贵的工作,以便您的应用程序在后台执行计算时保持响应

大多数编程语言都有后台处理框架/库。例如:
  • 红宝石 sidekiq。它使用 Redis 作为作业队列。
  • Python dramatiq。它使用 RabbitMQ 作为作业队列。
  • Elixir - oban:它使用 Postgres DB 作为作业队列。

异步编程(async/await)可用于后台处理,但如果直接使用它有几个主要缺点:
  • 它不能控制在任何给定时间正在执行的任务数量。因此,许多衍生的任务可能会使它们启动的一个/多个线程超载。
  • 它不提供任何有助于调查系统和发现瓶颈的监控
  • 任务不是持久的。因此,每次应用程序重新启动时,所有排队的任务都会丢失

为了解决异步编程的这些缺点,我们在fang 库中实现了异步处理。

Fang 是一个 rust 的后台处理库。Fang 的第一个版本是在一年前发布的。它的主要特点是:

  • 每个工人都在一个单独的线程中启动
  • Postgres 表用作任务队列

此实现是为特定用例 - el monitorro bot编写的。这种后台处理的具体实现方式被时间证明了。每天它每分钟处理越来越多的提要(当前数量超过 3000)。一些用户在他们的基础设施上托管机器人。
您可以在这篇博文中了解更多关于 fang 中的线程处理的信息。

Async Fang
异步显着降低了 CPU 和内存开销,尤其是对于具有大量 IO 绑定任务的工作负载,例如服务器和数据库。在其他条件相同的情况下,您可以拥有比 OS 线程更多数量级的任务,因为异步运行时使用少量(昂贵)线程来处理大量(廉价)任务

对于一些轻量级的后台任务,使用异步在同一个线程上运行它们比每个工作线程启动一个线程更便宜。这就是我们在 fang 中实现这种处理的原因。其主要特点:

  • 每个worker都作为一个 tokio 任务启动
  • 如果任何worker在任务执行期间失败,它会重新启动
  • 任务被保存到 Postgres 数据库。代替柴油,tokio-postgres用于与数据库交互。线程处理使用阻塞线程的柴油ORM。
  • 该实现基于traits ,因此很容易实现额外的后端(redis,内存中)来存储任务。

用法
用法很简单:

  • serde通过将派生添加到任务结构来定义可序列化的任务。
  • 为 fang实现AsyncRunnablerunnable trait 以便能够运行它。
  • 启动worker。
  • 排队任务。

让我们回顾一下每一步。

定义job

use fang::serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
#[serde(crate = "fang::serde")]
pub struct MyTask {
    pub number: u16,
}

impl MyTask {
    pub fn new(number: u16) -> Self {
        Self { number }
    }
}

Fang重新导出serde,因此不需要将其添加到Cargo.toml文件中

实现 AsyncRunnable 特征

use fang::async_trait;
use fang::typetag;
use fang::AsyncRunnable;
use std::time::Duration;

#[async_trait]
#[typetag::serde]
impl AsyncRunnable for MyTask {
    async fn run(&self, queue: &mut dyn AsyncQueueable) -> Result<(), Error> {
        let new_task = MyTask::new(self.number + 1);
        queue
            .insert_task(&new_task as &dyn AsyncRunnable)
            .await
            .unwrap();

        log::info!("the current number is {}", self.number);
        tokio::time::sleep(Duration::from_secs(3)).await;

        Ok(())
    }
}

  • Fang 使用 typetag library库来序列化 trait 对象并将它们保存到队列中。
  •  async-trait 用于实现异步trait

初始化队列

use fang::asynk::async_queue::AsyncQueue;

let max_pool_size: u32 = 2;
let mut queue = AsyncQueue::builder()
    .uri("postgres://postgres:postgres@localhost/fang")
    .max_pool_size(max_pool_size)
    .duplicated_tasks(true)
    .build();

启动worker

use fang::asynk::async_worker_pool::AsyncWorkerPool;
use fang::NoTls;

let mut pool: AsyncWorkerPool<AsyncQueue<NoTls>> = AsyncWorkerPool::builder()
    .number_of_workers(10_u32)
    .queue(queue.clone())
    .build();

pool.start().await;

插入任务

let task = MyTask::new(0);

queue
    .insert_task(&task1 as &dyn AsyncRunnable)
    .await
    .unwrap();

陷阱
异步处理适用于轻量级任务。但对于较重的任务,建议使用以下方法之一:

  • 启动一个单独的 tokio 运行时来运行 fang workers
  • 使用 fang 中实现的线程处理功能而不是异步处理

未来发展方向
fang 有几个功能计划:

  • 使用不同的退避模式重试
  • 额外的后端(内存中,redis)
  • 异步工作人员的正常关闭(对于线程处理,此功能已实现)
  • 定时cron任务

该项目在 GitHub上可用