用Java的Rama构建Mastodon降低100倍成本


Rama 完全使用 Java API 进行编程,与使用 Ruby on Rails 构建的官方 Mastodon 实现相比,我们的实现大大减少了代码。
Rama 负责处理所有数据处理、数据索引和大部分产品逻辑。最重要的是我们使用 Spring/Reactor 实现Mastodon API

Twitter 规模的 这种Mastodon 实现仅用 10k 行代码就具有极强的性能数据,这比 Mastodon 当前的后端实现少了代码,比 Twitter 的可扩展实现少了 100 倍。

Rama 的编程模型有四个主要概念:

  • Depot :它们是分布式的、持久的、可复制的数据日志。所有进入 Rama 的数据都通过追加进入Depot 。Depot 类似于Apache Kafka,只是与 Rama 的其余部分集成。
  • ETL:即提取-转换-加载拓扑。这些处理来自仓库的传入数据,并生成称为“分区状态”的索引存储。Rama 提供两种类型的 ETL:流式处理和微批处理,它们具有不同的性能特征。Rama 编程的大部分时间都花在了 ETL 上。Rama 公开了一个 Java 数据流 API,用于编码极具表现力的拓扑。
  • PStates:接下来是“分区状态”,我们通常称之为“PStates”。PState 是 Rama 中数据索引的方式,就像仓库一样,它们跨多个节点进行分区、持久且可复制。PStates 是 Rama 成为通用系统的关键之一。与现有数据库具有严格的索引模型(例如“键值”、“关系”、“面向列”、“文档”、“图”等)不同,PStates 具有灵活的索引模型。事实上,他们有一个每个程序员都熟悉的索引模型:数据结构。PState 是数据结构的任意组合,您创建的每个 PState 都可以有不同的组合。借助 PStates 的“子索引”功能,嵌套数据结构可以有效地包含数亿个元素。例如,“地图的地图”相当于“文档数据库”,“子索引排序地图的地图”相当于“面向列的数据库”。数据结构和任意数量的嵌套的任何组合都是有效的 - 例如,您可以拥有“子索引集列表的子索引映射列表的映射”。我无法充分强调与索引作为常规数据结构而不是神奇的“数据模型”的交互在多大程度上解放了后端编程。
  • 查询:Rama 中的查询利用 PStates 的数据结构方向和“基于路径”的 API,允许您从单个分区简洁地获取和聚合数据。除此之外,Rama 还有一个称为“查询拓扑”的功能,可以有效地对任意 PState 集合进行实时分布式查询和聚合。这些类似于传统数据库中的“预定义查询”,只不过是通过与 ETL 编程相同的 Java API 进行编程,而且功能更强大。

单独来看,这些概念都不是什么新概念。我相信你以前都见过它们。您可能会认为 Rama 的编程模型只是事件溯源 和物化视图的组合。但 Rama 所做的是集成和概括这些概念,使您可以端到端地构建整个后端,而不会出现任何阻抗不匹配或复杂性,而这些是现有系统的特征和压倒性的。

所有这些概念都是由 Rama 以线性可扩展的方式实现的。因此,如果您的应用程序需要更多资源,您可以通过单击按钮添加它们。Rama 还通过复制所有数据并实现自动故障转移来实现容错。

软件架构
与当今应用程序的典型架构方式相比,您看到的是完全的控制反转:
有一系列规则规定了哪些状态属于哪些关注者 :提升永远不会返回到原始作者,回复只会发送给也关注被回复帐户的关注者,等等。
传统上,这是通过处理存储的“数据库层”和实现产品逻辑的单独的“应用程序层”来完成的。“应用层”对“数据库层”进行读写,这两层是分开部署、扩展和管理的。但对于 Rama,产品逻辑存在于执行索引的系统内部。计算和存储是共置的。Rama 可以完成数据库所做的一切,但它还可以做更多的事情。

使用 Rama 构建后端时,您需要从需要支持的所有用例开始。例如:获取用户的关注者数量、获取时间线的一页、获取十个关注建议等。然后,您确定需要哪些 PState 布局(数据结构)来支持这些查询。一个 PState 可以支持 10 个查询,而另一个 PState 可能只支持一个查询。

