本文介绍如何使用开源 Chronicle Queue创建巨大的持久队列,同时保持可预测和一致的低延迟。
在本文中,目标是维护来自市场数据馈送的对象队列(例如,在交易所交易的证券的最新价格)。也可以选择其他业务领域,例如来自物联网设备的感官输入或读取汽车行业的碰撞记录信息。原理是一样的。
首先,定义一个持有市场数据的类:
public class MarketData extends SelfDescribingMarshallable { |
注意:在实际场景中,使用float和double保存货币值时必须非常小心,否则可能会导致舍入问题 [Bloch18, Item 60]。但是,在这篇介绍性文章中,我想保持简单。
还有一个小的实用函数MarketDataUtil::create将在调用时创建并返回一个新的随机MarketData对象:
static MarketData create() { |
现在,目标是创建一个持久、并发、低延迟、可从多个进程访问并且可以容纳数十亿个对象的队列。
天真的方法
有了这些类,就可以探索使用ConcurrentLinkedQueue的简单方法:
public static void main(String[] args) { |
导致失败的有几个原因:
- ConcurrentLinkedQueue将为添加到队列中的每个元素创建一个包装节点。这将有效地使创建的对象数量增加一倍。
- 对象放置在 Java 堆上,导致堆内存压力和垃圾收集问题。在我的机器上,这导致我的整个 JVM 变得无响应,唯一的办法是使用“kill -9”强行杀死它。
- 无法从其他进程(即其他 JVM)读取队列。
- 一旦 JVM 终止,队列的内容就会丢失。因此,队列不是持久的。
查看其他各种标准 Java 类,可以得出结论,不支持大型持久队列。
Chronicle Queue
Chronicle Queue 是一个开源库,旨在满足上述要求。这是设置和使用它的一种方法:
public static void main(String[] args) { |
使用配备 2.3 GHz 8 核英特尔酷睿 i9 的 MacBook Pro 2019 时,仅使用一个线程即可插入每秒超过 3,000,000 条消息。队列通过给定目录“market-data”中的内存映射文件持久化。人们会期望MarketData对象至少占用 4 (int securityId) + 8 (long time) + 4*3 (float last, high 和 low) = 24 字节。
在上面的示例中,附加了 10 亿个对象,导致映射文件占用 30,148,657,152 个字节,这意味着每条消息大约 30 个字节。在我看来,这确实非常有效。
从编年史队列中读取很简单。继续上面的示例,下面显示了如何从队列中读取前两个MarketData对象:
public static void main(String[] args) { |
还有许多其他功能超出了本文的范围。例如,可以将队列文件设置为以特定间隔(例如每天、每小时或每分钟)滚动,从而有效地创建信息分解,以便随着时间的推移清理旧数据。还有一些规定能够隔离 CPU 并将 Java 线程锁定到这些隔离的 CPU,从而显着减少应用程序抖动。