Rust中实现 API 速率限制

在本教程中,我们将采用 "滑动窗口 "算法,通过一个动态周期来检查请求历史记录,并使用一个基本的内存哈希表来存储用户及其请求时间。我们还将了解如何使用 tower-governor 为您配置速率限制。

1、简单的滑动窗口速率限制器
为了弄清楚如何从头开始实现这一目标,让我们从头开始编写一个基于 IP 的滑动窗口速率限制器。

首先,我们将使用 cargo init 初始化一个常规项目,然后根据提示选择 Axum 作为我们的首选框架。

我们还需要一些额外的依赖项,因此先用这个 shell 片段安装它们:
cargo add serde@1.0.196 -F derive
cargo add chrono@0.4.34 -F serde,clock

我们将声明一个新结构,该结构保存 IpAddr 键的 HashMap,其值为 Vec<DateTime<Utc>>(UTC 时区时间戳向量)。

use std::sync::{Arc, Mutex};
use std::collections::HashMap;
use std::net::IpAddr;
use chrono::{DateTime, Utc};

// 这将是用户访问端点的请求限制(每分钟)
// 如果用户试图超出此限制,我们应返回错误信息
const REQUEST_LIMIT: usize = 120;

#[derive(Clone, Default)]
pub struct RateLimiter {
    requests: Arc<Mutex<HashMap<IpAddr, Vec<DateTime<Utc>>>>>,
}

首先,我们要使用 .lock() 锁定哈希表,这样就有了写入权限。

然后,我们要使用 .entry() 函数检查哈希表中是否包含我们要检查的 IP 地址,然后根据长度是否低于请求限制,通过保留有效时间戳和推送新条目来修改哈希表。

然后,我们检查条目长度是否大于请求限制--如果是,则返回错误;如果不是,则返回 Ok(())。

impl RateLimiter {
    fn check_if_rate_limited(&self, ip_addr: IpAddr) -> Result<(), String> {
        // 我们只保存 60 秒前的时间戳
        let throttle_time_limit = Utc::now() - std::time::Duration::from_secs(60);

        let mut requests_hashmap = self.requests.lock().unwrap();

        let mut requests_for_ip = requests_hashmap
           
// 在此处抓取条目,并允许我们对其进行就地修改
            .entry(ip_addr)
           
// 如果条目为空,则插入一个带有当前时间戳的 vec
            .or_insert(Vec::new());

        requests_for_ip.retain(|x| x.to_utc() > throttle_time_limit);
        requests_for_ip.push(Utc::now());

        if requests_for_ip.len() > REQUEST_LIMIT {
            return Err(
"IP is rate limited :(".to_string());
        }

        Ok(())
    }
}

下面是一个如何使用的基本示例:

use std::net::Ipv4Addr;

fn main() {
let rate_limiter = RateLimiter::default();

    let localhost_v4 = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));

    // 在此,我们请求 120 次 - 我们的请求上限
    for _ in 1..80 {
    assert!(rate_limiter.check_if_rate_limited(localhost_v4).is_ok())
    }

   
// wait 30 seconds
    std::thread::sleep(std::time::Duration::from_secs(30));

   
// 在此再申请 40 次,以满足配额要求
    for _ in 1..40 {
    assert!(rate_limiter.check_if_rate_limited(localhost_v4).is_ok())
    }

   
// 再等 30 秒
    std::thread::sleep(std::time::Duration::from_secs(30));

   
// 现在我们可以再提出 80 项请求
    for _ in 1..80 {
    assert!(rate_limiter.check_if_rate_limited(localhost_v4).is_ok())
    }
}

在此基础上,如果我们想将其扩展到与 Axum 一起使用,也是可以的。不过,生产就绪的速率限制系统通常要比这先进得多。下面我们将讨论如何利用板条箱进行速率限制,包括使用基于用户的速率限制。

2、实施基于用户的速率限制
对于没有登录的对外网站,IP 地址是唯一可以用来跟踪用户的信息(除了浏览器信息)。不过,基于已验证用户而非 IP 地址的速率限制可能更有用。在使用 IP 地址时,您可能会遇到以下问题:

  • 多个用户可能拥有相同的 IP 地址
  • 如果你阻止了用户(通过代理或其他方法),用户可以简单地更改他们使用的 IP 地址

使用基于用户的速率限制可以解决这些问题。虽然用户可以拥有多个 IP 地址,但我们可以将其分配给同一个用户。

具体实施
为了初始化我们的网络服务,我们将使用 cargo shuttle init(需要安装 cargo-shuttle)来创建我们的项目,并确保选择 Axum 作为框架。

在添加速率限制器之前,我们要创建一个自定义头密钥!这将用于需要用户身份验证的路由中。我们还可以在稍后为速率限制器实施自定义密钥提取器时使用头。我们首先要添加 axum-extra,并使用键入式头信息功能:

cargo add axum-extra -F typed-header

接下来,我们要创建一个结构来保存字符串,并实现 headers::Header 的 axum_extra 重导出。你可以看到下面的 Header 实现,它通过遍历 HeaderValue 来解码值,并创建 CustomHeader 结构。

