使用Bucket4j限制Spring API的访问速率 - Baeldung

20-06-11 banq

在本教程中,我们将学习如何使用  Bucket4j对Spring REST API进行速率限制。我们将探索API速率限制,了解Bucket4j,并通过一些在Spring应用程序中限制REST API速率的方法进行工作。

速率限制是一种限制对API的访问的策略。它限制了客户端可以在特定时间范围内进行的API调用次数。这有助于防止API遭到无意​​和恶意的过度使用。

速率限制通常通过跟踪IP地址或以更特定于业务的方式(例如API密钥或访问令牌)应用于API。作为API开发人员,我们可以选择在客户端达到最大限制时以几种不同的方式做出响应:

  • 排队请求,直到剩余时间段已过
  • 立即允许该请求,但为此请求收取额外费用
  • 或者,最常见的是拒绝请求(HTTP 429太多请求)

Bucket4j是一个基于令牌桶算法的Java速率限制库。Bucket4j是一个线程安全的库,可以在独立的JVM应用程序或集群环境中使用。它还通过JCache(JSR107)规范支持内存或分布式缓存。

令牌桶算法

让我们在API速率限制的情况下直观地查看算法。

假设我们有一个存储桶,其容量定义为它可以容纳的令牌数量。每当消费者想要访问API端点时,它都必须从bucket中获取令牌。我们会从存储桶中删除令牌(如果有)并接受请求。另一方面,如果存储桶没有任何令牌,我们将拒绝请求。

当请求正在消耗令牌时,我们还将以一定的固定速率对其进行补充,以使我们永远不会超过存储桶的容量。

让我们考虑一个速率限制为每分钟100个请求的API。我们可以创建一个容量为100的存储桶,每分钟的重新填充速率为100个令牌。

如果我们收到70个请求,少于给定分钟内可用令牌的数量,则我们将在下一分钟开始时仅添加30个令牌以使存储桶达到最大容量。另一方面,如果我们在40秒内用完所有令牌,我们将等待20秒以重新填充存储桶。

在介绍如何使用Bucket4j之前,让我们简要讨论一些核心类,以及它们如何表示令牌桶算法形式模型中的不同元素。

  • Bucket接口表示与最大容量令牌桶。它提供了诸如tryConsume和  tryConsumeAndReturnRemaining之类的方法来使用令牌。如果请求符合限制,并且令牌已使用,则这些方法将使用的结果返回为  true。
  • Bandwidth 类是存储桶的关键构建块:它定义了桶的极限。我们使用 “ 带宽”来配置存储桶的容量和填充速度。
  •  Refill 类用于定义在该令牌添加到桶中的固定速率。我们可以将速率配置为在给定时间段内要添加的令牌数量。例如,每秒10个存储桶或每5分钟200个令牌,依此类推。
  • Bucket中的tryConsumeAndReturnRemaining方法返回ConsumptionProbe。ConsumptionProbe包含消耗的结果以及存储区的状态,例如剩余的令牌,或者直到请求的令牌再次在存储区中可用之前的剩余时间。

Bucket4j入门

首先,将bucket4j 依赖项添加到我们的pom.xml中:

<dependency>
    <groupId>com.github.vladimir-bukhtoyarov</groupId>
    <artifactId>bucket4j-core</artifactId>
    <version>4.10.0</version>
</dependency>

让我们测试一些基本的速率限制模式。

对于每分钟10个请求的速率限制,我们将创建一个容量为10的存储桶,每分钟的重新填充速率为10个令牌:

Refill refill = Refill.intervally(10, Duration.ofMinutes(1));
Bandwidth limit = Bandwidth.classic(10, refill);
Bucket bucket = Bucket4j.builder()
    .addLimit(limit)
    .build();
 
for (int i = 1; i <= 10; i++) {
    assertTrue(bucket.tryConsume(1));
}
assertFalse(bucket.tryConsume(1));

Refill.interval在时间窗口的开始间隔地重新填充存储桶,每分钟开始时填充 10个令牌。

接下来,让我们看看重新填充的动作。

我们将重新填充速率设置为每2秒1个令牌,并限制我们的请求以遵守速率限制:

bandwidth limit = Bandwidth.classic(1, Refill.intervally(1, Duration.ofSeconds(2)));
Bucket bucket = Bucket4j.builder()
    .addLimit(limit)
    .build();
assertTrue(bucket.tryConsume(1));     // first request
Executors.newScheduledThreadPool(1)   // schedule another request for 2 seconds later
    .schedule(() -> assertTrue(bucket.tryConsume(1)), 2, TimeUnit.SECONDS); 

