大数据主题

Storm - 大数据Big Data实时处理框架

上页

  现在开始分析Storm内部架构,首先看看Work之间的消息传递,如下图,

Work之间的通讯是通过ZeroMQ,但是Yahoo后来发现使用异步的Netty能够提升Storm一倍性能,数据使用Kryo进行序列化,本地通讯使用Lmax的Disruptor ,内部无需序列化。

   messages

 

容错性

如下图,executor发送心跳到Zookeeper,Supervisor从本地文件读取所在服务器的worker心跳状态,然后同步分配发送到zooKeeper。Nimbus监控集群状态。这样能确保worker一直活着。

rc

如果某个节点也就是服务器没有心跳,Nimbus将重新分配新的服务器上线工作。

如果某个节点服务器中work没有心跳,那么Supervisor将负责重启线程。

如果某个Supervisor完蛋,整个处理正常,但是分配的同步工作就无法进行了。

如果Nimbus崩溃,整个系统可以运行,但是拓扑分配工作无法进行了。

 

可靠性:确保消息被处理

public class RandomSentenceSpout extends BaseRichSpout {
  public void nextTuple() {
     UUID msgId = getMsgId();//用消息ID发送消息
    collector.emit(new Values(tweet), msgId);
  }
  public void ack(Object msgId) {
    // Do something with acked message id. 确认消息ID
  }
  public void fail(Object msgId) {
     // Do something with failed message id. 消息ID失败了
  }
}


public class SplitSentenceBolt extends BaseRichBolt {
  public void execute(Tuple input) {
    for (String s : input.getString(0).split("\\s")) {
      collector.emit(input, new Values(s));
    }
    //当整个词语都被切分后,确认输入的事件已经被接受处理。
    collector.ack(input);
  }
}

下面是一个Ack确认流程,注意到Acker Implicit bolt。

对于一个树形结构Tuple流,也就是Tuple里面嵌套Tuple。

如果事件被下一个节点成功接受和处理,这个节点将更新相应初始事件的签名,通过异或操作,将输入事件的ID和所有基于该输入事件产生的所有事件的ID进行异或操作,如下图,事件 01111 产生子事件 01100, 10010, 和 00010, 这样事件 01111的签名是11100 (= 01111 (initial value) xor 01111 xor 01100 xor 10010 xor 00010).

当Ack值变成0,Acker implicit bolt就知道tuple树形数据集合全部被处理完成,一个事务确保可靠结束。例如语句分成一个个单词全部完成。

更详细见:In-Stream大数据处理模式

Storm的集群设置

设置ZooKeeper cluster
(1)安装Storm依赖的库包到服务器上:
- ZeroMQ 2.1.7 and JZMQ
- Java 6 and Python 2.6.6
- unzip
(2)下载解压Storm。
填写强制性配置到storm.yaml
用storm脚本启动守护流程的监督

通过Web界面能够观察管理拓扑网络情况和组件情况。

 

在Windows上安装Storm

Hadoop打数据批处理架构

Storm专题

大数据专题