点击标题进入项目,CommandHandler代码
@Profile("command") @Aggregate public class Bike {
@AggregateIdentifier private String bikeId;
private boolean isAvailable;
public Bike() { }
@CommandHandler public Bike(RegisterBikeCommand command) { apply(new BikeRegisteredEvent(command.getBikeId(), command.getLocation())); }
@CommandHandler public void handle(RentBikeCommand command) { if (!this.isAvailable) { throw new IllegalStateException("Bike is already rented"); } apply(new BikeRentedEvent(command.getBikeId(), command.getRenter())); }
@CommandHandler public void handle(ReturnBikeCommand command) { if (this.isAvailable) { throw new IllegalStateException("Bike was already returned"); } apply(new BikeReturnedEvent(command.getBikeId(), command.getLocation())); }
@EventSourcingHandler protected void handle(BikeRegisteredEvent event) { this.bikeId = event.getBikeId(); this.isAvailable = true; }
@EventSourcingHandler protected void handle(BikeReturnedEvent event) { this.isAvailable = true; }
@EventSourcingHandler protected void handle(BikeRentedEvent event) { this.isAvailable = false; } }
|
用于EventSourcing的历史实体:
@Profile("history") @Entity public class BikeHistory {
@Id @GeneratedValue private Long id;
private String bikeId; private String description; private String timestamp;
public BikeHistory() { }
public BikeHistory(String bikeId, Instant timestamp, String description) { this.bikeId = bikeId; this.timestamp = timestamp.toString(); this.description = description; }
public String getBikeId() { return bikeId; }
public String getDescription() { return description; }
public String getTimestamp() { return timestamp; } }
|
领域历史事件重播到当前状态:
@Profile("history") @Component public class BikeHistoryProjection {
private final BikeHistoryRepository bikeHistoryRepository; private final QueryUpdateEmitter updateEmitter;
public BikeHistoryProjection(BikeHistoryRepository bikeHistoryRepository, QueryUpdateEmitter updateEmitter) { this.bikeHistoryRepository = bikeHistoryRepository; this.updateEmitter = updateEmitter; }
@EventHandler public void handle(BikeRegisteredEvent event, @Timestamp Instant timestamp) { bikeHistoryRepository.save(new BikeHistory(event.getBikeId(), timestamp, "Bike registered in " + event.getLocation())); }
@EventHandler public void handle(BikeRentedEvent event, @Timestamp Instant timestamp) { BikeHistory newEntry = new BikeHistory(event.getBikeId(), timestamp, "Bike rented out to " + event.getRenter()); bikeHistoryRepository.save(newEntry);
updateEmitter.emit(m -> "locationHistory".equals(m.getQueryName()) && newEntry.getBikeId().equals(m.getPayload()), newEntry); }
@EventHandler public void handle(BikeReturnedEvent event, @Timestamp Instant timestamp) { BikeHistory newEntry = new BikeHistory(event.getBikeId(), timestamp, "Bike returned in " + event.getLocation()); bikeHistoryRepository.save(newEntry);
updateEmitter.emit(m -> "locationHistory".equals(m.getQueryName()) && newEntry.getBikeId().equals(m.getPayload()), newEntry); }
@QueryHandler(queryName = "locationHistory") public List<BikeHistory> findMovements(String bikeId) { return bikeHistoryRepository.findByBikeIdOrderById(bikeId); }
}
|
领域事件历史使用Spring JPA持久:
@Profile("history") public interface BikeHistoryRepository extends JpaRepository<BikeHistory, Long> {
List<BikeHistory> findByBikeIdOrderById(String bikeId);
}
|