从Akka Actor 创建一个RxJava的Observable

Creating an RxJava Observable from an Akka Actor -一文谈了如何结合Akka和Rxjava这两个reactive框架使用,由Actor接受到的消息转变为RxJava的可观察Observable。

代码如下:


import akka.actor._
import rx.lang.scala._

sealed trait Message
case class Hello(text: String) extends Message

sealed trait SubUnsub extends Message
case class Subscribe(onNext: Message => Unit) extends SubUnsub
case object Unsubscribe extends SubUnsub

//测试客户端
object Main extends App {
val system = ActorSystem(
"client")
val receiver = system actorOf (Props[ObservableActor],
"rcv")

val subscription =
observableFromActor(receiver)
.take(3)
.subscribe(msg => println(s
"received: $msg"))

Seq(
"hello", "world", "again", "not anymore") foreach { msg =>
receiver ! Hello(msg)
}
subscription.unsubscribe
system.shutdown

//定义一个来自Actor的可观察者Observable
def observableFromActor(actor: ActorRef): Observable[Message] =
Observable { observer =>
actor ! Subscribe(observer onNext)
new Subscription {
override def unsubscribe: Unit = actor ! Unsubscribe
}
}
}

//akka的Actor 用来接收消息,这个消息将变成RxJava的Observable
class ObservableActor extends Actor with ActorLogging {
def receive = {
case Subscribe(onNext) =>
log debug
"subscribe"
context become subscribed(onNext)
}

def subscribed(onNext: Message => Unit): Actor.Receive = {
case Unsubscribe =>
log debug
"unsubscribe"
context become receive

case message: Message =>
log debug s
"incoming: $message"
onNext(message)
}
}