使用Resilience4j实施反应式断路器 - Wenqi


本文将重点介绍使用 Spring Cloud 断路器库 Resilience4j 实现反应式断路器。
 
为什么选择 Resilience4j?
我们可以使用两个主要库来实现断路器。Netflix Hystrix,它采用面向对象的设计,其中对外部系统的调用必须包含在HystrixCommand提供的多种功能中。但是,在 SpringOne 2019 中,Spring 宣布 Hystrix Dashboard 将从 Spring Cloud 3.1 版本中删除,这使其正式弃用。使用已弃用的库不是一个好主意。所以选择很明确,就是 Resilience4j!
Resilience4j 是一个受 Hystrix 启发的独立库,但建立在函数式编程的原则之上。Resilience4J 提供了高阶函数(装饰器),以通过断路器、速率限制器或隔板来增强任何功能接口、lambda 表达式或方法引用。
Resilience4J 的其他优点包括更精细的配置选项(例如关闭断路器模式所需的成功执行次数)和更轻的依赖项占用空间。
我们将使用两个 Spring Boot 微服务来演示如何实现响应式断路器:

  • 客户服务,充当 REST API 提供者,提供客户 CRUD 端点。
  • customer-service-client,它WebClient通过 Spring Boot Starter Webflux 库来调用 REST API。

 
步骤 1. 添加 POM 依赖项
由于我们选择WebClient使用 REST API,因此我们需要将 Spring Cloud Circuit Breaker Reactor Resilience4J 依赖项添加到我们的 REST 客户端应用程序中。

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId>
</dependency>
 

第2步。添加断路器配置bean
CircuitBreakerConfig类带有一组默认的断路器配置值,如果我们选择对所有的断路器使用默认的配置值,我们可以创建一个Customize bean,它被传递给ReactiveResilience4JCircuitBreakerFactory。该工厂的configureDefault方法可用于提供默认配置。示例片段如下。

@Bean
public Customizer<ReactiveResilience4JCircuitBreakerFactory> defaultCustomizer() {
    return factory -> factory.configureDefault(id -> new Resilience4JConfigBuilder(id)
            .circuitBreakerConfig(CircuitBreakerConfig.ofDefaults())
            .timeLimiterConfig(TimeLimiterConfig.custom().timeoutDuration(Duration.ofSeconds(4)).build()).build());
}


如果我们选择使用自定义的配置值,我们将需要如下定义我们的bean("customer-service "只是一个REST客户端实例的样本,你可以使用你给你的REST客户端应用程序的任何实例名称)。

@Bean
public Customizer<ReactiveResilience4JCircuitBreakerFactory> customerServiceCusomtizer() {
  return factory -> {
    factory.configure(builder -> builder
      .timeLimiterConfig(TimeLimiterConfig.custom().timeoutDuration(Duration.ofSeconds(2)).build())
      .circuitBreakerConfig(CircuitBreakerConfig.ofDefaults()), "customer-service");
  };
}

 
第3步. 为断路器属性添加配置
如果我们定义了我们的自定义配置豆,我们还需要在application.yml中添加断路器的配置,例如(仅是示例值,数字应根据应用的使用场景进行调整)。

resilience4j.circuitbreaker:
  instances:
    customer-service:
      failureRateThreshold: 50
      minimumNumberOfCalls: 10
      slidingWindowType: TIME_BASED
      slidingWindowSize: 10
      waitDurationInOpenState: 50s
      permittedNumberOfCallsInHalfOpenState: 3

  • failureRateThreshold。当故障率等于或大于阈值时,断路器过渡到开放状态并开始短路调用。在我们的例子中,这个值是50%,这意味着如果2个请求中有1个失败,就会达到阈值,这将使断路器进入开放状态。
  • MinimumNumberOfCalls: 这个属性确保一旦执行了最低数量的调用,就能计算出故障率。在我们的例子中,在开始计算故障率之前必须执行10个请求。
  • slidingWindowType(滑动窗口类型)。配置滑动窗口的类型,用于记录断路器关闭时的呼叫结果。滑动窗口可以是基于计数的,也可以是基于时间的。
  • slidingWindowSize(滑动窗口尺寸)。配置滑动窗口的大小,用于记录断路器关闭时的呼叫结果。
  • waitDurationInOpenState: 断路器在从开放状态过渡到半开放状态之前应该等待的时间。在我们的例子中,它是50秒。
  • permittedNumberOfCallsInHalfOpenState: 配置断路器处于半开状态时允许的呼叫数量。在我们的例子中,限制是3,这意味着在10秒的窗口中只处理3个请求。

 
第4步。实施Circuit Breaker
现在所有的配置都到位了,我们可以开始使用Circuit Breaker从客户端装饰我们的REST API调用。在下面的例子中,我们通过构造函数注入WebClient和ReactiveCircuitBreakerFactory到CustomerCientController。然后,我们使用webClient来触发对传递进来的CustomerVO和/或customerId的CRUD调用。注意 "转换 "部分,我们在ReactiveCircuitBreakerFactory的帮助下为 "customer-service "创建ReactiveCircuitBreaker实例(Rcb为ReactiveCircuitBreaker类型)。执行断路器的行是rcb.run(...)。在下面的示例控制器中,当异常被抛出时,我们为POST/GET/PUT调用返回一个空白的CustomerVO对象作为后退响应。对于DELETE调用,我们将返回传入的customerId作为回退。因此,在REST API供应商停机的情况下,我们不会得到500内部服务器错误,而是通过正确的实施Circuit Breaker,收到后备响应。

@RestController
@Slf4j
@RequiredArgsConstructor
public class CustomerClientController {

    private final WebClient webClient;
    private final ReactiveCircuitBreakerFactory reactiveCircuitBreakerFactory;

