以正好一次(Exactly-Once)方式传递数十亿消息

                   
banq 17-06-30

所有数据流水线的唯一要求是它们不能丢失数据。可以延迟或重新排序,但不能丢失。

为了满足这一要求,大多数分布式系统实现至少保证一次(least-once)传递。实现至少一次传递的技术通常等于:“重试,重试,重试”。直到在收到消费者的确认之前,否则永远不会认为发送消息已经成功。

但作为一个用户,“至少一次传递”并不是真的想要的。我希望消息只被传送一次。而且只有一次(精确一次)。

不幸的是,实现任何接近于一次交付的设计都需要考虑防弹防故障设计。每个故障情况都必须作为架构的一部分认真考虑 - 但是只有在其发生成为事实之后,才能被“锁定”进而才能采取对策,融入现有的实现。即使这样,只有一次传递消息还是不可能的。

在过去三个月中,我们构建了一个全新的重复数据删除系统,以便在面对各种故障模式时尽可能接近交付。

新系统能够跟踪旧系统的消息数量,增加可靠性,只需一小部分成本。就是这样。


问题

Segment系统的大部分内部系统都是通过重试、消息重新发送、锁定和两阶段提交来处理通常故障。但是,有一个特别的例外:将数据直接发送到我们公共API的客户端。

客户端(特别是移动客户端)有频繁的网络问题,可能在发送数据时错过了我们API的响应。

想象一下,您乘坐公共汽车,使用HotelTonight从iPhone预订房间。该应用程序开始将使用数据上传到Segment的服务器,但您突然通过隧道并失去无线信号与连接。您发送的某些事件却是已被处理,但客户端从未收到服务器响应。

在这些情况下,即使服务器在技术上已经收到了这些确切的消息,客户端也会重试并将相同的事件重新发送到Segment系统的API。

从我们的服务器指标来看,每四周大约有0.6%事件是重复的。

这个错误率可能听起来不重要。但是,对于一个产生数十亿美元收入的电子商务应用程序,0.6%的差异可能意味着利润和数百万美元的损失。

删除重复消息
所以我们了解问题的症结 - 我们必须删除发送到API的重复消息。但是会怎么样呢?

通过高级API对任何重复数据的删除都很简单。在Python(也称伪伪代码)中,我们可以将其表示为:


def dedupe(stream):
for message in stream:
if has_seen(message.id):
discard(message)
else:
publish_and_commit(message)


对于我们流中的每个消息,我们首先检查我们是否看到了由其id(我们假设是唯一的)唯一指定的消息。如果是,表示重复了就丢弃它。如果是新的,我们会重新发布消息并以原子方式提交消息。(这样做引入一个新问题,需要存储过去的消息)

为了避免存储所有的消息,我们会将“删除重复数据窗口”定义为key的存储时间,过了这个窗口时间就自动失效这个键值。消息在掉在这个窗口期外面,我们也将它们老化失效。我们要保证在窗口期内只发送一个指定ID的唯一的单个消息。

这种解决问题的方式虽然很容易用语言描述,但实际中有两个方面需要特别注意:读/写性能和正确性。

我们希望我们的系统当面对数十亿次消息时消除够重复事件,并以低延迟和低成本的方式来实现。

更重要的是,我们希望确保我们看到的哪些事件能够被持久写入,以便我们可以从崩溃中恢复,并且也不会在这种恢复输出中产生重复的消息。

架构
为了实现这一点,我们创建了一个“两段two-phase”架构,它读取卡夫卡,并且在四周的窗口时间内将所有重复事件删除。




要了解该架构是如何工作的,我们可以首先看看Kafka流拓扑。所有传入的API调用都将分离为单独的消息,并读取Kafka输入主题。

首先,每个传入的消息都被一个由客户端生成的唯一ID标记。在大多数情况下,这是一个UUIDv4(尽管我们考虑切换到ksuids)。如果客户端不提供messageId,我们将在API层自动分配一个。messageId

我们不使用向量时钟或序列号,因为我们想减少客户端的复杂性。使用UUID可以让任何人轻松地将数据发送到我们的API,因为几乎所有的主要语言都支持它。


{
"messageId": "ajs-65707fcf61352427e8f1666f0e7f6090",
"anonymousId": "e7bd0e18-57e9-4ef4-928a-4ccc0b189d18",
"timestamp": "2017-06-26T14:38:23.264Z",
"type": "page"
}


单个消息被卡夫卡的耐久性和重播能力记录下来。它们由MESSAGEID分隔,这样确保同一messageId总是由相同的消费者进行处理。

这对于我们的数据处理来说是一件很重要的事情。我们可以通过路由到正确的分区来缩小搜索空间的数量级,而不是搜索中央数据库在数百亿条消息中寻找key主键。

