Rust中新重试库包:reqwest-builder-retry


Reqwest 在生产环境中需要超时和重试。即使在开发过程中运行良好,但如果服务器出现故障,它也无法正常工作。

通过在请求中使用 timeout 方法指定超时时间,可以轻松设置超时。重试需要使用 crate 或自行实现。

使用 reqwest-middleware 的retry crate缺点:由于它与 reqwesest-middleware 结构体兼容,因此它不再是一个纯粹的 reqwest 结构体。它不能与仅需要 reqwest 的 API Binding crate 一起使用。

自制retry库包:
这是一个自制的重试 crate。起初我没注意到 reqwest-retry 的存在,所以用这个名字发布了 cargo,但由于名称冲突,报错,于是我赶紧去查看内容。我正在为、LINE、TikTok 和 Pinterest 编写 API 库,想创建一个可以使用这些 API 的重试,但 reqwest-retry 无法实现。不过,中间件的想法很棒,所以如果原公司支持中间件,我很乐意加入。

重试功能的接口如下:

pub enum RetryType {
    Stop,
    Retry,
    RetryAfter(Duration),
}

pub async fn execute<
    SuccessResponse,
    ErrorResponse,
    MakerBuilder,
    CheckDone,
    JITTER,
    SLEEPER,
    FutCheckDone,
    FutSLEEPER,
>(
    make_builder: MakerBuilder,
    check_done: CheckDone,
    try_count: u8,
    retry_duration: Duration,
    jitter: JITTER,
    sleeper: SLEEPER,
) -> Result<SuccessResponse, Error<ErrorResponse>>
where
    MakerBuilder: Fn(u8) -> RequestBuilder,
    CheckDone: Fn(Result<Response, reqwest::Error>) -> FutCheckDone,
    JITTER: Fn() -> Duration,
    SLEEPER: Fn(Duration) -> FutSLEEPER,
    FutCheckDone: Future<Output = Result<SuccessResponse, (RetryType, ErrorResponse)>>,
    FutSLEEPER: Future<Output = ()>,

看起来有点复杂。我做了一个稍微简单一些、更方便的版本。

pub async fn execute<SuccessResponse, ErrorResponse, MakerBuilder, CheckDone, FutCheckDone>(
    make_builder: MakerBuilder,
    check_done: CheckDone,
    try_count: u8,
    retry_duration: Duration,
) -> Result<SuccessResponse, Error<ErrorResponse>>
where
    MakerBuilder: Fn(u8) -> RequestBuilder,
    CheckDone: Fn(Result<Response, reqwest::Error>) -> FutCheckDone,
    FutCheckDone: Future<Output = Result<SuccessResponse, (RetryType, ErrorResponse)>>,

make_builder 的参数是尝试次数,返回值是 RequestBulider。如果 RequestBulider 有 clone 函数,就没必要把它变成函数了。但由于只有 try_clone 函数,所以我决定每次重试时都重新创建 RequestBuilder。顺便说一下,如果 RequestBulider 是 multipart 函数,try_clone 函数就会失败。

check_done 函数用于判断请求执行结果是否成功、是否重试或取消重试。部分 API 无法仅根据状态码判断是否重试,因此我们要求调用者进行检查。如果发生错误,则会返回 RetryType。Stop 表示不重试,Retry 表示重试。此外,还有一个函数 RetryAfter(Duration),用于指定由于 API 速率限制等原因导致 API 端指示停止的时间。

try_count 是该API执行的次数,包括第一次执行。

retry_duration 是暂停时间,直到执行下一个 API。但是,根据执行次数,可能会触发指数退避,暂停时间可能会更长。

以下是使用 API 的一个示例。

Cargo.toml

[package]
name = "x-sample"
version =
"0.1.0"
edition =
"2024"

[dependencies]
anyhow =
"1"
reqwest-builder-retry = { path =
"../..", features = ["rustls-tls", "convenience"], default-features = false }
serde =
"1"
serde_json =
"1"
tokio = { version =
"1", features = ["macros", "rt-multi-thread"] }
tracing =
"0.1"
tracing-bunyan-formatter =
"0.3"
tracing-subscriber =
"0.3"
twapi-v2 = { version =
"0.20.0", features = ["oauth10a"] }