    @PostMapping("/customers")
    public Mono<CustomerVO> createCustomer(CustomerVO customerVO){
        return webClient.post()
                .uri(
"/customers")
               
//.header("Authorization", "Bearer MY_SECRET_TOKEN")
                .contentType(MediaType.APPLICATION_JSON)
                .accept(MediaType.APPLICATION_JSON)
                .body(Mono.just(customerVO), CustomerVO.class)
                .retrieve()
                .bodyToMono(CustomerVO.class)
                .timeout(Duration.ofMillis(10_000))
                .transform(it -> {
                    ReactiveCircuitBreaker rcb = reactiveCircuitBreakerFactory.create(
"customer-service");
                    return rcb.run(it, throwable -> Mono.just(CustomerVO.builder().build()));
                });
    }

    @GetMapping(
"/customers/{customerId}")
    public Mono<CustomerVO> getCustomer(@PathVariable String customerId) {
        return webClient
                .get().uri(
"/customers/" + customerId)
                .retrieve()
                .bodyToMono(CustomerVO.class)
                .transform(it -> {
                    ReactiveCircuitBreaker rcb = reactiveCircuitBreakerFactory.create(
"customer-service");
                    return rcb.run(it, throwable -> Mono.just(CustomerVO.builder().build()));
                });
    }

    @PutMapping(
"/customers/{customerId}")
    public Mono<CustomerVO> updateCustomer(@PathVariable String customerId, CustomerVO customerVO){
        return webClient.put()
                .uri(
"/customers/" + customerVO.getCustomerId())
               
//.header("Authorization", "Bearer MY_SECRET_TOKEN")
                .contentType(MediaType.APPLICATION_JSON)
                .accept(MediaType.APPLICATION_JSON)
                .body(Mono.just(customerVO), CustomerVO.class)
                .retrieve()
                .bodyToMono(CustomerVO.class)
                .transform(it -> {
                    ReactiveCircuitBreaker rcb = reactiveCircuitBreakerFactory.create(
"customer-service");
                    return rcb.run(it, throwable -> Mono.just(CustomerVO.builder().build()));
                });
    }

    @DeleteMapping(
"/customers/{customerId}")
    public Mono<String> deleteCustomer(@PathVariable String customerId){
        return webClient.delete()
                .uri(
"/customers/" + customerId)
                .retrieve()
                .bodyToMono(String.class)
                .transform(it -> {
                    ReactiveCircuitBreaker rcb = reactiveCircuitBreakerFactory.create(
"customer-service");
                    return rcb.run(it, throwable -> Mono.just(customerId));
                });
    }
}

 
 问题1:Swagger用户界面在Webflux中不工作
由于我们引入了Webflux库来使用WebClient,你可能会注意到你的swagger UI最初并不工作。为了让它工作,请确保以下步骤得到执行。
在pom中添加以下依赖项。

<dependency>
  <groupId>io.springfox</groupId>
  <artifactId>springfox-boot-starter</artifactId>
  <version>${springfox.version}</version>
</dependency>

如果你已经实现了注释@EnableSwagger2WebFlux,请删除该注释。
现在访问swagger的URL应该是:http://<YOUR_APP_SERVER>:<YOUR_APP_PORT>/swagger-ui/,一定要加上结尾"/"。例如,http://localhost:8100/swagger-ui/。

 
如何为没有返回类型的端点实现断路?
对于在其响应体中没有返回内容的端点,如REST API提供商中的以下端点,在REST客户端,如果我们将调用该端点的相应方法标记为返回Mono<Void>,ReactiveCircuitBreaker将无法工作。在REST API提供者关闭的情况下,你会看到500服务器错误,这完全违背了拥有Circuit Breaker的目的。

@DeleteMapping(value = "/{customerId}")
public ResponseEntity deleteCustomer(@PathVariable String customerId) throws Exception {
    customerService.deleteCustomer(customerId);
    return ResponseEntity.noContent().build();
}

在非反应式断路器的实现中,对于没有返回类型的方法,我们可以使用 "CheckedRunnable",做如下工作(示例)。

CircuitBreaker circuitBreaker = circuitBreakerFactory.create("customer-service");
CheckedRunnable runnable = () -> customerClient.deleteCustomer(customerId);
Try.run(circuitBreaker.decorateCheckedRunnable(runnable)).get();

但是,在反应式断路器中,ReactiveCircuitBreaker没有这样的接口来装饰CheckedRunnable,那么我们该怎么做?经过一些调查和实验,我注意到我们可以操纵这种端点的返回类型,使其返回一个一般的类型,如String。简单地说,如果一个端点,如DELETE调用在服务器端返回Void,我们仍然可以在客户端操纵该DELETE调用的返回类型,以返回一个简单的类型,如String,只是传回输入到该端点的String。例如,在客户端我们可以这样实现DELETE调用。

public Mono<String> deleteCustomer(@PathVariable String customerId){
    return webClient.delete()
            .uri("/customers/" + customerId)
            .retrieve()
            .bodyToMono(String.class)
            .transform(it -> {
                ReactiveCircuitBreaker rcb = reactiveCircuitBreakerFactory.create(
"customer-service");
                return rcb.run(it, throwable -> Mono.just(customerId));
            });
}


注意我们正在返回Mono<String>而不是Mono<Void>,而且我们在.bodyToMono(String.class)行指定了返回类型为String,这就是为什么我们可以简单地调用ReactiveCircuitBreaker的run方法来调用装饰过的反应式断路器函数。这是我能想到的唯一方法,可以解决ReactiveCircuitBreaker没有decorateCheckedRunnable方法来处理没有返回类型的方法。
完整源码:GitHub repository