纽约证券交易所的实时Map/Reduce大数据分析

Streaming Map/Reduce on Wall Street | Concurrent M

我们在纽约证券交易所NYSE建立的事件处理系统到现在已经超过20年了。 在过去的十年中,我们侧重于复杂事件处理(CEP),在2005年首次建立了第一个CEP引擎,并随后与许多顶级供应商及产品一起工作。

当我开始使用Darkstar时,大部分这些早于2009年的产品的最大问题是:CEP引擎无法伸缩扩展。 我想突破这种限制 - 我想可以通过整个互联网实时过筛。 我的案例是为证券进行综合审计跟踪,这是实时处理,整天都需要。

我想建立一个系统可以每秒处理数百万个事件/消息,扫描那些数据流模式,将数据保存,并立即将数据可用于后续的查询。 那个时候在市场上并没有这样的事物。 大多数人说,这不可能做到。 我需要做的是愿意相信我和我的团队以及业务伙伴。

纽交所技术想提供交易后的监控系统,但不想为每个客户端组建独立的机器/系统。 他们想节约支出,提高收入增长 - 似乎是建立一个集群解决方案,然后只需要增加硬件规模的绝佳机会。 其他许多系统的问题是,为了扩大可扩展伸缩,他们都要重新设计架构。 这是我想避免的。

答案就在混合CEP和Hadoop。 我们需要一种方法来分发请求到群集,然后重新组装结果 - 似乎是一个完美的map / reduce用例。

该系统从2010年开始运行,Darkstar暗星在Mahwah部署到生产环境...

应该指出在下面图中的一个变化 - 我们不再使用MySQL作为存储库。 我们把数据变成一个分布式列族的Cassandra。

..

概述
Darkstar是一种通过云事件处理设计的多用途的高度分布式处理平台,体系结构是基于云的原则,这意味着它被设计成在一个高度分布式环境中执行,以在该系统中可以包括的节点的数量没有实际限制。 Darkstar™不是只是应用程序,它是一个平台,它提供:开发应用程序无需了解底层架构的分布式特性。 Darkstar暗星™提供了一个潜在的事件处理引擎,动态地接受由Darkstar暗星™应用程序注入的查询和报表,并结合基于进入系统的事件和查询注入到系统中。

Darkstar暗星™是专为在多个物理和/或虚拟节点上运行设计,使用的消息传递协议进行通信。 这种方法允许Darkstar暗星™灵活地对进入系统的事件流实现动态即时反应。 事件处理引擎的工作量会在集群中的节点之间分配,让暗星™有效无限的可扩展性。

暗星™使用RabbitMQ作为协议对输入查询动态注入到下层CEP引擎,用消息发送回应用程序查询的结果集。

DataStar使用下面产品:
Cassandra是,其目的是在集群环境中运行一个分布式数据库。 由暗星™集群收到的消息被写入Cassandra并可能在以后的时间观看。

RabbitMQ的 -是一个开源消息代理软件,该软件使用AMQP标准。 该RabbitMQ服务器是用Erlang编写的,是建立在开放电信平台架构,集群和故障转移。 暗星™使用RabbitMQ消息代理软件实现应用程序之间的通信。

MySQL的 -暗星™使用MySQL来存储接收到消息的元数据。 暗星™使用此信息来创建可通过注射查询到暗星™支持的各种信息流。 由这个元数据定义消息流是允许暗星™灵活简单地支持许多不同的消息类型或新的消息类型定义的存储。

如下图:

[该贴被banq于2013-11-25 07:02修改过]
[该贴被admin于2013-11-25 09:12修改过]


暗星™客户端API允许客户端应用程序将集群作为一个整体传入的消息。

假如是一个6节点集群,传入消息将按字母顺序分配,节点1可能处理传入消息的A - D,节点2可能处理它传入的消息E - H,节点3可能处理收到的消息I - M,节点4可能处理传入的消息N - Q,节点5可能处理传入的消息R - U,节点6处理传入的消息V - Z。

所有节点都互相不断的沟通,每个节点都知道其它节点处理的范围。 正因为如此,如果任何一个节点出现故障,集群就会意识到了这个故障,分区会自动在集群中的其余节点之间重新平衡。


为了让暗星™来处理大量的数据需要采集数据的方式。 数据进入集群中的两种方式之一:

1.暗星™应用程序
2.暗星™OnRamps OnRamps是获得来自外部源的数据的方式

所有OnRamps与它们注入数据到集群的方式类似,当OnRamps接收到消息,遍历该消息所有字段,将字段名为Key,字段值作为Value,把每个字段转为Map。

一旦所有字段转为Map以后,这个Map就转变为JSON对象,并通过暗星™客户端API,使用基于RPC的协议发送到暗星™集群。一旦
被暗星™集群收到,被添加到事件流中,可以被客户端应用程序查询。

当暗星™启动时,基于配置文件中指定的IP地址和端口启动一个RPC连接。 此连接是为客户端(例如OnRamps)向暗星™API请求服务所用。

前面说过,Map转为Json对象,被集群接受后,开始解析这些字段,根据字段中指定的值进行分区,基于此字段中的值其相应的消息被发送到相应的节点。

一旦事件消息被正确的节点接受,该消息被注入到消息中指定的事件流。 然后由指定事件处理引擎来处理。

[该贴被admin于2013-11-25 09:19修改过]


事件处理引擎用来它接受外部事件并执行其交易。 当暗星™启动时,它连接到一个MySQL数据库, 从该数据库中它加载各个事件流的定义到系统中,然后动态地创建这些事件流。

当新的事件类型被添加到系统中,只需要将这些事件类型的定义添加到MySql数据库中,暗星™会动态将这些事件类型自动添加到系统中。

一个事件类型加入到系统,一个新的事件流就被加入到事件流引擎中。

客户端的查询是使用事件处理语言(EPL)编写的,会被注入到事件处理引擎执行。这种EPL无需编写复杂的代码就能实现过滤和提纯等特定模式的数据。允许用户指定哪些事件流或他们感兴趣的流查询,过滤掉任何不必要的噪音,允许客户端应用程序聚集在他们感兴趣的活动,如事件比较执行操作。查询被RabbitMQ的传输协议广播到集群中的所有节点。

还支持“命名查询”,某个时间内建立的查询可以被其他查询中引用,从而允许对复杂数据挖掘查询的累积。


使用Cassandra持久保存事件,启动时暗星™试图连接配置文件中指定的Cassandra数据库。如果它不能连接的时候,维护一个后台线程将继续尝试连接到Cassandra。 一旦事件进入则群集暗星™该事件写入到Cassandra中相应的事件表。

..
DTO的默认行为是将传入的事件消息持久到Cassandra数据库。
..

总结
暗星™采用了创新的基于云的方法来处理大型数据集分析。 这种架构允许几乎无限的扩展和可扩展性,提供暗星™受雇为解决几乎任何问题集的灵活性,使它成为有关处理和分析大数据的一个可行的解决方案。