使用Chronicle Queue创建低延迟的TB级别的队列 - DZone

22-01-17 banq

本文介绍如何使用开源 Chronicle Queue创建巨大的持久队列,同时保持可预测和一致的低延迟。
在本文中,目标是维护来自市场数据馈送的对象队列(例如,在交易所交易的证券的最新价格)。也可以选择其他业务领域,例如来自物联网设备的感官输入或读取汽车行业的碰撞记录信息。原理是一样的。
首先,定义一个持有市场数据的类:

public class MarketData extends SelfDescribingMarshallable {

    int securityId;
    long time;
    float last;
    float high;
    float low;

    // Getters and setters not shown for brevity
}



注意:在实际场景中,使用float和double保存货币值时必须非常小心,否则可能会导致舍入问题 [Bloch18, Item 60]。但是,在这篇介绍性文章中,我想保持简单。
还有一个小的实用函数MarketDataUtil::create将在调用时创建并返回一个新的随机MarketData对象:


static MarketData create() {

    MarketData marketData = new MarketData();
    int id = ThreadLocalRandom.current().nextInt(1000);
    marketData.setSecurityId(id);
    float nextFloat = ThreadLocalRandom.current().nextFloat();
    float last = 20 + 100 * nextFloat;

    marketData.setLast(last);
    marketData.setHigh(last * 1.1f);
    marketData.setLow(last * 0.9f);
    marketData.setTime(System.currentTimeMillis());

    return marketData;
}

现在,目标是创建一个持久、并发、低延迟、可从多个进程访问并且可以容纳数十亿个对象的队列。
 

天真的方法
有了这些类,就可以探索使用ConcurrentLinkedQueue的简单方法:

public static void main(String[] args) {

    final Queue<MarketData> queue = new ConcurrentLinkedQueue<>();
    for (long i = 0; i < 1e9; i++) {
        queue.add(MarketDataUtil.create());
    }
  
}

导致失败的有几个原因:
  1. ConcurrentLinkedQueue将为添加到队列中的每个元素创建一个包装节点。这将有效地使创建的对象数量增加一倍。
  2. 对象放置在 Java 堆上,导致堆内存压力和垃圾收集问题。在我的机器上,这导致我的整个 JVM 变得无响应,唯一的办法是使用“kill -9”强行杀死它。
  3. 无法从其他进程(即其他 JVM)读取队列。
  4. 一旦 JVM 终止,队列的内容就会丢失。因此,队列不是持久的。

查看其他各种标准 Java 类,可以得出结论,不支持大型持久队列。
 

Chronicle Queue
Chronicle Queue 是一个开源库,旨在满足上述要求。这是设置和使用它的一种方法:

public static void main(String[] args) {

    final MarketData marketData = new MarketData();
    final ChronicleQueue q = ChronicleQueue
            .single("market-data");

    final ExcerptAppender appender = q.acquireAppender();

    for (long i = 0; i < 1e9; i++) {
        try (final DocumentContext document =
                     appender.acquireWritingDocument(false)) {

             document
                    .wire()
                    .bytes()
                    .writeObject(MarketData.class, 
                            MarketDataUtil.recycle(marketData));
        }
    }
}


使用配备 2.3 GHz 8 核英特尔酷睿 i9 的 MacBook Pro 2019 时,仅使用一个线程即可插入每秒超过 3,000,000 条消息。队列通过给定目录“market-data”中的内存映射文件持久化。人们会期望MarketData对象至少占用 4 (int securityId) + 8 (long time) + 4*3 (float last, high 和 low) = 24 字节。
在上面的示例中,附加了 10 亿个对象,导致映射文件占用 30,148,657,152 个字节,这意味着每条消息大约 30 个字节。在我看来,这确实非常有效。
 
从编年史队列中读取很简单。继续上面的示例,下面显示了如何从队列中读取前两个MarketData对象:

public static void main(String[] args) {

    final ChronicleQueue q = ChronicleQueue
            .single("market-data");
  
    final ExcerptTailer tailer = q.createTailer();

    for (long i = 0; i < 2; i++) {

        try (final DocumentContext document =
                     tailer.readingDocument()) {

            MarketData marketData = document
                    .wire()
                    .bytes()
                    .readObject(MarketData.class);

            System.out.println(marketData);
        }
    }
}


还有许多其他功能超出了本文的范围。例如,可以将队列文件设置为以特定间隔(例如每天、每小时或每分钟)滚动,从而有效地创建信息分解,以便随着时间的推移清理旧数据。还有一些规定能够隔离 CPU 并将 Java 线程锁定到这些隔离的 CPU,从而显着减少应用程序抖动。
 

1
猜你喜欢