Storm是Twitter开源的分布式实时大数据处理框架,被业界称为实时版Hadoop。随着越来越多的场景对Hadoop的MapReduce高延迟无法容忍,比如网站统计、推荐系统、预警系统、金融系统(高频交易、股票)等等,
目录
Storm磊
1、DAG:有向无循环图2、Storm的特性3、Storm物理架构4、Storm的数据分发策略
简单案例 5、Storm的通信机制
① Worker进程间通信原理② Worker进程内通信原理③ Storm的容错机制④ 任务级容错⑤ 消息容完整性
1、DAG:有向无循环图
它由有限个顶点和有向边组成,每条有向边都从一个顶点指向另一个顶点;从任意一个顶点出发都不能通过这些有向边回到原来的顶点。有向无环图就是从一个图中的任何一点出发,不管走过多少个分叉路口,都没有回到原来这个点的可能性。
2、Storm的特性
1.适用场景广泛: storm可以实时处理消息和更新DB,对一个数据量进行持续的查询并返回客户端(持续计算) 对一个耗资源的查询作实时并行化的处理(分布式方法调用,即DRPC),storm的这些基础API可以满足大量的场景。 2、可伸缩性高: Storm的可伸缩性可以让storm每秒可以处理的消息量达到很高。 Storm使用ZooKeeper来协调集群内的各种配置使得Storm的集群可以很容易的扩展。 3、保证无数据丢失: 实时系统必须保证所有的数据被成功的处理。storm保证每一条消息都会被处理。 4、异常健壮: storm集群非常容易管理,轮流重启节点不影响应用。 5、容错性好: 在消息处理过程中出现异常, storm会进行重试 6、语言无关性: Storm的topology和消息处理组件(Bolt)可以用任何语言来定义, 这一点使得任何人都可以使用storm.
3、Storm物理架构
1、nimbus: Storm的Master,负责资源分配和任务调度,一个Storm集群只有一个Nimbus,此节点是一个无状态节点,所有的一切都存储再Zookeeper 2、supervisor: Storm的Slave,负责接受Nimbus分配的任务管理所有的Worker,一个Supervisor节点包含多个Workers进程,默认4个,一般情况一个topology对应一个worker 3、workers: 工作进程(Process),每个工作进程中都有多个Task。 4、Task: 在 Storm 集群中每个 Spout 和 Bolt 都由若干个任务(tasks)来执行。 worker中每一个spout/bolt的线程称为一个task 同一个spout/bolt的task可能会共享一个物理线程(Thread),该线程称为executor 5、Storm的并行机制: Topology由一个或多个Spout/Bolt组件构成。运行中的Topology由一个或多个Supervisor节点的Worker构成 6、流式计算框架(计算框架) 客户端将数据发送给MQ(消息队列),然后传递到Storm中进行计算 最终计算的结果存储到数据库中(Hbase,Mysql) 客户端不要求服务器返回结果,客户端可以一直向Storm发送数据 客户端相当于生产者,Storm相当于消费者
4、Storm的数据分发策略
---> 1、ShuffleGrouping 随机分组,随机派发stream里面的tuple,保证每个bolttask接收到的tuple数目大致相同。轮询,平均分配 优点: 为tuple选择task的代价小; bolt的tasks之间的负载比较均衡; 缺点: 上下游components之间的逻辑组织关系不明显; ---> 2、FieldsGrouping 按字段分组 比如,按"user-id"这个字段来分组,那么具有同样"user-id"的tuple会被分到相同的Bolt里的一个task,而不同的"user-id"则可能会被分配到不同的task。 优点: 上下游components之间的逻辑组织关系显著; 缺点: 付出为tuple选择task的代价; bolt的tasks之间的负载可能不均衡,根据field字段而定; ---> 3、AllGrouping 广播发送,对于每一个tuple,所有的bolts都会收到 优点: 上游事件可以通知下游bolt中所有task; 缺点: tuple消息冗余,对性能有损耗,请谨慎使用; --- > 4、GlobalGrouping 全局分组,把tuple分配给taskid最低的task。 优点: 所有上游消息全部汇总,便于合并、统计等; 缺点: bolt的tasks之间的负载可能不均衡,id最小的task负载过重; --- > 5、DirectGrouping 指向型分组,这是一种比较特别的分组方法,用这种分组意味着消息(tuple)的发送者指定由消息接收者的哪个task处理这个消息。 只有被声明为DirectStream的消息流可以声明这种分组方法。 而且这种消息tuple必须使用emitDirect方法来发射。 消息处理者可以通过TopologyContext来获取处理它的消息的task的id(OutputCollector.emit方法也会返回task的id) 优点: Topology的可控性强,且组件的各task的负载可控; 缺点: 当实际负载与预估不符时性能削弱; ---> 6、Localorshufflegrouping 本地或随机分组。如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中,tuple将会被随机发送给这些同进程中的tasks。否则,和普通的ShuffleGrouping行为一致 优点: 相对于ShuffleGrouping,因优先选择同进程task间传输而降低tuple网络传输代价,但因寻找同进程的task而消耗CPU和内存资源,因此应视情况来确定选择ShuffleGrouping或LocalOrShuffleGrouping; 缺点: 上下游components之间的逻辑组织关系不明显; ---> 7、NoneGrouping 不分组,这个分组的意思是说stream不关心到底怎样分组。目前这种分组和Shufflegrouping是一样的效果。有一点不同的是storm会把使用nonegrouping的这个bolt放到这个bolt的订阅者同一个线程里面去执行(未来Storm如果可能的话会这样设计)。 8、customGrouping 自定义,相当于mapreduce那里自己去实现一个partition一样。
简单案例
测试小案例:
-- NumberTopology -- public class NumberTopology { public static void main(String[] args) { //创建Topology的构建器 TopologyBuilder topologyBuilder = new TopologyBuilder(); //开始构建整个流程(Spout) topologyBuilder.setSpout("numberspout", new NumberSpout()); //开始构建整个流程(Bolt) topologyBuilder.setBolt("numberBolt", new NumberBolt()).shuffleGrouping("numberspout"); //Bolt //topologyBuilder.setBolt("ownBolt",new OwnBolt()).shuffleGrouping("numberspout"); //启动Topology Config conf = new Config(); //创建一个topology StormTopology topology = topologyBuilder.createTopology(); //本地模式启动集群 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("numberTopology", conf, topology); } } -- NumberBolt -- public class NumberBolt extends baseBasicBolt { //声明一个统计器 private static int sum; @Override public void execute(Tuple input, BasicOutputCollector collector) { System.out.println("从上游获取的数据:" + input); System.out.println("从上游获取的数据为:" + input.getInteger(0) + "--" + input.getIntegerByField("num")); //开始统计 sum += input.getInteger(0); System.out.println("截止到本次,共获取数据和为:" + sum); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("num")); } } -- NumberSpout -- public class NumberSpout extends baseRichSpout { //声明一个SpoutOutputCollector对象,用于发送数据 private SpoutOutputCollector collector; //声明一个计数器 private int number; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { //获取初始化时的发送器 this.collector = collector; } @Override public void nextTuple() { //将数据发送下一个Bolt this.collector.emit(new Values(number++)); try { //限制传输速度 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("num")); } }
5、Storm的通信机制 ① Worker进程间通信原理
宏观 ① 每个worker进程都一个独立的接受线程和发送线程 ② 接收线程接受外部发送的消息到执行器(executor)的收集队列(incoming-queue)中 ③ 发送线程从worker的执行器发送队列(transfer-queue)中读取消息,通过网络发给其他worker
内部微观 ① worker中有多个执行器executor,每个执行器都有对应的接受进程和发送线程, ② worker接受线程将消息分配给指定taskid的executor的接受器中 ③ executor会有段单独的线程进行处理,最后结果outgoing-queue,达到阈值进行批量发送tuple(元组)
② Worker进程内通信原理
Disruptor是一个Queue队列。 Disruptor一种线程之间信息无锁的交换方式。 Disruptor主要特点: 1、 没有竞争=没有锁=非常快。 2、 所有访问者都记录自己的序号的实现方式,允许多个生产者与多个消费者共享相同的数据结 Disruptor 核心技术点: Disruptor可以看成一个事件监听或消息机制,在队列中一边生产者放入消息,另外一边消费者并行取出处理. 底层是单个数据结构:一个ring buffer(环形数据缓冲区)
③ Storm的容错机制
节点故障 Nimbus宕机 单点故障,1.0后便是高可用的了 非Nimbus宕机 故障时,所有任务超时,Nimbus将会将任务重新分配到别的服务器上 进程故障 1、Worker worker由Supervisor负责监控,一旦worker故障,会尝试重启,如果还是失败并且失去心跳机制,那么Nimbus将会把worker的任务分配给其他服务器 2、Supervisor 一旦遇到异常情况,直接自动毁灭,无状态(信息存放在zookeeper中) ① 快速失败:在遍历过程中进行增删改,抛出异常 多进程下不能发送修改 ② 安全失败:遍历的数据先复制一份,对复制的进行遍历并发修改,来保证安全
④ 任务级容错
1、Bolt 任务冲突crash引起消息未被应答 执行超时,会调用fail方法 2、acker任务失败 首先持有的消息超时失效,Spout的fail方法执行 3、Spout任务失败 此时消息的完整性交给外部设备比如MQ
⑤ 消息容完整性
每个从Spout(Storm中数据源点)发出的Tuple(Storm中最小的消息单元)可能会生成成千上万个新的Tuple 形成一颗Tuple树,当整颗Tuple树的节点都被成功处理了,我们就说从Spout发出的Tuple被完全处理了。 ACKER: acker的任务就是追踪从spout中流出来的每一个message id绑定的若干tuple的处理路径, 如果在用户设置的最大超时时间内这些tuple没有被完全处理,那么acker就会告知spout该消息处理失败了 相反则会告知spout该消息处理成功了。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)