main.rs

use std::time::Duration;

use reqwest_builder_retry::{
    RetryType,
    convenience::check_status_code,
    reqwest::{Error, Response, StatusCode, header::HeaderMap},
};
use tracing::Level;
use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer};
use tracing_subscriber::{Registry, filter::Targets, layer::SubscriberExt};
use twapi_v2::{api::get_2_users_me, oauth10a::OAuthAuthentication};

// Tracingの準備
pub fn setup_tracing(name: &str) {
    let formatting_layer = BunyanFormattingLayer::new(name.into(), std::io::stdout);
    let filter = Targets::new()
        .with_target(name, Level::TRACE)
        .with_target(
"twapi_v2", Level::TRACE);

    let subscriber = Registry::default()
        .with(filter)
        .with(JsonStorageLayer)
        .with(formatting_layer);
    tracing::subscriber::set_global_default(subscriber).unwrap();
}

// エラー時のレスポンスのデータ
#[derive(Debug)]
pub struct ResponseData {
    pub status_code: StatusCode,
    pub body: String,
    pub headers: HeaderMap,
}

// エラー情報、reqwestのエラーかそれ以外
#[derive(Debug)]
pub struct ResponseError {
    pub error: Option<reqwest_builder_retry::reqwest::Error>,
    pub response_data: Option<ResponseData>,
}

// レスポンスのチェック
async fn check_done<T>(
    response: Result<Response, Error>,
    retryable_status_codes: &[StatusCode],
) -> Result<T, (RetryType, ResponseError)>
where
    T: serde::de::DeserializeOwned,
{
    let response = response.map_err(|err| {
        (
            RetryType::Retry,
            ResponseError {
                error: Some(err),
                response_data: None,
            },
        )
    })?;

    let status_code = response.status();
    let headers = response.headers().clone();
    let body = response.text().await.unwrap_or_else(|_|
"".to_string());
    let response_data = ResponseData {
        status_code,
        body,
        headers,
    };

    if let Some(retry_type) = check_status_code(status_code, retryable_status_codes).await {
        return Err((
            retry_type,
            ResponseError {
                error: None,
                response_data: Some(response_data),
            },
        ));
    }

    match serde_json::from_str::<T>(&response_data.body) {
        Ok(result) => Ok(result),
        Err(_) => Err((
            RetryType::Retry,
            ResponseError {
                error: None,
                response_data: Some(response_data),
            },
        )),
    }
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    setup_tracing(
"x_sample");
    tracing::trace!(
"start");

    let auth = OAuthAuthentication::new(
        std::env::var(
"CONSUMER_KEY").unwrap_or_default(),
        std::env::var(
"CONSUMER_SECRET").unwrap_or_default(),
        std::env::var(
"ACCESS_KEY").unwrap_or_default(),
        std::env::var(
"ACCESS_SECRET").unwrap_or_default(),
    );

   
// スレッドで利用可能化チェック
    let handle = tokio::spawn({
        async move {
            let result = reqwest_builder_retry::convenience::execute(
                |_| {
                    let api = get_2_users_me::Api::open();
                   
// APIの実行には必ずタイムアウトをつけましょう
                    let builder = api.build(&auth).timeout(Duration::from_secs(3));
                   
// リクエストのログ
                    tracing::trace!(?builder,
"api request");
                    builder
                },
                |response| {
                   
// レスポンスのログ
                    tracing::trace!(?response,
"api response");
                   
// レスポンスのチェック
                    check_done::<get_2_users_me::Response>(
                        response,
                        &[StatusCode::TOO_MANY_REQUESTS, StatusCode::FORBIDDEN],
                    )
                },
                3,                      
// トライ回数
                Duration::from_secs(2),
// リトライ間隔
            )
            .await;
            println!(
"Result: {:?}", result);
        }
    });

    handle.await?;

    Ok(())
}

我能够执行 API 执行所需的超时和重试。我还创建了请求和响应日志。完成所有这些之后,我终于可以安心地在生产环境中使用它了。

作者:Unique Vision Inc. 董事兼首席技术官