本文将重点介绍使用 Spring Cloud 断路器库 Resilience4j 实现反应式断路器。
为什么选择 Resilience4j?
我们可以使用两个主要库来实现断路器。Netflix Hystrix,它采用面向对象的设计,其中对外部系统的调用必须包含在HystrixCommand提供的多种功能中。但是,在 SpringOne 2019 中,Spring 宣布 Hystrix Dashboard 将从 Spring Cloud 3.1 版本中删除,这使其正式弃用。使用已弃用的库不是一个好主意。所以选择很明确,就是 Resilience4j!
Resilience4j 是一个受 Hystrix 启发的独立库,但建立在函数式编程的原则之上。Resilience4J 提供了高阶函数(装饰器),以通过断路器、速率限制器或隔板来增强任何功能接口、lambda 表达式或方法引用。
Resilience4J 的其他优点包括更精细的配置选项(例如关闭断路器模式所需的成功执行次数)和更轻的依赖项占用空间。
我们将使用两个 Spring Boot 微服务来演示如何实现响应式断路器:
- 客户服务,充当 REST API 提供者,提供客户 CRUD 端点。
- customer-service-client,它WebClient通过 Spring Boot Starter Webflux 库来调用 REST API。
步骤 1. 添加 POM 依赖项
由于我们选择WebClient使用 REST API,因此我们需要将 Spring Cloud Circuit Breaker Reactor Resilience4J 依赖项添加到我们的 REST 客户端应用程序中。
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId> </dependency>
|
第2步。添加断路器配置bean
CircuitBreakerConfig类带有一组默认的断路器配置值,如果我们选择对所有的断路器使用默认的配置值,我们可以创建一个Customize bean,它被传递给ReactiveResilience4JCircuitBreakerFactory。该工厂的configureDefault方法可用于提供默认配置。示例片段如下。
@Bean public Customizer<ReactiveResilience4JCircuitBreakerFactory> defaultCustomizer() { return factory -> factory.configureDefault(id -> new Resilience4JConfigBuilder(id) .circuitBreakerConfig(CircuitBreakerConfig.ofDefaults()) .timeLimiterConfig(TimeLimiterConfig.custom().timeoutDuration(Duration.ofSeconds(4)).build()).build()); }
|
如果我们选择使用自定义的配置值,我们将需要如下定义我们的bean("customer-service "只是一个REST客户端实例的样本,你可以使用你给你的REST客户端应用程序的任何实例名称)。
@Bean public Customizer<ReactiveResilience4JCircuitBreakerFactory> customerServiceCusomtizer() { return factory -> { factory.configure(builder -> builder .timeLimiterConfig(TimeLimiterConfig.custom().timeoutDuration(Duration.ofSeconds(2)).build()) .circuitBreakerConfig(CircuitBreakerConfig.ofDefaults()), "customer-service"); }; }
|
第3步. 为断路器属性添加配置
如果我们定义了我们的自定义配置豆,我们还需要在application.yml中添加断路器的配置,例如(仅是示例值,数字应根据应用的使用场景进行调整)。
resilience4j.circuitbreaker: instances: customer-service: failureRateThreshold: 50 minimumNumberOfCalls: 10 slidingWindowType: TIME_BASED slidingWindowSize: 10 waitDurationInOpenState: 50s permittedNumberOfCallsInHalfOpenState: 3
|
- failureRateThreshold。当故障率等于或大于阈值时,断路器过渡到开放状态并开始短路调用。在我们的例子中,这个值是50%,这意味着如果2个请求中有1个失败,就会达到阈值,这将使断路器进入开放状态。
- MinimumNumberOfCalls: 这个属性确保一旦执行了最低数量的调用,就能计算出故障率。在我们的例子中,在开始计算故障率之前必须执行10个请求。
- slidingWindowType(滑动窗口类型)。配置滑动窗口的类型,用于记录断路器关闭时的呼叫结果。滑动窗口可以是基于计数的,也可以是基于时间的。
- slidingWindowSize(滑动窗口尺寸)。配置滑动窗口的大小,用于记录断路器关闭时的呼叫结果。
- waitDurationInOpenState: 断路器在从开放状态过渡到半开放状态之前应该等待的时间。在我们的例子中,它是50秒。
- permittedNumberOfCallsInHalfOpenState: 配置断路器处于半开状态时允许的呼叫数量。在我们的例子中,限制是3,这意味着在10秒的窗口中只处理3个请求。
第4步。实施Circuit Breaker
现在所有的配置都到位了,我们可以开始使用Circuit Breaker从客户端装饰我们的REST API调用。在下面的例子中,我们通过构造函数注入WebClient和ReactiveCircuitBreakerFactory到CustomerCientController。然后,我们使用webClient来触发对传递进来的CustomerVO和/或customerId的CRUD调用。注意 "转换 "部分,我们在ReactiveCircuitBreakerFactory的帮助下为 "customer-service "创建ReactiveCircuitBreaker实例(Rcb为ReactiveCircuitBreaker类型)。执行断路器的行是rcb.run(...)。在下面的示例控制器中,当异常被抛出时,我们为POST/GET/PUT调用返回一个空白的CustomerVO对象作为后退响应。对于DELETE调用,我们将返回传入的customerId作为回退。因此,在REST API供应商停机的情况下,我们不会得到500内部服务器错误,而是通过正确的实施Circuit Breaker,收到后备响应。
@RestController @Slf4j @RequiredArgsConstructor public class CustomerClientController {
private final WebClient webClient; private final ReactiveCircuitBreakerFactory reactiveCircuitBreakerFactory;
@PostMapping("/customers") public Mono<CustomerVO> createCustomer(CustomerVO customerVO){ return webClient.post() .uri("/customers") //.header("Authorization", "Bearer MY_SECRET_TOKEN") .contentType(MediaType.APPLICATION_JSON) .accept(MediaType.APPLICATION_JSON) .body(Mono.just(customerVO), CustomerVO.class) .retrieve() .bodyToMono(CustomerVO.class) .timeout(Duration.ofMillis(10_000)) .transform(it -> { ReactiveCircuitBreaker rcb = reactiveCircuitBreakerFactory.create("customer-service"); return rcb.run(it, throwable -> Mono.just(CustomerVO.builder().build())); }); }
@GetMapping("/customers/{customerId}") public Mono<CustomerVO> getCustomer(@PathVariable String customerId) { return webClient .get().uri("/customers/" + customerId) .retrieve() .bodyToMono(CustomerVO.class) .transform(it -> { ReactiveCircuitBreaker rcb = reactiveCircuitBreakerFactory.create("customer-service"); return rcb.run(it, throwable -> Mono.just(CustomerVO.builder().build())); }); }
@PutMapping("/customers/{customerId}") public Mono<CustomerVO> updateCustomer(@PathVariable String customerId, CustomerVO customerVO){ return webClient.put() .uri("/customers/" + customerVO.getCustomerId()) //.header("Authorization", "Bearer MY_SECRET_TOKEN") .contentType(MediaType.APPLICATION_JSON) .accept(MediaType.APPLICATION_JSON) .body(Mono.just(customerVO), CustomerVO.class) .retrieve() .bodyToMono(CustomerVO.class) .transform(it -> { ReactiveCircuitBreaker rcb = reactiveCircuitBreakerFactory.create("customer-service"); return rcb.run(it, throwable -> Mono.just(CustomerVO.builder().build())); }); }
@DeleteMapping("/customers/{customerId}") public Mono<String> deleteCustomer(@PathVariable String customerId){ return webClient.delete() .uri("/customers/" + customerId) .retrieve() .bodyToMono(String.class) .transform(it -> { ReactiveCircuitBreaker rcb = reactiveCircuitBreakerFactory.create("customer-service"); return rcb.run(it, throwable -> Mono.just(customerId)); }); } }
|
问题1:Swagger用户界面在Webflux中不工作
由于我们引入了Webflux库来使用WebClient,你可能会注意到你的swagger UI最初并不工作。为了让它工作,请确保以下步骤得到执行。
在pom中添加以下依赖项。
<dependency> <groupId>io.springfox</groupId> <artifactId>springfox-boot-starter</artifactId> <version>${springfox.version}</version> </dependency>
|
如果你已经实现了注释@EnableSwagger2WebFlux,请删除该注释。
现在访问swagger的URL应该是:http://<YOUR_APP_SERVER>:<YOUR_APP_PORT>/swagger-ui/,一定要加上结尾"/"。例如,http://localhost:8100/swagger-ui/。
如何为没有返回类型的端点实现断路?
对于在其响应体中没有返回内容的端点,如REST API提供商中的以下端点,在REST客户端,如果我们将调用该端点的相应方法标记为返回Mono<Void>,ReactiveCircuitBreaker将无法工作。在REST API提供者关闭的情况下,你会看到500服务器错误,这完全违背了拥有Circuit Breaker的目的。
@DeleteMapping(value = "/{customerId}") public ResponseEntity deleteCustomer(@PathVariable String customerId) throws Exception { customerService.deleteCustomer(customerId); return ResponseEntity.noContent().build(); }
|
在非反应式断路器的实现中,对于没有返回类型的方法,我们可以使用 "CheckedRunnable",做如下工作(示例)。
CircuitBreaker circuitBreaker = circuitBreakerFactory.create("customer-service"); CheckedRunnable runnable = () -> customerClient.deleteCustomer(customerId); Try.run(circuitBreaker.decorateCheckedRunnable(runnable)).get();
|
但是,在反应式断路器中,ReactiveCircuitBreaker没有这样的接口来装饰CheckedRunnable,那么我们该怎么做?经过一些调查和实验,我注意到我们可以操纵这种端点的返回类型,使其返回一个一般的类型,如String。简单地说,如果一个端点,如DELETE调用在服务器端返回Void,我们仍然可以在客户端操纵该DELETE调用的返回类型,以返回一个简单的类型,如String,只是传回输入到该端点的String。例如,在客户端我们可以这样实现DELETE调用。
public Mono<String> deleteCustomer(@PathVariable String customerId){ return webClient.delete() .uri("/customers/" + customerId) .retrieve() .bodyToMono(String.class) .transform(it -> { ReactiveCircuitBreaker rcb = reactiveCircuitBreakerFactory.create("customer-service"); return rcb.run(it, throwable -> Mono.just(customerId)); }); }
|
注意我们正在返回Mono<String>而不是Mono<Void>,而且我们在.bodyToMono(String.class)行指定了返回类型为String,这就是为什么我们可以简单地调用ReactiveCircuitBreaker的run方法来调用装饰过的反应式断路器函数。这是我能想到的唯一方法,可以解决ReactiveCircuitBreaker没有decorateCheckedRunnable方法来处理没有返回类型的方法。
完整源码:GitHub repository