假设我们的速率限制为每分钟10个请求。同时,我们可能希望避免在前5秒内耗尽所有令牌的峰值。Bucket4j允许我们在同一个存储桶上设置多个限制(带宽)。让我们添加另一个限制,该限制在20秒的时间窗口内仅允许5个请求:

Bucket bucket = Bucket4j.builder()
    .addLimit(Bandwidth.classic(10, Refill.intervally(10, Duration.ofMinutes(1))))
    .addLimit(Bandwidth.classic(5, Refill.intervally(5, Duration.ofSeconds(20))))
    .build();
 
for (int i = 1; i <= 5; i++) {
    assertTrue(bucket.tryConsume(1));
}
assertFalse(bucket.tryConsume(1));

使用Bucket4j对Spring API进行速率限制

有一个简单但非常流行的面积计算器REST API。当前,它根据给定的尺寸计算并返回矩形的面积。现在,我们将引入一个朴素的速率限制-该API允许每分钟20个请求。换句话说,如果API在1分钟的时间窗口内已收到20个请求,则拒绝该请求。

创建一个存储桶并添加限制(带宽):

@RestController
class AreaCalculationController {
 
    private final Bucket bucket;
 
    public AreaCalculationController() {
        Bandwidth limit = Bandwidth.classic(20, Refill.greedy(20, Duration.ofMinutes(1)));
        this.bucket = Bucket4j.builder()
            .addLimit(limit)
            .build();
    }
    //..
}

在此API中,我们可以使用tryConsume方法,通过使用存储桶中的令牌来检查是否允许该请求。如果达到限制,我们可以通过以HTTP 429太多请求状态进行响应来拒绝请求:

public ResponseEntity<AreaV1> rectangle(@RequestBody RectangleDimensionsV1 dimensions) {
    if (bucket.tryConsume(1)) {
        return ResponseEntity.ok(new AreaV1("rectangle", dimensions.getLength() * dimensions.getWidth()));
    }
 
    return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).build();
}

测试:# 21st request within 1 minute

$ curl -v -X POST http://localhost:9001/api/v1/area/rectangle \
    -H "Content-Type: application/json" \
    -d '{ "length": 10, "width": 12 }'
 
< HTTP/1.1 429

API客户端和定价计划

现在我们有了一个可以限制API请求的简单速率限制。接下来,让我们为更多以业务为中心的价格限制引入定价计划。

定价计划可帮助我们通过API获利。假设我们对API客户端有以下计划:

  • 免费:每个API客户端每小时20个请求
  • 基本:每个API客户端每小时40个请求
  • 专业:每个API客户端每小时100个请求

每个API客户端都有一个唯一的API密钥,必须随每个请求一起发送。这将有助于我们确定与API客户端链接的定价计划。

让我们为每个定价计划定义速率限制(带宽):

enum PricingPlan {
    FREE {
        Bandwidth getLimit() {
            return Bandwidth.classic(20, Refill.intervally(20, Duration.ofHours(1)));
        }
    },
    BASIC {
        Bandwidth getLimit() {
            return Bandwidth.classic(40, Refill.intervally(40, Duration.ofHours(1)));
        }
    },
    PROFESSIONAL {
        Bandwidth getLimit() {
            return Bandwidth.classic(100, Refill.intervally(100, Duration.ofHours(1)));
        }
    };
    //..
}

接下来,让我们添加一种方法来根据给定的API密钥解析定价计划:

enum PricingPlan {
     
    static PricingPlan resolvePlanFromApiKey(String apiKey) {
        if (apiKey == null || apiKey.isEmpty()) {
            return FREE;
        } else if (apiKey.startsWith("PX001-")) {
            return PROFESSIONAL;
        } else if (apiKey.startsWith("BX001-")) {
            return BASIC;
        }
        return FREE;
    }
    //..
}

接下来,我们需要为每个API密钥存储存储桶,并检索存储桶以进行速率限制:

class PricingPlanService {
 
    private final Map<String, Bucket> cache = new ConcurrentHashMap<>();
 
    public Bucket resolveBucket(String apiKey) {
        return cache.computeIfAbsent(apiKey, this::newBucket);
    }
 
    private Bucket newBucket(String apiKey) {
        PricingPlan pricingPlan = PricingPlan.resolvePlanFromApiKey(apiKey);
        return Bucket4j.builder()
            .addLimit(pricingPlan.getLimit())
            .build();
    }
}

