Storm实时处理架构

Storm实时处理架构,第1张

Storm实时处理架构 Storm磊

Storm是Twitter开源的分布式实时大数据处理框架,被业界称为实时版Hadoop。随着越来越多的场景对Hadoop的MapReduce高延迟无法容忍,比如网站统计、推荐系统、预警系统、金融系统(高频交易、股票)等等,


目录

Storm磊

1、DAG:有向无循环图2、Storm的特性3、Storm物理架构4、Storm的数据分发策略

简单案例 5、Storm的通信机制

① Worker进程间通信原理② Worker进程内通信原理③ Storm的容错机制④ 任务级容错⑤ 消息容完整性


1、DAG:有向无循环图

它由有限个顶点和有向边组成,每条有向边都从一个顶点指向另一个顶点;从任意一个顶点出发都不能通过这些有向边回到原来的顶点。有向无环图就是从一个图中的任何一点出发,不管走过多少个分叉路口,都没有回到原来这个点的可能性。


2、Storm的特性
1.适用场景广泛:
	storm可以实时处理消息和更新DB,对一个数据量进行持续的查询并返回客户端(持续计算)
	对一个耗资源的查询作实时并行化的处理(分布式方法调用,即DRPC),storm的这些基础API可以满足大量的场景。
2、可伸缩性高: 
	Storm的可伸缩性可以让storm每秒可以处理的消息量达到很高。
	Storm使用ZooKeeper来协调集群内的各种配置使得Storm的集群可以很容易的扩展。
3、保证无数据丢失:
	实时系统必须保证所有的数据被成功的处理。storm保证每一条消息都会被处理。
4、异常健壮:
	storm集群非常容易管理,轮流重启节点不影响应用。
5、容错性好:
	在消息处理过程中出现异常, storm会进行重试
6、语言无关性:
	Storm的topology和消息处理组件(Bolt)可以用任何语言来定义, 这一点使得任何人都可以使用storm.

3、Storm物理架构

1、nimbus:
	Storm的Master,负责资源分配和任务调度,一个Storm集群只有一个Nimbus,此节点是一个无状态节点,所有的一切都存储再Zookeeper
		
2、supervisor:
	Storm的Slave,负责接受Nimbus分配的任务管理所有的Worker,一个Supervisor节点包含多个Workers进程,默认4个,一般情况一个topology对应一个worker
3、workers:
	工作进程(Process),每个工作进程中都有多个Task。
4、Task:
	在 Storm 集群中每个 Spout 和 Bolt 都由若干个任务(tasks)来执行。
	worker中每一个spout/bolt的线程称为一个task
	同一个spout/bolt的task可能会共享一个物理线程(Thread),该线程称为executor
5、Storm的并行机制:
	Topology由一个或多个Spout/Bolt组件构成。运行中的Topology由一个或多个Supervisor节点的Worker构成
6、流式计算框架(计算框架)
	客户端将数据发送给MQ(消息队列),然后传递到Storm中进行计算
	最终计算的结果存储到数据库中(Hbase,Mysql)
	客户端不要求服务器返回结果,客户端可以一直向Storm发送数据
	客户端相当于生产者,Storm相当于消费者

4、Storm的数据分发策略
--->
	1、ShuffleGrouping
		随机分组,随机派发stream里面的tuple,保证每个bolttask接收到的tuple数目大致相同。轮询,平均分配
			优点:
				为tuple选择task的代价小;
				bolt的tasks之间的负载比较均衡;
			缺点:
				上下游components之间的逻辑组织关系不明显;
--->
	2、FieldsGrouping
		按字段分组
		比如,按"user-id"这个字段来分组,那么具有同样"user-id"的tuple会被分到相同的Bolt里的一个task,而不同的"user-id"则可能会被分配到不同的task。
			优点:
				上下游components之间的逻辑组织关系显著;
			缺点:
				付出为tuple选择task的代价;
				bolt的tasks之间的负载可能不均衡,根据field字段而定;
--->
	3、AllGrouping
		广播发送,对于每一个tuple,所有的bolts都会收到
			优点:
				上游事件可以通知下游bolt中所有task;
			缺点:
				tuple消息冗余,对性能有损耗,请谨慎使用;
---	>		
	4、GlobalGrouping
		全局分组,把tuple分配给taskid最低的task。
			优点:
				所有上游消息全部汇总,便于合并、统计等;
			缺点:
				bolt的tasks之间的负载可能不均衡,id最小的task负载过重;
---	>		
	5、DirectGrouping
		指向型分组,这是一种比较特别的分组方法,用这种分组意味着消息(tuple)的发送者指定由消息接收者的哪个task处理这个消息。	
		只有被声明为DirectStream的消息流可以声明这种分组方法。
		而且这种消息tuple必须使用emitDirect方法来发射。
		消息处理者可以通过TopologyContext来获取处理它的消息的task的id(OutputCollector.emit方法也会返回task的id)
			优点:
				Topology的可控性强,且组件的各task的负载可控;
			缺点:
				当实际负载与预估不符时性能削弱;
