Akka的产品化应用经验分享

13-11-12 banq
                   

ScalaStorm的作者EvanChan在其Akka in Production: Our Story中分享了自己公司Ooyala 使用Akka的经验:

Ooyala公司提供每月超过1亿起视频和每天2亿分析事件,25%的美国在线观众观看以Ooyala驱动视频。

原来Ooyala是一个以Ruby为主的公司,但经验教训是:

• Lesson - 不要在大数据上使用Ruby

使用了Scala以后,性能得到一两倍的提升,至今使用 Scala, Akka with Storm, Spark, MR,Cassandra, 等所有大数据产品线。

每天2亿个事件,如下图架构:


使用Akka的Actor作为文件操纵者:


他们结合Akka和Storm的应用,在一个Storm的bolt中用两个Actor互相通信,发现Storm太复杂,难以找到问题。


待续

[该贴被banq于2013-11-12 14:09修改过]

                   

1
banq
2013-11-12 16:08

使用基于Akka集群pipeline的教训:

• 比较复杂。

• Akka 集群在2.1还无法用在生产现场(新的2.2.x version稳定了)

• 对于HTTP 请求使用 actors 和 futures 难以维护拓展

• Actors相比Storm,对于大多数程序易于理解。

重构如下:


•Kafka用来分区消息

•单处理过程,很简单

•没有数据分发。

• 线性actor pipeline -易于理解

他们还针对Actor使用了Scala 的traits,类似AspectJ的AOP的拦截器,用来实现监控,记录,度量,跟踪代码面向方面编程要比Spring优雅得多。

比如原来Actor代码如下:

class someActor extends Actor {
    def wrappedReceive: Receive = {
            case x => blah
    }
   def receive = {
            case x =>
                println(“Do something before...”)
                wrappedReceive(x)
                println(“Do something after...”)
        }
}
<p>

这个someActor中有一个wrappedReceive,对接受消息进行处理,不同处理需要不同动作,比如对消息进行记录等,经常需要修改,破坏封装,使用trait则无需这行代码。

stacktrait代码:

trait ActorStack extends Actor {
/** Actor classes should implement this partialFunction for standard
* actor message handling
*/
       def wrappedReceive: Receive

/** Stackable traits should override and call super.receive(x) for
* stacking functionality
*/
       def receive: Receive = {
case x => if (wrappedReceive.isDefinedAt(x)) wrappedReceive(x) else unhandled(x)
}
<p>

做了两个实现Instrument1和Instrument2:

trait Instrument1 extends ActorStack {
       override def receive: Receive = {
              case x =>
                     println("Do something before...")
                     super.receive(x)
                     println("Do something after...")
       }
<p>

Instrument2类似Instrument1

Actor和trait混合如下:

class DummyActor extends Actor with Instrument1 with Instrument2 {
      def wrappedReceive = {
            case "something" => println("Got something")
            case x => println("Got something else: " + x)
      }
}
<p>

保证Actor非常干净。

输出结果:

Antes...

Do something before...

Got something

Do something after...

Despues..

[该贴被banq于2013-11-12 16:11修改过]

banq
2013-11-12 16:24

技术stack如下:

• Spray - high performance HTTP

• SLF4J / Logback

• Yammer Metrics

• spray-json

• Akka 2.x

• Scala 2.9 / 2.10

分布式系统最重要的是一切可显,否则出了问题无法定位。

配合Akka使用日志:

易于设置

•加入 Logback jar包

• 在 application.conf配置:

event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]

• 使用定制的日志trait, 不用在 Actor实现日志代码

• 将Actor 路径加入你的日志方法:

• org.slf4j.MDC.put(“actorPath”, self.path.toString)

trait Slf4jLogging extends Actor with ActorStack {
    val logger = LoggerFactory.getLogger(getClass)
    private[this] val myPath = self.path.toString

    logger.info("Starting actor " + getClass.getName)

    override def receive: Receive = {
          case x =>
              org.slf4j.MDC.put("akkaSource", myPath)
              super.receive(x)
}
<p>

banq
2013-11-12 16:30

定义一个对每个actor加入两次度量

1.消息的频率处理 (1min, 5min, 15min moving averages)

2.接受模块花费时间

• 所有的度量都通过Spray route /metricz对外暴露

• 后台运行polls /metricz 将发送到度量服务

trait ActorMetrics extends ActorStack {
   // Timer includes a histogram of wrappedReceive() duration as well as moving avg of rate
of invocation
     val metricReceiveTimer = Metrics.newTimer(getClass, "message-handler",TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
     override def receive: Receive = {
            case x =>
              val context = metricReceiveTimer.time()
              try {
                  super.receive(x)
              } finally {
                   context.stop()
       }
}

得到Actor性能处理如下图:

<p>


banq
2013-11-12 16:46

缺省Actor的邮箱大小是没有限制

•但是用一个固定大小的邮箱

• 当邮箱满了,发送消息到DeadLetters

• mailbox-push-timeout-time: 当邮箱满了要等待多长时间?

• 对于分布式Akka系统无用。

真正流程控制:拉 pull, 带确认的推

• 分布式也能行,但是需要付出劳动。

一个流程控制系统能使Actor组件的处理率(每秒处理多少消息)达成一致。

这段流程控制具体如何实施从原文无法详细得到,其控制代码:

trait TrakkarExtractor extends TrakkarBase with ActorStack {
    import TrakkarUtils._
    val messageIdExtractor: MessageIdExtractor = randomExtractor
   
    override def receive: Receive = {
         case x =>
             lastMsgId = (messageIdExtractor orElse randomExtractor)(x)
            Collector.sendEdge(sender, self, lastMsgId, x)
            super.receive(x)
   }
}
<p>

Trait 发送一个Edge(source, dest, messageInfo) 到本地Collector actor,这个聚合的edges是跨节点拓扑的。

最后他总结了akka开发经验:

不要把东西放入 Actor 构造器,如果出现这种情况,注意:

• 缺省的 supervision 策略会停止一个没有初始化的Actor,

• 用一个初始化消息替代在构造器中初始化。

把这个消息和Actor放在一起,一个命名空间最好。

最后实现一个Actor的日志 性能度量 和流程跟踪的代码:

trait InstrumentedActor extends Slf4jLogging with ActorMetrics with TrakkarExtractor

object MyWorkerActor {
            case object Initialize
            case class DoSomeWork(desc: String)
}

class MyWorkerActor extends InstrumentedActor {
    def wrappedReceive = {
            case Initialize =>
            case DoSomeWork(desc) =>
}
<p>

2Go 1 2 下一页