因此,我们现在在每个API密钥的内存中都有一个存储区。让我们修改控制器以使用PricingPlanService:

@RestController
class AreaCalculationController {
 
    private PricingPlanService pricingPlanService;
 
    public ResponseEntity<AreaV1> rectangle(@RequestHeader(value = "X-api-key") String apiKey,
        @RequestBody RectangleDimensionsV1 dimensions) {
 
        Bucket bucket = pricingPlanService.resolveBucket(apiKey);
        ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);
        if (probe.isConsumed()) {
            return ResponseEntity.ok()
                .header("X-Rate-Limit-Remaining", Long.toString(probe.getRemainingTokens()))
                .body(new AreaV1("rectangle", dimensions.getLength() * dimensions.getWidth()));
        }
         
        long waitForRefill = probe.getNanosToWaitForRefill() / 1_000_000_000;
        return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS)
            .header("X-Rate-Limit-Retry-After-Seconds", String.valueOf(waitForRefill))
            .build();
    }
}

让我们来看一下这些变化。API客户端发送带有X-api-key请求标头的API密钥。我们使用  PricingPlanService获取此API密钥的存储桶,并通过使用存储桶中的令牌来检查是否允许该请求。

为了增强API的客户端体验,我们将使用以下附加响应标头发送有关速率限制的信息:

  • X-Rate-Limit-Remaining:当前时间窗口中剩余的令牌数量
  • X速率限制重试秒后:剩余时间(以秒为单位),直到重新填充存储桶为止

我们可以调用ConsumptionProbe  方法的getRemainingTokens和getNanosToWaitForRefill来分别获取存储桶中剩余令牌的数量和到下一次重新填充之前的剩余时间。该getNanosToWaitForRefill如果我们能够成功地消耗令牌方法返回0。

让我们调用API:

## successful request
$ curl -v -X POST http://localhost:9001/api/v1/area/rectangle \
    -H "Content-Type: application/json" -H "X-api-key:FX001-99999" \
    -d '{ "length": 10, "width": 12 }'
 
< HTTP/1.1 200
< X-Rate-Limit-Remaining: 11
{"shape":"rectangle","area":120.0}
 
## rejected request
$ curl -v -X POST http://localhost:9001/api/v1/area/rectangle \
    -H "Content-Type: application/json" -H "X-api-key:FX001-99999" \
    -d '{ "length": 10, "width": 12 }'
 
< HTTP/1.1 429
< X-Rate-Limit-Retry-After-Seconds: 583

使用Spring MVC拦截器

假设我们现在必须添加一个新的API端点,我们还需要对新端点进行速率限制。我们可以简单地从先前的端点复制并粘贴速率限制代码。或者,我们可以使用Spring MVC的HandlerInterceptor将速率限制代码与业务代码分离。让我们创建一个RateLimitInterceptor并在preHandle方法中实现速率限制代码:

public class RateLimitInterceptor implements HandlerInterceptor {
 
    private PricingPlanService pricingPlanService;
 
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) 
      throws Exception {
        String apiKey = request.getHeader("X-api-key");
        if (apiKey == null || apiKey.isEmpty()) {
            response.sendError(HttpStatus.BAD_REQUEST.value(), "Missing Header: X-api-key");
            return false;
        }
 
        Bucket tokenBucket = pricingPlanService.resolveBucket(apiKey);
        ConsumptionProbe probe = tokenBucket.tryConsumeAndReturnRemaining(1);
        if (probe.isConsumed()) {
            response.addHeader("X-Rate-Limit-Remaining", String.valueOf(probe.getRemainingTokens()));
            return true;
        } else {
            long waitForRefill = probe.getNanosToWaitForRefill() / 1_000_000_000;
            response.addHeader("X-Rate-Limit-Retry-After-Seconds", String.valueOf(waitForRefill));
            response.sendError(HttpStatus.TOO_MANY_REQUESTS.value(),
              "You have exhausted your API Request Quota"); 
            return false;
        }
    }
}

最后,我们必须将拦截器添加到InterceptorRegistry中:

public class AppConfig implements WebMvcConfigurer {
     
    private RateLimitInterceptor interceptor;
 
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(interceptor)
            .addPathPatterns("/api/v1/area/**");
    }
}

Bucket4j Spring Starter

让我们看看在Spring应用程序中使用Bucket4j的另一种方法。Bucket4j Spring Boot Starter 提供Bucket4j自动配置,可以帮助我们实现API率通过Spring启动应用程序性能或配置限制。

一旦将Bucket4j入门程序集成到我们的应用程序中,我们将获得一个完全声明性的API速率限制实现,而无需任何应用程序代码。

