如何让Java以光的速度跨线程通信?

一个比Disruptor吞吐量等性能指标更好的框架,使用Railway算法,将线程之间的消费发送参考现实生活中火车在站点之间搬运货物。

目标起始于一个简单的想法:创建一个开发人员友好的,简单的,轻量级线程间的通信框架,无需使用任何锁,同步器,信号量,等待,通知以及没有队列,消息,事件或任何其它并发特定的语法或工具。

只是一个Java接口接受到POJO以后在其背后实现这个通信,这个主意很类似Akka的Actors,但是它也许是有点矫枉过正,特别是对于单个多核计算机上线程间的通信优化必须是轻量的。

Akka的伟大之处是跨进程通信,特别是Actor是能够跨越不同JVM节点实现分布式通信。

无论如何,你可能觉得使用Akka在一个小型项目上有些过度,因为你只需要线程之间的通信,但是你还是想使用类似Actor这种做法模式。

该文章作者使用了动态代理 堵塞队列和一个缓存的线程池创建了这个解决方案,如图:

SPSC队列是一个Single Producer/Single Consumer 队列(单生产者/单消费者),而MPSC是一个Multi Producer/Single Consumer队列。

Dispatcher线程从Actor线程接受到消息,然后发送到相应的SPSC中。

Actor线程从接受的消息中使用数据,调用相应的actor类的方法,Actor实例都是发送消息给MPSC队列,然后再从Actor线程那里得到消息。

下面是ping-pong案例:


public interface PlayerA (
void pong(long ball); //send and forget method call
}
public interface PlayerB {
void ping(PlayerA playerA, long ball);
//send and forget method call
}
public class PlayerAImpl implements PlayerA {
@Override
@ublic void pong(long ball) {
}
}
public class PlayerBImpl implements PlayerB {
@Override
public void ping(PlayerA playerA, long ball) {
playerA.pong(ball);
}
}
public class PingPongExample {
public void testPingPong() {
// this manager hides the complexity of inter-thread communications
// and it takes control over actor proxies, actor implementations and threads
ActorManager manager = new ActorManager();
// registers actor implementations inside the manager
manager.registerImpl(PlayerAImpl.class);
manager.registerImpl(PlayerBImpl.class);
//Create actor proxies. Proxies convert method calls into internal messages
//which would be sent between threads to a specific actor instance.
PlayerA playerA = manager.createActor(PlayerA.class);
PlayerB playerB = manager.createActor(PlayerB.class);
for(int i = 0; i < 1000000; i++) {
playerB.ping(playerA, i);
}
}


这两个play能够每秒打500,000个乒乓。但是如果和单个线程执行速度相比,还是很差的,同样代码在单个线程可以到达每秒两百万个。

作者开始研究缓慢的原因,在一些校验和测试以后,他认为是Actors之间发送消息影响了整体性能:

作者找到一个SPSC单生产者和单消费者的无锁队列,http://www.infoq.com/presentations/Lock-Free-Algorithms

无锁队列提供比锁队列更好的性能。锁队列中在当一个线程获得锁,其他线程将被阻塞,直到该锁被释放的。在无锁算法的情况下,生产者线程可以产生消息,但不阻止其他生产者线程,以及其他消费者,而从队列中读取的消费者不会被阻塞。

这个无锁队列据测试结果是超过每秒100M ops,是JDK的并发队列实现的10倍。

但是作者使用这个无锁队列提到SPSC 以后,并没有产生明显性能提升,他立即意识到这个框架的性能瓶颈不是在SPSC,而是在多个生产者/单个消费者(MPSC)那里。

多个生产者如果使用SPSC会覆盖彼此的值,因为SPSC并没有一个对生产者的控制机制,即使最快的SPSC也不适合。

对于MPSC作者找到了LMAX的disruptor,一个通过Ringbuffer实现的高性能线程间通信库包。

使用Disruptor很容易实现非常低延迟,高吞吐量的线程间消息通信。它还提供了用例对生产者和消费者的不同组合。多个线程可以从环形缓冲区中读取而不会阻塞对方:

多生产者和多消费者:

三个生产者/一个消费者测试结果显示,Disruptor都是两倍于LinkedBlockingQueue 。

但是使用Disruptor后的这个框架性能还是没有达到预期,作者从上下班的地铁中得到灵感,在某个站点同一车厢出来的人是生产者,进去的是消费者。

建立一个Railway类,使用AtomicLong来跟踪地铁在站与站之间的传递,下面是一个single-train railway:


public class RailWay {
private final Train train = new Train();
//站台号码stationNo 跟踪火车,定义哪个站点接受火车
private final AtomicInteger stationIndex = new AtomicInteger();
//多线程访问这个方法,也就是在特定站点等待火车
public Train waitTrainOnStation(final int stationNo) {

while (stationIndex.get() % stationCount != stationNo) {
Thread.yield();
// this is necessary to keep a high throughput of message passing.
//But it eats CPU cycles while waiting for a train
}
// the busy loop returns only when the station number will match
// stationIndex.get() % stationCount condition

return train;
}
//这个方法通过增加火车站台号将火车移到下一个站点。 public void sendTrain() {
stationIndex.getAndIncrement();
}
}

参考Disruptor,创建线程间传递long值:


public class Train {
//
public static int CAPACITY = 2*1024;
private final long[] goodsArray;
// array to transfer freight goods

private int index;

public Train() {
goodsArray = new long[CAPACITY];
}

public int goodsCount() {
// returns the count of goods
return index;
}
public void addGoods(long i) {
// adds item to the train
goodsArray[index++] = i;
}
public long getGoods(int i) {
//removes the item from the train
index--;
return goodsArray[i];
}
}


如下图两个线程传递long:

