2021SC@SDUSC
mk-scheduler函数定义如下:
mk-scheduler函数
( defn mk-scheduler [ conf inimbus ] ;; 当前版本getForcedScheduler函数返回nil ( let [ forced-scheduler ( .getForcedScheduler inimbus) ;; scheduler绑定IScheduler接口的实现 ;; cond等价于java中的switch,我们可以发现首先检查forced-scheduler,如果forced-scheduler为nil,则检查是否有用户自定义的scheduler,如果没有则 ;; 使用默认的DefaultScheduler scheduler ( cond forced-scheduler ( do ( log-message "Using forced scheduler from INimbus " ( class forced-scheduler)) forced-scheduler) ( conf STORM-SCHEDULER) ( do ( log-message "Using custom scheduler: " ( conf STORM-SCHEDULER)) ( -> ( conf STORM-SCHEDULER) new-instance)) :else ( do ( log-message "Using default scheduler") ( DefaultScheduler. )))] ;; 先调用prepare函数 ( .prepare scheduler conf) ;; 然后返回scheduler scheduler ))
cleanup-corrupt-topologies!函数定义如下:
cleanup-corrupt-topologies!函数
( defn cleanup-corrupt-topologies! [ nimbus ] ;; 获取nimbus这个map中保存的StormCluterState实例 ( let [ storm-cluster-state ( :storm-cluster-state nimbus) ;; code-ids绑定了nimbus服务器上{storm.local.dir}/nimbus/stormdist/目录下所有子目录的名称,即提交给nimbus的所有topology的id code-ids ( set ( code-ids ( :conf nimbus))) ;; active-topologies绑定zookeeper上/storms/目录中所有文件名称,即当前storm集群上正在运行的topology的id active-topologies ( set ( .active-storms storm-cluster-state)) ;; corrupt-topologies绑定active-topologies和code-ids的差集,即当前正在运行的,但丢失jar包、topology信息和配置信息的topology的id corrupt-topologies ( set/difference active-topologies code-ids )] ;; 将id包含在corrupt-topologies集合的topology的分配信息从zookeeper的/assignments目录删除,同时将Stormbase信息从zookeeper的/storms目录删除 ( doseq [ corrupt corrupt-topologies ] ( log-message "Corrupt topology " corrupt " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...") ( .remove-storm! storm-cluster-state corrupt) )))
transition!函数定义如下:
transition!函数的作用十分重要,负责topology状态转换,在启动nimbus场景下,event的值为":startup"关键字,error-on-no-transition?的值为false。transition!函数有两个重载版本。
transition!函数
( defn transition! ([ nimbus storm-id event ] ( transition! nimbus storm-id event false)) ([ nimbus storm-id event error-on-no-transition? ] ;; 加锁 ( locking ( :submit-lock nimbus) ;; system-events绑定一个集合#{:startup} ( let [ system-events # { :startup } ;; 在启动nimbus场景下,event绑定[:startup],event-args为nil [ event & event-args ] ( if ( keyword? event) [ event ] event) ;; 从zookeeper上获取topology的状态,一个map对象,绑定到status上 status ( topology-status nimbus storm-id )] ;; handles the case where event was scheduled but topology has been removed ( if-not status ;; 如果status为nil则记录日志,transition!函数执行结束 ( log-message "Cannot apply event " event " to " storm-id " because topology no longer exists") ;; 如果status不为nil,get-event绑定一个函数 ( let [ get-event ( fn [ m e ] ( if ( contains? m e) ( m e) ( let [ msg ( str "No transition for event: " event ", status: " status, " storm-id: " storm-id )] ( if error-on-no-transition? ( throw-runtime msg) ( do ( when-not ( contains? system-events event) ( log-message msg)) nil)) ))) ;; state-transitions函数返回一个状态转换映射map,这个map中规定了由一种状态可以转换到哪些状态,并且在状态转换后执行哪些处理(即调用哪个函数),参见其定义部分 ;; 通过分析state-transitions函数,我们可以发现只有当topology的当前状态为":killed"和":rebalancing"时,才允许转换到":startup"状态,如果当前状态是其他状态,transition将为nil ;; 我们先讨论其他状态,这时transition为nil,接着transition通过if判断将绑定一个(fn [] nil)函数,这样new-status将为nil。所以在启动nimbus场景下,topology由其他状态转换到":startup"状态时,transition!函数什么都没做 transition ( -> ( state-transitions nimbus storm-id status) ( get ( :type status)) ( get-event event)) transition ( if ( or ( nil? transition) ( keyword? transition)) ( fn [] transition) transition) new-status ( apply transition event-args) new-status ( if ( keyword? new-status) { :type new-status } new-status )] ( when new-status ( set-topology-status! nimbus storm-id new-status))))) )))
1、如果topology由":killed"转换到":startup"(kill topology的过程中,nimbus挂掉了,当重启nimbus时就有可能出现这种状态转换)时,transition将绑定
( fn [] ( delay-event nimbus storm-id ( :kill-time-secs status) :remove) nil)
new-status值为transition绑定的函数的返回值nil。transition绑定的函数通过调用delay-event函数将#(transition! nimbus storm-id :remove false)函数添加到storm定时器中,然后由storm定时器执行该函数,该函数再次调用了transition!函数,不过这次是由":killed"转换到":remove", 调用函数
( fn [] ( log-message "Killing topology: " storm-id) ;; 删除zookeeper上该topology的Stormbase信息和分配信息 ( .remove-storm! ( :storm-cluster-state nimbus) storm-id) nil)
2、如果topology由":rebalancing"转换到":startup"(rebalance topology的过程中,nimbus挂掉了,当重启nimbus时就有可能出现这种状态转换)时,transition将绑定
( fn [] ( delay-event nimbus storm-id ( :delay-secs status) :do-rebalance) nil)
new-status值为transition绑定的函数的返回值nil。transition绑定的函数通过调用delay-event函数将#(transition! nimbus storm-id :do-rebalance false)函数添加到storm定时器中,然后由storm定时器执行该函数,该函数再次调用了transition!函数,不过这次是由":rebalancing"转换到":do-rebalance",调用函数
( fn [] ( do-rebalance nimbus storm-id status) ( :old-status status))
由于这个函数返回:rebalancing状态的前一个状态,所以storm定时器所执行的定时任务会将topology的状态由:rebalancing修改成前一个状态。以上就是启动nimbus场景下,topology可能的状态转换处理过程。 delay-event函数定义如下:主要功能就是将#(transition! nimbus storm-id event false)函数作为"定时任务"添加到storm定时器中。
( defn delay-event [ nimbus storm-id delay-secs event ] ( log-message "Delaying event " event " for " delay-secs " secs for " storm-id) ( schedule ( :timer nimbus) delay-secs #( transition! nimbus storm-id event false) ))
state-transitions函数定义如下:
state-transitions函数
( defn state-transitions [ nimbus storm-id status ] { :active { :inactivate :inactive :activate nil :rebalance ( rebalance-transition nimbus storm-id status) :kill ( kill-transition nimbus storm-id) } :inactive { :activate :active :inactivate nil :rebalance ( rebalance-transition nimbus storm-id status) :kill ( kill-transition nimbus storm-id) } :killed { :startup ( fn [] ( delay-event nimbus storm-id ( :kill-time-secs status) :remove) nil) :kill ( kill-transition nimbus storm-id) :remove ( fn [] ( log-message "Killing topology: " storm-id) ( .remove-storm! ( :storm-cluster-state nimbus) storm-id) nil) } :rebalancing { :startup ( fn [] ( delay-event nimbus storm-id ( :delay-secs status) :do-rebalance) nil) :kill ( kill-transition nimbus storm-id) :do-rebalance ( fn [] ( do-rebalance nimbus storm-id status) ( :old-status status)) }})
do-cleanup函数定义如下:
do-cleanup函数
( defn do-cleanup [ nimbus ] ( let [ storm-cluster-state ( :storm-cluster-state nimbus) conf ( :conf nimbus) submit-lock ( :submit-lock nimbus )] ;; to-cleanup-ids绑定需要清理的topology的id,即不再活跃的topology的id,cleanup-storm-ids函数参见其定义部分 ( let [ to-cleanup-ids ( locking submit-lock ( cleanup-storm-ids conf storm-cluster-state ))] ( when-not ( empty? to-cleanup-ids) ( doseq [ id to-cleanup-ids ] ( log-message "Cleaning up " id) ;; 从zookeeper上删除/workerbeats/{id}节点(清理其心跳信息) ( .teardown-heartbeats! storm-cluster-state id) ;; 从zookeeper上删除/errors/{id}节点(清理其错误信息) ( .teardown-topology-errors! storm-cluster-state id) ;; 从nimbus服务器上删除{storm.local.dir}/nimbus/stormdist/{id}目录(删除其jar包,topology信息,配置信息) ( rmr ( master-stormdist-root conf id)) ;; 将该topology的心跳信息从nimbus的心跳缓存中删除 ( swap! ( :heartbeats-cache nimbus) dissoc id)) ))))
cleanup-storm-ids函数定义如下:
cleanup-storm-ids函数
( defn cleanup-storm-ids [ conf storm-cluster-state ] ;; heartbeat-ids绑定有心跳的topology的id集合 ( let [ heartbeat-ids ( set ( .heartbeat-storms storm-cluster-state)) ;; error-ids绑定有错误信息的topology的id集合 error-ids ( set ( .error-topologies storm-cluster-state)) ;; code-ids绑定在nimbus服务器上有jar包的topology的id集合 code-ids ( code-ids conf) ;; assigned-ids绑定当前活跃的topology的id集合 assigned-ids ( set ( .active-storms storm-cluster-state ))] ;; heartbeat-ids、error-ids、code-ids的并集再与assigned-ids做差集就是不活跃的topology的id ( set/difference ( set/union heartbeat-ids error-ids code-ids) assigned-ids) ))
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)