发帖    主题    评论    推荐    标签    作者    订阅    查搜    注册   登陆   关注
 
面向对象 设计模式 领域驱动设计 企业架构 框架 开发教程 微服务 CQRS 扩展性 并发编程 事件溯源 分布式 SOA

从Akka Actor 创建一个RxJava的Observable

                   
2013-10-24 09:47
赞助商链接

Creating an RxJava Observable from an Akka Actor -一文谈了如何结合Akka和Rxjava这两个reactive框架使用,由Actor接受到的消息转变为RxJava的可观察Observable。

代码如下:


import akka.actor._
import rx.lang.scala._

sealed trait Message
case class Hello(text: String) extends Message

sealed trait SubUnsub extends Message
case class Subscribe(onNext: Message => Unit) extends SubUnsub
case object Unsubscribe extends SubUnsub

//测试客户端
object Main extends App {
val system = ActorSystem(
"client")
val receiver = system actorOf (Props[ObservableActor],
"rcv")

val subscription =
observableFromActor(receiver)
.take(3)
.subscribe(msg => println(s
"received: $msg"))

Seq(
"hello", "world", "again", "not anymore") foreach { msg =>
receiver ! Hello(msg)
}
subscription.unsubscribe
system.shutdown

//定义一个来自Actor的可观察者Observable
def observableFromActor(actor: ActorRef): Observable[Message] =
Observable { observer =>
actor ! Subscribe(observer onNext)
new Subscription {
override def unsubscribe: Unit = actor ! Unsubscribe
}
}
}

//akka的Actor 用来接收消息,这个消息将变成RxJava的Observable
class ObservableActor extends Actor with ActorLogging {
def receive = {
case Subscribe(onNext) =>
log debug
"subscribe"
context become subscribed(onNext)
}

def subscribed(onNext: Message => Unit): Actor.Receive = {
case Unsubscribe =>
log debug
"unsubscribe"
context become receive

case message: Message =>
log debug s
"incoming: $message"
onNext(message)
}
}

Reactive编程      akka      scala      Rxjava     

赞助商链接

赞助商链接

返回顶部

移动版 关于本站 使用帮助 联系反馈 最佳分辨率1366x768
OpenSource JIVEJDON Powered by JdonFramework Code © 2002-20 jdon.com