重复数据删除“工作者”是一个Go程序,它读取Kafka输入分区。它负责阅读消息,检查它们是否是重复的,如果它们是新的,将它们发送到Kafka输出主题。

根据我们的经验,工作者程序和Kafka拓扑结构非常容易管理。我们不再需要一组有发生故障时进行副本切换的大型Memcached服务器系统了。相反,我们只使用零协调需要的嵌入式RocksDB数据库,并以非常低的成本获得持久存储。

RocksDB工作者

每个工作者Job在本地的EBS硬盘上存储一个本地的RocksDB数据库。RocksDB是由Facebook开发的嵌入式键值数据存储,经过优化,性能非常高。

每当输入主题的事件进来时,消费者查询RocksDB来确定是否看到过该事件messageId。

如果RocksDB中不存在该消息,我们将其添加到RocksDB中,然后将消息发布到Kafka输出主题。

如果消息已存在于RocksDB中,则工作者Job根本就不会将其发布到输出主题,并更新输入分区的偏移,并确认已处理该消息。

性能
为了从数据库获得高性能,对于每个事件我们必须满足的三个查询模式:

1.检测随机进来的Key是否存在,但可能不会只在我们的数据库中,可以在我们的Key空间的任何地方找到。

2.以高写入吞吐量编写新key

3.失效旧key,这是指对于那些超过“删除重复数据窗口”时间的消息。

实际上,我们必须不断地扫描整个数据库,追加新的key,并失效老化旧key。理想情况下,这些数据库存储key方式只会以同一数据模型存在。




一般来说,性能的大部分取决于数据库性能,所以值得了解RocksDB的内部性能。

RocksDB是一个日志结构合并树(LSM)数据库,这意味着它会不断地将新的key附加到磁盘上的预写日志write-ahead-log中,以及将排序的多个key作为memtable的一部分进行存储内存中。

写入key是非常快速的过程。新数据以追加append方式(为了直接持久性和故障恢复)直接记录到磁盘,并且数据条目在内存中排序,以提供快速搜索和批量写入的组合。

每当向memtable写入足够多条目时,它将作为SSTable(排序的字符串表)持续存储到磁盘。由于字符串已经在内存中排序,所以可以将它们直接flush到磁盘。

以下是我们的生产日志中flush的示例:


[JOB 40] Syncing log #655020
[default] [JOB 40] Flushing memtable with next log file: 655022
[default] [JOB 40] Level-0 flush table #655023: started
[default] [JOB 40] Level-0 flush table #655023: 15153564 bytes OK
[JOB 40] Try to delete WAL files size 12238598, prev total WAL file size 24346413, number of live WAL files 3.


每个SSTable是不可变的 - 一旦创建,它永远不会改变 - 这就是为什么写入新的键key会这么快原因。没有文件需要更新,没有写入扩展。相反,在out-of-band压缩阶段,同一级别的多个SST可以合并成一个新的文件。

当在同一级别的单个SSTables被压缩时,它们的key被合并在一起,然后将新的文件升级到下一个更高的级别。

查看我们的生产日志,我们可以看到这些压缩作业的示例。在这种情况下,作业41正在压缩4级0文件,并将它们合并为单个较大的1级文件。


/data/dedupe.db$ head -1000 LOG | grep "JOB 41"
[JOB 41] Compacting 4@0 + 4@1 files to L1, score 1.00
[default] [JOB 41] Generated table #655024: 1550991 keys, 69310820 bytes
[default] [JOB 41] Generated table #655025: 1556181 keys, 69315779 bytes
[default] [JOB 41] Generated table #655026: 797409 keys, 35651472 bytes
[default] [JOB 41] Generated table #655027: 1612608 keys, 69391908 bytes
[default] [JOB 41] Generated table #655028: 462217 keys, 19957191 bytes
[default] [JOB 41] Compacted 4@0 + 4@1 files to L1 => 263627170 bytes


压缩完成后,新合并的SSTables将成为数据库记录的确定记录集,旧的SSTables将被取消链接。

如果我们登录到生产实例,我们可以看到正在更新的预写日志以及正在写入,读取和合并的单个SSTable。

日志和最新的SSTable主导了I / O
如果我们来看看生产中的SSTable统计数据,我们可以看到我们共有四个文件级别,每个级别都有更大和更大的文件。

RocksDB保留SSTables本身存储的特定的索引和布隆bloom过滤器,并将它们加载到内存中。然后查询这些bloom过滤器和索引以便确定能查找到指定的key。然后将完整的SSTable作为LRU基础的一部分加载到内存中。

