Event-Sourcing和CQRS实战案例

任何试图实现一个完全符合标准的ACID系统的人都知道:你必须顾及很多方面。需要确保数据库实体在事务ACID前提下可以自由创建、修改和删除,而不会有错误风险,在大多数情况下,这种围绕数据库的CRUD总是会有性能方面的损失。这里介绍另外一个方法,设计的基础是基于一系列事件而不是基于数据库这样的可变状态系统。这种方式通常被称为事件溯源Event Sourcing。

本文介绍使用基于Speedment快速开发一个可扩展的Event-Sourced数据库应用,完整代码见这里

什么是事件溯源?
在一个典型的关系数据库系统中,您将实体的状态存储为数据库中的一行。当状态更改时,应用程序使用更新或删除语句修改该行。这种方法的问题是,它增加了对数据库技术的很多要求,要确保没有记录行被改变到一个非法的状态。

在一个事件来源event-sourced系统中,我们采取了一种不同的方法。不是存储实体状态在数据库中,存储的是一系列改变状态的操作,即事件,事件是状态改变的原因。一个事件一旦被创建是不可变的,这意味着你只需要实现两个操作,CREAT创建和读取READ。如果一个实体被更新或删除,那是创建一个“UPDATE更新”或“REMOVE删除”事件来实现。

一个事件源系统可以很容易地扩展提高性能,因为任何一个节点都可以简单地下载事件日志和重播当前状态。你也得到更好的性能,由于这样的事实,写作和查询是由不同的机器完成,这被称为cqrs(命令查询职责分离)。

案例:预订桑拿房
为了展示建立事件源系统的工作流程,我们将创建一个小应用程序来处理一个共享桑拿房的预订。我们有多个租户感兴趣预订桑拿浴室,但我们需要保证,害羞的住户从来不会意外地预订两次;我们也希望在同一系统中支持多个桑拿。

用数据库来简化通信,我们将使用speedment工具包。speedment是java的工具,使我们能够从数据库中生成一个完整的领域模型,并可以很容易地使用java 8流实现数据库查询优化。

第一步:定义数据库Schema

第一步是要确定我们的数据库(MySQL)。我们只有一个表称为“booking”,存储预订桑拿相关的事件。请注意,预订是一个事件,而不是一个实体。如果我们想取消预订或更改它,我们将不得不发布新的事件作为新的记录插入,而不是修改现有记录行。我们不允许修改或删除已插入的记录行。


CREATE DATABASE `sauna`;

