使用GPars实现JVM并发和Actors模型

banq 11-04-27
                   

开源项目Gpars是一个在JVM上可以方便且安全并行计算开源框架,GPars提供Actor模型,它比Java传统基于内存多线程共享式并发要高效得多,Actors封装一个个活动对象,他们之间通讯是通过异步的不可变消息进行的,不管actor内部拥有什么状态,它不能从actor外部访问,除非发一个消息给这个actor,然后通过这个消息获得其内部状态。

因为actors处理调用消息是异步的,他们自己需要激活活动,一种方式是就分配系统线程给一个活动的对象,这样随着系统扩展,你需要频繁地分配所有可用的线程,所以,一般采取在actors之间共享线程,如果一个actor不工作,那么就回收系统线程,不再占用系统线程(banq:类似线程池threadPool?)

Actors模型确保处理actor之间处理消息时不能超过1个线程,actor内部状态就可以被安全地修改(单线程),这样actor是明显的线程安全型。

GPars 提供了actors的Java实现,并且提供适合Groovy和Java方便调用的API。Actors实现三个标准操作:发生消息,接受消息,创建新Actor。

下面是GPars代码:

final class MyCounterActor extends DynamicDispatchActor
{
private int counter = 0;

void onMessage(String message) {
log("Received a string");
counter += message.length();
}

void onMessage(Integer message) {
log(
"Received an integer");
counter += message;
}

void onMessage(Boolean message) {
log(
"Received a boolean");
reply(counter);
}
}

public class DecryptorTest
{
public static void main(String[] args) throws InterruptedException {
//创建Actor
Actor counter = new MyCounterActor().start();
//发送消息
counter.send(
"Hello");
System.out.println(
"Current value is: " + 
 counter.sendAndWait(true));
counter.stop();
counter.join();
}
}


发送消息也有一种方法sendAndWait(),这将会堵塞调用者不做任何事,一直等待消息回复(类似同步系统)。

Actors分为无状态Actors和有状态Actors(banq:一般偏重行为动作的 可用作服务的类都有无状态和有状态之分)

无态Actors: DynamicDispatchActor 重复扫描接受的消息,然后分发给actor中定义的onMessage()方法。
ReactiveActor 是允许更觉类似事件驱动EDA风格,如groovy如下,actor就可以直接将消息作为参数运行:

final def doubler = reactor {
2 * it
}

println 'Double of 10 = ' + doubler.sendAndWait(10)


Java代码有些琐碎:

Closure handler = new ReactorMessagingRunnable<Integer, Integer>() {
@[author]Override[/author]
protected Integer doRun(final Integer value) {
return value * 2;
}
};
final Actor doubler = reactor(handler);


有态Actors就类似Scala中的actors:

def actor = actor {
loop {
log 'Waiting for a gift'
react {gift ->
if (myWife.likes gift) reply 'Thank you!'
else {
reply 'Try again, please'
react {anotherGift ->
if (myChildren.like gift) reply 'Thank you!'
}
}
}
}
}


Continuations延续:
状态跨越多个Actor,需要在他们之间延续事务一致性,这称为Continuations模型,因为JVM不直接支持Continuations,必须在Actor框架中进行模拟。

react 方法是一个线程环节中的最后执行方法,一旦到期执行完毕,actor将会把线程归还给系统,同时,react方法是一种下一个消息到达需要执行的代码(闭包),那么在react将线程归还给系统之间,通过这个闭包将状态传递给下一个消息,实现类似延续风格的设计。

为了让actor归还线程,在react方法中写代码:

def myActor = Actors.actor {
loop {
react {msg1 ->
...
react {msg2 ->
...
}
// Never reached
}
// Never reached
}
// Never reached
}

loop循环把状态都遍历出来用来传递给下一个消息。


总之,GPars是遵循消息范式(message-passing paradigms)中的Communicating Sequential Processes (CSP)和 dataflow,能提供Actors如下功能:
Agents代理
Dataflow concurrency数据流并发
Communicating Sequential Processes CSP
Parallel collections并行收集
Fork/Join capabilitiesFork/Join能力
Composable asynchronous functions组合式异步


JVM Concurrency and Actors with GPars
[该贴被banq于2011-04-27 15:56修改过]
[该贴被banq于2011-04-27 15:56修改过]

                   

1
px96004
2014-01-02 13:41

求BANQ大帮忙解决个问题。
我想用GPars actor来进行广播怎么办?
或者说我想遍历一下所有actor怎么办好。
我又不想每次new actor时放进一个集合里面存起来。

banq
2014-01-02 19:50

2014-01-02 13:41 "@px96004"的内容
GPars actor来进行广播怎么办 ...


中间引入一个RabbitMQ或ZeroMQ 或Vert.x等消息事件总线就能实现1:N的广播。