在本文中,我们将介绍Aeron,这是一个由 Adaptive Financial Consulting 维护的多语言库,旨在实现应用程序之间的高效 UDP 消息传递。它专为提高性能而设计,旨在实现高吞吐量、低延迟和容错。
在使用 Aeron 之前,我们需要在我们的构建中包含最新版本,在撰写本文时版本为1.44.1 。
如果我们使用 Maven,我们可以在pom.xml中包含它的依赖项:
<dependency> <groupId>io.aeron</groupId> <artifactId>aeron-all</artifactId> <version>1.44.1</version> </dependency> |
implementation("io.aeron:aeron-all:1.44.1") 此时,我们已准备好开始在我们的应用程序中使用它。
请注意,目前 Aeron 的某些部分无法与 Java 16 或更新版本兼容。这是由于JPMS 阻止了某些交互。
媒体驱动程序 Aeron 在应用程序和传输之间采用间接方式工作。这被称为媒体驱动程序,因为它是我们的应用程序和传输媒体之间的交互。
每个 Aeron 进程都与一个媒体驱动程序交互,并通过该驱动程序与其他进程交互 - 无论是在同一台机器上还是远程。它通过文件系统执行此交互。我们需要将媒体驱动程序和所有应用程序指向磁盘上的同一目录,该目录存储了各个方面。请注意,我们只能同时为任何给定目录运行一个媒体驱动程序。尝试运行多个媒体驱动程序将失败。
当我们想要简单的时候,我们可以运行应用程序中嵌入的媒体驱动程序:
MediaDriver mediaDriver = MediaDriver.launch(); |
这将启动具有所有默认设置的媒体驱动程序。具体来说,这将使用默认媒体驱动程序目录运行。
我们还有另一种专为嵌入式使用而设计的启动方法。它的作用与以前完全相同,只是它会生成一个随机目录,以确保同一台机器上的多个实例不会发生冲突:
MediaDriver mediaDriver = MediaDriver.launchEmbedded(); |
在这两种情况下,我们还可以提供MediaDriver.Context对象来进一步配置媒体驱动程序:
MediaDriver.Context context = new MediaDriver.Context(); context.threadingMode(ThreadingMode.SHARED); MediaDriver mediaDriver = MediaDriver.launch(context); |
或者,我们可以将媒体驱动程序作为外部应用程序运行。我们可以使用作为依赖项包含的aeron-all.jar JAR 文件来执行此操作:
$ java -cp aeron-all-1.44.1.jar io.aeron.driver.MediaDriver |
其功能与上面的MediaDriver.launch()完全相同。
Aeron API 客户端 我们通过Aeron类使用 Aeron 执行所有 API 交互。我们需要创建一个新的实例并将其指向我们的媒体驱动程序。只需创建一个新实例即可指向默认位置的媒体驱动程序 - 就像我们使用MediaDriver.launch()启动它一样:
Aeron aeron = Aeron.connect(); |
Aeron.Context ctx = new Aeron.Context(); ctx.aeronDirectoryName(mediaDriver.aeronDirectoryName()); Aeron aeron = Aeron.connect(ctx); |
我们可以将任意数量的 Aeron 客户端连接到同一个媒体驱动程序。通常,这些客户端来自不同的应用程序,但如果需要,它们也可以来自同一个应用程序。但是,如果我们这样做,那么我们还需要使用Aeron.Context的新实例:
Aeron.Context ctx1 = new Aeron.Context(); ctx1.aeronDirectoryName(mediaDriver.aeronDirectoryName()); aeron1 = Aeron.connect(ctx1); System.out.println("Aeron 1 connected: " + aeron1); Aeron.Context ctx2 = new Aeron.Context(); ctx2.aeronDirectoryName(mediaDriver.aeronDirectoryName()); aeron2 = Aeron.connect(ctx2); System.out.println("Aeron 2 connected: " + aeron2); |
发送和接收消息 现在我们已经有了 Aeron API 客户端,我们可以使用它来发送和接收消息。
缓冲区 Aeron 将所有消息(包括发送和接收)表示为DirectBuffer实例。归根结底,这些不过是一组字节,但它们为我们提供了一组方法来处理一组标准类型。
当我们发送消息时,我们需要根据自己的数据自行构建缓冲区。为此,我们最好使用UnsafeBuffer实例 - 之所以这样命名是因为它使用sun.misc.Unsafe来读取和写入底层缓冲区的值。创建它需要一个字节数组或ByteBuffer实例,然后我们可以使用BufferUtil.allocateDirectAligned()来帮助最有效地完成此操作:
UnsafeBuffer buffer = new UnsafeBuffer(BufferUtil.allocateDirectAligned(256, 64)); |
// Put a string into the buffer starting at index 0. int length = buffer.putStringWithoutLengthUtf8(0, message); // Read a string of the given length from the buffer starting from the given offset. String message = buffer.getStringWithoutLengthUtf8(offset, length); |
频道和流 使用通过特定通道传输的已识别流,可以使用 Aeron 发送和接收数据。
我们将通道指定为特定格式的 URI,告诉 Aeron 如何传输消息。然后,我们的媒体驱动程序使用它与传输媒体进行交互,确保它正确发送和接收消息。流仅以数字标识。唯一的要求是同一通信的两端使用相同的流 ID。
最简单的此类通道是aeron:ipc,它使用媒体驱动程序中的共享内存进行传输和接收。请注意,这仅在双方使用相同的媒体驱动程序且不允许联网时才有效。
更有用的是,我们可以使用aeron:udp使用UDP发送和接收。这使我们能够与我们可以连接的任何地方的任何其他应用程序进行通信。特别是,我们的应用程序将与媒体驱动程序通信,然后媒体驱动程序将相互通信。
指定 UDP 通道时,我们至少需要包含主机和端口。在接收端,我们将在此监听;在发送端,我们将在此发送消息。例如,aeron:udp?endpoint=localhost:20121将通过localhost:20121上的 UDP 发送和接收消息。
订阅 一旦设置好媒体驱动程序和 Aeron 客户端,我们就可以接收消息了。我们通过创建对特定频道上特定流的订阅,然后轮询该流以获取消息来实现这一点。
添加订阅足以让媒体驱动程序设置一切以便能够接收我们的消息。我们使用Aeron实例上的addSubscription()方法执行此操作:
Subscription subscription = aeron.addSubscription("aeron:udp?endpoint=localhost:20121", 1001); |
与之前一样,当不再使用它时,我们需要关闭它,以便媒体驱动程序知道停止监听消息。与往常一样,它是AutoCloseable,因此我们可以使用 try-with-resources 来管理它。
当我们订阅时,我们需要接收消息。Aeron 使用轮询机制执行此操作,让我们完全控制它何时处理消息。要轮询消息,我们需要提供一个FragmentHandler来处理收到的消息。如果我们想将所有代码内联,我们可以使用 lambda 来实现它;如果我们想重用它,我们可以使用单独的类来实现接口:
FragmentHandler fragmentHandler = (buffer, offset, length, header) -> { String data = buffer.getStringWithoutLengthUtf8(offset, length); System.out.printf("Message from session %d (%d@%d) <<%s>>%n", header.sessionId(), length, offset, data); }; |
当我们准备轮询新消息时,我们使用 Subscription.poll ()方法:
int fragmentsRead = subscription.poll(fragmentHandler, 10); |
Publication 消息传递的另一面是发送消息。我们使用Publication来实现这一点,它可以将消息发送到特定通道上的特定流。
我们可以使用Aeron.addPublication()方法添加新发布。然后我们需要等待它连接,这要求接收端有一个订阅准备好接收消息:
ConcurrentPublication publication = aeron.addPublication("aeron:udp?endpoint=localhost:20121", 1001); while (!publication.isConnected()) { TimeUnit.MILLISECONDS.sleep(100); } |
与之前一样,当不再使用它时,我们需要将其关闭,以便媒体驱动程序可以释放任何分配的资源。与往常一样,这是AutoCloseable,因此我们可以使用 try-with-resources 来管理它。
一旦我们有了连接的发布,我们就可以向其提供消息。这些消息始终以填充缓冲区的形式提供,然后发送给连接的订阅者:
UnsafeBuffer buffer = new UnsafeBuffer(BufferUtil.allocateDirectAligned(256, 64)); buffer.putStringWithoutLengthUtf8(0, message); long result = publication.offer(buffer, 0, message.length()); |
- Publication.NOT_CONNECTED – 发布未连接到订阅者。
- Publication.BACK_PRESSURED – 来自订阅者的背压意味着我们现在无法再发送任何消息。
- Publication.ADMIN_ACTION – 某些管理操作(例如日志轮换)导致发送失败。在这种情况下,立即重试通常是安全的。
- Publication.CLOSED – Publication实例已关闭。
- Publication.MAX_POSITION_EXCEEDED – 媒体驱动程序中的缓冲区已满。通常,我们可以通过关闭出版物并创建新出版物来解决这个问题。