使用GPars实现JVM并发和Actors模型

11-04-27 banq

开源项目Gpars是一个在JVM上可以方便且安全并行计算开源框架,GPars提供Actor模型,它比Java传统基于内存多线程共享式并发要高效得多,Actors封装一个个活动对象,他们之间通讯是通过异步的不可变消息进行的,不管actor内部拥有什么状态,它不能从actor外部访问,除非发一个消息给这个actor,然后通过这个消息获得其内部状态。

因为actors处理调用消息是异步的,他们自己需要激活活动,一种方式是就分配系统线程给一个活动的对象,这样随着系统扩展,你需要频繁地分配所有可用的线程,所以,一般采取在actors之间共享线程,如果一个actor不工作,那么就回收系统线程,不再占用系统线程(banq:类似线程池threadPool?)

Actors模型确保处理actor之间处理消息时不能超过1个线程,actor内部状态就可以被安全地修改(单线程),这样actor是明显的线程安全型。

GPars 提供了actors的Java实现,并且提供适合Groovy和Java方便调用的API。Actors实现三个标准操作:发生消息,接受消息,创建新Actor。

下面是GPars代码:

 final class MyCounterActor extends DynamicDispatchActor 
    {
        private int counter = 0;

        void onMessage(String message) {
            log("Received a string");
            counter += message.length();
        }

        void onMessage(Integer message) {
            log("Received an integer");
            counter += message;
        }

        void onMessage(Boolean message) {
            log("Received a boolean");
            reply(counter);
        }
    }

public class DecryptorTest 
{
    public static void main(String[] args) throws InterruptedException {
        //创建Actor
       	Actor counter = new MyCounterActor().start();
        //发送消息
       	counter.send("Hello");
     	System.out.println("Current value is: " + 
		counter.sendAndWait(true));
      	counter.stop();
       	counter.join();
    }
}
<p>

发送消息也有一种方法sendAndWait(),这将会堵塞调用者不做任何事,一直等待消息回复(类似同步系统)。

Actors分为无状态Actors和有状态Actors(banq:一般偏重行为动作的 可用作服务的类都有无状态和有状态之分)

无态Actors: DynamicDispatchActor 重复扫描接受的消息,然后分发给actor中定义的onMessage()方法。

ReactiveActor 是允许更觉类似事件驱动EDA风格,如groovy如下,actor就可以直接将消息作为参数运行:

final def doubler = reactor {
    2 * it
}

println 'Double of 10 = ' + doubler.sendAndWait(10)
<p>

Java代码有些琐碎:

Closure handler = new ReactorMessagingRunnable<Integer, Integer>() {
    @[author]Override[/author]
    protected Integer doRun(final Integer value) {
        return value * 2;
    }
};
final Actor doubler = reactor(handler);
<p>

有态Actors就类似Scala中的actors:

def actor = actor {
    loop {
        log 'Waiting for a gift'
        react {gift ->
            if (myWife.likes gift) reply 'Thank you!'
            else {
                reply 'Try again, please'
                react {anotherGift ->
                    if (myChildren.like gift) reply 'Thank you!'
                }
            }
        }
    }
}
<p>

Continuations延续:

状态跨越多个Actor,需要在他们之间延续事务一致性,这称为Continuations模型,因为JVM不直接支持Continuations,必须在Actor框架中进行模拟。

react 方法是一个线程环节中的最后执行方法,一旦到期执行完毕,actor将会把线程归还给系统,同时,react方法是一种下一个消息到达需要执行的代码(闭包),那么在react将线程归还给系统之间,通过这个闭包将状态传递给下一个消息,实现类似延续风格的设计。

为了让actor归还线程,在react方法中写代码:

def myActor = Actors.actor {
    loop {
        react {msg1 ->
            ...
            react {msg2 ->
                ...
            }
            // Never reached
        }
        // Never reached
    }
    // Never reached
}
<p>

loop循环把状态都遍历出来用来传递给下一个消息。

总之,GPars是遵循消息范式(message-passing paradigms)中的Communicating Sequential Processes (CSP)和 dataflow,能提供Actors如下功能:

Agents代理

Dataflow concurrency数据流并发

Communicating Sequential Processes CSP

Parallel collections并行收集

Fork/Join capabilitiesFork/Join能力

Composable asynchronous functions组合式异步

JVM Concurrency and Actors with GPars

[该贴被banq于2011-04-27 15:56修改过]

[该贴被banq于2011-04-27 15:56修改过]

                   

1
px96004
2014-01-02 13:41

求BANQ大帮忙解决个问题。

我想用GPars actor来进行广播怎么办?

或者说我想遍历一下所有actor怎么办好。

我又不想每次new actor时放进一个集合里面存起来。

banq
2014-01-02 19:50

2014-01-02 13:41 "@px96004"的内容
GPars actor来进行广播怎么办 ...

中间引入一个RabbitMQ或ZeroMQ 或Vert.x等消息事件总线就能实现1:N的广播。