用数据结构解释事件溯源 – {4Comprehension}


在本系列中,我们将通过实现假设数据结构的PoC(基于事件的列表),重新审视事件源的概念,然后在后续文章中通过使其并发且对内存友好的方式进一步改进事件源的概念。

事件溯源
多年来,我们已经习惯了这样一个事实,即大多数业务应用程序将状态保存在某些外部存储中,这在确保可审计性或重建过去的状态时通常会产生额外的工作。但是,如果我们放弃了需要存储状态的假设前提,该怎么办?
事件溯源是一个概念,在该概念中,我们存储状态更改事件而不是状态,并仅在需要时导出实际状态。在可审核性和获取新信息方面,这提供了很多可能性!
例如,想象一下检索用户的帐户余额。使用事件源,您不仅可以访问原始操作数,还可以访问整个操作历史记录。您可以跟踪所有过去的事件,过去的状态,这些事件导致导致表示当前状态的特定值。奇妙!
知道这一点后,让我们看看是否可以将此想法应用到通用列表中。

将事件源应用于列表
让我们开始为实现定义:

public class ESList<T> implements List<T> { ... }

就像上面建立的一样,为了实现事件源数据结构,我们需要存储状态改变事件/操作而不是状态本身,然后针对当场重新创建的状态运行方法。
为了实现这一目标,我们需要:

  • 为我们的事件定义合同
  • 为我们的历史事件定义一个存储容器
  • 实现事件处理逻辑
  • 实现事件重播逻辑

内部事件日志是我们实施的核心。这是应用于数据结构的所有修改的历史,可以表示为简单列表:
private final List<ListOp<T>> opLog = new ArrayList<>();

一个操作实际上只是一个函数,它接受一些列表并返回该操作的结果:

interface ListOp<R> {
    Object apply(List<R> list);
}

因此,代表加法的操作可以实现为:

class AddOp<T> implements ListOp<T> {
    private final T elem;

    AddOp(T elem) {
        this.elem = elem;
    }

    @Override
    public Object apply(List<T> list) {
        return list.add(elem);
    }

    @Override
    public String toString() {
        return String.format("add(element = %s)", elem);
    }
}

现在,无论何时实现任何状态更改方法,我们都只需要创建该操作的表示形式,将其存储并应用于重新创建的状态,以便我们可以返回该操作的结果:

@Override 
public boolean add(T t){ 
    returnboolean)handle(new AddOp <>(t)); 
}

现在,如果我们想在任何时间点重新创建列表的状态,则需要从时间开始重新运行所有操作。
最好返回包装在Optional实例中的结果:

public Optional<List<T>> snapshot(int version) {
    if (version > opLog.size()) {
        return Optional.empty();
    }

    var snapshot = new ArrayList<T>();
    for (int i = 0; i <= version; i++) {
        try {
            opLog.get(i).apply(snapshot);
        } catch (Exception ignored) { }
    }
    return Optional.of(snapshot);
}

public List<T> snapshot() {
    return snapshot(opLog.size())
        .orElseThrow(IllegalStateException::new);
}

空的catch块可能看起来有争议,但是对于跟踪故障至关重要。稍后我们将添加一些日志记录。
现在我们可以完成事件处理逻辑。注意,在将操作添加到日志之前,我们需要首先重新创建当前状态:

private Object handle(ListOp<T> op) {
    List<T> snapshot = snapshot();
    opLog.add(op);
    return op.apply(snapshot);
}

现在,所有List接口的查询方法都需要首先重新创建状态:

@Override
public int indexOf(Object o) {
    return snapshot().indexOf(o);
}

@Override
public int lastIndexOf(Object o) {
    return snapshot().lastIndexOf(o);
}

@Override
public ListIterator<T> listIterator() {
    return snapshot().listIterator();
}

// ...

拥有一种通知我们数据结构存在多少版本的方法也将很方便:

public int version() {
    return opLog.size();
}

另外,为了方便我们观察更改,让我们添加一种额外的方法来显示所有操作的历史记录:

public void displayLog() {
    for (int i = 0; i < opLog.size(); i++) {
        System.out.printf("v%d :: %s%n", i, opLog.get(i).toString());
    }
}

现在,让我们通过执行一些修改并最终清除列表来了解它的作用:

public static void main(String[] args) {
    ESList<Integer> objects = ESList.newInstance();

    objects.add(1);
    objects.add(2);
    objects.add(3);
    objects.addAll(List.of(4, 5));
    objects.remove(Integer.valueOf(1));
    objects.clear();
    objects.displayLog();

    System.out.println();

    for (int i = 0; i < objects.version(); i++) {
        System.out.println("v" + i + " :" + objects.snapshot(i).get());
    }
}

尽管通过清除列表回到了第一位,我们可以看到整个历史记录得以保留,并且我们设法重新创建了数据结构的所有现有版本:

v0 :: init[]
v1 :: add(element = 1)
v2 :: add(element = 2)
v3 :: add(element = 3)
v4 :: addAll([4, 5])
v5 :: remove(1)
v6 :: clear()

v0 :[]
v1 :[1]
v2 :[1, 2]
v3 :[1, 2, 3]
v4 :[1, 2, 3, 4, 5]
v5 :[2, 3, 4, 5]
v6 :[]

我们到了!这是事件源的基本思想。
自然,此实现有多个缺点,而且肯定还没有准备好投入生产。它不仅不是线程安全的,而且内部事件日志是内存泄漏的根源。
甚至没有提到这样一个事实,即在每次操作之前重播事件效率低下,但可以很好地为我们提供教育材料。

可以在GitHub上找到代码段