首先,将bucket4j-spring-boot-starter依赖项添加到我们的pom.xml中:

<dependency>
    <groupId>com.giffing.bucket4j.spring.boot.starter</groupId>
    <artifactId>bucket4j-spring-boot-starter</artifactId>
    <version>0.2.0</version>
</dependency>

我们在早期的实现中使用内存映射来存储每个API密钥(使用者)的存储桶。在这里,我们可以使用Spring的缓存抽象来配置内存存储,例如CaffeineGuava

让我们添加缓存依赖项:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<dependency>
    <groupId>javax.cache</groupId>
    <artifactId>cache-api</artifactId>
</dependency>
<dependency>
    <groupId>com.github.ben-manes.caffeine</groupId>
    <artifactId>caffeine</artifactId>
    <version>2.8.2</version>
</dependency>
<dependency>
    <groupId>com.github.ben-manes.caffeine</groupId>
    <artifactId>jcache</artifactId>
    <version>2.8.2</version>
</dependency>

注意:我们还添加了jcache 依赖项,以符合Bucket4j的缓存支持。

我们将配置 Caffeine缓存以在内存中存储API密钥和存储桶:

spring:
  cache:
    cache-names:
    - rate-limit-buckets
    caffeine:
      spec: maximumSize=100000,expireAfterAccess=3600s

在我们的示例中,我们使用了请求标头X-api-key的值作为标识和应用速率限制的键。

Bucket4j Spring Boot Starter提供了几种预定义的配置来定义我们的速率限制密钥:

  • 简单的速率限制过滤器,这是默认设置
  • 按IP地址过滤
  • 基于表达式的过滤器

基于表达式的过滤器使用Spring Expression Language(SpEL)。SpEL提供对根对象的访问,例如HttpServletRequest,可用于在IP地址(getRemoteAddr()),请求标头(getHeader('X-api-key'))上构建过滤器表达式,等等。

该库还在过滤器表达式中支持自定义类,在文档中对此进行了讨论。

让我们配置 Bucket4j:

bucket4j:
  enabled: true
  filters:
  - cache-name: rate-limit-buckets
    url: /api/v1/area.*
    strategy: first
    http-response-body: "{ \"status\": 429, \"error\": \"Too Many Requests\", \"message\": \"You have exhausted your API Request Quota\" }"
    rate-limits:
    - expression: "getHeader('X-api-key')"
      execute-condition: "getHeader('X-api-key').startsWith('PX001-')"
      bandwidths:
      - capacity: 100
        time: 1
        unit: hours
    - expression: "getHeader('X-api-key')"
      execute-condition: "getHeader('X-api-key').startsWith('BX001-')"
      bandwidths:
      - capacity: 40
        time: 1
        unit: hours
    - expression: "getHeader('X-api-key')"
      bandwidths:
      - capacity: 20
        time: 1
        unit: hours

那么,我们刚刚配置了什么?

  • bucket4j.enabled = true –启用Bucket4j自动配置
  • bucket4j.filters.cache-name – 从缓存中获取  API密钥的存储桶
  • bucket4j.filters.url –指示应用速率限制的路径表达式
  • bucket4j.filters.strategy = first –在第一个匹配速率限制配置处停止
  • bucket4j.filters.rate-limits.expression –使用Spring Expression Language(SpEL)检索密钥
  • bucket4j.filters.rate-limits.execute-condition –使用SpEL决定是否执行速率限制
  • bucket4j.filters.rate-limits.bandwidths –定义Bucket4j速率限制参数

我们将PricingPlanService和RateLimitInterceptor替换为顺序评估的速率限制配置列表。

测试:

## successful request
$ curl -v -X POST http://localhost:9000/api/v1/area/triangle \
    -H "Content-Type: application/json" -H "X-api-key:FX001-99999" \
    -d '{ "height": 20, "base": 7 }'
 
< HTTP/1.1 200
< X-Rate-Limit-Remaining: 7
{"shape":"triangle","area":70.0}
 
## rejected request
$ curl -v -X POST http://localhost:9000/api/v1/area/triangle \
    -H "Content-Type: application/json" -H "X-api-key:FX001-99999" \
    -d '{ "height": 7, "base": 20 }'
 
< HTTP/1.1 429
< X-Rate-Limit-Retry-After-Seconds: 212
{ "status": 429, "error": "Too Many Requests", "message": "You have exhausted your API Request Quota" }

所有示例的源代码都可以在GitHub上获得

                   

1
猜你喜欢