多年来,微服务已变得非常流行。微服务是分布式系统。它们更小,模块化,易于部署和扩展等。开发单个微服务应用程序可能会很有趣!但是处理跨越多个微服务的业务交易并不好玩!MicroService体系结构具有特定的职责。为了完成应用程序工作流程/任务,可能需要多个MicroServices一起工作。
让我们看看本文中在分布式系统中处理事务/数据一致性有多困难。
假设我们的业务规则说,当用户下订单时,如果产品的价格在用户的信用限额/余额之内并且该产品的库存可用,则订单将得到满足。否则将无法实现。看起来真的很简单。这在整体应用中非常容易实现。整个工作流程可以视为1个单事务。当所有内容都在单个数据库中时,提交/回滚很容易。对于具有多个数据库的分布式系统,这将非常复杂!首先让我们看一下我们的架构,看看如何实现它。
我们在下面的微服务中拥有自己的数据库。
当订单服务收到新订单的请求时,它必须与付款服务和库存服务进行核对。我们扣除付款,库存并最终完成订单!如果我们扣除付款但没有库存,会发生什么?如何回滚?涉及多个数据库很难。
传奇Saga模式
通常,在所有微服务之间处理事务和维护数据一致性很困难。当涉及多种服务时,例如付款,库存,欺诈检查,运输检查…..etc等,如果没有协调员,将很难通过多个步骤来管理如此复杂的工作流程。通过为协调员引入单独的服务,订单服务摆脱了这些多余责任。我们也没有引入任何循环依赖。
在此处检查项目源代码。
跨越多个微服务的每个业务交易都被分成特定于微服务的本地交易,并按顺序执行它们以完成业务工作流程。它被称为佐贺。它可以通过两种方式实现。
- 编舞Choreography 方法
- 编排Orchestration 方法
在本文中,我们将讨论基于Orchestration的传奇Saga。在这种模式下,我们将有一个协调器,一个单独的服务,它将协调所有微服务之间的所有事务。如果一切正常,它将使订单请求完成,否则将其标记为已取消。
让我们看看如何实现这一点。我们的示例架构将或多或少像这样!
- 在此演示中,协调器与其他服务之间的通信将是一个简单的HTTP,以一种非阻塞的异步方式来使其无状态。
- 我们也可以使用Kafka主题进行交流。为此,我们必须使用分散/聚集模式,该模式更像是有状态的样式。
Order协调器
这是一个微服务,负责协调所有事务。它侦听订单创建的主题。当创建新订单时,它会立即为每个服务(如付款服务/库存服务等)建立单独的请求,并验证响应。如果可以,请执行订单。如果其中之一不是,则取消定单。它还尝试重置任何微服务中发生的任何本地事务。
我们将所有本地交易视为1个单一工作流程。一个工作流程将包含多个工作流程步骤。
public interface WorkflowStep {
WorkflowStepStatus getStatus(); Mono<Boolean> process(); Mono<Boolean> revert();
}
|
public interface Workflow {
List<WorkflowStep> getSteps();
}
|
- 在本例中,对于“订购”工作流,我们有2个步骤。每个实现都应该知道如何进行本地事务以及如何重置。
- 库存步骤需要继承实现WorkflowStep接口
public class InventoryStep implements WorkflowStep {
private final WebClient webClient; private final InventoryRequestDTO requestDTO; private WorkflowStepStatus stepStatus = WorkflowStepStatus.PENDING;
public InventoryStep(WebClient webClient, InventoryRequestDTO requestDTO) { this.webClient = webClient; this.requestDTO = requestDTO; }
@Override public WorkflowStepStatus getStatus() { return this.stepStatus; }
@Override public Mono<Boolean> process() { return this.webClient .post() .uri("/inventory/deduct") .body(BodyInserters.fromValue(this.requestDTO)) .retrieve() .bodyToMono(InventoryResponseDTO.class) .map(r -> r.getStatus().equals(InventoryStatus.AVAILABLE)) .doOnNext(b -> this.stepStatus = b ? WorkflowStepStatus.COMPLETE : WorkflowStepStatus.FAILED); }
@Override public Mono<Boolean> revert() { return this.webClient .post() .uri("/inventory/add") .body(BodyInserters.fromValue(this.requestDTO)) .retrieve() .bodyToMono(Void.class) .map(r ->true) .onErrorReturn(false); } }
|
public class PaymentStep implements WorkflowStep {
private final WebClient webClient; private final PaymentRequestDTO requestDTO; private WorkflowStepStatus stepStatus = WorkflowStepStatus.PENDING;
public PaymentStep(WebClient webClient, PaymentRequestDTO requestDTO) { this.webClient = webClient; this.requestDTO = requestDTO; }
@Override public WorkflowStepStatus getStatus() { return this.stepStatus; }
@Override public Mono<Boolean> process() { return this.webClient .post() .uri("/payment/debit") .body(BodyInserters.fromValue(this.requestDTO)) .retrieve() .bodyToMono(PaymentResponseDTO.class) .map(r -> r.getStatus().equals(PaymentStatus.PAYMENT_APPROVED)) .doOnNext(b -> this.stepStatus = b ? WorkflowStepStatus.COMPLETE : WorkflowStepStatus.FAILED); }
@Override public Mono<Boolean> revert() { return this.webClient .post() .uri("/payment/credit") .body(BodyInserters.fromValue(this.requestDTO)) .retrieve() .bodyToMono(Void.class) .map(r -> true) .onErrorReturn(false); }
}
|
@Service public class OrchestratorService {
@Autowired @Qualifier("payment") private WebClient paymentClient;
@Autowired @Qualifier("inventory") private WebClient inventoryClient;
public Mono<OrchestratorResponseDTO> orderProduct(final OrchestratorRequestDTO requestDTO){ Workflow orderWorkflow = this.getOrderWorkflow(requestDTO); return Flux.fromStream(() -> orderWorkflow.getSteps().stream()) .flatMap(WorkflowStep::process) .handle(((aBoolean, synchronousSink) -> { if(aBoolean) synchronousSink.next(true); else synchronousSink.error(new WorkflowException("create order failed!")); })) .then(Mono.fromCallable(() -> getResponseDTO(requestDTO, OrderStatus.ORDER_COMPLETED))) .onErrorResume(ex -> this.revertOrder(orderWorkflow, requestDTO));
}
private Mono<OrchestratorResponseDTO> revertOrder(final Workflow workflow, final OrchestratorRequestDTO requestDTO){ return Flux.fromStream(() -> workflow.getSteps().stream()) .filter(wf -> wf.getStatus().equals(WorkflowStepStatus.COMPLETE)) .flatMap(WorkflowStep::revert) .retry(3) .then(Mono.just(this.getResponseDTO(requestDTO, OrderStatus.ORDER_CANCELLED))); }
private Workflow getOrderWorkflow(OrchestratorRequestDTO requestDTO){ WorkflowStep paymentStep = new PaymentStep(this.paymentClient, this.getPaymentRequestDTO(requestDTO)); WorkflowStep inventoryStep = new InventoryStep(this.inventoryClient, this.getInventoryRequestDTO(requestDTO)); return new OrderWorkflow(List.of(paymentStep, inventoryStep)); }
private OrchestratorResponseDTO getResponseDTO(OrchestratorRequestDTO requestDTO, OrderStatus status){ OrchestratorResponseDTO responseDTO = new OrchestratorResponseDTO(); responseDTO.setOrderId(requestDTO.getOrderId()); responseDTO.setAmount(requestDTO.getAmount()); responseDTO.setProductId(requestDTO.getProductId()); responseDTO.setUserId(requestDTO.getUserId()); responseDTO.setStatus(status); return responseDTO; }
private PaymentRequestDTO getPaymentRequestDTO(OrchestratorRequestDTO requestDTO){ PaymentRequestDTO paymentRequestDTO = new PaymentRequestDTO(); paymentRequestDTO.setUserId(requestDTO.getUserId()); paymentRequestDTO.setAmount(requestDTO.getAmount()); paymentRequestDTO.setOrderId(requestDTO.getOrderId()); return paymentRequestDTO; }
private InventoryRequestDTO getInventoryRequestDTO(OrchestratorRequestDTO requestDTO){ InventoryRequestDTO inventoryRequestDTO = new InventoryRequestDTO(); inventoryRequestDTO.setUserId(requestDTO.getUserId()); inventoryRequestDTO.setProductId(requestDTO.getProductId()); inventoryRequestDTO.setOrderId(requestDTO.getOrderId()); return inventoryRequestDTO; }
}
|
有关完整的源码,请在此处下载。