使用Python和Redis进行速率限制教程与源码 - hackdoor

20-11-20 banq

速率限制是许多开发人员可能在生活中某些时候必须处理的机制。它可用于多种用途,例如共享对有限资源的访问权限或限制对API端点的请求数量,并以429状态码进行响应 。

在这里,我们将探索一些使用PythonRedis的速率限制算法,从朴素的方法开始,最后达到一种称为通用单元速率算法(GCRA)的高级方法。

在以下文章中,我们将使用 redis-py 与Redis(pip install redis)通信。我建议您 从GitHub克隆 我的存储库,以进行一些限速实验。

 

时间桶

对一个请求中的请求数量进行速率限制的第一种方法 period 是使用时间分组算法,其中为每个速率密钥(诸如用户名或IP地址这样的唯一值)存储一个计数器(最初设置为 limit)和到期时间(period)并根据后续请求递减。

使用Python和Redis,我们可以通过以下方式实现时间桶逻辑:

  • 检查价目码是否 key 存在
  • 如果 key 不存在,则将其初始化为 limit 值(Redis SETNX)和到期时间 period (Redis EXPIRE
  • 在随后的每个请求中减小此值(Redis DECRBY
  • 仅当key 值降至零以下时,请求才受到限制 
  • 给定 period 密钥后将自动删除

from datetime import timedelta
from redis import Redis

def request_is_limited(r: Redis, key: str, limit: int, period: timedelta):
    if r.setnx(key, limit):
        r.expire(key, int(period.total_seconds()))
    bucket_val = r.get(key)
    if bucket_val and int(bucket_val) > 0:
        r.decrby(key, 1)
        return False
    return True

您可以在模拟请求的操作中看到此代码,其限制为 20 requests / 30 seconds (为了使内容清楚,我将功能包装在模块中,就像您在存储库中看到的那样 )。

import redis
from datetime import timedelta
from ratelimit import time_bucketed

r = redis.Redis(host='localhost', port=6379, db=0)
requests = 25

for i in range(requests):
    if time_bucketed.request_is_limited(r, 'admin', 20, timedelta(seconds=30)):
        print ('Request is limited')
    else:
        print ('Request is allowed')

只有前20个请求不会受到限制,您必须等待30秒才能被允许再次提出新请求。

这种方法的缺点是,它实际上限制了用户在一段时间内可以执行的请求数量,而不是速率。这意味着整个配额可以立即用尽,只有在该期限到期后才重置。

 

漏斗

有一种算法可以处理速率限制并产生非常平滑的速率限制效果: 漏斗式方法。该算法的工作原理与实际的漏水桶类似:您可以将桶中的水(要求)倒入最大容量,而一定量的水以恒定的速率逸出(通常使用后台过程实现)。如果进入水桶的水量大于通过泄漏口流出的水量,则水桶开始装满水,当水桶装满水时,新的请求将受到限制。

我们可以跳过这一点,以获得更优雅的方法,该方法不需要单独的过程即可模拟泄漏:通用信元速率算法

 

通用信元速率算法

通用信元速率算法(GCRA) 来自一种称为“通信”的通信技术 Asynchronous Transfer Mode (ATM) ,用于ATM网络的调度程序中,以延迟或丢弃超出速率限制的信元(固定大小的小数据包)。

GCRA的工作原理是跟踪每个请求上称为理论到达时间(TAT)的时间的剩余限制

tat = current_time + period

如果到达时间小于存储的TAT,则限制下一个请求。如果rate = 1 request / period 在period通常情况下,通常在现实世界中,每个请求都被分隔开 , 那么这很好 rate = limit / period。例如, rate = 10 requests / 60 seconds 允许用户在前6秒内发出10个请求,而 rate = 1 request / 6 seconds 用户必须在请求之间等待6秒。

为了能够在短时间和在 limit > 1期间限制请求,每个请求应当由period / limit分开,下一个理论到达时间(new_tatt)用与以前以不同的方式来计算。使用t表示请求到达的时间:

  • new_tat = tat + period / limit 如果请求符合 (t <= tat)
  • new_tat = t + period / limit 如果请求不符合 (t > tat)

因此:

new_tat = max(tat, t) + period / limit.

如果new_tat 超过当前时间+期间段, 则请求将受到限制 new_tat > t + period。因为 new_tat = tat + period / limit 所以:

tat + period / limit > t + period

这样,使用下面条件限制请求:

tat — t > period — period / limit

 period — period / limit
 <----------------------->
--|----------------------------|--->
  t                   TAT

在这种情况下,不应更新TAT,因为有限的请求没有理论上的到达时间。

现在是时候发布最终代码了!

from datetime import timedelta
from redis import Redis

def request_is_limited(r: Redis, key: str, limit: int, period: timedelta):
    period_in_seconds = int(period.total_seconds())
    t = r.time()[0]
    separation = round(period_in_seconds / limit)
    r.setnx(key, 0)
    tat = max(int(r.get(key)), t)
    if tat - t <= period_in_seconds - separation:
        new_tat = max(tat, t) + separation
        r.set(key, new_tat)
        return False
    return True

在此实现中,我们使用 Redis TIME, 因为GCRA依赖于时间,并且确保多个部署之间的当前时间保持一致(计算机之间的时钟漂移可能导致误报)至关重要。

更多点击标题见原文

在GitHub上检查 完整的代码

         

猜你喜欢