本文是Spring Boot 3.2 与 Java 21、虚拟线程、Spring Security、PostgreSQL、Flyway、缓存、Micrometer、Opentelemetry、JUnit 5、RabbitMQ、Keycloak 集成等的综合指南!
包模块
这个源码project 层次结构遵循标准 Java 包约定,按包类型组织。就我个人而言,我发现从完全模块化的方法开始具有挑战性,因为最初,您可能无法完全理解您的应用程序。我建议从一个简单的结构开始,随着您的理解的成熟,使其适应您的要求。控制器、请求和响应按产品进行组织,以增强代码分离和安全性:
├── postman ├── scripts └── src └── main └── java │ └── com │ └── mycompany │ └── microservice │ └── api │ ├── clients │ │ ├── http │ │ └── slack │ ├── controllers │ │ ├── backoffice │ │ ├── internal │ │ │ ├── actuator │ │ │ ├── cloudfunctions │ │ │ ├── cloudschedulers │ │ │ └── integrations │ │ ├── management │ │ │ └── base │ │ ├── platform │ │ │ ├── api │ │ │ ├── mobile │ │ │ └── web │ │ └── pubic │ ├── entities │ │ └── base │ ├── enums │ ├── exceptions │ ├── facades │ ├── infra │ │ ├── advice │ │ ├── auditors │ │ ├── auth │ │ │ ├── converters │ │ │ ├── jwt │ │ │ └── providers │ │ ├── executors │ │ ├── filters │ │ ├── interceptors │ │ ├── otlp │ │ ├── ratelimit │ │ └── security │ ├── listeners │ ├── mappers │ │ ├── annotations │ │ └── base │ ├── rabbitmq │ │ ├── configs │ │ ├── listeners │ │ └── publishers │ ├── repositories │ ├── requests │ │ ├── management │ │ └── shared │ ├── responses │ │ ├── management │ │ └── shared │ ├── services │ │ └── base │ └── utils └── resources └── db └── migration
|
依赖:
实体
它只有两个实体,Company这ApiKey对于展示 MVP 是绝对必要的。它们扩展BaseEntity到提供审计:
@Getter @Setter @NoArgsConstructor @AllArgsConstructor @SuperBuilder @MappedSuperclass public abstract class BaseEntity implements Serializable {
@Serial private static final long serialVersionUID = 7677353645504602647L;
@CreatedBy @Column private String createdBy; @LastModifiedBy @Column private String updatedBy;
@CreatedDate @Column(nullable = false, updatable = false) private LocalDateTime createdAt;
@LastModifiedDate @Column(nullable = false) private LocalDateTime updatedAt;
public abstract Long getId(); } @Entity @EntityListeners(AuditingEntityListener.class) @Getter @Setter @NoArgsConstructor @AllArgsConstructor @SuperBuilder @Table(name = TABLE_NAME, schema = "public") public class Company extends BaseEntity { public static final String TABLE_NAME = "company";
@Serial private static final long serialVersionUID = 2137607105409362080L;
@Id @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = TABLE_NAME) @GenericGenerator( name = TABLE_NAME, type = BatchSequenceGenerator.class, parameters = { @Parameter(name = "sequence", value = TABLE_NAME + "_id_seq"), @Parameter(name = "fetch_size", value = "1") }) private Long id;
[...]
}
|
注意:如果使用 GenerationType.IDENTITY,Hibernate 将禁用插入批处理。通过使用 BatchSequenceGenerator,Hibernate 将要求一个 ID 数量(fetch_size),以防止任何额外的数据库往返。例如,在默认配置下,如果要插入 5 条记录,Hibernate 将进行 5 次往返以获取 5 个增量 ID,然后再进行 1 次往返以插入记录。如果使用 UUID,请务必小心,因为它会降低应用程序的运行速度。
使用 Postgres 15 进行持久化,并使用 Flyway 来管理迁移,见db.migration目录:
- 1.1.1 创建表和一些初始记录以简化 API 测试。
- 2.1.1 仅用于本地测试,它支持逻辑解码以使用 Debezium 等 CDC 工具进行实验。
Spring Exception
Spring 6 实现了HTTP API规范的问题详细信息RFC 7807,现已弃用,取而代之的是RFC 9457。
通过使用@ControllerAdvice和扩展,ResponseEntityExceptionHandler它很容易实现。
例如,@Valid处理程序:
@Slf4j @ControllerAdvice @RequiredArgsConstructor public class GlobalExceptionHandler extends ResponseEntityExceptionHandler {
// Process @Valid @Override protected ResponseEntity<Object> handleMethodArgumentNotValid( @NonNull final MethodArgumentNotValidException ex, @NonNull final HttpHeaders headers, @NonNull final HttpStatusCode status, @NonNull final WebRequest request) { log.info(ex.getMessage(), ex);
final List<ApiErrorDetails> errors = new ArrayList<>();
for (final ObjectError err : ex.getBindingResult().getAllErrors()) { errors.add( ApiErrorDetails.builder() .pointer(((FieldError) err).getField()) .reason(err.getDefaultMessage()) .build()); }
return ResponseEntity.status(BAD_REQUEST) .body(this.buildProblemDetail(BAD_REQUEST, "Validation failed.", errors)); }
private ProblemDetail buildProblemDetail( final HttpStatus status, final String detail, final List<ApiErrorDetails> errors) { final ProblemDetail problemDetail = ProblemDetail.forStatusAndDetail(status, StringUtils.normalizeSpace(detail)); // Adds errors fields on validation errors, following RFC 9457 best practices. if (CollectionUtils.isNotEmpty(errors)) { problemDetail.setProperty("errors", errors); } return problemDetail; } JSON样本: { "type": "about:blank", "title": "Bad Request", "status": 400, "detail": "Validation failed.", "instance": "/management/companies", "errors": [ { "pointer": "name", "reason": "must not be blank" }, { "pointer": "slug", "reason": "must not be blank" } ] }
|
重写控制器方法验证,例如 @RequestParam、@Pathvariable:
// Process controller method parameter validations e.g. @RequestParam, @PathVariable etc. @Override protected ResponseEntity<Object> handleHandlerMethodValidationException( final @NotNull HandlerMethodValidationException ex, final @NotNull HttpHeaders headers, final @NotNull HttpStatusCode status, final @NotNull WebRequest request) { log.info(ex.getMessage(), ex);
final List<ApiErrorDetails> errors = new ArrayList<>(); for (final var validation : ex.getAllValidationResults()) { final String parameterName = validation.getMethodParameter().getParameterName(); validation .getResolvableErrors() .forEach( error -> { errors.add( ApiErrorDetails.builder() .pointer(parameterName) .reason(error.getDefaultMessage()) .build()); }); }
return ResponseEntity.status(BAD_REQUEST) .body(this.buildProblemDetail(BAD_REQUEST, "Validation failed.", errors)); }
|
JSON:
{ "type": "about:blank", "title": "Bad Request", "status": 400, "detail": "Validation failed.", "instance": "/back-office/hello-world", "errors": [ { "pointer": "email", "reason": "must be a well-formed email address" } ] }
|
应用Exception
所有应用程序异常都会扩展 RootException:
@Getter public class RootException extends RuntimeException {
@Serial private static final long serialVersionUID = 6378336966214073013L;
private final HttpStatus httpStatus; private final List<ApiErrorDetails> errors = new ArrayList<>();
public RootException(@NonNull final HttpStatus httpStatus) { super(); this.httpStatus = httpStatus; }
public RootException(@NonNull final HttpStatus httpStatus, final String message) { super(message); this.httpStatus = httpStatus; } }
|
同样,在 @ControllerAdvice 中,它实现了一个全局错误处理程序:
@ExceptionHandler(RootException.class) public ResponseEntity<ProblemDetail> rootException(final RootException ex) { log.info(ex.getMessage(), ex); // Uses default message, can be adapted to use ex.getMessage(). final ProblemDetail problemDetail = this.buildProblemDetail( ex.getHttpStatus(), API_DEFAULT_REQUEST_FAILED_MESSAGE, ex.getErrors()); return ResponseEntity.status(ex.getHttpStatus()).body(problemDetail); } { "type": "about:blank", "title": "Internal Server Error", "status": 500, "detail": "Request failed.", "instance": "/back-office/hello-world" }
|
Fallback Exception
所有未捕获的异常都将回退到该处理程序:
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR) @ExceptionHandler(Throwable.class) public ProblemDetail handleAllExceptions(final Throwable ex, final WebRequest request) { log.warn(ex.getMessage(), ex);
this.slack.notify(format("[API] InternalServerError: %s", ex.getMessage()));
return this.buildProblemDetail(HttpStatus.INTERNAL_SERVER_ERROR, API_DEFAULT_ERROR_MESSAGE); } { "type": "about:blank", "title": "Internal Server Error", "status": 500, "detail": "Something went wrong. Please try again later or enter in contact with our service.", "instance": "/back-office/hello-world" }
|
身份验证与授权
身份和访问管理(IAM)是一个非常广泛和复杂的话题。我将尽力做到简明扼要。
应用程序接口有 4 个产品、6 个应用程序接口和 5 个角色(为简单起见,跳过管理员应用程序接口)。
4 个应用程序接口使用 JWT,2 个使用 API key。
JWT 授权
JWT 授权是通过将 Keycloak 角色映射到 Spring GrantedAuthority来完成的:
private Collection<GrantedAuthority> extractRealmAccessRoles(final Jwt jwt) { final Map<String, Collection<String>> realmAccess = jwt.getClaim(CLAIM_REALM_ACCESS);
if (realmAccess == null) { return Collections.emptyList(); }
final Collection<String> realmAccessRoles = realmAccess.get(CLAIM_ROLES);
if (realmAccessRoles == null) return Collections.emptyList(); }
return realmAccessRoles.stream() .map(role -> new SimpleGrantedAuthority("ROLE_" + role)) .collect(Collectors.toSet()); }
|
keycloak 中的每个用户都被分配到一个组和一个子组。子组自动分配相应的角色。例如,后台组有一个子组用户,其角色是自动分配的 back_office_user
然后,Spring 安全过滤链会根据 Keycloak 角色授权 API。
http. [...] .authorizeHttpRequests( authorize -> authorize [...] .requestMatchers(AppUrls.BACK_OFFICE + "/**") .hasAnyRole(BACK_OFFICE_USER.getName()) [...]
|
API 授权
API 授权是通过使用 ApiKeyAuthenticationFilter 和 ApiKeyAuthenticationProvider 将 Api-Key 标头映射到 api_key.key 表来实现的。
@Slf4j public class ApiKeyAuthenticationFilter extends AbstractAuthenticationProcessingFilter {
public ApiKeyAuthenticationFilter( final String defaultFilterProcessesUrl, final AuthenticationManager authenticationManager) { super(defaultFilterProcessesUrl); this.setAuthenticationManager(authenticationManager); }
@Override public Authentication attemptAuthentication( final HttpServletRequest request, final HttpServletResponse response) {
final String apiKeyHeader = request.getHeader(AppHeaders.API_KEY_HEADER);
final Optional<String> apiKeyOptional = StringUtils.isNotBlank(apiKeyHeader) ? Optional.of(apiKeyHeader) : Optional.empty();
final ApiKeyAuthentication apiKey = apiKeyOptional.map(ApiKeyAuthentication::new).orElse(new ApiKeyAuthentication());
return this.getAuthenticationManager().authenticate(apiKey); }
|
@Slf4j public class ApiKeyAuthenticationProvider implements AuthenticationProvider {
@Autowired private ApiKeyService apiKeyService; @Autowired private CompanyService companyService;
@Override public Authentication authenticate(final Authentication authentication) throws AuthenticationException {
final String apiKeyInRequest = (String) authentication.getPrincipal();
if (StringUtils.isBlank(apiKeyInRequest)) { throw new InsufficientAuthenticationException("api-key is not defined on request"); } else {
final Optional<ApiKey> apiKeyOptional = this.apiKeyService.findByKeyOptional(apiKeyInRequest);
if (apiKeyOptional.isPresent()) { final ApiKey apiKey = apiKeyOptional.get(); final Company company = this.companyService.findById(apiKey.getCompanyId()); final ApiKeyDetails apiKeyDetails = ApiKeyDetails.builder() .id(apiKey.getId()) .companySlug(company.getSlug()) .email(company.getEmail()) .isInternal(Boolean.TRUE.equals(company.getIsInternal())) .isPlatform(Boolean.TRUE.equals(company.getIsPlatform())) .build();
return new ApiKeyAuthentication( apiKey.getKey(), true, apiKeyDetails, company.getGrantedAuthoritiesFromCompanyType()); }
throw new BadCredentialsException("invalid api-key"); } }
|
根据公司的 is_internal 和 is_platform 列设置授予权限。公司身份
每个用户都与一个公司标签(company.slug)相关联,以识别其所属的公司。
JWT 使用 Keycloak 用户属性。例如,标头为 "back-office "的公司 back-office
这并不理想,因为用户属性无法在领域级别强制执行。不过,有一个实验性的 --features=declarative-user-profile 可以解决这个问题。它应该会在 Keycloak 24 中发布。
ApiKeyDetail 有一个名为 CompanySlug 的字段,可直接保存值。
Auth 交互隐藏在 AuthFacade 之后,以简化数据检索:
@Slf4j @UtilityClass public class AuthFacade {
public static Optional<String> getCompanySlugOptional() { try {
final Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
if (isJWT(authentication)) { return getCompanySlugFromJwt((Jwt) authentication.getPrincipal()); } else if (isApiKey(authentication)) { return getCompanySlugFromApikey(authentication); }
return Optional.empty();
} catch (final Exception ex) { throw new InternalServerErrorException(); } }
|
高级授权
该系统采用基于角色的访问控制(RBAC),使用两种不同的授权机制,虽然远非理想,但相对简单高效。
要获得更精细的授权,可以使用 Keycloak 基于属性的访问控制 (ABAC)。
如果想在 Keycloak 中集中 API 密钥授权,我建议使用资源所有者密码凭据(更容易设置,但在 OAuth 2 中已弃用)或客户端凭据授权(推荐使用)。不过,客户端凭据授权(尚未)完全可扩展。
缓存
缓存对于通过存储和重复使用经常请求的数据来提高性能和缩短响应时间至关重要。它有助于减轻后端服务器的负荷,提高可扩展性,并有助于提供反应更快、更高效的用户体验。
Spring Boot 可让您轻松开始使用缓存抽象;您只需添加以下依赖项即可:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-cache</artifactId> </dependency>
|
它将在应用程序内存中创建一个 ConcurrentMapCacheManager。随着应用程序的增长,强烈建议迁移到像 Redis 这样的专用系统。最重要的是,你不需要修改任何代码!
public interface CompanyRepository extends JpaRepository<Company, Long> {
String CACHE_NAME = "company";
@NonNull @Cacheable(value = CACHE_NAME, key = "{'byId', id}") @Override Optional<Company> findById(@NonNull Long id);
@Cacheable(value = CACHE_NAME, key = "{'bySlug', slug}") Optional<Company> findBySlug(String slug);
@Caching( evict = { @CacheEvict(value = CACHE_NAME, key = "{'byId', entity.id}"), @CacheEvict(value = CACHE_NAME, key = "{'bySlug', entity.slug}"), }) @Override <S extends Company> @NonNull S save(@NonNull S entity);
/* * This cache implementation is only valid if the table is not * frequently updated since it will clear the cache at every update operation * If you want to be more performant you can use something like https://github.com/ms100/cache-as-multi * */ @NonNull @CacheEvict(cacheNames = CACHE_NAME, allEntries = true) @Override <S extends Company> List<S> saveAll(@NonNull Iterable<S> entities);
@Caching( evict = { @CacheEvict(value = CACHE_NAME, key = "{'byId', entity.id}"), @CacheEvict(value = CACHE_NAME, key = "{'bySlug', entity.slug}"), }) @Override void delete(@NonNull Company entity);
/* * This cache implementation is only valid if the table is not * frequently updated since it will clear the cache at every delete operation * If you want to be more performant you can use something like https://github.com/ms100/cache-as-multi * */ @CacheEvict(cacheNames = CACHE_NAME, allEntries = true) @Override void deleteAll(@NonNull Iterable<? extends Company> entities); }
|
它使用 @Cacheable 注解。我个人喜欢使用键的作用域来防止任何碰撞。使用 xAll 操作(如 saveAll 或 deleteAll)时要小心,因为每次调用它都会清除缓存。这可能不适合您的使用情况。
注意:如果您的生产环境有多个实例,您可以添加一个 @Scheduled 函数,以在任何时间间隔清理缓存。如果这不适合您的使用情况,请考虑迁移到集中式缓存实例。
速率限制
通常建议在负载平衡器(和/或 WAF)上使用速率限制器。为安全起见,API 使用 OncePerRequestFilter 实现了默认请求限制器,默认值为每个 IP 地址 50 个请求/秒:
@Slf4j @Component @RequiredArgsConstructor public class RateLimitFilter extends OncePerRequestFilter {
public static final String HEADER_RATE_LIMIT_REMAINING = "X-Rate-Limit-Remaining"; public static final String HEADER_RATE_LIMIT_RETRY_AFTER_SECONDS = "X-Rate-Limit-Retry-After-Milliseconds";
private final DefaultRateLimit defaultRateLimit; private final Map<String, Bucket> cache = new ConcurrentHashMap<>();
@Override protected void doFilterInternal( @NonNull final HttpServletRequest request, @NonNull final HttpServletResponse response, @NonNull final FilterChain filterChain) throws ServletException, IOException {
final Bucket bucket = this.resolveBucket(request); final ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);
if (probe.isConsumed()) { // Comment if you want to hide remaining request. response.addHeader(HEADER_RATE_LIMIT_REMAINING, String.valueOf(probe.getRemainingTokens())); filterChain.doFilter(request, response); } else {
final long waitForRefill = probe.getNanosToWaitForRefill() / 1_000_000;
response.reset(); // Comment if you want to hide remaining time before refill. response.addHeader(HEADER_RATE_LIMIT_RETRY_AFTER_SECONDS, String.valueOf(waitForRefill)); response.setContentType(MediaType.APPLICATION_JSON_VALUE); response.setStatus(TOO_MANY_REQUESTS.value()); } }
private Bucket resolveBucket(final HttpServletRequest request) { final BaseRateLimit rateLimit = this.getRateLimitFor(request.getRequestURI()); // Compute cache based on remote address = IP address return this.cache.computeIfAbsent( request.getRemoteAddr(), s -> Bucket.builder().addLimit(rateLimit.getLimit()).build()); }
private BaseRateLimit getRateLimitFor(final String requestedUri) { // Use a switch case if you want to have different rate limit per uri. return this.defaultRateLimit; } }
|
断路器
断路器是一种软件模式,有助于增强分布式应用程序的系统恢复能力。它能监控操作,并在达到预定义的故障阈值时暂时中断其执行,从而防止潜在的连锁故障,使系统得以恢复。面对瞬时故障,这种模式对于保持系统整体稳定性至关重要。
在 Java 中,您需要非常小心地使用可用线程,因为它们会很快耗尽。阻塞代码(如 HTTP 客户端)会造成很大的负担,尤其是在服务不可用的情况下。因此,您需要某种机制来保护它们。
集成服务就是一个很好的例子--它们接收请求、转换/充实请求,然后等待下游服务的响应。如果下游服务不可用,应用程序就会迅速臃肿并失去响应。
在使用阻塞代码时,反应式编程有助于释放线程,从而大大减少资源使用。然而,实现反应式代码是一项挑战。要生成阻塞式反应代码并不容易。我强烈推荐你使用 BlockHound 测试你的反应式代码,说不定会有意外收获!您可以在该项目中找到相关示例。
Java 21 中的虚拟线程改变了一切。您的线程几乎是无限的,等待时间很长的阻塞代码(什么都不做)不再是问题。
要在 Spring Boot 3.2 中使用虚拟线程,只需设置 spring.threads.virtual.enabled。
不过,如果您的应用程序没有这些选项,您可以在项目中的此链接中找到使用示例。
注:配置断路器可能非常棘手,需要仔细考虑参数和阈值。准确预测其使用情况可能具有挑战性,因为这取决于流量模式和系统动态等各种因素。关键是要持续监控和微调配置,以确保在处理故障时达到最佳性能。
可观测性--度量和跟踪
.度量
Spring Boot 会为您处理所有繁重的工作;您只需添加以下依赖项:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-registry-prometheus</artifactId> </dependency>
|
配置
spring: endpoints: web: exposure: include: info, health, prometheus, metrics
|
在生产过程中,请注意不要暴露您的管理 API(执行器)。建议为应用程序和管理 API 使用不同的端口。默认情况下,项目会为应用程序配置 8080 端口,为管理 API 配置 8081 端口。
跟踪
同样,Spring Boot 会为您处理所有繁重的工作;您只需添加以下依赖项即可:
<dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-tracing-bridge-otel</artifactId> </dependency> <dependency> <groupId>net.ttddyy.observation</groupId> <artifactId>datasource-micrometer-spring-boot</artifactId> <version>${datasource-micrometer.version}</version> </dependency> <dependency> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-exporter-otlp</artifactId> <version>${opentelemetry-exporter-otlp.version}</version> </dependency>
|
每个同步请求都会自动使用 traceId 和 spanId 进行装饰。要装饰异步请求,请在配置中添加 spring.reactor.context-propagation=true。
还有一些配置需要为特殊组件添加,例如
- WebClient -> 需要使用 WebClient.Builder 构建客户端。
- RabbitMQ -> 需要使用 factory.setObservationEnable(true) 创建工厂。
- @Async ->(使用虚拟线程时)需要使用 taskExecutor.setTaskDecorator(new ContextPropagatingTaskDecorator() 创建 SimpleAsyncTaskExecutor。
要进行本地测试,可以使用 Otel Desktop Viewer并将应用程序属性更新为
management: tracing: sampling: probability: 1 otlp: tracing: endpoint: http://localhost:4317
|
在生产中,您通常希望将这些跟踪信息发送到分布式后端,如 Jaeger 或 Tempo。
跟踪是调试请求的好帮手。您最终总是要浏览日志系统,通过 traceId 筛选出与问题请求相对应的所有日志。
指标与跟踪 - 示例
您还可以使用exemplars 将指标与跟踪关联起来。您只需设置以下配置:
management: metrics: distribution: percentiles-histogram: http: server: requests: true 它将关联您的指标:
|
# TYPE http_server_requests_seconds histogram # HELP http_server_requests_seconds Duration of HTTP server request handling http_server_requests_seconds_bucket{application="app",exception="None",method="GET",outcome="SUCCESS",status="200",uri="/",le="0.002796201"} 1.0 # {span_id="55255da260e873d9",trace_id="21933703cb442151b1cef583714eb42e"} 0.002745959 1665676383.654
|
使用 Prometheus 时,需要添加 - enable-feature=exemplar-storage 标记。
集成测试
应用程序接口的集成测试对于确保各种组件和服务无缝协作、模拟真实世界场景至关重要。它有助于识别和解决与系统不同部分之间的交互有关的问题,确保应用程序接口在集成环境中按预期运行
有许多不同的策略;有些人喜欢集成测试,有些人则偏爱单元测试或端到端(e2e)测试。我个人认为,单元测试对于实际应用来说是一种轻量级方法。架构良好的集成测试既高效又相对快速,在错误到达生产环境之前就能更有效地发现它们。
Spring Boot 可以轻松共享测试上下文,从而显著提高性能,同时将缺点控制在相对较低的水平。这就是所有集成测试类都扩展 BaseIntegrationTest 的原因:
@ActiveProfiles("test") @AutoConfigureMockMvc @TestInstance(Lifecycle.PER_CLASS) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public abstract class BaseIntegrationTest {
@Container @ServiceConnection public static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:15-alpine");
@Container public static RabbitMQContainer rabbit = new RabbitMQContainer("rabbitmq:3.12.4");
static { setRabbitConfig(rabbit); Startables.deepStart(postgres, rabbit).join(); }
@Autowired public MockMvc mockMvc;
@DynamicPropertySource static void applicationProperties(final DynamicPropertyRegistry registry) { registry.add("rabbitmq.host", rabbit::getHost); registry.add("rabbitmq.port", rabbit::getAmqpPort); registry.add("rabbitmq.username", () -> "user"); registry.add("rabbitmq.password", () -> "password"); }
private static void setRabbitConfig(final RabbitMQContainer rabbit) { rabbit.withCopyFileToContainer( MountableFile.forHostPath(getRabbitDefinition()), "/etc/rabbitmq/definitions.json"); rabbit.withCopyFileToContainer( MountableFile.forHostPath(getRabbitConfig()), "/etc/rabbitmq/rabbitmq.conf"); } }
|
通过使用通用上下文,PostgreSQL 和 RabbitMQ 将在测试执行之间共享。它会自动配置 MockMvc,以便轻松测试 API。通过这种实现方式,添加一个测试的边际成本相对较低,通常在 5 到 25 毫秒之间。
RabbitMQ
RabbitMQ 是一种消息代理软件,可通过异步交换数据促进分布式系统之间的通信。它解决了系统中组件解耦的难题,使它们能够在不直接连接的情况下进行高效通信。RabbitMQ 通过管理一个系统不同部分之间或不同系统之间的消息流,确保无缝通信和协调,从而增强了分布式架构的可扩展性、可靠性和灵活性。
在当今的体系结构中,消息中介几乎是不可避免的。RabbitMQ 和 Kafka 是两种广泛使用的开源技术,后者的管理难度明显更高。
下面代码实现了 1 个发布者和 1 个订阅者的标准配置:
@Slf4j @Configuration public class RabbitConfig {
public static final String RABBIT_ASYNC_EVENT_LISTENER_FACTORY = "AsyncEventListener"; public static final String RABBIT_EVENT_PUBLISHER = "EventPublisher";
@Value("${rabbitmq.host}") private String host;
@Value("${rabbitmq.port}") private int port;
@Value("${rabbitmq.username}") private String username;
@Value("${rabbitmq.password}") private String password;
@Value("${rabbitmq.listeners.event.prefetch-count}") private Integer prefetchCount;
private ConnectionFactory connectionFactory(final String connectionName) { final CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setConnectionNameStrategy(conn -> connectionName);
connectionFactory.setHost(this.host); connectionFactory.setPort(this.port); connectionFactory.setUsername(this.username); connectionFactory.setPassword(this.password);
return connectionFactory; }
@Bean(name = RABBIT_ASYNC_EVENT_LISTENER_FACTORY) public DirectRabbitListenerContainerFactory eventListenerFactory() { final DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory(); factory.setConnectionFactory(this.connectionFactory("api-event-listener")); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setObservationEnabled(true); factory.setAutoStartup(false); // started at ApplicationReadyEvent
// Needed for listener using Mono<> https://docs.spring.io/spring-amqp/docs/current/reference/html/async-listeners factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); factory.setDefaultRequeueRejected(false); factory.setPrefetchCount(this.prefetchCount); return factory; }
@Bean(name = RABBIT_EVENT_PUBLISHER) public RabbitTemplate rabbitTemplate() { final RabbitTemplate factory = new RabbitTemplate(this.connectionFactory("api-event-publisher")); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setObservationEnabled(true); factory.setRetryTemplate(RetryTemplate.defaultInstance());
return factory; } }
|
通常要设置预取次数,以防止应用程序超载。我们还启用了在发布者和接收者之间传播跟踪的观察功能。它将在信息中添加跟踪头。 注意:根据您的 RabbitMQ 配置策略,您可能需要在您的应用程序中显式创建配置。我个人喜欢在 RabbitMQ 中集中管理配置,以防止错误配置或配置漂移。