在当今现代API驱动的世界中,保护后端免受过载和滥用已变得至关重要。无论您运行的是公共API服务还是内部微服务,速率限制都是保持系统稳定、在线和经济高效的关键技术。
.NET在最近的版本中引入了本机速率限制功能,但是当您在分布式环境中工作时-多个API服务器需要共享限制-您需要更强大的功能。这就是Redis的用武之地。在这篇文章中,我们将了解什么是速率限制,为什么Redis可以成为实现它们的绝佳选择,以及如何将基于Redis的速率限制集成到您的API中。
什么是速率限制?
速率限制设置了客户端(用户、应用程序或 IP)在规定时间段内可以发出的最大请求数,例如,每个 IP 每分钟 100 个请求。一旦达到限制,同一时间段内任何其他传入的请求都将被阻止,通常会返回429 请求过多错误响应。
常见用例
这就是速率限制发挥作用的地方:
- 保护 API 免遭滥用:防止 DDoS 攻击、暴力登录尝试或抓取机器人。
- 确保公平使用:所有客户端都应获得公平的 API 容量份额,尤其是在多租户或公共 API 中。
- 控制付费 API 的成本:限制免费用户的请求数量,并对高级计划提供更高的限制。
高层策略
有几种流行的策略来实现速率限制
- 固定窗口计数:在固定时间窗口内(例如每分钟)请求。简单,但在窗口边缘可能不公平。
- 滑动窗口通过滚动或滑动窗口跟踪请求,以实现更平滑的限制和更好的公平性。
- 令牌桶:客户端“消耗”令牌来发起请求。令牌以稳定的速率补充。允许短时间的突发流量,同时控制整体速率。
- 漏桶类似于令牌桶,但带有队列——传入的请求会被放入一个“桶”(队列)。如果桶超载,请求就会被拒绝。桶会以稳定的速率“漏出”请求,从而允许处理更多请求。
重要提示:您不必自己实现所有这些算法!许多库(以及 Redis 模式)已经支持它们——我们稍后会介绍。
原生 .NET 速率限制
随着.NET 7的发布和.NET 8的持续改进,微软推出了一个内置RateLimiterAPI,使得向ASP.NET Core 应用程序添加速率限制比以往更轻松、更简化。
这是一个巨大的进步,因为在早期版本的 .NET 中,即使是基本的速率限制,也需要依赖自定义代码或第三方库来实现。现在,您可以使用中间件和配置原生地添加它。
.Net RateLimiter 快速介绍
.NET RateLimiter直接集成到ASP.NET Core中间件管道中。
它提供了几种可立即使用的限制策略,包括:
- 固定窗口限制器:限制每个固定时间窗口的请求数量(例如,每分钟 100 个请求)。
- 滑动窗口限制器:平滑地对滚动窗口实施限制。
- 令牌桶限制器:允许偶尔的突发,同时保持稳定的速率。
- 并发限制器:限制正在进行的并发请求数。
应用原生 .NET RateLimiter 的示例(官方文档中已发布)
// Add native rate limiting builder.Services.AddRateLimiter(options => { options.GlobalLimiter = PartitionedRateLimiter.Create<HttpContext, string>(httpContext => { return RateLimitPartition.GetFixedWindowLimiter( partitionKey: httpContext.User.Identity?.Name ?? httpContext.Request.Headers.Host.ToString(), factory: partition => new FixedWindowRateLimiterOptions { AutoReplenishment = true, PermitLimit = 10, QueueLimit = 0, Window = TimeSpan.FromMinutes(1) }); }); });
|
优势
内置功能.NET RateLimiter带来多种好处:
- 开箱即用的支持:您不需要第三方包 - 它直接融入到ASP.NET Core 中。
- 多种策略可用:根据您的需要选择不同的算法。
- 细粒度控制:按端点、按用户或按路线应用限制。
- 所需的最少配置:只需几行代码,您就可以保护您的 API 免受滥用流量的侵害。
限制
虽然原生速率限制器功能强大,但它也有一些重要的限制:
- 仅限本地(单实例):它在一个应用程序实例的内存中工作。如果您在负载均衡器后面运行多个服务器,则每个实例都会独立执行限制——这意味着整个系统不会进行全局强制执行。
- 没有共享计数器或状态:没有内置机制来跨多个服务器同步限制。
- 没有持久存储:如果应用程序重新启动或缩小,计数器将重置。
何时可能需要 Redis 而不是内置解决方案
对于简单的场景(例如内部 API 或在单个服务器上运行的小型服务),本机.NET速率限制器通常就足够了。
但是,当你进入分布式系统或需要跨多个应用实例进行全局限制时,就会遇到它的局限性。这时,Redis 就派上用场了。
Redis 充当速率限制数据的集中式分布式存储:
- 它同步所有服务器的计数器。
- 它应用全局的每个用户、每个 IP 或每个 API 密钥限制。
- 即使某些应用程序节点发生故障或重新启动,它也能确保一致性。
- 它通过 Lua 脚本支持具有原子操作的高级模式(如滑动窗口或令牌桶)。
简而言之,Redis 解锁了分布式速率限制——这是原生 .NET 限制器无法提供的。
为什么选择 Redis 进行分布式速率限制
正如我之前提到的,原生的.NET速率限制器效果很好——但仅限于单实例环境。一旦你将 API 部署到多个实例、容器或负载均衡器之后,你很快就会遇到一个关键问题:每个实例都有自己的内存和计数器!
这基本上意味着客户端只需向不同的实例发送请求即可轻松绕过您的限制。
使用 Redis 的优势
Redis充当所有 API 实例都可以通信的集中式高性能数据存储,解决了分布式速率限制难题。
这就是为什么 Redis 如此适合:
- 跨服务器共享状态: Redis 让所有 API 实例共享相同的计数器和执行逻辑,确保全面一致的限制。
- 内存速度: Redis 速度惊人。由于它完全在内存中运行,因此每秒可以处理数千次读写操作,且延迟极低,非常适合高吞吐量 API。
- 内置 TTL(生存时间): Redis 允许您设置键的过期时间,这对于基于时间的速率限制非常理想。您无需手动清理旧计数器——Redis 会自动处理。
在 .NET 中使用 Redis 实现速率限制
现在我们了解了理论,让我们通过使用 Redis 和 .NET构建一个简单的固定窗口速率限制器来将其付诸实践。
通过这种方法,我们将:
- 跟踪用户在固定时间窗口(例如 1 分钟)内发出的请求数量。
- 阻止任何超出允许阈值的请求(例如,每分钟 100 个请求)。
步骤1:启动Redis
我将使用 Docker 在本地运行 Redis:
docker run -d -p 6379:6379 redis
但是如果您愿意,您可以使用托管 Redis 服务或在本地安装它。
步骤 2:添加 Stackexchange.Redis
我们将使用流行的 StackExchange.Redis 客户端:
dotnet add package StackExchange.Redis
然后,在 Program.cs 中注册连接:
builder.Services.AddSingleton(
ConnectionMultiplexer.Connect("localhost:6379"));
步骤 3:实施速率限制
基本速率限制 – 固定窗口速率限制
为与 Redis 集成的速率限制器创建一个新类,并处理速率限制。
public class RedisRateLimiter { private readonly IDatabase _redis; private readonly int _limit; private readonly TimeSpan _window; public RedisRateLimiter(IConnectionMultiplexer connectionMultiplexer, int limit, TimeSpan window) { _redis = connectionMultiplexer.GetDatabase(); _limit = limit; _window = window; } public async Task<bool> IsAllowedAsync(string key) { var redisKey = $"rate_limit:{key}"; var count = await _redis.StringIncrementAsync(redisKey); if (count == 1) { // First request, set expiration for the window await _redis.KeyExpireAsync(redisKey, _window); } return count <= _limit; } }
|
使用它作为中间件:
app.Use(async (context, next) => { var limiter = context.RequestServices.GetRequiredService<RedisRateLimiter>(); var clientIp = context.Connection.RemoteIpAddress?.ToString(); if (!await limiter.IsAllowedAsync(clientIp)) { context.Response.StatusCode = 429; await context.Response.WriteAsync("Too many requests. Try again later."); return; } await next(); });
|
Redis 的高级模式
虽然固定窗口算法简单且易于实现,但它也有局限性——特别是在窗口边界突发或公平性至关重要等边缘情况下。
让我们探索可以使用 Redis 实现的三种更高级、更灵活的模式,以实现生产级速率限制。
使用排序集的滑动窗口
滑动窗口算法解决了固定窗口限制器中常见的窗口边缘“爆发”问题。
示例问题:
如果每分钟允许 100 个请求,则用户可以每秒发送 100 个请求,12:00:59然后再每秒发送 100 个请求12:01:01- 只需 2 秒即可有效发送 200 个请求。
滑动窗口通过在过去 N 秒(例如,过去 60 秒)内强制限制来避免这种情况,无论窗口从哪里“开始”。
有序集: Redis有序集 (ZSET)是一种强大的数据结构,用于存储按浮点分数(最常见的是时间戳)排序的唯一元素。这使得它们非常适合需要基于时间的跟踪的用例,例如滑动窗口速率限制。在我们的实现中,我们将每个请求的当前时间戳作为其分数进行存储。然后,我们使用 Redis 命令(例如ZREMRANGEBYSCORE删除过期条目并ZCARD计算当前窗口中剩余的请求数)来高效地完成所有这些操作。
实现此算法有几种方法。一种方法仅使用.NET,它简单且通常足够用——但它并非完全原子的。这意味着在高并发环境中,如果同时处理多个请求,可能会出现竞争条件。另一种方法是使用Lua,这是一种轻量级的高级脚本语言,以其简单性和灵活性而闻名。Redis支持使用Lua编写脚本,允许我们在单个步骤中原子地执行多个命令——即使在高负载下也能确保一致性。
让我们看看如何使用和不使用 Lua 来实现该算法。
选项 1 – 不使用 Lua 实现
不使用 Lua 的滑动窗口的实现如下所示:
public class SlidingWindowRateLimiter { private readonly IDatabase _redis; private readonly ILogger<SlidingWindowRateLimiter> _logger; private readonly int _limit; private readonly TimeSpan _window; public SlidingWindowRateLimiter(IConnectionMultiplexer connectionMultiplexer, ILogger<SlidingWindowRateLimiter> logger, int limit, TimeSpan window) { _redis = connectionMultiplexer.GetDatabase() ?? throw new InvalidOperationException("Unable to get Redis database."); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _limit = limit; _window = window; } public async Task<bool> IsAllowedAsync(string key) { var redisKey = $"rate_limit:{key}"; var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); var windowStart = now - (long)_window.TotalMilliseconds; // Remove expired timestamps try { // step 1 : Remove expired timestamps that are outside the sliding window await _redis.SortedSetRemoveRangeByScoreAsync(redisKey, 0, windowStart); // Step 2 : Count the current entries (requests) in the window var currentCount = await _redis.SortedSetLengthAsync(redisKey); if ( currentCount >= _limit ) { _logger.LogWarning("Rate limit exceeded for key: {Key}. Current count: {CurrentCount}, Limit: {Limit}", key, currentCount, _limit); return false; } // Step 3 : Add the new request with timestamp as score var requestId = $"{now}-{Guid.NewGuid()}"; // prevent duplicate entries await _redis.SortedSetAddAsync(redisKey, requestId, now); // Step 4 : Set expiration for the sliding window await _redis.KeyExpireAsync(redisKey, _window); _logger.LogInformation("Request allowed for key: {Key}. Current count: {CurrentCount}, Limit: {Limit}", key, currentCount + 1, _limit); return true; } catch (Exception ex) { _logger.LogError(ex, "Error while removing expired timestamps from Redis."); return false; } } }
|
然后注册新的服务:
builder.Services.AddSingleton<SlidingWindowRateLimiter>(provide => { var redis = provide.GetRequiredService<IConnectionMultiplexer>(); var logger = provide.GetRequiredService<ILogger<SlidingWindowRateLimiter>>(); return new SlidingWindowRateLimiter(redis, logger, 10, TimeSpan.FromMinutes(1)); });
|
并使用它:
app.Use(async (context, next) => { var rateLimiter = context.RequestServices.GetRequiredService<SlidingWindowRateLimiter>(); // Use client IP or user ID var clientKey = context.Connection.RemoteIpAddress?.ToString() ?? "unknown"; if (!await rateLimiter.IsAllowedAsync(clientKey)) { context.Response.StatusCode = 429; context.Response.Headers["Retry-After"] = "60"; // Optional await context.Response.WriteAsync("Too many requests. Try again later."); return; } await next(); });
|
此实现使用 Redis 排序集来存储和评估请求时间戳。它通过移除过期请求并添加新请求来强制执行滑动窗口,同时统计窗口内活跃请求的数量。
限制:非原子——如果多个请求同时到达,则可能会出现竞争条件。
选项 2 – 使用 Lua 实现
使用相同的速率限制器,但使用 Lua 实现。它保证窗口修剪、请求计数、插入和 TTL 设置作为单个操作完成——在高并发下不会出现竞争条件和不一致。
public class SlidingWindowLuaRateLimiter { private readonly IDatabase _redis; private readonly ILogger<SlidingWindowLuaRateLimiter> _logger; private readonly LuaScript _script; private readonly int _limit; private readonly TimeSpan _window; public SlidingWindowLuaRateLimiter( IConnectionMultiplexer connectionMultiplexer, ILogger<SlidingWindowLuaRateLimiter> logger, int limit, TimeSpan window) { _redis = connectionMultiplexer.GetDatabase(); _logger = logger; _limit = limit; _window = window; // Atomic Lua script: removes old requests, counts existing ones, // adds new one if limit not exceeded, and sets expiry _script = LuaScript.Prepare(@" local key = KEYS[1] local now = tonumber(ARGV[1]) local window = tonumber(ARGV[2]) local limit = tonumber(ARGV[3]) local expire = tonumber(ARGV[4]) redis.call('ZREMRANGEBYSCORE', key, 0, now - window) local count = redis.call('ZCARD', key) if count >= limit then return 0 else redis.call('ZADD', key, now, now .. '-' .. math.random()) redis.call('EXPIRE', key, expire) return 1 end "); } /// <summary> /// Atomically determines whether a request is allowed. /// </summary> public async Task<bool> IsAllowedAsync(string key) { var redisKey = new RedisKey[] { $"rate_limit:{key}" }; var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); var args = new RedisValue[] { now, // ARGV[1] = current timestamp (long)_window.TotalMilliseconds, // ARGV[2] = window size _limit, // ARGV[3] = request limit (int)_window.TotalSeconds // ARGV[4] = TTL }; try { var result = (int)await _redis.ScriptEvaluateAsync(_script.OriginalScript, redisKey, args); return result == 1; } catch (Exception ex) { _logger.LogError(ex, "SlidingWindowLuaRateLimiter failed for key {Key}", key); return true; // Optionally fail open } } }
|
然后注册新的服务:
builder.Services.AddSingleton(provide => { var redis = provide.GetRequiredService<IConnectionMultiplexer>(); var logger = provide.GetRequiredService<ILogger<SlidingWindowLuaRateLimiter>>(); return new SlidingWindowLuaRateLimiter(redis, logger, 10, TimeSpan.FromMinutes(1)); });
|
并使用它:
app.Use(async (context, next) => { var rateLimiter = context.RequestServices.GetRequiredService<SlidingWindowLuaRateLimiter>(); // Use client IP or user ID var clientKey = context.Connection.RemoteIpAddress?.ToString() ?? "unknown"; if (!await rateLimiter.IsAllowedAsync(clientKey)) { context.Response.StatusCode = 429; context.Response.Headers["Retry-After"] = "60"; // Optional await context.Response.WriteAsync("Too many requests. Try again later."); return; } await next(); });
|
使用计数器和时间戳的令牌桶
令牌桶算法允许客户端以稳定的速率发出请求 - 但只要桶中有足够的“令牌”,也允许短时间的突发。
令牌桶的实现如下:
public class TokenBucketRateLimiter { private readonly IDatabase _redis; private readonly ILogger<TokenBucketRateLimiter> _logger; private readonly LuaScript _luaScript; private readonly int _bucketCapacity; private readonly double _refillRatePerSecond; public TokenBucketRateLimiter( IConnectionMultiplexer connectionMultiplexer, ILogger<TokenBucketRateLimiter> logger, int bucketCapacity, double refillRatePerSecond) { _redis = connectionMultiplexer.GetDatabase(); _logger = logger; _bucketCapacity = bucketCapacity; _refillRatePerSecond = refillRatePerSecond; // Lua script: // KEYS[1] = token key // KEYS[2] = timestamp key // ARGV[1] = current timestamp (ms) // ARGV[2] = bucket capacity // ARGV[3] = refill rate per second _luaScript = LuaScript.Prepare(@" local tokens_key = KEYS[1] local timestamp_key = KEYS[2] local now = tonumber(ARGV[1]) local capacity = tonumber(ARGV[2]) local refill_rate = tonumber(ARGV[3]) local last_tokens = tonumber(redis.call('GET', tokens_key) or capacity) local last_refill = tonumber(redis.call('GET', timestamp_key) or now) local elapsed = now - last_refill local refill = math.floor(elapsed * refill_rate / 1000) local tokens = math.min(capacity, last_tokens + refill) if tokens <= 0 then return 0 else tokens = tokens - 1 redis.call('SET', tokens_key, tokens) redis.call('SET', timestamp_key, now) redis.call('PEXPIRE', tokens_key, 60000) redis.call('PEXPIRE', timestamp_key, 60000) return 1 end "); } public async Task<bool> IsAllowedAsync(string key) { var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); var redisKeys = new RedisKey[] { new RedisKey($"token_bucket:{key}:tokens"), new RedisKey($"token_bucket:{key}:timestamp") }; var redisArgs = new RedisValue[] { now, _bucketCapacity, _refillRatePerSecond }; try { var result = (int)await _redis.ScriptEvaluateAsync(_luaScript.OriginalScript, redisKeys, redisArgs); return result == 1; } catch (Exception ex) { _logger.LogError(ex, "TokenBucketRateLimiter failed for key {Key}", key); return true; // fail-open strategy } } }
|
令牌桶算法的这种实现确保了具有突发容忍度的速率控制。具体方法如下:
- 每个用户/客户端都有一个包含有限数量令牌(例如 10 个)的存储桶。
- 令牌会随着时间的推移以可配置的速率重新填充(例如每秒 1 个令牌)。
- 每个传入请求都会检查存在多少个令牌:
- 如果有可用的令牌,则会消耗一个令牌并继续执行请求。
- 如果没有剩余令牌,则请求被拒绝(429)。
我们存储:
- 当前令牌计数(Redis 字符串键)
- 上次补充时间戳(Redis 字符串键)
陷阱和需要注意的事项
- 时钟偏差:基于 Redis 的速率限制器依赖于时间戳。如果您的服务器跨多个区域或系统运行,且时钟不一致,速率执行可能会变得不可靠。请确保您的服务器已通过 NTP 同步。
- 密钥过期管理:如果密钥未正确设置过期时间(尤其是在固定/滑动窗口逻辑中),它们会累积并增加 Redis 内存占用。请务必在每次请求时或通过 Lua 设置 TTL。
- 故障开放 vs. 故障关闭当 Redis 宕机时,你的 API 应该阻止所有请求(故障关闭)还是允许所有请求(故障开放)?请根据你的风险模型进行选择——故障开放可能更方便用户使用,但存在滥用的风险。
结论
速率限制和节流不再是可选功能——它们是构建安全、公平且弹性 API 的基础。虽然它为.NET本地速率限制提供了可靠的内置支持,但要扩展到分布式环境,则需要更复杂的策略。
我们探索并实施了三种强大的技术:
- 固定窗口,简单快捷
- 滑动窗口,提高公平性和准确性
- 令牌桶,提供灵活的速率控制和突发容忍度