接下来,您确定源数据是什么,然后创建仓库来接收该数据。源数据通常对应于应用程序中发生的事件,例如“Alice 关注 Bob”、“James 发布状态“Hello world””或“Bob 取消关注 Charlie”。您可以按照自己想要的方式表示数据,无论是 Java 对象、ThriftProtocol Buffers等框架,还是 JSON 等非结构化格式(但是,我们建议尽可能使用结构化格式)。

最后一步是编写 ETL 拓扑,将源数据从仓库转换为 PState。部署后,ETL 会持续运行,使您的 PState 保持最新状态。Rama 的 ETL API 虽然只是 Java,但就像一种“分布式编程语言”,具有任何图灵完备语言的计算能力以及轻松控制在任何给定点发生哪个分区计算的设施。

集群和模块
Rama 部署到节点集群上。有一个称为“Conductor”的中央守护进程,用于协调部署、更新和扩展操作。每个节点都有一个“Supervisor”守护进程,用于管理用户代码的启动/拆卸。

应用程序作为“模块”部署到 Rama 集群上。“模块”包含仓库、ETL、PState 和查询拓扑的任意组合。与传统架构不同,传统架构中相应的类似物存在于单独的进程中并且通常位于单独的节点上,而在 Rama 中,这些类似物位于同一组进程中。这种托管实现了前所未有的惊人效率。模块还可以像使用自己的模块一样轻松地使用来自其他模块中的仓库和 PState 的数据。模块永远运行,不断处理来自仓库的新数据,除非您选择销毁它。

通过为 Conductor 提供一个包含用户代码的 .jar 文件来部署模块。此外,还提供了分配给模块的节点数量、复制参数以及任何其他调整参数的配置。模块的更新方式类似:随新代码一起提供新的 .jar,Conductor 编排更新序列,启动新进程并将仓库和 PState 传输到新模块版本。

为了测试和开发,Rama 提供了一个类,用于 InProcessCluster 在单个进程中模拟 Rama 集群。

Rama 之上实现 Mastodon 
让我们从实现的一个非常简单的部分开始,跟踪主题标签的关注者。该实现总共有 11 行代码,并支持以下查询:

  • 用户 A 关注标签 H 吗?
  • 谁关注主题标签 H(已分页)?
  • 主题标签 H 有多少关注者?

为此只需要一个 PState,称为 $$hashtagToFollowers (Rama 中的 PState 名称始终以 开头 $$ )。它是从主题标签到一组帐户 ID 的映射。
有两个事件会更改此 PState:关注主题标签和取消关注主题标签。在我们的实现中,这些由类型 FollowHashtag 和 RemoveFollowHashtag 实现。

可视化如何处理数据以生成此 PState 的一种好方法是通过数据流图,这里的逻辑很简单,这就是为什么实现只有 11 行代码。您无需担心诸如设置数据库、建立数据库连接、处理每个数据库读/写的序列化/反序列化、编写部署只是为了处理这一任务或构建时堆积的任何其他任务之类的事情后端系统。

您可能想知道为什么关注和取消关注事件会进入同一个Depot 而不是单独的Depot ?
数据按照接收顺序在软件仓库Depot 分区中进行处理,但不同软件仓库Depot 之间没有顺序保证。
因此,如果用户向 UI 上的“关注”和“取消关注”按钮发送垃圾邮件,并且这些事件被附加到不同的Depot ,则可能会在之前的关注之前处理稍后的取消关注。
这将导致 PState 根据用户执行操作的顺序处于不正确的状态。通过将两个事件放在同一个仓库中,数据将按照创建数据的顺序进行处理。

作为一般规则,拉玛保证本地顺序。两点之间发送的数据按照发送顺序进行处理。这对于处理库外的数据是正确的,对于内部 ETL 处理也是如此,当您的处理作为计算的一部分跳转到不同的分区时。