在绝大多数情况下,我们看到新的消息 - 这使得我们的删除重复数据系统成为了bloom过滤器的教科书经典用例了。

bloom过滤器会告诉我们一个键key是否“可能在集合中”,或者“绝对不在集合中”。为了做到这一点,bloom过滤器保留了已经看到的任何元素的各种哈希函数设置位。如果一个哈希函数的所有位都设置了,则过滤器将返回“该元素可能在集合中”。

如果响应“可能在集合中”,则RocksDB可以从SSTables查询原始数据,以确定该项目真的是否在该集合中实际存在。但在大多数情况下,我们可以避免查询任何SSTables,因为过滤器将返回“绝对不在集合”响应。

当我们查询RocksDB时,我们会发布一个MultiGet查询我们想要查询messageIds的所有相关内容。能够提升性能,同时避免许多并发锁定操作。它还允许我们批查询来自卡夫卡的数据,并且通常顺序操作的方式可以避免随机写入。

接下页

2
banq
2017-06-30 17:47


删除:大小限制,没有时间限制
删除重复数据过程,重要的事情是必须决定是否将我们的系统大小限制在严格的“删除重复数据时间窗口期”内,或者是通过磁盘上的总数据库大小来限制。

为了避免系统突然失败或所有客户都进行删除重复的操作进而导致系统崩溃,我们决定按照大小而不是限制到设定的时间窗口。这要我们为每个RocksDB实例设置最大大小,并处理突发尖峰或负载增加。副作用是可以将删除重复数据窗口期降低到24小时以下,到那个时候会拨打我们工程师通话。

我们会定期从RocksDB中淘汰旧的key,使其不会增长到无限大小。为此,我们根据序列号保留Key的二次索引,以便我们可以先删除最早接收到的key。

我们并没有使用RocksDB TTL,这虽然能让我们在打开数据库时保持固定的TTL - 我们而是使用每个插入的键key的序列号来删除对象。

因为序列号被存储为辅助索引,所以我们可以快速查询它,并将其标记为被删除。这里是通过序列号的删除功能:


func (d *DB) delete(n int) error {
// open a connection to RocksDB
ro := rocksdb.NewDefaultReadOptions()
defer ro.Destroy()

// find our offset to seek through for writing deletes
hint, err := d.GetBytes(ro, []byte(
"seek_hint"))
if err != nil {
return err
}

it := d.NewIteratorCF(ro, d.seq)
defer it.Close()

// seek to the first key, this is a small
// optimization to ensure we don't use `.SeekToFirst()`
// since it has to skip through a lot of tombstones.
if len(hint) > 0 {
it.Seek(hint)
} else {
it.SeekToFirst()
}

seqs := make([][]byte, 0, n)
keys := make([][]byte, 0, n)

// look through our sequence numbers, counting up
// append any data keys that we find to our set to be
// deleted
for it.Valid() && len(seqs) < n {
k, v := it.Key(), it.Value()
key := make([]byte, len(k.Data()))
val := make([]byte, len(v.Data()))

copy(key, k.Data())
copy(val, v.Data())
seqs = append(seqs, key)
keys = append(keys, val)

it.Next()
k.Free()
v.Free()
}

wb := rocksdb.NewWriteBatch()
wo := rocksdb.NewDefaultWriteOptions()
defer wb.Destroy()
defer wo.Destroy()

// preserve next sequence to be deleted.
// this is an optimization so we can use `.Seek()`
// instead of letting `.SeekToFirst()` skip through lots of tombstones.
if len(seqs) > 0 {
hint, err := strconv.ParseUint(string(seqs[len(seqs)-1]), 10, 64)
if err != nil {
return err
}

buf := []byte(strconv.FormatUint(hint+1, 10))
wb.Put([]byte(
"seek_hint"), buf)
}

// we not only purge the keys, but the sequence numbers as well
for i := range seqs {
wb.DeleteCF(d.seq, seqs[i])
wb.Delete(keys[i])
}

// finally, we persist the deletions to our database
err = d.Write(wo, wb)
if err != nil {
return err
}

return it.Err()
}


为了继续确保写入速度,RocksDB不会立即返回并删除一个键(记住这些SSTable是不可变的!)。相反,RocksDB将附加一个“tombstone墓碑”,后来会在压缩过程中被删除。因此,我们可以通过顺序写入快速老化失效,并且通过删除旧项来避免破坏我们的内存。

确保正确性
我们前面讨论了如何确保数十亿条消息的速度,扩展和低成本搜索。最后一个部分是我们如何确保各种故障模式下数据的正确性。