使用一列火车实现单个生产者单个消费者:


public void testRailWay() {
final Railway railway = new Railway();
final long n = 20000000000l;
//starting a consumer thread
new Thread() {
long lastValue = 0;
@Override
public void run() {
while (lastValue < n) {
Train train = railway.waitTrainOnStation(1);
//waits for the train at the station #1
int count = train.goodsCount();
for (int i = 0; i < count; i++) {
lastValue = train.getGoods(i);
// unload goods
}
railway.sendTrain();
//sends the current train to the first station.
}
}
}.start();

final long start = System.nanoTime();
long i = 0;
while (i < n) {
Train train = railway.waitTrainOnStation(0);
// waits for the train on the station #0
int capacity = train.getCapacity();
for (int j = 0; j < capacity; j++) {
train.addGoods((int)i++);
// adds goods to the train
}
railway.sendTrain();
if (i % 100000000 == 0) {
//measures the performance per each 100M items
final long duration = System.nanoTime() - start;|
final long ops = (i * 1000L * 1000L * 1000L) / duration;
System.out.format(
"ops/sec = %,d\n", ops);
System.out.format(
"trains/sec = %,d\n", ops / Train.CAPACITY);
System.out.format(
"latency nanos = %.3f%n\n",
duration / (float)(i) * (float) Train.CAPACITY);
}
}
}


通过测试,它达到 767,028,751 ops/sec ,是Nitsan’s blog.(第一个采用)的SPSC队列的几倍。

下面假设如果能有两列火车,每个站点有自己的火车,一个火车在第一个站点加载货物,第二列火车在第二个站点加载货物:

经过测试吞吐量是单列火车的1.4被,延迟从192.6纳秒降低到133.5纳秒。

但是线程间传输消息延迟是因为火车容量2048导致2178.4纳秒,通过增加火车降低这个延迟,如下图:

当在两个线程之间使用32,768 列火车传递一个long值,其延迟降低到13.9纳秒。到此吞吐量和延迟达到了一个平衡。

这只是SPSC的实现,纳秒多个生产者如何实现呢?答案是加入更多站点。

每个线程等待下一列火车,然后加载卸装消息,再把火车发到下一个站,而生产者线程放入消息到火车而消费者是从其中获得,火车总是从一个站到另外一个站循环不断移动。

测试了SPMC单个生产者和多个消费者,使用8个站点,一个属于生产者,剩余7个是消费者。

火车数量是256 火车容量是32时,测试结果是:吞吐量和延迟:
ops/sec = 116,604,397
latency nanos = 274.4
而火车数量是32而火车容量是256时:
ops/sec = 432,055,469
latency nanos = 592.5

后者相对是一个好的结果,延迟虽然提高,但是吞吐量提高的倍数要高得多。


本框架目标是多生产者和单消费者:

作者使用了3个生产者和一个消费者,测试结果:
ops/sec = 162,597,109
trains/sec = 54,199,036
latency ns = 18.5
生产者和消费者之间速度达到160M ops/sec

而disruptor则是:
Run 0, Disruptor=11,467,889 ops/sec
Run 1, Disruptor=11,280,315 ops/sec
Run 2, Disruptor=11,286,681 ops/sec
Run 3, Disruptor=11,254,924 ops/sec

使用Disruptor最好的结果是128M ops/sec,相差于作者的Railway框架,当然JDK的LinkedBlockingQueue 最后成绩是4M ops/sec

Railway算法能够显著增加吞吐量,通过调节火车的容量和数量容易在吞吐量和延迟之间取得平衡。

下图是Railway算法在混合生产者消费者中使用:

该项目源码下载github

测试使用源码

在当前多线程编程遭受Node.js等单线程事件驱动的挑战下,多线程如何加强线程之间通信速度,而且要以最小的代码实施,成为最大问题。

我个人观点,该文虽然以火车与火车站来比喻,实际是Disruptor的多个Ringbuffer组合,多列火车实际是多个Ringbuffer。

Disruptor的作者使用Disruptor实现多生产者和消费者案例如下:
点击翻墙看英文

他说:如果你知道你有生产商的数目在初始化时,你可以建立一个结构,显著减少争用。Disruptor中现有MultiProducerSequencer没有这个约束。

相比Disruptor以前使用MultiProducerSequencer为多个生产者只用一个Disruptor,现在可以使用SingleProducerSequencer 为每个生产者创建一个disruptor,下面就是在消费者里面的处理,增加了一个MultiBufferBatchEventProcesor

三个生产者一个消费者的代码:ThreeToThreeSequencedThroughputTes
测试结果:
Run 0, Disruptor=390,738,060 ops/sec
Run 1, Disruptor=387,931,034 ops/sec
Run 2, Disruptor=397,058,823 ops/sec
Run 3, Disruptor=394,160,583 ops/sec
Run 4, Disruptor=396,767,083 ops/sec
Run 5, Disruptor=394,736,842 ops/sec
Run 6, Disruptor=396,767,083 ops/sec

而Railway测试结果:
ops/sec = 243,141,801
ops/sec = 302,695,445
ops/sec = 283,096,862
ops/sec = 273,670,298
ops/sec = 268,340,387
ops/sec = 264,802,500
ops/sec = 262,258,028

disruptor比railway吞吐量提高了。

这个新功能将发布在下一个disruptor版本中。



public long getGoods(int i) { //removes the item from the train
index--;
return goodsArray[i];
}

Train类中的这个移除有点看不懂...这样不是取出i位上的值 但是把最后一位给移走吗?? ...

这个用例还是说明保持 single writer的原则,多个线程只写自己的ringbuffer,改造下消费者线程就能达到disruptor单生产者的性能了。

光速?这个应该还不能称为光速,还能再快点吗?

其实可以的