数据流图实际上就是通过在纯 Java API 中指定数据流图来使用 Rama 进行编程的方式。正如您将在下面看到的,指定此类计算的细节涉及变量、函数、过滤器、循环、分支和合并。它还包括对在任何给定点执行哪些分区计算的细粒度控制。

社交图谱
现在让我们看一下实现中稍微复杂的部分,即社交图谱。社交图共有 105 行代码,并支持以下查询,这些查询为 Mastodon 的各个方面提供支持:

  • 用户A关注用户B吗?
  • 用户 A 有多少关注者?
  • 用户 A 关注了多少个帐户?
  • 按关注顺序(分页),用户 A 的关注者是谁?
  • 用户 A 按照关注顺序关注了谁(分页)?
  • 当前谁请求按照用户 A 请求的顺序(分页)关注他们?
  • 用户A是否屏蔽了用户B?
  • 用户A是否将用户B静音?
  • 用户 A 是否希望看到用户 B 的提升?

尽管 Rama 的实现非常简单,但值得注意的是 Twitter 必须从头开始编写自定义数据库来构建可扩展的社交图谱。
我们的社交图实现产生了四个主要的 PState,名为 $$followerToFollowees 、 $$followeeToFollowers 、 $$accountIdToFollowRequests 和 $$accountIdToSuppressions 。

详细点击标题

简单的 Rama 代码示例
以下是“字数统计模块”的完整定义,它接受句子作为输入并生成一个包含这些句子中所有单词计数的 PState:

public class WordCountModule implements RamaModule {
    @Override
    public void define(Setup setup, Topologies topologies) {
        setup.declareDepot("*sentenceDepot", Depot.random());

        StreamTopology wordCount = topologies.stream(
"wordCount");
        wordCount.pstate(
"$$wordCounts", PState.mapSchema(String.class, Long.class));

        wordCount.source(
"*sentenceDepot").out("*sentence")
                 .each((String sentence, OutputCollector collector) -> {
                     for(String word: sentence.split(
" ")) {
                       collector.emit(word);
                     }
                 },
"*sentence").out("*word")
                 .hashPartition(
"*word")
                 .compoundAgg(
"$$wordCounts", CompoundAgg.map("*word", Agg.count()));
    }
}

该模块有一个名为 wordCount 的 ETL、一个名为 *sentenceDepot 的仓库和一个名为 $$wordCounts 的 PState。ETL 从句子库接收新句子,将这些句子标记为单词,然后更新 PState 中这些单词的计数。将句子添加到仓库后,PState 分区会在几毫秒内更新。

模块实现了 RamaModule 接口,该接口只定义了一个方法。setup 用于声明仓库以及与其他模块中的仓库或 PState 的任何依赖关系,topologies 用于声明所有 ETL 和查询拓扑。

define 的第一行声明仓库。仓库名称总是以 * 开头。以 * 开头的字符串在 Rama 代码中被解释为变量,它们可以像任何编程语言中的变量一样被传递和使用。第二个参数 Depot.random() 指定了仓库的分区方案。在这种情况下,分区方案会使添加的句子进入仓库的随机分区。当本地排序很重要时,比如对于关注和取消关注事件,分区方案将被适当设置,以便同一实体的事件进入同一分区。

下一行将 ETL wordCount 声明为流式拓扑。

之后是 PState $$wordCounts 的声明。声明 PState 时会使用一个模式,指定其存储的内容和存储方式。在本例中,它只是一个简单的映射,但您可以在此指定任何您想要的结构(例如,子索引集列表的子索引映射)。

最后是 ETL 的定义。wordCount.source("*sentenceDepot").out("*sentence")一行将 ETL 订阅到 *sentenceDepot,并将收到的任何新句子绑定到变量 *sentence 上。

下一行将每个句子标记为单词。插入的 Java 代码使用 lambda 在空白处分割每个句子,并将每个单词单独作为变量 *word 发送。像这样在拓扑结构中插入任意 Java 代码是非常常见的。

