import akka.actor._
import rx.lang.scala._
sealed trait Message
case class Hello(text: String) extends Message
sealed trait SubUnsub extends Message
case class Subscribe(onNext: Message => Unit) extends SubUnsub
case object Unsubscribe extends SubUnsub
//测试客户端
object Main extends App {
val system = ActorSystem("client")
val receiver = system actorOf (Props[ObservableActor], "rcv")
val subscription =
observableFromActor(receiver)
.take(3)
.subscribe(msg => println(s"received: $msg"))
Seq("hello", "world", "again", "not anymore") foreach { msg =>
receiver ! Hello(msg)
}
subscription.unsubscribe
system.shutdown
//定义一个来自Actor的可观察者Observable
def observableFromActor(actor: ActorRef): Observable[Message] =
Observable { observer =>
actor ! Subscribe(observer onNext)
new Subscription {
override def unsubscribe: Unit = actor ! Unsubscribe
}
}
}
//akka的Actor 用来接收消息,这个消息将变成RxJava的Observable
class ObservableActor extends Actor with ActorLogging {
def receive = {
case Subscribe(onNext) =>
log debug "subscribe"
context become subscribed(onNext)
}
def subscribed(onNext: Message => Unit): Actor.Receive = {
case Unsubscribe =>
log debug "unsubscribe"
context become receive
case message: Message =>
log debug s"incoming: $message"
onNext(message)
}
}
<p class="indent">
|