微服务模式:Spring Boot + Kafka的业务流程Saga模式 - vinsguru


多年来,微服务已变得非常流行。微服务是分布式系统。它们更小,模块化,易于部署和扩展等。开发单个微服务应用程序可能会很有趣!但是处理跨越多个微服务的业务交易并不好玩!MicroService体系结构具有特定的职责。为了完成应用程序工作流程/任务,可能需要多个MicroServices一起工作。
让我们看看本文中在分布式系统中处理事务/数据一致性有多困难。
假设我们的业务规则说,当用户下订单时,如果产品的价格在用户的信用限额/余额之内并且该产品的库存可用,则订单将得到满足。否则将无法实现。看起来真的很简单。这在整体应用中非常容易实现。整个工作流程可以视为1个单事务。当所有内容都在单个数据库中时,提交/回滚很容易。对于具有多个数据库的分布式系统,这将非常复杂!首先让我们看一下我们的架构,看看如何实现它。

我们在下面的微服务中拥有自己的数据库。

  • 订购服务
  • 付款服务
  • 库存服务

当订单服务收到新订单的请求时,它必须与付款服务和库存服务进行核对。我们扣除付款,库存并最终完成订单!如果我们扣除付款但没有库存,会发生什么?如何回滚?涉及多个数据库很难。

传奇Saga模式
通常,在所有微服务之间处理事务和维护数据一致性很困难。当涉及多种服务时,例如付款,库存,欺诈检查,运输检查…..etc等,如果没有协调员,将很难通过多个步骤来管理如此复杂的工作流程。通过为协调员引入单独的服务,订单服务摆脱了这些多余责任。我们也没有引入任何循环依赖。
此处检查项目源代码。
跨越多个微服务的每个业务交易都被分成特定于微服务的本地交易,并按顺序执行它们以完成业务工作流程。它被称为佐贺。它可以通过两种方式实现。

  • 编舞Choreography 方法
  • 编排Orchestration 方法

在本文中,我们将讨论基于Orchestration的传奇Saga。

在这种模式下,我们将有一个协调器,一个单独的服务,它将协调所有微服务之间的所有事务。如果一切正常,它将使订单请求完成,否则将其标记为已取消。
让我们看看如何实现这一点。我们的示例架构将或多或少像这样!

  • 在此演示中,协调器与其他服务之间的通信将是一个简单的HTTP,以一种非阻塞的异步方式来使其无状态。
  • 我们也可以使用Kafka主题进行交流。为此,我们必须使用分散/聚集模式,该模式更像是有状态的样式。

Order协调器
这是一个微服务,负责协调所有事务。它侦听订单创建的主题。当创建新订单时,它会立即为每个服务(如付款服务/库存服务等)建立单独的请求,并验证响应。如果可以,请执行订单。如果其中之一不是,则取消定单。它还尝试重置任何微服务中发生的任何本地事务。
我们将所有本地交易视为1个单一工作流程。一个工作流程将包含多个工作流程步骤。

  • 工作流程步骤

public interface WorkflowStep {

    WorkflowStepStatus getStatus();
    Mono<Boolean> process();
    Mono<Boolean> revert();

}

  • 工作流程

public interface Workflow {

    List<WorkflowStep> getSteps();

}

  • 在本例中,对于“订购”工作流,我们有2个步骤。每个实现都应该知道如何进行本地事务以及如何重置。
  • 库存步骤需要继承实现WorkflowStep接口

public class InventoryStep implements WorkflowStep {

    private final WebClient webClient;
    private final InventoryRequestDTO requestDTO;
    private WorkflowStepStatus stepStatus = WorkflowStepStatus.PENDING;

    public InventoryStep(WebClient webClient, InventoryRequestDTO requestDTO) {
        this.webClient = webClient;
        this.requestDTO = requestDTO;
    }

    @Override
    public WorkflowStepStatus getStatus() {
        return this.stepStatus;
    }

    @Override
    public Mono<Boolean> process() {
        return this.webClient
                .post()
                .uri("/inventory/deduct")
                .body(BodyInserters.fromValue(this.requestDTO))
                .retrieve()
                .bodyToMono(InventoryResponseDTO.class)
                .map(r -> r.getStatus().equals(InventoryStatus.AVAILABLE))
                .doOnNext(b -> this.stepStatus = b ? WorkflowStepStatus.COMPLETE : WorkflowStepStatus.FAILED);
    }

    @Override
    public Mono<Boolean> revert() {
        return this.webClient
                    .post()
                    .uri(
"/inventory/add")
                    .body(BodyInserters.fromValue(this.requestDTO))
                    .retrieve()
                    .bodyToMono(Void.class)
                    .map(r ->true)
                    .onErrorReturn(false);
    }
}

  • 付款步骤也是要实现接口中处理和回退两个步骤