下一行 .hashPartition("*word") 将数据流重新定位到存储该单词计数的模块分区。该行之前和之后的代码可以在不同的机器上执行,Rama 会处理移动计算所涉及的所有序列化和网络传输工作。

最后,既然计算是在正确的分区上进行的,最后一行就会更新 PState 中字的计数。这种 PState 更新是以聚合模板的形式指定的--在本例中,它表示正在聚合一个映射,其中键是单词,值是该单词的所有事件计数。

这是一个非常基本的示例,无法真正体现 Rama 的表现力。不过,它确实展示了声明模块、仓库、PStates 和拓扑的一般工作流程。此处未显示的部分功能包括:从其他模块消耗资源库/PStates、查询拓扑、微批处理、分支/合并、连接、循环、变量阴影、条件以及使用宏分解代码。

现在让我们看一下作为集群外的客户端与 Rama 模块进行交互,类似于使用数据库客户端与数据库交互的方式。下面的代码连接到远程集群,创建模块的库和 PState 的句柄,附加一些句子,然后执行一些 PState 查询:

Map config = new HashMap();
config.put("conductor.host", "1.2.3.4");
RamaClusterManager manager = RamaClusterManager.open(config);
Depot depot = manager.clusterDepot(
"rama.examples.wordcount.WordCountModule", "*sentenceDepot");
depot.append(
"hello world");
depot.append(
"hello world again");
depot.append(
"say hello to the planet");
depot.append(
"red planet labs");

PState wc = manager.clusterPState(
"rama.examples.wordcount.WordCountModule", "$$wordCounts");
System.out.println(
"'hello' count: " + wc.selectOne(Path.key("hello")));
System.out.println(
"'world' count: " + wc.selectOne(Path.key("world")));
System.out.println(
"'planet' count: " + wc.selectOne(Path.key("planet")));
System.out.println(
"'red' count: " + wc.selectOne(Path.key("red")));

RamaClusterManager 用于连接到集群并检索仓库和 PStates 的句柄。仓库和 PStates 由其模块名称(模块定义的类名)和模块内的名称标识。默认情况下,仓库追加会阻塞,直到所有同地的流拓扑都处理完追加的数据。这就是为什么 PState 查询可以在仓库追加后立即执行,而无需进一步协调的原因。

这里的 PState 查询会获取指定键的值。PStates 查询使用的是 Rama 的 "路径 "应用程序接口(Path API)。无论 PState 的结构如何,路径都能让你轻松进入 PState,并准确检索到所需内容--无论是一个值、多个值还是值的集合。它们还可用于更新拓扑结构中的 PState。掌握路径是掌握 Rama 开发的关键之一。

现在让我们看看如何在单元测试环境中运行 WordCountModule:

public void wordCountTest() throws Exception {
    try (InProcessCluster cluster = InProcessCluster.create()) {
        cluster.launchModule(new WordCountModule(), new LaunchConfig(4, 2));
        String moduleName = WordCountModule.class.getName();
        Depot depot = cluster.clusterDepot(moduleName, "*sentenceDepot");
        depot.append(
"hello world");
        depot.append(
"hello world again");
        depot.append(
"say hello to the planet");
        depot.append(
"red planet labs");

        PState wc = cluster.clusterPState(moduleName,
"$$wordCounts");
        System.out.println(
"'hello' count: " + wc.selectOne(Path.key("hello")));
        System.out.println(
"'world' count: " + wc.selectOne(Path.key("world")));
        System.out.println(
"'planet' count: " + wc.selectOne(Path.key("planet")));
        System.out.println(
"'red' count: " + wc.selectOne(Path.key("red")));
    }
}
输出:

'hello' count: 3
'world' count: 2
'planet' count: 2
'red' count: 1


InProcessCluster 可完全在进程中模拟 Rama 集群,是单元测试模块的理想选择。在这里,你可以看到 InProcessCluster 是如何用来启动模块,然后像使用 RamaClusterManager 一样获取仓库/PStates 的。InProcessCluster 与真正的集群在功能上没有区别,下周我们发布 Rama 的非生产版本时,你就可以试用 InProcessCluster 了。

详细点击标题