从Akka Actor 创建一个RxJava的Observable

13-10-24 banq
Creating an RxJava Observable from an Akka Actor -一文谈了如何结合Akka和Rxjava这两个reactive框架使用,由Actor接受到的消息转变为RxJava的可观察Observable。

代码如下:

import akka.actor._
import rx.lang.scala._

sealed trait Message
case class Hello(text: String) extends Message

sealed trait SubUnsub extends Message
case class Subscribe(onNext: Message => Unit) extends SubUnsub
case object Unsubscribe extends SubUnsub

//测试客户端
object Main extends App {
  val system = ActorSystem("client")
  val receiver = system actorOf (Props[ObservableActor], "rcv")

  val subscription =
    observableFromActor(receiver)
      .take(3)
      .subscribe(msg => println(s"received: $msg"))

  Seq("hello", "world", "again", "not anymore") foreach { msg =>
    receiver ! Hello(msg)
  }
  subscription.unsubscribe
  system.shutdown

  //定义一个来自Actor的可观察者Observable
  def observableFromActor(actor: ActorRef): Observable[Message] =
    Observable { observer =>
      actor ! Subscribe(observer onNext)
      new Subscription {
        override def unsubscribe: Unit = actor ! Unsubscribe
      }
    }
}

//akka的Actor 用来接收消息,这个消息将变成RxJava的Observable
class ObservableActor extends Actor with ActorLogging {
  def receive = {
    case Subscribe(onNext) =>
      log debug "subscribe"
      context become subscribed(onNext)
  }

  def subscribed(onNext: Message => Unit): Actor.Receive = {
    case Unsubscribe =>
      log debug "unsubscribe"
      context become receive

    case message: Message =>
      log debug s"incoming: $message"
      onNext(message)
  }
}
<p class="indent">