--->
	6、Localorshufflegrouping
		本地或随机分组。如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中,tuple将会被随机发送给这些同进程中的tasks。否则,和普通的ShuffleGrouping行为一致
			优点:
				相对于ShuffleGrouping,因优先选择同进程task间传输而降低tuple网络传输代价,但因寻找同进程的task而消耗CPU和内存资源,因此应视情况来确定选择ShuffleGrouping或LocalOrShuffleGrouping;
			缺点:
				上下游components之间的逻辑组织关系不明显;
--->
	7、NoneGrouping
		不分组,这个分组的意思是说stream不关心到底怎样分组。目前这种分组和Shufflegrouping是一样的效果。有一点不同的是storm会把使用nonegrouping的这个bolt放到这个bolt的订阅者同一个线程里面去执行(未来Storm如果可能的话会这样设计)。
	8、customGrouping
		自定义,相当于mapreduce那里自己去实现一个partition一样。

简单案例

测试小案例:

-- NumberTopology --

public class NumberTopology {
    public static void main(String[] args) {
        //创建Topology的构建器
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        //开始构建整个流程(Spout)
        topologyBuilder.setSpout("numberspout", new NumberSpout());
        //开始构建整个流程(Bolt)
        topologyBuilder.setBolt("numberBolt", new NumberBolt()).shuffleGrouping("numberspout");
        //Bolt
        //topologyBuilder.setBolt("ownBolt",new OwnBolt()).shuffleGrouping("numberspout");

        //启动Topology
        Config conf = new Config();
        //创建一个topology
        StormTopology topology = topologyBuilder.createTopology();
        //本地模式启动集群
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("numberTopology", conf, topology);
    }
}

-- NumberBolt --
public class NumberBolt extends baseBasicBolt {
    //声明一个统计器
    private static int sum;

    
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        System.out.println("从上游获取的数据:" + input);
        System.out.println("从上游获取的数据为:" + input.getInteger(0) + "--" + input.getIntegerByField("num"));
        //开始统计
        sum += input.getInteger(0);
        System.out.println("截止到本次,共获取数据和为:" + sum);
    }

    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("num"));
    }
}

-- NumberSpout --
public class NumberSpout extends baseRichSpout {

    //声明一个SpoutOutputCollector对象,用于发送数据
    private SpoutOutputCollector collector;
    //声明一个计数器
    private int number;

    
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        //获取初始化时的发送器
        this.collector = collector;
    }

    
    @Override
    public void nextTuple() {
        //将数据发送下一个Bolt
        this.collector.emit(new Values(number++));
        try {
            //限制传输速度
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("num"));
    }
}



5、Storm的通信机制 ① Worker进程间通信原理
宏观
	① 每个worker进程都一个独立的接受线程和发送线程
		
	② 接收线程接受外部发送的消息到执行器(executor)的收集队列(incoming-queue)中
	③ 发送线程从worker的执行器发送队列(transfer-queue)中读取消息,通过网络发给其他worker

内部微观
	① worker中有多个执行器executor,每个执行器都有对应的接受进程和发送线程,
		
	② worker接受线程将消息分配给指定taskid的executor的接受器中
	③ executor会有段单独的线程进行处理,最后结果outgoing-queue,达到阈值进行批量发送tuple(元组)


② Worker进程内通信原理
Disruptor是一个Queue队列。
Disruptor一种线程之间信息无锁的交换方式。
Disruptor主要特点:
	1、 没有竞争=没有锁=非常快。
	2、 所有访问者都记录自己的序号的实现方式,允许多个生产者与多个消费者共享相同的数据结
Disruptor 核心技术点:
	Disruptor可以看成一个事件监听或消息机制,在队列中一边生产者放入消息,另外一边消费者并行取出处理.
	底层是单个数据结构:一个ring buffer(环形数据缓冲区)

③ Storm的容错机制
节点故障
	Nimbus宕机
		单点故障,1.0后便是高可用的了
	非Nimbus宕机
		故障时,所有任务超时,Nimbus将会将任务重新分配到别的服务器上
进程故障
	1、Worker
		worker由Supervisor负责监控,一旦worker故障,会尝试重启,如果还是失败并且失去心跳机制,那么Nimbus将会把worker的任务分配给其他服务器
	2、Supervisor
		一旦遇到异常情况,直接自动毁灭,无状态(信息存放在zookeeper中)
		① 快速失败:在遍历过程中进行增删改,抛出异常 多进程下不能发送修改
		② 安全失败:遍历的数据先复制一份,对复制的进行遍历并发修改,来保证安全

④ 任务级容错
1、Bolt 任务冲突crash引起消息未被应答
	执行超时,会调用fail方法
2、acker任务失败
	首先持有的消息超时失效,Spout的fail方法执行
3、Spout任务失败
	此时消息的完整性交给外部设备比如MQ

⑤ 消息容完整性
每个从Spout(Storm中数据源点)发出的Tuple(Storm中最小的消息单元)可能会生成成千上万个新的Tuple
形成一颗Tuple树,当整颗Tuple树的节点都被成功处理了,我们就说从Spout发出的Tuple被完全处理了。
ACKER:
	acker的任务就是追踪从spout中流出来的每一个message id绑定的若干tuple的处理路径,
	如果在用户设置的最大超时时间内这些tuple没有被完全处理,那么acker就会告知spout该消息处理失败了	
	相反则会告知spout该消息处理成功了。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5704832.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存