Twitter基于时间流的聚合设计

谷歌在其google I/O的大会上发布了使用Pipeline统一了大数据批处理和流处理,Twitter在其博客也发布了类似平台:基于时间流的聚合设计

Twitter是一个实时处理数十亿事件的交流平台, 实时聚合这些事件是一个非常大的挑战,经典的时间流(time-series)应用包括站点流量 服务健康性 用户活动监控。Twitter的经验显示很难建立一个真正健壮的聚合服务,扩展和维护演进它们更加难,这些时间流的应用大部分架构相同,只是在数据模型上有轻微不同。

Twitter引入了Summingbird库,这是一种适合通用分布式计算的高级别抽象库包,能提供为解决复杂聚合问题提供灵活的实现。Twitter然后基于Summingbird建立了一种灵活 可重用/端到端架构,称为TSAR (时间序列/时间流聚合: TimeSeries AggregatoR).

下面以twitter的帖子Tweet浏览量计数为案例,说明如何通过Summingbird将大数据批处理和实时处理无缝结合在一起,实现PipleLine流式处理的方式。

帖子浏览计数没有多大问题,问题是,如果提供帖子计数的服务中断了,如何恢复失去的数据呢?因为帖子浏览计数对于Twiiter分析用户行为有着重要影响,可以认为是他们的关键事务性数据,如同金钱数据是银行系统关键数据一样,这里其实提出了数据可靠性和失败恢复的问题。

关键问题是如何协调数据Schema (数据结构,数据表结构)能让数据在各种计数场景下表现为一致呢?各种计数保存媒介有:浏览数作为日志数据保存(将被下游分析管道);保存在一个key/value的NOSQL存储库中(用于低延迟 高可用的数据); 保存在缓存中 (用于快速访问); 当然,也有保存在关系数据库中 (内部研究和数据质量监控).

如何设计出一个数据结构能够灵活应付上面各种场景应用保存介质?比如产品团队也许要为付钱的广告计数分离他们关心的主贴浏览数和转发的浏览数。也许还要根据国家 用户地区对浏览数进行分类,这些需求都是在第一次部署上线后不可预测的(关系数据库数据表结构是一开始就需要设计好,运行中无法随机改变结构的),还有如果数据结构变化了,如何五痛苦地更新旧的数据呢?(回填历史数据)。

TSAR就是定位解决这些问题,遵循如下一些设计原则:

1.混合计算Hybrid computation. 处理每个事件两次,一个是实时,还有一个是稍后时间在批处理任务中再处理一次。双处理过程由Summingbird协调指挥. 混合模型结合了批处理和实时流处理的优点。批处理优点是稳定且可重复,而流式处理优点是实时最新。

2.从事件聚合时分离事件的发射production. 第一个处理阶段是从源数据中释放获得事件,在帖子浏览数这个案例中, TSAR 从Web和移动客户端两个终端访问日志中获得帖子浏览事件。第二个阶段是事件发射阶段,TSAR能够实现聚合和预处理这些事件。

3.统一数据结构. 为TSAR服务的数据结构是以一种独立于数据存储的方式设计的,与存储介质无关, TSAR会映射数据结构到相应的存储库,当数据结构变化时进行数据转换。

4.集成服务工具. TSAR集成其他必要服务提供数据处理 数据仓库和数据查询能力。可观察可报警 自动配置,协调各个组件。

下面以代码为案例:

这里是一个TSAR服务对帖子进行计数,然后将已经计算后的聚合保存到存储库Manhattan中,这是一个Twitter 自己的key-value NoSQL数据库。


aggregate {
onKeys(
(TweetId)
) produce (
Count
) sinkTo (Manhattan)
} fromProducer {
ClientEventSource(“client_events”)
.filter { event => isImpressionEvent(event) }
.map { event =>
val impr = ImpressionAttributes(event.tweetId)
(event.timestamp, impr)
}
}

The TSAR job is broken into several code sections:

onKeys 段是定义一个或多个聚合模板— 也就是我们依据什么进行聚合,这是帖子ID(Tweet ID )
produce事件发射部分是告诉TSAR应该依据什么计算,这里我们需要产生每个帖子浏览总数的计数计算。
sinkTo(Manhattan) 是告诉TSAR将数据发送给Manhattan键值存储库存储。
fromProducer 是指定一些在聚合事件时预处理逻辑, TSAR 会执行这些代码对数据进行处理。

下面看看这种数据结构能否无缝应对需求变化,现在如果希望将根据客户端应用分离浏览数统计 (e.g., Twitter for iPhone, Android, etc.) ,这就要求我们演进这个工作逻辑适应新的需求, TSAR简化了这种演进:


aggregate {
onKeys(
(TweetId)
(TweetId, ClientApplicationId) // 增加一个客户应用ID,新的聚合尺度
) produce (
Count
) sinkTo (Manhattan)
}

既然我们的数据结构已经改变,那么历史数据是只有一个TweetId,如何变成现在的两个ID呢,使用回填方式:
$ tsar backfill --start=<start> --end=<end>
这是一种并行运行,能够将历史数据更新到新的数据结构下的数据。

此外,还可以进行其他各个方面的定制改动,比如记录数据的细粒度:
put(sink = Sink.Manhattan, width = 1 * Day)
Output(sink = Sink.Manhattan, width = Alltime) // new aggregation granularity
上面从一天记录一次改变到时刻记录,保存到数据库的频率增加了。也可以保存到不同数据库:
Output(sink = Sink.MySQL, width = Alltime) // 新的mySQL数据库

上面案例是对帖子总浏览数进行计数,我们另外可以计算出每个帖子不同的用户浏览数:


aggregate {
onKeys(
(TweetId),
(TweetId, ClientApplicationId)
) produce (
Count,
Unique(UserId) // new metric 新的计算方式
) sinkTo (Manhattan)
}

以Twitter经验来看,建设管理大数据流 data flow 需要建立整个分析管道pipeline,底层部分类似一个管道接另外一个管道,整个管道线布局如下图所示:



[该贴被banq于2014-06-30 10:55修改过]


LinkedIn对Google和Twitter这种混合实时流处理和批处理的Lambda架构提出了质疑:
质疑Lambda架构

Twitter这种函数化的聚合设计,而不是针对数据结构的聚合设计其实很有普遍性,可广泛应用于查询方面的设计,见:
http://www.jdon.com/46516