EBS快照和附件
为了确保我们的RocksDB实例不会被错误的代码或基础EBS损坏导致停机下线,我们会定期对每个硬盘驱动器进行快照。虽然EBS已经在引擎盖下复制了,但是这个步骤可以防止数据库被从一些基础机制中破坏。

如果我们需要更换使用实例,则可以暂停使用者,并将相关联的EBS驱动器分离,然后重新附加到新的实例。只要我们保持分区ID相同,重新分配磁盘是一个相当无痛的过程,仍然保证正确性。

在工作者Job崩溃的情况下,我们依靠RocksDB的内置预写日志write-ahead-log来确保我们不会丢失消息。来自输入主题的消息不会被确认,除非我们确保RocksDB在日志中保留了消息。

读取输出主题out topic
您可能会注意到,阅读到直到这里,还没有“原子”步骤使我们能够确保我们已经传递了一次消息。我们的工作者Job有可能在任何时候崩溃:写入RocksDB时,发布到输出主题时,或确认输入消息时。

我们需要一个“确认commit”点,它是原子的,并确保它覆盖所有这些单独系统的事务。我们的数据需要一些“真相的来源”。

这就是从输出主题中读取的重点。

如果删除重复数据工作者Job由于任何原因而死亡或遇到Kafka的错误,则在重新启动时,将首先查阅事件发布的“真相来源”:输出主题。

如果在输出主题中找到消息,而不是 RocksDB(反之亦然),则删除重复数据工作者Job将进行必要的修复,以保持数据库和RocksDB同步。实质上,我们使用输出主题作为我们的前瞻性日志和我们的真相的最终来源,和RocksDB检查点合并进行验证。

在生产中

我们现在已经在生产中运行了我们的删除重复数据系统3个月,对结果感到非常满意。我们有如下数字,:

1.RocksDB中磁盘上存储的1.5 TB的key
2.一个4周的删除重复数据窗口,过期失效这些老化键old key
3.我们的RocksDB实例中存储大约60B个key
4.200B消息通过删除重复数据系统

该系统通常是快速,高效和容错的,也是非常容易理解的。

特别是,我们的v2系统比我们的旧系统有很多优点。

以前我们将所有的key存储在Memcached中,并使用Memcached的原子CAS(check-and-set)操作符来键key不存在的情况下设置它。Memcached是作为发布键key“原子性”提交点。

虽然这个功能很好,但它需要大量的内存来存储我们所有的key。此外,我们必须决定接受偶尔的Memcached故障,或者将我们的支出加倍于高内存故障转移副本。

Kafka / RocksDB方法使我们可以获得旧系统的几乎所有优点,并提高可靠性。

总结最大的胜利:

存储在磁盘上的数据:保持一整套key或完整索引在内存中非常昂贵。通过将更多的数据移动存储到磁盘,并利用各种级别的文件和索引,我们能够大幅削减我们的成本。我们能够将处理故障失败切换到依赖非常酷的存储介质(EBS),再也不用运行其他热故障转移实例。

分区:当然,为了缩小我们的搜索空间,并避免在内存中加载太多的索引,我们需要保证某些消息被路由到正确的工作者。在Kafka的上游分区可以让我们一直路由这些消息,这样我们可以更有效地缓存和查询。

显式老化失效:使用Memcached,我们将在每个键上设置一个TTL来使其老化失效,然后依靠Memcached进程来处理删除驱逐出缓存。这使我们在面对大量数据时耗尽了我们的内存,并且面对大量的删除驱逐操作,飙升了Memcached CPU。通过客户端处理键key删除,我们可以通过缩短我们的删除重复数据的窗口体面的面对失败。

卡夫卡作为真相来源:要真正避免面对多重删除重复数据的提交点,我们必须使用真相来源,本来是用作Topic主题下游消费者使用的。使用卡夫卡作为“真相来源”工作非常好。在大多数失败的情况下(除了卡夫卡失败之外),消息将被写入卡夫卡,否则就不会。并且使用Kafka可以确保发布的消息按顺序交付,并在多台计算机上进行磁盘复制,而不需要在内存中保留大量数据。

批处理阅读和写作:通过对Kafka和RocksDB进行批量I / O调用,我们可以通过利用顺序读写来获得更好的性能。而不是我们之前使用Memcached的随机访问,我们可以通过依赖我们的磁盘高性能,并且只保留内存中的索引来实现更好的吞吐量。

总的来说,我们对于这个构建的删除重复数据系统提供的保证感到非常满意。使用Kafka和RocksDB作为流媒体应用的原语开始变得越来越普遍。我们很高兴在这些基元上方来继续构建新的分布式应用程序。


Delivering Billions of Messages Exactly Once · Seg

相关:为什么正好一次(Exactly-Once)传递是不可能的?