在本教程中,我们将采用 "滑动窗口 "算法,通过一个动态周期来检查请求历史记录,并使用一个基本的内存哈希表来存储用户及其请求时间。我们还将了解如何使用 tower-governor 为您配置速率限制。
1、简单的滑动窗口速率限制器
为了弄清楚如何从头开始实现这一目标,让我们从头开始编写一个基于 IP 的滑动窗口速率限制器。
首先,我们将使用 cargo init 初始化一个常规项目,然后根据提示选择 Axum 作为我们的首选框架。
我们还需要一些额外的依赖项,因此先用这个 shell 片段安装它们:
cargo add serde@1.0.196 -F derive
cargo add chrono@0.4.34 -F serde,clock
我们将声明一个新结构,该结构保存 IpAddr 键的 HashMap,其值为 Vec<DateTime<Utc>>(UTC 时区时间戳向量)。
use std::sync::{Arc, Mutex}; use std::collections::HashMap; use std::net::IpAddr; use chrono::{DateTime, Utc};
// 这将是用户访问端点的请求限制(每分钟) // 如果用户试图超出此限制,我们应返回错误信息 const REQUEST_LIMIT: usize = 120;
#[derive(Clone, Default)] pub struct RateLimiter { requests: Arc<Mutex<HashMap<IpAddr, Vec<DateTime<Utc>>>>>, }
|
首先,我们要使用 .lock() 锁定哈希表,这样就有了写入权限。
然后,我们要使用 .entry() 函数检查哈希表中是否包含我们要检查的 IP 地址,然后根据长度是否低于请求限制,通过保留有效时间戳和推送新条目来修改哈希表。
然后,我们检查条目长度是否大于请求限制--如果是,则返回错误;如果不是,则返回 Ok(())。
impl RateLimiter { fn check_if_rate_limited(&self, ip_addr: IpAddr) -> Result<(), String> { // 我们只保存 60 秒前的时间戳 let throttle_time_limit = Utc::now() - std::time::Duration::from_secs(60);
let mut requests_hashmap = self.requests.lock().unwrap();
let mut requests_for_ip = requests_hashmap // 在此处抓取条目,并允许我们对其进行就地修改 .entry(ip_addr) // 如果条目为空,则插入一个带有当前时间戳的 vec .or_insert(Vec::new());
requests_for_ip.retain(|x| x.to_utc() > throttle_time_limit); requests_for_ip.push(Utc::now());
if requests_for_ip.len() > REQUEST_LIMIT { return Err("IP is rate limited :(".to_string()); }
Ok(()) } }
|
下面是一个如何使用的基本示例:
use std::net::Ipv4Addr;
fn main() { let rate_limiter = RateLimiter::default();
let localhost_v4 = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
// 在此,我们请求 120 次 - 我们的请求上限 for _ in 1..80 { assert!(rate_limiter.check_if_rate_limited(localhost_v4).is_ok()) }
// wait 30 seconds std::thread::sleep(std::time::Duration::from_secs(30));
// 在此再申请 40 次,以满足配额要求 for _ in 1..40 { assert!(rate_limiter.check_if_rate_limited(localhost_v4).is_ok()) }
// 再等 30 秒 std::thread::sleep(std::time::Duration::from_secs(30));
// 现在我们可以再提出 80 项请求 for _ in 1..80 { assert!(rate_limiter.check_if_rate_limited(localhost_v4).is_ok()) } }
|
在此基础上,如果我们想将其扩展到与 Axum 一起使用,也是可以的。不过,生产就绪的速率限制系统通常要比这先进得多。下面我们将讨论如何利用板条箱进行速率限制,包括使用基于用户的速率限制。
2、实施基于用户的速率限制
对于没有登录的对外网站,IP 地址是唯一可以用来跟踪用户的信息(除了浏览器信息)。不过,基于已验证用户而非 IP 地址的速率限制可能更有用。在使用 IP 地址时,您可能会遇到以下问题:
- 多个用户可能拥有相同的 IP 地址
- 如果你阻止了用户(通过代理或其他方法),用户可以简单地更改他们使用的 IP 地址
使用基于用户的速率限制可以解决这些问题。虽然用户可以拥有多个 IP 地址,但我们可以将其分配给同一个用户。具体实施
为了初始化我们的网络服务,我们将使用 cargo shuttle init(需要安装 cargo-shuttle)来创建我们的项目,并确保选择 Axum 作为框架。
在添加速率限制器之前,我们要创建一个自定义头密钥!这将用于需要用户身份验证的路由中。我们还可以在稍后为速率限制器实施自定义密钥提取器时使用头。我们首先要添加 axum-extra,并使用键入式头信息功能:
cargo add axum-extra -F typed-header
接下来,我们要创建一个结构来保存字符串,并实现 headers::Header 的 axum_extra 重导出。你可以看到下面的 Header 实现,它通过遍历 HeaderValue 来解码值,并创建 CustomHeader 结构。
我们可以从定义 HeaderName 开始:
static X: HeaderName = HeaderName::from_static("x-custom-key"); static CUSTOM_HEADER: &HeaderName = &X;
pub struct CustomHeader(String);
impl CustomHeader { pub fn key(self) -> String { self.0 } }
|
既然我们已经定义了自定义头名称(将用作头关键字),我们就可以为 CustomHeader 实现 axum_extra::headers::Header 了:
impl Header for CustomHeader { fn name() -> &'static HeaderName { CUSTOM_HEADER }
fn decode<'i, I>(values: &mut I) -> Result<Self, axum_extra::headers::Error> where I: Iterator<Item = &'i HeaderValue>, { let value = values .next() .ok_or_else(axum_extra::headers::Error::invalid)?;
Ok(CustomHeader(value.to_str().unwrap().to_owned())) }
fn encode<E>(&self, values: &mut E) where E: Extend<HeaderValue>, { let s = &self.0;
let value = HeaderValue::from_str(s).unwrap();
values.extend(std::iter::once(value)); } }
|
要将 CustomHeader 用作 Axum 提取器,我们需要像这样用 TypedHeader 将其包裹起来:
async fn register( TypedHeader(header): TypedHeader<CustomHeader>, ) -> impl IntoResponse { // .. your code goes here }
|
这一切都很好,但这与费率限制有什么关系呢?
虽然我们可以在中间件中使用它,但更好的替代解决方案是使用 tower_governor。该板块使用通用小区速率算法(GCRA),它是漏斗算法的更复杂版本。有关 GCRA 的更多信息,请点击此处here。
要开始使用,我们先将该板块添加到 Rust 程序中:
cargo add tower-governor
当我们要将其添加到主函数时,可以使用 GovernorConfigBuilder,然后将其添加到 GovernorLayer 中。请注意,虽然GovernorConfigBuilder没有实现克隆,但添加塔式服务层需要它实现克隆。这意味着我们需要将配置生成器封装起来,然后再使用 Box::leak 将封装泄露出去,以获得静态生命周期的 GovernorConfig,供我们的 axum::Router 使用:
use auxm::{Router, routing::get}; use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer};
#[shuttle_runtime::main] async fn main() -> shuttle_axum::ShuttleAxum { let governor_conf = Box::new( GovernorConfigBuilder::default() .per_second(2) .burst_size(5) .finish() .unwrap(), );
let router = Router::new() .route("/", get(hello_world)) .layer(GovernorLayer { // We can leak this because it is created once and then never needs to be destructed config: Box::leak(governor_conf), });
Ok(router.into()) }
|
默认情况下,GovernorConfigBuilder 使用一种名为 PeerIpKeyExtractor 的类型,它试图抓取连接客户端的 IP 密钥。不过,要使用我们的头作为提取的密钥,我们可以实现 tower_governor::key_extractor::KeyExtractor。为此,我们将使用一个单元结构,因为当我们稍后将其添加到 GovernorConfigBuilder 时,目前不需要任何额外的变量:
use tower_governor::GovernorError; use axum::http::Request;
#[derive(Clone)] pub struct CustomHeaderExtractor;
impl KeyExtractor for CustomHeaderExtractor { type Key = String;
fn extract<T>(&self, req: &Request<T>) -> Result<Self::Key, GovernorError> { let headers = req.headers();
match headers.get(CUSTOM_HEADER) { Some(res) => { let res = res.to_str() .map_err(|_| GovernorError::UnableToExtractKey)?;
Ok(res.to_owned()) }, None => Err(GovernorError::UnableToExtractKey) } } }
|
这样,我们就可以在主函数中将 CustomHeaderExtractor 添加到我们的 GovernorConfigBuilder 中。
let governor_conf = Box::new( GovernorConfigBuilder::default() .per_second(2) .burst_size(5) .key_extractor(CustomHeaderExtractor) .finish() .unwrap(), );
|
当用户尝试访问任何与治理层分层的路由时,现在它会尝试获取一个标头名称为 x-custom-key 的标头--如果不存在,路由将返回错误。在这里,我们设置了限制,允许用户每 2 秒发送 5 个请求。
请注意,在生成器中,per_second() 函数会告诉我们补充配额的确切间隔时间,而 burst_size 则会告诉我们在 tower-governor 开始阻止来自给定 IP 地址(或 API 密钥,在我们的例子中)的请求之前的配额是多少。
我们还可以额外设置 per_millisecond() 和 per_nanosecond()参数,这样如果想每半秒补充一次配额,就可以在生成器中使用 per_millisecond(500)。
部署
现在我们已经完成了部署,你可以使用 cargo shuttle deploy 进行部署(如果是在脏 Git 分支上,则添加--ad),然后观看奇迹的发生。部署完成后,Shuttle 会在终端输出部署的详细信息。