Nimbus(补充二)

Nimbus(补充二),第1张

Nimbus(补充二)

2021SC@SDUSC

mk-scheduler函数定义如下:

mk-scheduler函数

( defn  mk-scheduler  [ conf  inimbus ]
  ;; 当前版本getForcedScheduler函数返回nil
 ( let  [ forced-scheduler ( .getForcedScheduler  inimbus)
        ;; scheduler绑定IScheduler接口的实现
    ;; cond等价于java中的switch,我们可以发现首先检查forced-scheduler,如果forced-scheduler为nil,则检查是否有用户自定义的scheduler,如果没有则
    ;; 使用默认的DefaultScheduler
        scheduler ( cond
                    forced-scheduler
                   ( do ( log-message  "Using forced scheduler from INimbus " ( class  forced-scheduler))
                        forced-scheduler)
   
                   ( conf  STORM-SCHEDULER)
                   ( do ( log-message  "Using custom scheduler: " ( conf  STORM-SCHEDULER))
                       ( -> ( conf  STORM-SCHEDULER)  new-instance))
   
                    :else
                   ( do ( log-message  "Using default scheduler")
                       ( DefaultScheduler. )))]
    ;; 先调用prepare函数
   ( .prepare  scheduler  conf)
    ;; 然后返回scheduler
    scheduler
   ))

cleanup-corrupt-topologies!函数定义如下:

cleanup-corrupt-topologies!函数

( defn  cleanup-corrupt-topologies!  [ nimbus ]
  ;; 获取nimbus这个map中保存的StormCluterState实例
 ( let  [ storm-cluster-state ( :storm-cluster-state  nimbus)
        ;; code-ids绑定了nimbus服务器上{storm.local.dir}/nimbus/stormdist/目录下所有子目录的名称,即提交给nimbus的所有topology的id
        code-ids ( set ( code-ids ( :conf  nimbus)))
    ;; active-topologies绑定zookeeper上/storms/目录中所有文件名称,即当前storm集群上正在运行的topology的id
        active-topologies ( set ( .active-storms  storm-cluster-state))
    ;; corrupt-topologies绑定active-topologies和code-ids的差集,即当前正在运行的,但丢失jar包、topology信息和配置信息的topology的id
        corrupt-topologies ( set/difference  active-topologies  code-ids )]
    ;; 将id包含在corrupt-topologies集合的topology的分配信息从zookeeper的/assignments目录删除,同时将Stormbase信息从zookeeper的/storms目录删除
   ( doseq  [ corrupt  corrupt-topologies ]
     ( log-message  "Corrupt topology "  corrupt  " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...")
     ( .remove-storm!  storm-cluster-state  corrupt)
     )))

transition!函数定义如下:

transition!函数的作用十分重要,负责topology状态转换,在启动nimbus场景下,event的值为":startup"关键字,error-on-no-transition?的值为false。transition!函数有两个重载版本。

transition!函数

( defn  transition!
  ([ nimbus  storm-id  event ]
    ( transition!  nimbus  storm-id  event  false))
  ([ nimbus  storm-id  event  error-on-no-transition? ]
     ;; 加锁
    ( locking ( :submit-lock  nimbus)
       ;; system-events绑定一个集合#{:startup}
      ( let  [ system-events  # { :startup }
             ;; 在启动nimbus场景下,event绑定[:startup],event-args为nil
             [ event  &  event-args ] ( if ( keyword?  event)  [ event ]  event)
             ;; 从zookeeper上获取topology的状态,一个map对象,绑定到status上
             status ( topology-status  nimbus  storm-id )]
         ;; handles the case where event was scheduled but topology has been removed
        ( if-not  status
           ;; 如果status为nil则记录日志,transition!函数执行结束
          ( log-message  "Cannot apply event "  event  " to "  storm-id  " because topology no longer exists")
           ;; 如果status不为nil,get-event绑定一个函数
          ( let  [ get-event ( fn  [ m  e ]
                            ( if ( contains?  m  e)
                              ( m  e)
                              ( let  [ msg ( str  "No transition for event: "  event
                                              ", status: "  status,
                                              " storm-id: "  storm-id )]
                                ( if  error-on-no-transition?
                                  ( throw-runtime  msg)
                                  ( do ( when-not ( contains?  system-events  event)
                                        ( log-message  msg))
                                       nil))
                                )))
                 ;; state-transitions函数返回一个状态转换映射map,这个map中规定了由一种状态可以转换到哪些状态,并且在状态转换后执行哪些处理(即调用哪个函数),参见其定义部分
                 ;; 通过分析state-transitions函数,我们可以发现只有当topology的当前状态为":killed"和":rebalancing"时,才允许转换到":startup"状态,如果当前状态是其他状态,transition将为nil
                 ;; 我们先讨论其他状态,这时transition为nil,接着transition通过if判断将绑定一个(fn [] nil)函数,这样new-status将为nil。所以在启动nimbus场景下,topology由其他状态转换到":startup"状态时,transition!函数什么都没做
                 transition ( -> ( state-transitions  nimbus  storm-id  status)
                               ( get ( :type  status))
                               ( get-event  event))
                 transition ( if ( or ( nil?  transition)
                                   ( keyword?  transition))
                             ( fn  []  transition)
                              transition)
                 new-status ( apply  transition  event-args)
                 new-status ( if ( keyword?  new-status)
                              { :type  new-status }
                              new-status )]
            ( when  new-status
              ( set-topology-status!  nimbus  storm-id  new-status)))))
      )))

1、如果topology由":killed"转换到":startup"(kill topology的过程中,nimbus挂掉了,当重启nimbus时就有可能出现这种状态转换)时,transition将绑定

