Event-Sourcing和CQRS实战案例
本文介绍使用基于Speedment快速开发一个可扩展的Event-Sourced数据库应用,完整代码见这里
什么是事件溯源?
在一个典型的关系数据库系统中,您将实体的状态存储为数据库中的一行。当状态更改时,应用程序使用更新或删除语句修改该行。这种方法的问题是,它增加了对数据库技术的很多要求,要确保没有记录行被改变到一个非法的状态。
在一个事件来源event-sourced系统中,我们采取了一种不同的方法。不是存储实体状态在数据库中,存储的是一系列改变状态的操作,即事件,事件是状态改变的原因。一个事件一旦被创建是不可变的,这意味着你只需要实现两个操作,CREAT创建和读取READ。如果一个实体被更新或删除,那是创建一个“UPDATE更新”或“REMOVE删除”事件来实现。
一个事件源系统可以很容易地扩展提高性能,因为任何一个节点都可以简单地下载事件日志和重播当前状态。你也得到更好的性能,由于这样的事实,写作和查询是由不同的机器完成,这被称为CQRS(命令查询职责分离)。
案例:预订桑拿房
为了展示建立事件源系统的工作流程,我们将创建一个小应用程序来处理一个共享桑拿房的预订。我们有多个租户感兴趣预订桑拿浴室,但我们需要保证,害羞的住户从来不会意外地预订两次;我们也希望在同一系统中支持多个桑拿。
用数据库来简化通信,我们将使用speedment工具包。speedment是java的工具,使我们能够从数据库中生成一个完整的领域模型,并可以很容易地使用java 8流实现数据库查询优化。
第一步:定义数据库Schema
第一步是要确定我们的数据库(MySQL)。我们只有一个表称为“booking”,存储预订桑拿相关的事件。请注意,预订是一个事件,而不是一个实体。如果我们想取消预订或更改它,我们将不得不发布新的事件作为新的记录插入,而不是修改现有记录行。我们不允许修改或删除已插入的记录行。
CREATE DATABASE `sauna`; CREATE TABLE `sauna`.`booking` ( `id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, `booking_id` BIGINT NOT NULL, `event_type` ENUM('CREATE', 'UPDATE', 'DELETE') NOT NULL, `tenant` INT NULL, `sauna` INT NULL, `booked_from` DATE NULL, `booked_to` DATE NULL, PRIMARY KEY (`id`) ); <p class="indent"> |
“id”列是一个不断增加的整数,每次都会自动分配一个新的事件被发布到日志中。“booking_id”指的是预订。如果两个事件共享相同的预订标识,他们将指向同一个实体。我们也有一个枚举类型称为“event_type”描述我们想做的哪种操作。之后列是有关预订的信息。如果一列是null,我们会认为是与以前的值相比没有修改过。
第二步:使用Speedment产生代码
下一步是使用speedment项目生成代码。简单的创建一个新的Maven项目,将下面的代码添加到pom.xml-file。
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <speedment.version>3.0.0-EA2</speedment.version> <mysql.version>5.1.39</mysql.version> </properties> <build> <plugins> <plugin> <groupId>com.speedment</groupId> <artifactId>speedment-maven-plugin</artifactId> <version>${speedment.version}</version> <dependencies> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> </dependencies> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <dependency> <groupId>com.speedment</groupId> <artifactId>runtime</artifactId> <version>${speedment.version}</version> <type>pom</type> </dependency> </dependencies> <p class="indent"> |
如果你构建build这个项目,一个新的Maven目标称为speedment:tool应该出现在IDE中。运行它,加载speedment推出的用户界面。在那里,连接到Sauna数据库,并使用默认设置生成代码。该项目现在应该存在了自动生成的源文件。
第三步:创建物化视图Materialized View
物化视图是定期轮询数据库的一个组件,以查看是否已添加了新的记录行,如果有的话,下载并将它们以正确的顺序合并到视图中。由于轮询有时需要花很多时间,我们希望这个进程在一个单独的线程中运行。我们可以用一个java Timer和TimerTask实现。
轮询数据库?真的?嗯,要考虑的一个重要的事情是,它只是轮询数据库的服务器,而不是客户端。这给我们提供了很好的可扩展性,因为我们可以有几个服务器轮询数据库,反过来又为成千上万的租户提供服务。
见下:
BookingView.java
public final class BookingView { ... public static BookingView create(BookingManager mgr) { final AtomicBoolean working = new AtomicBoolean(false); final AtomicLong last = new AtomicLong(); final AtomicLong total = new AtomicLong(); final String table = mgr.getTableIdentifier().getTableName(); final String field = Booking.ID.identifier().getColumnName(); final Timer timer = new Timer(); final BookingView view = new BookingView(timer); final TimerTask task = ...; timer.scheduleAtFixedRate(task, 0, UPDATE_EVERY); return view; } } <p class="indent"> |
定时器任务被匿名定义,轮询逻辑驻留其中:
final TimerTask task = new TimerTask() { @Override public void run() { boolean first = true; // Make sure no previous task is already inside this block. if (working.compareAndSet(false, true)) { try { // Loop until no events was merged // (the database is up to date). while (true) { // Get a list of up to 25 events that has not yet // been merged into the materialized object view. final List added = unmodifiableList( mgr.stream() .filter(Booking.ID.greaterThan(last.get())) .sorted(Booking.ID.comparator()) .limit(MAX_BATCH_SIZE) .collect(toList()) ); if (added.isEmpty()) { if (!first) { System.out.format( "%s: View is up to date. A total of " + "%d rows have been loaded.%n", System.identityHashCode(last), total.get() ); } break; } else { final Booking lastEntity = added.get(added.size() - 1); last.set(lastEntity.getId()); added.forEach(view::accept); total.addAndGet(added.size()); System.out.format( "%s: Downloaded %d row(s) from %s. " + "Latest %s: %d.%n", System.identityHashCode(last), added.size(), table, field, Long.parseLong("" + last.get()) ); } first = false; } // Release this resource once we exit this block. } finally { working.set(false); } } } }; <p class="indent"> |
有时合并任务可以比比定时器的间隔执行会花更多的时间。为了避免这一问题,我们用一个atomicboolean检查并确保只有一个任务可以同时执行。这类似Semaphore。
构造函数和基本成员方法是相当容易实现的。我们将计时器传递给类作为构造函数中的一个参数,以便我们可以取消那个计时器,如果我们需要停止。我们还存储了一个map,保存的是内存中所有预订的当前视图。
private final static int MAX_BATCH_SIZE = 25; private final static int UPDATE_EVERY = 1_000; // Milliseconds private final Timer timer; private final Map<Long, Booking> bookings; private BookingView(Timer timer) { this.timer = requireNonNull(timer); this.bookings = new ConcurrentHashMap<>(); } public Stream<Booking> stream() { return bookings.values().stream(); } public void stop() { timer.cancel(); } <p class="indent"> |
最后一个的bookingview类方法accept()。这是用于新的事件被合并到视图中。
private boolean accept(Booking ev) { final String type = ev.getEventType(); // If this was a creation event switch (type) { case "CREATE" : // Creation events must contain all information. if (!ev.getSauna().isPresent() || !ev.getTenant().isPresent() || !ev.getBookedFrom().isPresent() || !ev.getBookedTo().isPresent() || !checkIfAllowed(ev)) { return false; } // If something is already mapped to that key, refuse the // event. return bookings.putIfAbsent(ev.getBookingId(), ev) == null; case "UPDATE" : // Create a copy of the current state final Booking existing = bookings.get(ev.getBookingId()); // If the specified key did not exist, refuse the event. if (existing != null) { final Booking proposed = new BookingImpl(); proposed.setId(existing.getId()); // Update non-null values proposed.setSauna(ev.getSauna().orElse( unwrap(existing.getSauna()) )); proposed.setTenant(ev.getTenant().orElse( unwrap(existing.getTenant()) )); proposed.setBookedFrom(ev.getBookedFrom().orElse( unwrap(existing.getBookedFrom()) )); proposed.setBookedTo(ev.getBookedTo().orElse( unwrap(existing.getBookedTo()) )); // Make sure these changes are allowed. if (checkIfAllowed(proposed)) { bookings.put(ev.getBookingId(), proposed); return true; } } return false; case "DELETE" : // Remove the event if it exists, else refuse the event. return bookings.remove(ev.getBookingId()) != null; default : System.out.format( "Unexpected type '%s' was refused.%n", type); return false; } } <p class="indent"> |
在一个事件源系统中,当事件被接收到,但是没有被物化时,规则就没有被执行。因此,任何人都可以插入新的事件到系统中,只要他们插入表尾部。
在这个例子中,我们将使用标准的speedment API插入三个新订单到数据库。然后,我们将等待查看更新和打印每一个预订。
public static void main(String... params) { final SaunaApplication app = new SaunaApplicationBuilder() .withPassword("password") .build(); final BookingManager bookings = app.getOrThrow(BookingManager.class); final SecureRandom rand = new SecureRandom(); rand.setSeed(System.currentTimeMillis()); // Insert three new bookings into the system. bookings.persist( new BookingImpl() .setBookingId(rand.nextLong()) .setEventType("CREATE") .setSauna(1) .setTenant(1) .setBookedFrom(Date.valueOf(LocalDate.now().plus(3, DAYS))) .setBookedTo(Date.valueOf(LocalDate.now().plus(5, DAYS))) ); bookings.persist( new BookingImpl() .setBookingId(rand.nextLong()) .setEventType("CREATE") .setSauna(1) .setTenant(2) .setBookedFrom(Date.valueOf(LocalDate.now().plus(1, DAYS))) .setBookedTo(Date.valueOf(LocalDate.now().plus(2, DAYS))) ); bookings.persist( new BookingImpl() .setBookingId(rand.nextLong()) .setEventType("CREATE") .setSauna(1) .setTenant(3) .setBookedFrom(Date.valueOf(LocalDate.now().plus(2, DAYS))) .setBookedTo(Date.valueOf(LocalDate.now().plus(7, DAYS))) ); final BookingView view = BookingView.create(bookings); // Wait until the view is up-to-date. try { Thread.sleep(5_000); } catch (final InterruptedException ex) { throw new RuntimeException(ex); } System.out.println("Current Bookings for Sauna 1:"); final SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd"); final Date now = Date.valueOf(LocalDate.now()); view.stream() .filter(Booking.SAUNA.equal(1)) .filter(Booking.BOOKED_TO.greaterOrEqual(now)) .sorted(Booking.BOOKED_FROM.comparator()) .map(b -> String.format( "Booked from %s to %s by Tenant %d.", dt.format(b.getBookedFrom().get()), dt.format(b.getBookedTo().get()), b.getTenant().getAsInt() )) .forEachOrdered(System.out::println); System.out.println("No more bookings!"); view.stop(); } <p class="indent"> |
运行得到如下结果:
677772350: Downloaded 3 row(s) from booking. Latest id: 3.
677772350: View is up to date. A total of 3 rows have been loaded.
Current Bookings for Sauna 1:
Booked from 2016-10-11 to 2016-10-12 by Tenant 2.
Booked from 2016-10-13 to 2016-10-15 by Tenant 1.
No more bookings!
完整代码见:Github
在这篇文章中,我们已经开发了一个物化视图系统而不是普通数据库表系统,物化视图会计算事件,但是不是在事件插入时进行计算,这使得它可以扩展到应用程序的多个实例,而不必担心它们之间同步,因为它们最终会是一致的。然后展示了使用speedment API如何通过物化视图完成查询,生成一个当前预订查询列表。
Event-Sourcing and CQRS in Practise | Java Code Ge