使用benjamin-batchly实现Rust异步批处理 - alexheretic


有时,与其同时做很多小事,不如将它们捆绑在一起,一次完成,作为一个批处理。所以在星期四早上的一个银行假期里,我很早就醒了(主要是因为我 1 岁男孩的尖叫声)并且(在尖叫声停止后)写了一个crate 来帮助做到这一点:benjamin_batchly

示例:插入数据库
数据库优化的一个反复出现的主题是减少对数据库的“旅行”。发送 1 条包含 N 项的胖消息通常比发送 N 条仅包含 1 项的瘦消息要快。

考虑一个我们想要插入数据库的 crud 风格的创建请求。

async fn handle_create_foo(db: &Db, data: CreateFooRequest) -> Result<(), Error> {
    let db_item = DbItem::from(data);
    db.insert(db_item).await?;
    Ok(())
}

因此,对于每个CreateFooRequest,我们将在我们的数据库中插入一个项目。我们可以使用BatchMutex对它们进行批处理。

use benjamin_batchly::{BatchMutex, BatchResult};

async fn handle_create_foo(
    batch_mutex: &BatchMutex<(), DbItem>,
    db: &Db,
    data: CreateFooRequest,
) -> Result<(), Error> {
    let db_item = DbItem::from(data);
    match batch_mutex.submit((), db_item).await {
        BatchResult::Work(mut batch) => {
            db.bulk_insert(&batch.items).await?;
            batch.notify_all_done();
            Ok(())
        }
        BatchResult::Done(_) => Ok(()),
        BatchResult::Failed => Err(Error::BatchFailed),
    }
}

提交到BatchMutex提供 3 个异步结果。
  • Work :一批 1 个或多个项目,包括提交的项目,应处理。
  • Done:提交的项目由另一个提交者批量处理并通知完成。
  • Failed:提交的项目由另一个提交者批量处理,但在通知完成之前被丢弃。
因此,如果在批处理进行时有 100 个CreateFooRequest请求进入,它们将在submit处等待。上一批完成后,所有等待提交的内容将成为下一批。1 次调用将返回BatchResult::Work并且在batch.notify_all_done()调用后 99 次将返回BatchResult::Done。

注意:在示例中,我使用单位类型 () 作为submit的第一个参数。这是可用于分区批处理的“批处理密钥”。只有使用相同批次密钥提交的项目才会捆绑在一起,这在更一般的情况下很有用。

注意(2):也支持单独的项目返回值,查看文档以获取更多信息。