public class PaymentStep implements WorkflowStep {

    private final WebClient webClient;
    private final PaymentRequestDTO requestDTO;
    private WorkflowStepStatus stepStatus = WorkflowStepStatus.PENDING;

    public PaymentStep(WebClient webClient, PaymentRequestDTO requestDTO) {
        this.webClient = webClient;
        this.requestDTO = requestDTO;
    }

    @Override
    public WorkflowStepStatus getStatus() {
        return this.stepStatus;
    }

    @Override
    public Mono<Boolean> process() {
        return this.webClient
                    .post()
                    .uri("/payment/debit")
                    .body(BodyInserters.fromValue(this.requestDTO))
                    .retrieve()
                    .bodyToMono(PaymentResponseDTO.class)
                    .map(r -> r.getStatus().equals(PaymentStatus.PAYMENT_APPROVED))
                    .doOnNext(b -> this.stepStatus = b ? WorkflowStepStatus.COMPLETE : WorkflowStepStatus.FAILED);
    }

    @Override
    public Mono<Boolean> revert() {
        return this.webClient
                .post()
                .uri(
"/payment/credit")
                .body(BodyInserters.fromValue(this.requestDTO))
                .retrieve()
                .bodyToMono(Void.class)
                .map(r -> true)
                .onErrorReturn(false);
    }

}

  • 服务/协调员

@Service
public class OrchestratorService {

    @Autowired
    @Qualifier("payment")
    private WebClient paymentClient;

    @Autowired
    @Qualifier(
"inventory")
    private WebClient inventoryClient;

    public Mono<OrchestratorResponseDTO> orderProduct(final OrchestratorRequestDTO requestDTO){
        Workflow orderWorkflow = this.getOrderWorkflow(requestDTO);
        return Flux.fromStream(() -> orderWorkflow.getSteps().stream())
                .flatMap(WorkflowStep::process)
                .handle(((aBoolean, synchronousSink) -> {
                    if(aBoolean)
                        synchronousSink.next(true);
                    else
                        synchronousSink.error(new WorkflowException(
"create order failed!"));
                }))
                .then(Mono.fromCallable(() -> getResponseDTO(requestDTO, OrderStatus.ORDER_COMPLETED)))
                .onErrorResume(ex -> this.revertOrder(orderWorkflow, requestDTO));

    }

    private Mono<OrchestratorResponseDTO> revertOrder(final Workflow workflow, final OrchestratorRequestDTO requestDTO){
        return Flux.fromStream(() -> workflow.getSteps().stream())
                .filter(wf -> wf.getStatus().equals(WorkflowStepStatus.COMPLETE))
                .flatMap(WorkflowStep::revert)
                .retry(3)
                .then(Mono.just(this.getResponseDTO(requestDTO, OrderStatus.ORDER_CANCELLED)));
    }

    private Workflow getOrderWorkflow(OrchestratorRequestDTO requestDTO){
        WorkflowStep paymentStep = new PaymentStep(this.paymentClient, this.getPaymentRequestDTO(requestDTO));
        WorkflowStep inventoryStep = new InventoryStep(this.inventoryClient, this.getInventoryRequestDTO(requestDTO));
        return new OrderWorkflow(List.of(paymentStep, inventoryStep));
    }

    private OrchestratorResponseDTO getResponseDTO(OrchestratorRequestDTO requestDTO, OrderStatus status){
        OrchestratorResponseDTO responseDTO = new OrchestratorResponseDTO();
        responseDTO.setOrderId(requestDTO.getOrderId());
        responseDTO.setAmount(requestDTO.getAmount());
        responseDTO.setProductId(requestDTO.getProductId());
        responseDTO.setUserId(requestDTO.getUserId());
        responseDTO.setStatus(status);
        return responseDTO;
    }

    private PaymentRequestDTO getPaymentRequestDTO(OrchestratorRequestDTO requestDTO){
        PaymentRequestDTO paymentRequestDTO = new PaymentRequestDTO();
        paymentRequestDTO.setUserId(requestDTO.getUserId());
        paymentRequestDTO.setAmount(requestDTO.getAmount());
        paymentRequestDTO.setOrderId(requestDTO.getOrderId());
        return paymentRequestDTO;
    }

    private InventoryRequestDTO getInventoryRequestDTO(OrchestratorRequestDTO requestDTO){
        InventoryRequestDTO inventoryRequestDTO = new InventoryRequestDTO();
        inventoryRequestDTO.setUserId(requestDTO.getUserId());
        inventoryRequestDTO.setProductId(requestDTO.getProductId());
        inventoryRequestDTO.setOrderId(requestDTO.getOrderId());
        return inventoryRequestDTO;
    }

}

有关完整的源码,请在此处下载。