Akka的产品化应用经验分享

banq 13-11-12
                   

ScalaStorm的作者EvanChan在其Akka in Production: Our Story中分享了自己公司Ooyala 使用Akka的经验:

Ooyala公司提供每月超过1亿起视频和每天2亿分析事件,25%的美国在线观众观看以Ooyala驱动视频。

原来Ooyala是一个以Ruby为主的公司,但经验教训是:
• Lesson - 不要在大数据上使用Ruby

使用了Scala以后,性能得到一两倍的提升,至今使用 Scala, Akka with Storm, Spark, MR,Cassandra, 等所有大数据产品线。

每天2亿个事件,如下图架构:




使用Akka的Actor作为文件操纵者:




他们结合Akka和Storm的应用,在一个Storm的bolt中用两个Actor互相通信,发现Storm太复杂,难以找到问题。




待续


[该贴被banq于2013-11-12 14:09修改过]

                   

1
banq
2013-11-12 16:08

使用基于Akka集群pipeline的教训:

• 比较复杂。
• Akka 集群在2.1还无法用在生产现场(新的2.2.x version稳定了)
• 对于HTTP 请求使用 actors 和 futures 难以维护拓展
• Actors相比Storm,对于大多数程序易于理解。

重构如下:




•Kafka用来分区消息
•单处理过程,很简单
•没有数据分发。
• 线性actor pipeline -易于理解

他们还针对Actor使用了Scala 的traits,类似AspectJ的AOP的拦截器,用来实现监控,记录,度量,跟踪代码面向方面编程要比Spring优雅得多。

比如原来Actor代码如下:

class someActor extends Actor {
def wrappedReceive: Receive = {
case x => blah
}
def receive = {
case x =>
println(“Do something before...”)
wrappedReceive(x)
println(“Do something after...”)
}
}

这个someActor中有一个wrappedReceive,对接受消息进行处理,不同处理需要不同动作,比如对消息进行记录等,经常需要修改,破坏封装,使用trait则无需这行代码。

stacktrait代码:

trait ActorStack extends Actor {
/** Actor classes should implement this partialFunction for standard
* actor message handling
*/

def wrappedReceive: Receive

/** Stackable traits should override and call super.receive(x) for
* stacking functionality
*/

def receive: Receive = {
case x => if (wrappedReceive.isDefinedAt(x)) wrappedReceive(x) else unhandled(x)
}


做了两个实现Instrument1和Instrument2:

trait Instrument1 extends ActorStack {
override def receive: Receive = {
case x =>
println("Do something before...")
super.receive(x)
println(
"Do something after...")
}

Instrument2类似Instrument1

Actor和trait混合如下:

class DummyActor extends Actor with Instrument1 with Instrument2 {
def wrappedReceive = {
case "something" => println("Got something")
case x => println(
"Got something else: " + x)
}
}


保证Actor非常干净。
输出结果:
Antes...
Do something before...
Got something
Do something after...
Despues..



[该贴被banq于2013-11-12 16:11修改过]

banq
2013-11-12 16:24

技术stack如下:
• Spray - high performance HTTP
• SLF4J / Logback
• Yammer Metrics
• spray-json
• Akka 2.x
• Scala 2.9 / 2.10

分布式系统最重要的是一切可显,否则出了问题无法定位。

配合Akka使用日志:
易于设置
•加入 Logback jar包
• 在 application.conf配置:
event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]

• 使用定制的日志trait, 不用在 Actor实现日志代码
• 将Actor 路径加入你的日志方法:
• org.slf4j.MDC.put(“actorPath”, self.path.toString)


trait Slf4jLogging extends Actor with ActorStack {
val logger = LoggerFactory.getLogger(getClass)
private[this] val myPath = self.path.toString

logger.info("Starting actor " + getClass.getName)

override def receive: Receive = {
case x =>
org.slf4j.MDC.put(
"akkaSource", myPath)
super.receive(x)
}

banq
2013-11-12 16:30

定义一个对每个actor加入两次度量
1.消息的频率处理 (1min, 5min, 15min moving averages)
2.接受模块花费时间

• 所有的度量都通过Spray route /metricz对外暴露
• 后台运行polls /metricz 将发送到度量服务


trait ActorMetrics extends ActorStack {
// Timer includes a histogram of wrappedReceive() duration as well as moving avg of rate
of invocation
val metricReceiveTimer = Metrics.newTimer(getClass,
"message-handler",TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
override def receive: Receive = {
case x =>
val context = metricReceiveTimer.time()
try {
super.receive(x)
} finally {
context.stop()
}
}

得到Actor性能处理如下图:



banq
2013-11-12 16:46

缺省Actor的邮箱大小是没有限制
•但是用一个固定大小的邮箱
• 当邮箱满了,发送消息到DeadLetters
• mailbox-push-timeout-time: 当邮箱满了要等待多长时间?
• 对于分布式Akka系统无用。

真正流程控制:拉 pull, 带确认的推
• 分布式也能行,但是需要付出劳动。

一个流程控制系统能使Actor组件的处理率(每秒处理多少消息)达成一致。

这段流程控制具体如何实施从原文无法详细得到,其控制代码:

trait TrakkarExtractor extends TrakkarBase with ActorStack {
import TrakkarUtils._
val messageIdExtractor: MessageIdExtractor = randomExtractor

override def receive: Receive = {
case x =>
lastMsgId = (messageIdExtractor orElse randomExtractor)(x)
Collector.sendEdge(sender, self, lastMsgId, x)
super.receive(x)
}
}


Trait 发送一个Edge(source, dest, messageInfo) 到本地Collector actor,这个聚合的edges是跨节点拓扑的。

最后他总结了akka开发经验:
不要把东西放入 Actor 构造器,如果出现这种情况,注意:
• 缺省的 supervision 策略会停止一个没有初始化的Actor,
• 用一个初始化消息替代在构造器中初始化。
把这个消息和Actor放在一起,一个命名空间最好。

最后实现一个Actor的日志 性能度量 和流程跟踪的代码:

trait InstrumentedActor extends Slf4jLogging with ActorMetrics with TrakkarExtractor

object MyWorkerActor {
case object Initialize
case class DoSomeWork(desc: String)
}

class MyWorkerActor extends InstrumentedActor {
def wrappedReceive = {
case Initialize =>
case DoSomeWork(desc) =>
}

2Go 1 2 下一页