CREATE TABLE `sauna`.`booking` (
`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
`booking_id` BIGINT NOT NULL,
`event_type` ENUM('CREATE', 'UPDATE', 'DELETE') NOT NULL,
`tenant` INT NULL,
`sauna` INT NULL,
`booked_from` DATE NULL,
`booked_to` DATE NULL,
PRIMARY KEY (`id`)
);

“id”列是一个不断增加的整数,每次都会自动分配一个新的事件被发布到日志中。“booking_id”指的是预订。如果两个事件共享相同的预订标识,他们将指向同一个实体。我们也有一个枚举类型称为“event_type”描述我们想做的哪种操作。之后列是有关预订的信息。如果一列是null,我们会认为是与以前的值相比没有修改过。

第二步:使用Speedment产生代码
下一步是使用speedment项目生成代码。简单的创建一个新的Maven项目,将下面的代码添加到pom.xml-file。


<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<speedment.version>3.0.0-EA2</speedment.version>
<mysql.version>5.1.39</mysql.version>
</properties>

<build>
<plugins>
<plugin>
<groupId>com.speedment</groupId>
<artifactId>speedment-maven-plugin</artifactId>
<version>${speedment.version}</version>

<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>

<dependency>
<groupId>com.speedment</groupId>
<artifactId>runtime</artifactId>
<version>${speedment.version}</version>
<type>pom</type>
</dependency>
</dependencies>

如果你构建build这个项目,一个新的Maven目标称为speedment:tool应该出现在IDE中。运行它,加载speedment推出的用户界面。在那里,连接到Sauna数据库,并使用默认设置生成代码。该项目现在应该存在了自动生成的源文件。

第三步:创建物化视图Materialized View
物化视图是定期轮询数据库的一个组件,以查看是否已添加了新的记录行,如果有的话,下载并将它们以正确的顺序合并到视图中。由于轮询有时需要花很多时间,我们希望这个进程在一个单独的线程中运行。我们可以用一个java Timer和TimerTask实现。

轮询数据库?真的?嗯,要考虑的一个重要的事情是,它只是轮询数据库的服务器,而不是客户端。这给我们提供了很好的可扩展性,因为我们可以有几个服务器轮询数据库,反过来又为成千上万的租户提供服务。

见下:


BookingView.java


public final class BookingView {

...

public static BookingView create(BookingManager mgr) {
final AtomicBoolean working = new AtomicBoolean(false);
final AtomicLong last = new AtomicLong();
final AtomicLong total = new AtomicLong();

final String table = mgr.getTableIdentifier().getTableName();
final String field = Booking.ID.identifier().getColumnName();

final Timer timer = new Timer();
final BookingView view = new BookingView(timer);
final TimerTask task = ...;

timer.scheduleAtFixedRate(task, 0, UPDATE_EVERY);
return view;
}
}

定时器任务被匿名定义,轮询逻辑驻留其中:

final TimerTask task = new TimerTask() {
@Override
public void run() {
boolean first = true;

// Make sure no previous task is already inside this block.
if (working.compareAndSet(false, true)) {
try {

// Loop until no events was merged
// (the database is up to date).
while (true) {

// Get a list of up to 25 events that has not yet
// been merged into the materialized object view.
final List added = unmodifiableList(
mgr.stream()
.filter(Booking.ID.greaterThan(last.get()))
.sorted(Booking.ID.comparator())
.limit(MAX_BATCH_SIZE)
.collect(toList())
);

if (added.isEmpty()) {
if (!first) {
System.out.format(
"%s: View is up to date. A total of " +
"%d rows have been loaded.%n",
System.identityHashCode(last),
total.get()
);
}

break;
} else {
final Booking lastEntity =
added.get(added.size() - 1);

last.set(lastEntity.getId());
added.forEach(view::accept);
total.addAndGet(added.size());

System.out.format(
"%s: Downloaded %d row(s) from %s. " +
"Latest %s: %d.%n",
System.identityHashCode(last),
added.size(),
table,
field,
Long.parseLong(
"" + last.get())
);
}

first = false;
}

// Release this resource once we exit this block.
} finally {
working.set(false);
}
}
}
};

有时合并任务可以比比定时器的间隔执行会花更多的时间。为了避免这一问题,我们用一个atomicboolean检查并确保只有一个任务可以同时执行。这类似Semaphore。

构造函数和基本成员方法是相当容易实现的。我们将计时器传递给类作为构造函数中的一个参数,以便我们可以取消那个计时器,如果我们需要停止。我们还存储了一个map,保存的是内存中所有预订的当前视图。


private final static int MAX_BATCH_SIZE = 25;
private final static int UPDATE_EVERY = 1_000; // Milliseconds

private final Timer timer;
private final Map<Long, Booking> bookings;

private BookingView(Timer timer) {
this.timer = requireNonNull(timer);
this.bookings = new ConcurrentHashMap<>();
}

public Stream<Booking> stream() {
return bookings.values().stream();
}

public void stop() {
timer.cancel();
}

最后一个的bookingview类方法accept()。这是用于新的事件被合并到视图中。


private boolean accept(Booking ev) {
final String type = ev.getEventType();

// If this was a creation event
switch (type) {
case
"CREATE" :
// Creation events must contain all information.
if (!ev.getSauna().isPresent()
|| !ev.getTenant().isPresent()
|| !ev.getBookedFrom().isPresent()
|| !ev.getBookedTo().isPresent()
|| !checkIfAllowed(ev)) {
return false;
}

// If something is already mapped to that key, refuse the
// event.
return bookings.putIfAbsent(ev.getBookingId(), ev) == null;

case
"UPDATE" :
// Create a copy of the current state
final Booking existing = bookings.get(ev.getBookingId());

// If the specified key did not exist, refuse the event.
if (existing != null) {
final Booking proposed = new BookingImpl();
proposed.setId(existing.getId());

// Update non-null values
proposed.setSauna(ev.getSauna().orElse(
unwrap(existing.getSauna())
));
proposed.setTenant(ev.getTenant().orElse(
unwrap(existing.getTenant())
));
proposed.setBookedFrom(ev.getBookedFrom().orElse(
unwrap(existing.getBookedFrom())
));
proposed.setBookedTo(ev.getBookedTo().orElse(
unwrap(existing.getBookedTo())
));

// Make sure these changes are allowed.
if (checkIfAllowed(proposed)) {
bookings.put(ev.getBookingId(), proposed);
return true;
}
}

return false;


case
"DELETE" :
// Remove the event if it exists, else refuse the event.
return bookings.remove(ev.getBookingId()) != null;

default :
System.out.format(
"Unexpected type '%s' was refused.%n", type);
return false;
}
}

在一个事件源系统中,当事件被接收到,但是没有被物化时,规则就没有被执行。因此,任何人都可以插入新的事件到系统中,只要他们插入表尾部。

第四步:使用方式
在这个例子中,我们将使用标准的speedment API插入三个新订单到数据库。然后,我们将等待查看更新和打印每一个预订。


public static void main(String... params) {
final SaunaApplication app = new SaunaApplicationBuilder()
.withPassword("password")
.build();

final BookingManager bookings =
app.getOrThrow(BookingManager.class);

final SecureRandom rand = new SecureRandom();
rand.setSeed(System.currentTimeMillis());

// Insert three new bookings into the system.
bookings.persist(
new BookingImpl()
.setBookingId(rand.nextLong())
.setEventType(
"CREATE")
.setSauna(1)
.setTenant(1)
.setBookedFrom(Date.valueOf(LocalDate.now().plus(3, DAYS)))
.setBookedTo(Date.valueOf(LocalDate.now().plus(5, DAYS)))
);

bookings.persist(
new BookingImpl()
.setBookingId(rand.nextLong())
.setEventType(
"CREATE")
.setSauna(1)
.setTenant(2)
.setBookedFrom(Date.valueOf(LocalDate.now().plus(1, DAYS)))
.setBookedTo(Date.valueOf(LocalDate.now().plus(2, DAYS)))
);

bookings.persist(
new BookingImpl()
.setBookingId(rand.nextLong())
.setEventType(
"CREATE")
.setSauna(1)
.setTenant(3)
.setBookedFrom(Date.valueOf(LocalDate.now().plus(2, DAYS)))
.setBookedTo(Date.valueOf(LocalDate.now().plus(7, DAYS)))
);

final BookingView view = BookingView.create(bookings);

// Wait until the view is up-to-date.
try { Thread.sleep(5_000); }
catch (final InterruptedException ex) {
throw new RuntimeException(ex);
}

System.out.println(
"Current Bookings for Sauna 1:");
final SimpleDateFormat dt = new SimpleDateFormat(
"yyyy-MM-dd");
final Date now = Date.valueOf(LocalDate.now());
view.stream()
.filter(Booking.SAUNA.equal(1))
.filter(Booking.BOOKED_TO.greaterOrEqual(now))
.sorted(Booking.BOOKED_FROM.comparator())
.map(b -> String.format(
"Booked from %s to %s by Tenant %d.",
dt.format(b.getBookedFrom().get()),
dt.format(b.getBookedTo().get()),
b.getTenant().getAsInt()
))
.forEachOrdered(System.out::println);

System.out.println(
"No more bookings!");
view.stop();
}

运行得到如下结果:
677772350: Downloaded 3 row(s) from booking. Latest id: 3.
677772350: View is up to date. A total of 3 rows have been loaded.
Current Bookings for Sauna 1:
Booked from 2016-10-11 to 2016-10-12 by Tenant 2.
Booked from 2016-10-13 to 2016-10-15 by Tenant 1.
No more bookings!

完整代码见:Github

在这篇文章中,我们已经开发了一个物化视图系统而不是普通数据库表系统,物化视图会计算事件,但是不是在事件插入时进行计算,这使得它可以扩展到应用程序的多个实例,而不必担心它们之间同步,因为它们最终会是一致的。然后展示了使用speedment API如何通过物化视图完成查询,生成一个当前预订查询列表。

Event-Sourcing and CQRS in Practise | Java Code Ge