Creating an RxJava Observable from an Akka Actor -一文谈了如何结合Akka和Rxjava这两个reactive框架使用,由Actor接受到的消息转变为RxJava的可观察Observable。
代码如下:
import akka.actor._ import rx.lang.scala._
sealed trait Message case class Hello(text: String) extends Message
sealed trait SubUnsub extends Message case class Subscribe(onNext: Message => Unit) extends SubUnsub case object Unsubscribe extends SubUnsub
//测试客户端 object Main extends App { val system = ActorSystem("client") val receiver = system actorOf (Props[ObservableActor], "rcv")
val subscription = observableFromActor(receiver) .take(3) .subscribe(msg => println(s"received: $msg"))
Seq("hello", "world", "again", "not anymore") foreach { msg => receiver ! Hello(msg) } subscription.unsubscribe system.shutdown
//定义一个来自Actor的可观察者Observable def observableFromActor(actor: ActorRef): Observable[Message] = Observable { observer => actor ! Subscribe(observer onNext) new Subscription { override def unsubscribe: Unit = actor ! Unsubscribe } } }
//akka的Actor 用来接收消息,这个消息将变成RxJava的Observable class ObservableActor extends Actor with ActorLogging { def receive = { case Subscribe(onNext) => log debug "subscribe" context become subscribed(onNext) }
def subscribed(onNext: Message => Unit): Actor.Receive = { case Unsubscribe => log debug "unsubscribe" context become receive
case message: Message => log debug s"incoming: $message" onNext(message) } }
|