( fn  [] ( delay-event  nimbus
                    storm-id
                   ( :kill-time-secs  status)
                    :remove)
    nil)


 new-status值为transition绑定的函数的返回值nil。transition绑定的函数通过调用delay-event函数将#(transition! nimbus storm-id :remove false)函数添加到storm定时器中,然后由storm定时器执行该函数,该函数再次调用了transition!函数,不过这次是由":killed"转换到":remove", 调用函数
 

( fn  []
   ( log-message  "Killing topology: "  storm-id)
    ;; 删除zookeeper上该topology的Stormbase信息和分配信息
   ( .remove-storm! ( :storm-cluster-state  nimbus)
                    storm-id)
    nil)

2、如果topology由":rebalancing"转换到":startup"(rebalance topology的过程中,nimbus挂掉了,当重启nimbus时就有可能出现这种状态转换)时,transition将绑定

( fn  [] ( delay-event  nimbus
                storm-id
               ( :delay-secs  status)
                :do-rebalance)
    nil)

new-status值为transition绑定的函数的返回值nil。transition绑定的函数通过调用delay-event函数将#(transition! nimbus storm-id :do-rebalance false)函数添加到storm定时器中,然后由storm定时器执行该函数,该函数再次调用了transition!函数,不过这次是由":rebalancing"转换到":do-rebalance",调用函数
 

( fn  []
  ( do-rebalance  nimbus  storm-id  status)
  ( :old-status  status))

由于这个函数返回:rebalancing状态的前一个状态,所以storm定时器所执行的定时任务会将topology的状态由:rebalancing修改成前一个状态。以上就是启动nimbus场景下,topology可能的状态转换处理过程。 delay-event函数定义如下:主要功能就是将#(transition! nimbus storm-id event false)函数作为"定时任务"添加到storm定时器中。
 

( defn  delay-event  [ nimbus  storm-id  delay-secs  event ]
 ( log-message  "Delaying event "  event  " for "  delay-secs  " secs for "  storm-id)
 ( schedule ( :timer  nimbus)
            delay-secs
            #( transition!  nimbus  storm-id  event  false)
           ))

state-transitions函数定义如下:

state-transitions函数

( defn  state-transitions  [ nimbus  storm-id  status ]
  { :active  { :inactivate  :inactive            
            :activate  nil
            :rebalance ( rebalance-transition  nimbus  storm-id  status)
            :kill ( kill-transition  nimbus  storm-id)
            }
   :inactive  { :activate  :active
              :inactivate  nil
              :rebalance ( rebalance-transition  nimbus  storm-id  status)
              :kill ( kill-transition  nimbus  storm-id)
              }
   :killed  { :startup ( fn  [] ( delay-event  nimbus
                                         storm-id
                                        ( :kill-time-secs  status)
                                         :remove)
                             nil)
            :kill ( kill-transition  nimbus  storm-id)
            :remove ( fn  []
                     ( log-message  "Killing topology: "  storm-id)
                     ( .remove-storm! ( :storm-cluster-state  nimbus)
                                      storm-id)
                      nil)
            }
   :rebalancing  { :startup ( fn  [] ( delay-event  nimbus
                                              storm-id
                                             ( :delay-secs  status)
                                              :do-rebalance)
                                 nil)
                 :kill ( kill-transition  nimbus  storm-id)
                 :do-rebalance ( fn  []
                                ( do-rebalance  nimbus  storm-id  status)
                                ( :old-status  status))
                 }})

do-cleanup函数定义如下:

do-cleanup函数

( defn  do-cleanup  [ nimbus ]
 ( let  [ storm-cluster-state ( :storm-cluster-state  nimbus)
        conf ( :conf  nimbus)
        submit-lock ( :submit-lock  nimbus )]
    ;; to-cleanup-ids绑定需要清理的topology的id,即不再活跃的topology的id,cleanup-storm-ids函数参见其定义部分
   ( let  [ to-cleanup-ids ( locking  submit-lock
                          ( cleanup-storm-ids  conf  storm-cluster-state ))]
     ( when-not ( empty?  to-cleanup-ids)
       ( doseq  [ id  to-cleanup-ids ]
         ( log-message  "Cleaning up "  id)
          ;; 从zookeeper上删除/workerbeats/{id}节点(清理其心跳信息)
         ( .teardown-heartbeats!  storm-cluster-state  id)
          ;; 从zookeeper上删除/errors/{id}节点(清理其错误信息)
         ( .teardown-topology-errors!  storm-cluster-state  id)
          ;; 从nimbus服务器上删除{storm.local.dir}/nimbus/stormdist/{id}目录(删除其jar包,topology信息,配置信息)
         ( rmr ( master-stormdist-root  conf  id))
          ;; 将该topology的心跳信息从nimbus的心跳缓存中删除
         ( swap! ( :heartbeats-cache  nimbus)  dissoc  id))
       ))))

cleanup-storm-ids函数定义如下:

cleanup-storm-ids函数

( defn  cleanup-storm-ids  [ conf  storm-cluster-state ]
  ;; heartbeat-ids绑定有心跳的topology的id集合
 ( let  [ heartbeat-ids ( set ( .heartbeat-storms  storm-cluster-state))
        ;; error-ids绑定有错误信息的topology的id集合
        error-ids ( set ( .error-topologies  storm-cluster-state))
        ;; code-ids绑定在nimbus服务器上有jar包的topology的id集合
        code-ids ( code-ids  conf)
        ;; assigned-ids绑定当前活跃的topology的id集合
        assigned-ids ( set ( .active-storms  storm-cluster-state ))]
    ;; heartbeat-ids、error-ids、code-ids的并集再与assigned-ids做差集就是不活跃的topology的id
   ( set/difference ( set/union  heartbeat-ids  error-ids  code-ids)  assigned-ids)
   ))

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5624245.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存