Spring IO 2019大会上Axon+Spring的事件驱动微服务和CQRS源码项目


点击标题进入项目,CommandHandler代码


@Profile("command")
@Aggregate
public class Bike {

    @AggregateIdentifier
    private String bikeId;

    private boolean isAvailable;

    public Bike() {
    }

    @CommandHandler
    public Bike(RegisterBikeCommand command) {
        apply(new BikeRegisteredEvent(command.getBikeId(), command.getLocation()));
    }

    @CommandHandler
    public void handle(RentBikeCommand command) {
        if (!this.isAvailable) {
            throw new IllegalStateException(
"Bike is already rented");
        }
        apply(new BikeRentedEvent(command.getBikeId(), command.getRenter()));
    }

    @CommandHandler
    public void handle(ReturnBikeCommand command) {
        if (this.isAvailable) {
            throw new IllegalStateException(
"Bike was already returned");
        }
        apply(new BikeReturnedEvent(command.getBikeId(), command.getLocation()));
    }

    @EventSourcingHandler
    protected void handle(BikeRegisteredEvent event) {
        this.bikeId = event.getBikeId();
        this.isAvailable = true;
    }

    @EventSourcingHandler
    protected void handle(BikeReturnedEvent event) {
        this.isAvailable = true;
    }

    @EventSourcingHandler
    protected void handle(BikeRentedEvent event) {
        this.isAvailable = false;
    }
}

用于EventSourcing的历史实体:


@Profile("history")
@Entity
public class BikeHistory {

    @Id
    @GeneratedValue
    private Long id;

    private String bikeId;
    private String description;
    private String timestamp;

    public BikeHistory() {
    }

    public BikeHistory(String bikeId, Instant timestamp, String description) {
        this.bikeId = bikeId;
        this.timestamp = timestamp.toString();
        this.description = description;
    }

    public String getBikeId() {
        return bikeId;
    }

    public String getDescription() {
        return description;
    }

    public String getTimestamp() {
        return timestamp;
    }
}

领域历史事件重播到当前状态:


@Profile("history")
@Component
public class BikeHistoryProjection {

    private final BikeHistoryRepository bikeHistoryRepository;
    private final QueryUpdateEmitter updateEmitter;

    public BikeHistoryProjection(BikeHistoryRepository bikeHistoryRepository, QueryUpdateEmitter updateEmitter) {
        this.bikeHistoryRepository = bikeHistoryRepository;
        this.updateEmitter = updateEmitter;
    }

    @EventHandler
    public void handle(BikeRegisteredEvent event, @Timestamp Instant timestamp) {
        bikeHistoryRepository.save(new BikeHistory(event.getBikeId(), timestamp,
"Bike registered in " + event.getLocation()));
    }

    @EventHandler
    public void handle(BikeRentedEvent event, @Timestamp Instant timestamp) {
        BikeHistory newEntry = new BikeHistory(event.getBikeId(), timestamp,
"Bike rented out to " + event.getRenter());
        bikeHistoryRepository.save(newEntry);

        updateEmitter.emit(m ->
"locationHistory".equals(m.getQueryName())
                                   && newEntry.getBikeId().equals(m.getPayload()),
                           newEntry);
    }

    @EventHandler
    public void handle(BikeReturnedEvent event, @Timestamp Instant timestamp) {
        BikeHistory newEntry = new BikeHistory(event.getBikeId(), timestamp,
"Bike returned in " + event.getLocation());
        bikeHistoryRepository.save(newEntry);

        updateEmitter.emit(m ->
"locationHistory".equals(m.getQueryName())
                                   && newEntry.getBikeId().equals(m.getPayload()),
                           newEntry);
    }

    @QueryHandler(queryName =
"locationHistory")
    public List<BikeHistory> findMovements(String bikeId) {
        return bikeHistoryRepository.findByBikeIdOrderById(bikeId);
    }

}

领域事件历史使用Spring JPA持久:


@Profile("history")
public interface BikeHistoryRepository extends JpaRepository<BikeHistory, Long> {

    List<BikeHistory> findByBikeIdOrderById(String bikeId);

}