在本文中,我们使用 Spring Boot 和 Dapr 构建了一个松耦合的发布/订阅消息系统。通过利用 Dapr 对消息代理的抽象及其与 Spring Boot 的集成,我们简化了消息传递逻辑,而无需依赖特定的基础架构。我们还演示了如何使用 Testcontainers 在本地运行和测试整个设置,从而在开发过程中实现快速反馈循环。
我们将了解 Dapr 是什么、它如何与 Spring Boot 集成,以及如何创建无需与特定代理耦合的发布/订阅系统。我们将演示一个网约车场景,其中用户请求乘车,司机订阅这些请求。最终,我们将实现无需 Dapr CLI 或外部基础设施即可运行的测试。
使用 Dapr 实现不可知基础设施
分布式系统常常面临一些常见而复杂的挑战。我们通常会结合使用特定供应商的库、基础架构工具以及手动集成工作来解决这些问题。
Dapr(分布式应用程序运行时)提供了一组 API 和构建块来应对这些挑战,抽象出基础架构,以便我们专注于业务逻辑。这些原则也适用于其他方面,例如调用其他服务(通过服务调用 API)、持久化状态(通过状态管理 API)或检索机密(通过机密 API)。
这种解耦使应用程序更易于测试、跨环境移植性更强,并且更能适应基础架构的变化。在本文中,我们将重点介绍发布/订阅 API,以实践方式说明这些优势。
使用 Dapr 桥接 Spring Messaging
Spring Boot 拥有一套非常严格的集成模型,尤其是在消息传递方面。许多开发人员已经熟悉 Spring 的抽象,例如KafkaTemplate、RabbitTemplate及其对应的监听器。虽然这些抽象简化了 Broker 集成,但它们仍然与特定技术紧密耦合。
dapr-spring-boot-starter项目不仅仅是一个 API,它还提供了无缝集成。它使用一些熟悉的接口命名,例如DaprMessagingTemplate和@Topic。这使得用户可以轻松使用 Dapr 的分布式消息传递功能,而无需了解底层基础设施的细节。
更具体地说,通过引入 Dapr Spring Boot Starter,我们无需添加任何特定的 Broker 依赖项。这样一来,无需更改任何代码即可切换 Broker 。此外,无需更改应用程序代码,即可在组件级别配置特定于提供程序的功能。例如,我们可以引入Kafka 特定的设置,以利用消费者组等原生功能。
拥有不受锁定的基础设施灵活性
Dapr 将应用程序代码与基础架构解耦。例如,无论我们在底层使用 Kafka、RabbitMQ、Redis Streams 还是 Azure Service Bus,我们的 Spring Boot 应用程序都可以通过 HTTP 或 gRPC 与 Dapr Sidecar 进行通信,而 Dapr 则负责与实际代理的集成。
最重要的是,我们可以在没有完整基础设施的情况下在本地进行测试,正如我们将使用Testcontainers所看到的那样。dapr -spring-boot-starter-test模块将 Dapr Sidecar 作为测试生命周期的一部分启动,从而无需学习Dapr CLI或 Kubernetes。
设置 Spring Boot 项目
我们将模拟一个叫车应用,以演示 Dapr 如何与 Spring Boot 集成。用户将向我们的 API 端点发送乘车请求,该端点会向已订阅的司机发布消息。然后,司机可以选择是否接受该行程。
首先添加所需的依赖项。我们需要spring-boot-starter-web用于 REST 端点,以及dapr-spring-boot-starter用于 Spring Boot 集成:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>io.dapr.spring</groupId> <artifactId>dapr-spring-boot-starter</artifactId> </dependency>
|
为了进行测试,我们还将添加dapr-spring-boot-starter-test以支持 Testcontainers,并添加RabbitMQ容器作为我们的消息代理:
<dependency> <groupId>io.dapr.spring</groupId> <artifactId>dapr-spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>rabbitmq</artifactId> <version>1.20.6</version> <scope>test</scope> </dependency>
|
创建模型
这个 POJO 代表乘车请求:
public class RideRequest { private String passengerId; private String location; private String destination; // default getters and setters }
|
它不需要对消息进行特殊的注释。
使用DaprMessagingTemplate实现发布器
DaprMessagingTemplate与 Spring 的其他消息传递模板类似,但不需要特定的代理作为依赖项。首先,在application.properties中定义消息传递组件的名称:
dapr.pubsub.name=ride-hailing
然后,我们将使用DaprPubSubProperties类引用此属性,并使用RideRequest作为消息类型。这样就完成了发送消息所需的配置:
@Configuration @EnableConfigurationProperties({ DaprPubSubProperties.class }) public class DaprMessagingConfig { @Bean public DaprMessagingTemplate<RideRequest> messagingTemplate( DaprClient client, DaprPubSubProperties config) { return new DaprMessagingTemplate<>(client, config.getName(), false); } }
|
使用端点接收消息
接下来,我们将创建一个控制器来接收乘车请求,并使用 Dapr 模板将其转发到“ride-requests”主题。我们可以将控制器映射到任何我们想要的路径:
@RestController @RequestMapping("/passenger") public class PassengerRestController { @Autowired private DaprMessagingTemplate<RideRequest> messaging; @PostMapping("/request-ride") public String requestRide(@RequestBody RideRequest request) { messaging.send("ride-requests", request); return "waiting for drivers"; } }
|
请注意,我们的消息正文不需要任何转换或配置,因为 Dapr 会自动处理它。
创建和配置订阅者
在我们的示例中,司机充当订阅者,接收乘车请求并决定是否接受。我们将使用 Dapr 的@Topic注解来实现这一点,将传入的消息绑定到控制器方法。
使用@Topic实现控制器
使用@Topic注解时,必须同时包含组件名称和主题名称。Dapr Sidecar (由测试容器自动处理)每次从 Broker 转发消息时都会调用此端点:
@RestController @RequestMapping("driver") public class DriverRestController { // ... @PostMapping("ride-request") @Topic(pubsubName = "ride-hailing", name = "ride-requests") public void onRideRequest(@RequestBody CloudEvent<RideRequest> event) { // ... } }
|
请注意,有效负载被包装在Dapr 自动创建的CloudEvent对象中。这对于基于CloudEvent 元数据进行路由或过滤等高级场景很有帮助,但对于基本的发布/订阅模式来说并非必需。
配置订阅者行为
我们的订阅者代表接受或拒绝行程的司机。为了说明这一点,我们将使用简单的模式逻辑来判断行程是否可接受。让我们将其添加到application.properties中,以便在启动应用程序时轻松更改其值:
driver.acceptance.criteria=East Side
接下来,我们将这个值注入到控制器中的一个变量中,同时注入用于计数接受/拒绝的驱动器的变量:
int drivesAccepted; int drivesRejected; @Value("${driver.acceptance.criteria}") String criteria; public int getDrivesAccepted() { return drivesAccepted; } public int getDrivesRejected() { return drivesRejected; }
|
我们将在编写测试时使用这些来检查控制器的行为。
处理CloudEvent
最后,我们将从CloudEvent中检索有效负载并决定驱动器是否可以接受:
@Topic(pubsubName = "ride-hailing", name = "ride-requests") public void onRideRequest(@RequestBody CloudEvent<RideRequest> event) { RideRequest request = event.getData(); if (request.getDestination().contains(criteria)) { drivesAccepted++; } else { drivesRejected++; throw new UnsupportedOperationException("drive rejected"); } }
|
由于我们无法直接拒绝消息,因此我们会抛出异常来触发消息的重新排队。对于 RabbitMQ,这需要requeueInFailure配置,我们将在创建测试容器时设置该配置。
使用 Testcontainers 测试发布者
为了验证我们的发布者是否正确发送消息,我们将使用Testcontainers编写集成测试。这使我们能够启动 Dapr Sidecar 和 RabbitMQ 实例,而无需依赖外部工具或 Dapr CLI。
1. 设置测试配置
对于我们的测试属性,除了验收标准之外,我们还将包括消息传递组件名称和 Dapr 容器的专用服务器端口。
此外,我们需要选择一个固定端口,以便我们的组件可以在同一个网络中相互定位:
driver.acceptance.criteria=East Side dapr.pubsub.name=ride-hailing server.port=60601
|
我们将通过设置服务器端口号并指定组件之间共享的网络来开始配置。我们还将包含DaprPubSubProperties以便稍后获取 RabbitMQ 组件的名称:
@TestConfiguration(proxyBeanMethods = false) @EnableConfigurationProperties({ DaprPubSubProperties.class }) public class DaprTestContainersConfig { @Value("${server.port}") private int serverPort; @Bean public Network daprNetwork() { return Network.newNetwork(); } // ... }
|
配置容器
让我们创建暴露默认端口 5672 的 RabbitMQ 容器:
@Bean public RabbitMQContainer rabbitMQContainer(Network daprNetwork) { return new RabbitMQContainer(DockerImageName.parse("rabbitmq:3.7.25-management-alpine")) .withExposedPorts(5672) .withNetworkAliases("rabbitmq") .withNetwork(daprNetwork); }
|
最后,我们将添加一个 Dapr 容器来包装所有内容,并使用@ServiceConnection注释来简化配置:
@Bean @ServiceConnection public DaprContainer daprContainer( Network daprNetwork, RabbitMQContainer rabbitMQ, DaprPubSubProperties pubSub) { Map<String, String> rabbitMqConfig = new HashMap<>(); rabbitMqConfig.put("connectionString", "amqp://guest:guest@rabbitmq:5672"); rabbitMqConfig.put("user", "guest"); rabbitMqConfig.put("password", "guest"); rabbitMqConfig.put("requeueInFailure", "true"); return new DaprContainer("daprio/daprd:1.14.4") .withAppName("dapr-pubsub") .withNetwork(daprNetwork) .withComponent(new Component(pubSub.getName(), "pubsub.rabbitmq", "v1", rabbitMqConfig)) .withAppPort(serverPort) .withAppChannelAddress("host.testcontainers.internal") .dependsOn(rabbitMQ); }
|
除了样板之外,关键配置还包括:
- requeueInFailure :由于我们无法直接NACK消息,因此我们启用此选项。当我们有多个订阅者实例时,这允许其他客户端接收被其他客户端拒绝的消息。
- withComponent(…”pubsub.rabbitmq”):我们将使用 RabbitMQ 实现,因此我们在此处指定它。Dapr 支持许多消息代理,包括云提供商管理的服务,例如Google PubSub、Amazon SQS/SNS和Azure Event Hub。
- withAppChannelAddress:我们将添加此项以启用主机对容器的访问。如果没有它,测试可能会在等待 Dapr 响应时挂起。
我们还可以使用日志记录配置来启动 Dapr 容器,以便于调试。为此,我们设置了withDaprLogLevel和withLogConsumer选项:
.withDaprLogLevel(DaprLogLevel.INFO) .withLogConsumer(outputFrame -> logger.info(outputFrame.getUtf8String()))
|
创建测试应用程序
现在我们准备在测试包中创建测试应用程序:
@SpringBootApplication public class DaprPublisherTestApp { public static void main(String[] args) { SpringApplication.from(DaprPublisherApp::main) .with(DaprTestContainersConfig.class) .run(args); } }
|
我们将引用主应用程序类,以避免重复任何配置,例如DaprMessagingConfig 类。我们还需要将DriverRestController复制到测试文件夹以进行集成测试。
创建集成测试
我们需要引用测试应用、配置和DaprAutoConfiguration类。然后,注入控制器来检查控制变量,并注入 Dapr 容器以了解应用何时准备好接收消息:
@SpringBootTest( classes = { DaprPublisherTestApp.class, DaprTestContainersConfig.class, DaprAutoConfiguration.class }, webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT) class DaprPublisherIntegrationTest { @Autowired DriverRestController controller; @Autowired DaprContainer daprContainer; @Value("${server.port}") int serverPort; @Value("${driver.acceptance.criteria}") String criteria; // ... }
|
由于我们需要验证容器是否已正确启动,因此我们可以等待“应用程序已订阅以下主题”消息。这有助于确保测试仅在容器准备好接受消息时才开始。我们还将定义 API 的基本 URI,以便使用RestAssured进行调用:
@BeforeEach void setUp() { RestAssured.baseURI = "http://localhost:" + serverPort; org.testcontainers.Testcontainers.exposeHostPorts(serverPort); Wait.forLogMessage(".*app is subscribed to the following topics.*", 1) .waitUntilReady(daprContainer); }
|
我们的第一个测试是发布一个符合驾驶员接受标准的驾驶请求,并检查已接受的驾驶数量。当这个数字增加时,我们可以断言订阅者已处理该消息:
@Test void whenDriveAcceptable_thenDrivesAcceptedIncrease() { int drivesAccepted = controller.getDrivesAccepted(); given() .contentType(ContentType.JSON) .body(""" { "passengerId": "1", "location": "Point A", "destination": "%s Point B" } """.formatted(criteria)) .when() .post("/passenger/request-ride") .then() .statusCode(200); await() .atMost(Duration.ofSeconds(5)) .until(controller::getDrivesAccepted, equalTo(drivesAccepted + 1)); }
|
相反,我们的第二个测试涉及发布我们的驱动程序应该拒绝的驱动请求:
@Test void whenDriveUnacceptable_thenDrivesRejectedIncrease() { int drivesRejected = controller.getDrivesRejected(); given().contentType(ContentType.JSON) .body(""" { "passengerId": "2", "location": "Point B", "destination": "West Side A" } """) .when() .post("/passenger/request-ride") .then() .statusCode(200); await() .atMost(Duration.ofSeconds(5)) .until(controller::getDrivesRejected, greaterThan(drivesRejected)); }
|
这次,我们测试被拒绝的驱动器数量是否增加了。此外,由于消息在发生错误时会重新排队,因此我们验证变量是否大于其初始值,因为我们无法确定它已被处理了多少次。
使用 Testcontainers 测试订阅服务器
现在让我们测试一下订阅者的行为。我们将创建一个类似于发布者的设置,重点验证订阅者如何处理传入的消息。
设置环境
首先,我们将包含类似的测试属性,仅更改服务器端口:
driver.acceptance.criteria=East Side
dapr.pubsub.name=ride-hailing
server.port=60602
我们将DaprMessagingConfig类复制到测试包中,以便在集成测试中使用它。我们还需要将 DaprTestContainersConfig 复制 到测试文件夹,因为我们需要相同的容器。
创建集成测试
与之前的集成测试类似,我们需要连接容器、控制器、服务器端口、驱动程序验收标准,并在@Setup期间等待容器准备就绪。我们还需要包含 Dapr 消息模板,以便向订阅者发送消息:
@SpringBootTest( classes = { DaprSubscriberTestApp.class, DaprTestContainersConfig.class, DaprMessagingConfig.class, DaprAutoConfiguration.class }, webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT) class DaprSubscriberIntegrationTest { @Autowired DaprMessagingTemplate<RideRequest> messaging; @Autowired DriverRestController controller; @Autowired DaprContainer daprContainer; @Value("${server.port}") int serverPort; @Value("${driver.acceptance.criteria}") String criteria; // test setup... }
|
实施测试场景
我们的第一个测试涉及发送可接受的驱动器并检查我们的控制器是否正确接收它:
@Test void whenDriveAcceptable_thenDrivesAcceptedIncrease() { int drivesAccepted = controller.getDrivesAccepted(); RideRequest ride = new RideRequest( "1", "Point A", String.format("%s Point B", criteria)); messaging.send("ride-requests", ride); await().atMost(Duration.ofSeconds(5)) .until(controller::getDrivesAccepted, equalTo(drivesAccepted + 1)); }
|
我们的第二项测试包括发送不可接受的驱动器并检查我们的控制器是否正确拒绝它:
@Test void whenDriveUnacceptable_thenDrivesRejectedIncrease() { int drivesRejected = controller.getDrivesRejected(); RideRequest request = new RideRequest("2", "Point B", "West Side Point A"); messaging.send("ride-requests", request); await().atMost(Duration.ofSeconds(5)) .until(controller::getDrivesRejected, greaterThan(drivesRejected)); }
|
通过我们的订阅者测试,我们已经验证了 Dapr 正确地将消息从代理路由到我们的 Spring Boot 应用程序,并且订阅者的行为按预期工作。