我们可以从定义 HeaderName 开始:

static X: HeaderName = HeaderName::from_static("x-custom-key");
static CUSTOM_HEADER: &HeaderName = &X;

pub struct CustomHeader(String);

impl CustomHeader {
    pub fn key(self) -> String {
        self.0
    }
}

既然我们已经定义了自定义头名称(将用作头关键字),我们就可以为 CustomHeader 实现 axum_extra::headers::Header 了:

impl Header for CustomHeader {
    fn name() -> &'static HeaderName {
        CUSTOM_HEADER
    }

    fn decode<'i, I>(values: &mut I) -> Result<Self, axum_extra::headers::Error>
    where
        I: Iterator<Item = &'i HeaderValue>,
    {
        let value = values
            .next()
            .ok_or_else(axum_extra::headers::Error::invalid)?;

        Ok(CustomHeader(value.to_str().unwrap().to_owned()))
    }

    fn encode<E>(&self, values: &mut E)
    where
        E: Extend<HeaderValue>,
    {
        let s = &self.0;

        let value = HeaderValue::from_str(s).unwrap();

        values.extend(std::iter::once(value));
    }
}

要将 CustomHeader 用作 Axum 提取器,我们需要像这样用 TypedHeader 将其包裹起来:

async fn register(
    TypedHeader(header): TypedHeader<CustomHeader>,
) -> impl IntoResponse {
    // .. your code goes here
}

这一切都很好,但这与费率限制有什么关系呢?

虽然我们可以在中间件中使用它,但更好的替代解决方案是使用 tower_governor。该板块使用通用小区速率算法(GCRA),它是漏斗算法的更复杂版本。有关 GCRA 的更多信息,请点击此处here

要开始使用,我们先将该板块添加到 Rust 程序中:
cargo add tower-governor

当我们要将其添加到主函数时,可以使用 GovernorConfigBuilder,然后将其添加到 GovernorLayer 中。请注意,虽然GovernorConfigBuilder没有实现克隆,但添加塔式服务层需要它实现克隆。这意味着我们需要将配置生成器封装起来,然后再使用 Box::leak 将封装泄露出去,以获得静态生命周期的 GovernorConfig,供我们的 axum::Router 使用:

use auxm::{Router, routing::get};
use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer};

#[shuttle_runtime::main]
async fn main() -> shuttle_axum::ShuttleAxum {
    let governor_conf = Box::new(
        GovernorConfigBuilder::default()
            .per_second(2)
            .burst_size(5)
            .finish()
            .unwrap(),
    );

    let router = Router::new()
        .route("/", get(hello_world))
        .layer(GovernorLayer {
           
// We can leak this because it is created once and then never needs to be destructed
            config: Box::leak(governor_conf),
        });

    Ok(router.into())
}

默认情况下,GovernorConfigBuilder 使用一种名为 PeerIpKeyExtractor 的类型,它试图抓取连接客户端的 IP 密钥。不过,要使用我们的头作为提取的密钥,我们可以实现 tower_governor::key_extractor::KeyExtractor。为此,我们将使用一个单元结构,因为当我们稍后将其添加到 GovernorConfigBuilder 时,目前不需要任何额外的变量:

use tower_governor::GovernorError;
use axum::http::Request;

#[derive(Clone)]
pub struct CustomHeaderExtractor;

impl KeyExtractor for CustomHeaderExtractor {
    type Key = String;

    fn extract<T>(&self, req: &Request<T>) -> Result<Self::Key, GovernorError> {
        let headers = req.headers();

        match headers.get(CUSTOM_HEADER) {
            Some(res) => {
                let res = res.to_str()
                    .map_err(|_| GovernorError::UnableToExtractKey)?;

                Ok(res.to_owned())
            },
            None => Err(GovernorError::UnableToExtractKey)
        }
    }
}

这样,我们就可以在主函数中将 CustomHeaderExtractor 添加到我们的 GovernorConfigBuilder 中。

let governor_conf = Box::new(
    GovernorConfigBuilder::default()
        .per_second(2)
        .burst_size(5)
        .key_extractor(CustomHeaderExtractor)
        .finish()
        .unwrap(),
);

当用户尝试访问任何与治理层分层的路由时,现在它会尝试获取一个标头名称为 x-custom-key 的标头--如果不存在,路由将返回错误。在这里,我们设置了限制,允许用户每 2 秒发送 5 个请求。

请注意,在生成器中,per_second() 函数会告诉我们补充配额的确切间隔时间,而 burst_size 则会告诉我们在 tower-governor 开始阻止来自给定 IP 地址(或 API 密钥,在我们的例子中)的请求之前的配额是多少。

我们还可以额外设置 per_millisecond() 和 per_nanosecond()参数,这样如果想每半秒补充一次配额,就可以在生成器中使用 per_millisecond(500)。

部署
现在我们已经完成了部署,你可以使用 cargo shuttle deploy 进行部署(如果是在脏 Git 分支上,则添加--ad),然后观看奇迹的发生。部署完成后,Shuttle 会在终端输出部署的详细信息。