2021SC@SDUSC
nimbus是storm集群的"控制器",是storm集群的重要组成部分。我们可以通用执行bin/storm nimbus >/dev/null 2>&1 &来启动nimbus。bin/storm是一个python脚本,在这个脚本中定义了一个nimbus函数:
nimbus函数
def nimbus( klass = "backtype.storm.daemon.nimbus" ): """Syntax: [storm nimbus] Launches the nimbus daemon. This command should be run under supervision with a tool like daemontools or monit. See Setting up a Storm cluster for more information. (https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster) """ cppaths = [ STORM_DIR + "/log4j" , STORM_DIR + "/conf" ] jvmopts = parse_args( confvalue( "nimbus.childopts" , cppaths)) + [ "-Dlogfile.name=nimbus.log" , "-Dlog4j.configuration=storm.log.properties" , ] exec_storm_class( klass , jvmtype = "-server" , extrajars = cppaths , jvmopts = jvmopts)
klass参数的默认值为backtype.storm.daemon.nimbus,backtype.storm.daemon.nimbus标识一个java类。STORM_DIR标识storm的安装目录,cppaths集合存放了log4j配置文件路径和storm配置文件storm.yaml路径,jvmopts存放传递给jvm的参数,包括log4j配文件路径、storm.yaml路径、log4j日志名称和log4j配置文件名称。
exec_storm_class函数的逻辑比较简单,具体实现如下:
exec_storm_class函数
def exec_storm_class( klass , jvmtype = "-server" , jvmopts = [], extrajars = [], args = [], fork = False ): global ConFFILE all_args = [ "java" , jvmtype , get_config_opts (), "-Dstorm.home=" + STORM_DIR , "-Djava.library.path=" + confvalue( "java.library.path" , extrajars ), "-Dstorm.conf.file=" + ConFFILE , "-cp" , get_classpath( extrajars ), ] + jvmopts + [ klass ] + list( args) print "Running: " + " " . join( all_args) if fork : os . spawnvp( os . P_WAIT , "java" , all_args) else : os . execvp( "java" , all_args) # replaces the current process and never returns
get_config_opts()获取jvm的默认配置信息,confvalue("java.library.path", extrajars)获取storm使用的本地库JZMQ加载路径,get_classpath(extrajars)获取所有依赖jar包的完整路径,然后拼接一个java -cp命令运行klass的main方法。
klass默认值为backtype.storm.daemon.nimbus,所以exec_storm_class函数最终调用backtype.storm.daemon.nimbus类的main方法。
backtype.storm.daemon.nimbus类定义在nimbus.clj文件中,定义如下:
backtype.storm.daemon.nimbus类
(ns backtype.storm.daemon.nimbus ( :import [ org.apache.thrift.server THsHaServer THsHaServer$Args ]) ( :import [ org.apache.thrift.protocol TBinaryProtocol TBinaryProtocol$Factory ]) ( :import [ org.apache.thrift.exception ]) ( :import [ org.apache.thrift.transport TNonblockingServerTransport TNonblockingServerSocket ]) ( :import [ java.nio ByteBuffer ]) ( :import [ java.io FileNotFoundException ]) ( :import [ java.nio.channels Channels WritableByteChannel ]) ( :use [ backtype.storm.scheduler.DefaultScheduler ]) ( :import [ backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails ]) ( :use [ backtype.storm bootstrap util ]) ( :use [ backtype.storm.config :only [ validate-configs-with-schemas ]]) ( :use [ backtype.storm.daemon common ]) ( :gen-class :methods [ ^ { :static true } [ launch [ backtype.storm.scheduler.INimbus ] void ]])) ... ;; 其他方法 ... ( defn -main [] ( -launch ( standalone-nimbus)))
:gen-class指示Clojure生成Java类backtype.storm.daemon.nimbus,并且声明一个静态方法launch,launch方法接收一个实现backtype.storm.scheduler.INimbus接口的实例作为参数。launch函数的参数是由standalone-nimbus函数生成的。standalone-nimbus函数定义如下:返回一个实现INimbus接口的实例。
standalone-nimbus函数
( defn standalone-nimbus [] ;; 实现INimbus接口 ( reify INimbus ;; prepare函数为空实现 ( prepare [ this conf local-dir ] ) ;; allSlotsAvailableForScheduling获取所有可用的slot集合 ( allSlotsAvailableForScheduling [ this supervisors topologies topologies-missing-assignments ] ;; supervisors标识集群所有supervisor的详细信息对象SupervisorDetails的集合 ;;也就是说通过遍历Collection类型的变量supervisors取id和meta,用来初始化WorkerSolt对象,mapcat将WorkerSolt形成集合,;;之后将这个集合作为参数传递给set,使之返回符合接口要求的Collection ( ->> supervisors ;; 遍历supervisors,为supervisor的每个port生成对应的WorkerSlot对象,WorkerSlot包含两个属性节点id和port ( mapcat ( fn [ ^ SupervisorDetails s ] ( for [p ( .getmeta s )] ( WorkerSlot. ( .getId s) p)))) set )) ( assignSlots [ this topology slots ] ) ( getForcedScheduler [ this ] nil ) getHostName 是取主机名的接口,supervisors 的类型是Map ,node-id的类型是String。其过程是从supervisors中通过node-id去找符合条件的SupervisorDetails对象,如果找到了,就调用SupervisorDetails的getHost方法取得主机名,否者返回的是nil ;; 获取supervisor主机名 ( getHostName [ this supervisors node-id ] ( if-let [ ^ SupervisorDetails supervisor ( get supervisors node-id )] ( .getHost supervisor))) ))
launch函数定义如下:
launch函数
( defn -launch [ nimbus ] ;; ;; read-storm-config函数用于读取storm集群的配置信息,参见其定义部分 ( launch-server! ( read-storm-config) nimbus)) launch-server! 函数定义如下: ( defn launch-server! [ conf nimbus ] ;; 判断是否是分布式模式,如果是本地模式则抛出IllegalArgumentException ( validate-distributed-mode! conf) ;; service-handler函数是由宏defserverfn定义的,返回一个实现了Nimbus类中的Iface接口的实例,Nimbus类是由thrift框架自动生成的,Iface接口封装了service Nimbus的全部接口。 ;; nimbus thrift server端提供的接口服务都是由这个实例实现的。service-handler函数参见其定义部分,service Nimbus参见storm.thrift ;; service-handler绑定实现了Nimbus类中的Iface接口的实例 ( let [ service-handler ( service-handler conf nimbus) options ( -> ( TNonblockingServerSocket. ( int ( conf NIMBUS-THRIFT-PORT))) ( THsHaServer$Args.) ( .workerThreads 64) ( .protocolFactory ( TBinaryProtocol$Factory. false true ( conf NIMBUS-THRIFT-MAX-BUFFER-SIZE))) ( .processor ( Nimbus$Processor. service-handler)) ) server ( THsHaServer. ( do ( set! ( . options maxReadBufferBytes)( conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)) options ))] ( add-shutdown-hook-with-force-kill-in-1-sec ( fn [] ( .shutdown service-handler) ( .stop server))) ( log-message "Starting Nimbus server...") ( .serve server)))
read-storm-config定义如下:
read-storm-config函数
( defn read-storm-config [] ;; conf绑定storm集群配置信息 ( let [ conf ( clojurify-structure ( Utils/readStormConfig ))] ;; validate-configs-with-schemas函数验证配置信息的正确性并删除不正确的配置信息 ( validate-configs-with-schemas conf) conf))
read-storm-config函数调用了backtype.storm.utils.Utils类的静态方法readStormConfig,如下:
readStormConfig方法
public static Map readStormConfig() { // 调用readDefaultConfig从defaults.yaml配置文件读取默认配置信息存入ret Map ret = readDefaultConfig(); // 获取用户自定义配置文件路径 String confFile = System . getProperty( "storm.conf.file"); Map storm; if ( confFile == null || confFile . equals( "")) { storm = findAndReadConfigFile( "storm.yaml" , false); } else { // 读取用户自定义配置信息 storm = findAndReadConfigFile( confFile , true); } // 将用户自定义的配置信息覆盖更新到ret中 ret . putAll( storm); // 将命令行方式提供的配置信息覆盖更新到ret中 ret . putAll( readCommandLineOpts()); // 返回覆盖更新后的配置信息ret return ret; }
service-handler函数定义如下:
defserverfn是一个宏,(defserverfn service-handler [conf inimbus] ... )返回一个名字为service-handler函数。宏扩展是在编译时进行的
service-handler函数
( defserverfn service-handler [ conf inimbus ] ;; 调用inimbus的prepare方法,inimbus是standalone-nimbus函数返回的实现INimbus接口的实例,当前版本prepare方法为空实现 ( .prepare inimbus conf ( master-inimbus-dir conf)) ;; 打印日志信息 ( log-message "Starting Nimbus with conf " conf) ;; nimbus绑定了一个map,这个map保存了nimbus端所必需的"属性",详见nimbus-data函数定义部分 ( let [ nimbus ( nimbus-data conf inimbus )] ;; 调用nimbus这个map中保存的backtype.storm.nimbus.DefaultTopologyValidator对象的prepare方法,通过查看backtype.storm.nimbus.DefaultTopologyValidator类,我们可以发现prepare默认为空实现 ( .prepare ^ backtype.storm.nimbus.ITopologyValidator ( :validator nimbus) conf) ;; cleanup-corrupt-topologies!函数的主要功能就是将在nimbus服务器{storm.local.dir}/nimbus/stormdist/路径中不存在的topology id从zookeeper的/storms/路径中删除,即删除在nimbus服务器上缺失jar包、topology信息和配置信息的当前正在运行的topology, ;; cleanup-corrupt-topologies!函数参见其定义部分 ( cleanup-corrupt-topologies! nimbus) ;; 更新当前storm集群上topology的状态 ( doseq [ storm-id ( .active-storms ( :storm-cluster-state nimbus ))] ;; transition!函数主要功能就是负责topology状态转换,规定了当topology由一种状态转换成另一种新状态时,需要做哪些处理 *** 作,参见其定义部分 ( transition! nimbus storm-id :startup)) ;; 通过schedule-recurring函数向storm定时器添加了一个"周期任务"检查心跳,重新分配任务,清理不活跃的topology,mk-assignments函数的主要功能就是检查心跳和重新分配任务。关于storm定时器详细分析请见"storm定时器timer源码分析";关于mk-assignments函数请见"storm任务分配源码分析" ;; do-cleanup函数主要功能就是清理不活跃的topology,请参加其定义部分 ( schedule-recurring ( :timer nimbus) 0 ( conf NIMBUS-MONITOR-FREQ-SECS) ( fn [] ( when ( conf NIMBUS-REASSIGN) ( locking ( :submit-lock nimbus) ( mk-assignments nimbus))) ( do-cleanup nimbus) )) ;; Schedule Nimbus inbox cleaner ;; 通过schedule-recurring函数向storm定时器添加一个"周期任务"删除nimbus服务器上的过期jar包 ( schedule-recurring ( :timer nimbus) 0 ( conf NIMBUS-CLEANUP-INBOX-FREQ-SECS) ( fn [] ( clean-inbox ( inbox nimbus) ( conf NIMBUS-INBOX-JAR-EXPIRATION-SECS)) )) ( reify Nimbus$Iface ;; submitTopologyWithOpts函数负责topology的提交,有关该函数的详细分析请参见"storm源码分析之topology提交过程" ( ^ void submitTopologyWithOpts [ this ^ String storm-name ^ String uploadedJarLocation ^ String serializedConf ^ StormTopology topology ^ SubmitOptions submitOptions ] ( try ( assert ( not-nil? submitOptions)) ( validate-topology-name! storm-name) ( check-storm-active! nimbus storm-name false) ( let [ topo-conf ( from-json serializedConf )] ( try ( validate-configs-with-schemas topo-conf) ( catch IllegalArgumentException ex ( throw ( InvalidTopologyException. ( .getMessage ex))))) ( .validate ^ backtype.storm.nimbus.ITopologyValidator ( :validator nimbus) storm-name topo-conf topology)) ( swap! ( :submitted-count nimbus) inc) ( let [ storm-id ( str storm-name "-" @( :submitted-count nimbus) "-" ( current-time-secs)) storm-conf ( normalize-conf conf ( -> serializedConf from-json ( assoc STORM-ID storm-id) ( assoc TOPOLOGY-NAME storm-name)) topology) total-storm-conf ( merge conf storm-conf) topology ( normalize-topology total-storm-conf topology) storm-cluster-state ( :storm-cluster-state nimbus )] ( system-topology! total-storm-conf topology) ;; this validates the structure of the topology ( log-message "Received topology submission for " storm-name " with conf " storm-conf) ;; lock protects against multiple topologies being submitted at once and ;; cleanup thread killing topology in b/w assignment and starting the topology ( locking ( :submit-lock nimbus) ( setup-storm-code conf storm-id uploadedJarLocation storm-conf topology) ( .setup-heartbeats! storm-cluster-state storm-id) ( let [ thrift-status->kw-status { TopologyInitialStatus/INACTIVE :inactive TopologyInitialStatus/ACTIVE :active }] ( start-storm nimbus storm-name storm-id ( thrift-status->kw-status ( .get_initial_status submitOptions)))) ( mk-assignments nimbus))) ( catch Throwable e ( log-warn-error e "Topology submission exception. (topology name='" storm-name "')") ( throw e)))) ;; submitTopology函数调用了submitTopologyWithOpts函数 ( ^ void submitTopology [ this ^ String storm-name ^ String uploadedJarLocation ^ String serializedConf ^ StormTopology topology ] ( .submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology ( SubmitOptions. TopologyInitialStatus/ACTIVE))) ;; killTopology函数见名知意,调用了killTopologyWithOpts函数 ( ^ void killTopology [ this ^ String name ] ( .killTopologyWithOpts this name ( KillOptions.))) ;; storm-name绑定kill的topology名称,KillOptions是一个thrift数据结构,只有个属性wait_secs,表示延迟多长时间执行kill ( ^ void killTopologyWithOpts [ this ^ String storm-name ^ KillOptions options ] ;; check-storm-active!检查topology是否是"active",如果不活跃则抛出异常 ( check-storm-active! nimbus storm-name true) ;; 如果设置了延迟时间,wait-amt绑定延迟时间 ( let [ wait-amt ( if ( .is_set_wait_secs options) ( .get_wait_secs options) )] ;; transition-name!函数主要功能就是根据storm-name获取topology id,然后调用transition!函数,topology由当前状态转换到:kill状态,:kill状态是一个"临时状态",最终修改topology状态为:killed,:killed状态为"持久状态" ;; 通过state-transitions函数我们可以知道无论从哪种状态转换到:kill状态,都将调用kill-transition函数,kill-transition通过调用delay-event向storm定时器添加一个定时任务,这个定时任务的主要功能就是负责topology由:killed状态 ;; 转换到:remove状态,这时将调用remove-storm!函数清理topology ( transition-name! nimbus storm-name [ :kill wait-amt ] true) )) ;; rebalance函数可以重新设置topology的进程数和各个component的并行度,RebalanceOptions是thirft数据结构,有三个属性rebalance的延迟时间、新的进程数,新的并行度 ( ^ void rebalance [ this ^ String storm-name ^ RebalanceOptions options ] ;; check-storm-active!检查topology是否是"active",如果不活跃则抛出异常 ( check-storm-active! nimbus storm-name true) ;; 如果设置了延迟时间,wait-amt绑定延迟时间 ( let [ wait-amt ( if ( .is_set_wait_secs options) ( .get_wait_secs options)) ;; 如果设置了新的进程数,num-workers绑定新进程数 num-workers ( if ( .is_set_num_workers options) ( .get_num_workers options)) ;; 如果设置了新的组件并行度,executor-overrides绑定新组件并行度 executor-overrides ( if ( .is_set_num_executors options) ( .get_num_executors options) {})] ( doseq [[ c num-executors ] executor-overrides ] ( when ( <= num-executors 0) ( throw ( InvalidTopologyException. "Number of executors must be greater than 0")) )) ;; transition-name!函数主要功能就是根据storm-name获取topology id,然后调用transition!函数,topology由当前状态转换到:rebalance状态,:rebalance状态是一个"临时状态",最终修改topology状态为:rebalancing,:rebalancing状态为"持久状态" ;; 通过state-transitions函数我们可以知道只允许从:active和:inactive状态转换到:rebalance状态,并调用rebalance-transition函数,rebalance-transition通过调用delay-event向storm定时器添加一个定时任务,这个定时任务的主要功能就是负责topology由:rebalancing状态 ;; 转换到:do-rebalance状态,并调用do-rebalance函数(重新设置topology的进程数和组件并行度,然后调用mk-assignments函数重新进行任务分配),然后将topology状态修改成:rebalancing的前一个状态 ( transition-name! nimbus storm-name [ :rebalance wait-amt num-workers executor-overrides ] true) )) ;; 激活topology,将topology状态修改成:active,处理过程与killTopologyWithOpts、rebalance相似 ( activate [ this storm-name ] ( transition-name! nimbus storm-name :activate true) ) ;; 将topology状态修改成:inactive,deactivate处理过程与activate相似 ( deactivate [ this storm-name ] ( transition-name! nimbus storm-name :inactivate true)) ;; beginFileUpload()函数获取nimbus存放jar的目录 ( beginFileUpload [ this ] ( let [ fileloc ( str ( inbox nimbus) "/stormjar-" ( uuid) ".jar" )] ( .put ( :uploaders nimbus) fileloc ( Channels/newChannel ( FileOutputStream. fileloc))) ( log-message "Uploading file from client to " fileloc) fileloc )) ;; 上传jar包文件 ( ^ void uploadChunk [ this ^ String location ^ ByteBuffer chunk ] ( let [ uploaders ( :uploaders nimbus) ^ WritableByteChannel channel ( .get uploaders location )] ( when-not channel ( throw ( RuntimeException. "File for that location does not exist (or timed out)"))) ( .write channel chunk) ( .put uploaders location channel) )) ;; 上传jar包完成,关闭Channel ( ^ void finishFileUpload [ this ^ String location ] ( let [ uploaders ( :uploaders nimbus) ^ WritableByteChannel channel ( .get uploaders location )] ( when-not channel ( throw ( RuntimeException. "File for that location does not exist (or timed out)"))) ( .close channel) ( log-message "Finished uploading file from client: " location) ( .remove uploaders location) )) ;; 获取文件输入流 ( ^ String beginFileDownload [ this ^ String file ] ( let [ is ( BufferFileInputStream. file) id ( uuid )] ( .put ( :downloaders nimbus) id is) id )) ;; 读取文件 ( ^ ByteBuffer downloadChunk [ this ^ String id ] ( let [ downloaders ( :downloaders nimbus) ^ BufferFileInputStream is ( .get downloaders id )] ( when-not is ( throw ( RuntimeException. "Could not find input stream for that id"))) ( let [ ret ( .read is )] ( .put downloaders id is) ( when ( empty? ret) ( .remove downloaders id)) ( ByteBuffer/wrap ret) ))) ;; 获取storm集群配置信息 ( ^ String getNimbusConf [ this ] ( to-json ( :conf nimbus))) ;; 获取topology配置信息 ( ^ String getTopologyConf [ this ^ String id ] ( to-json ( try-read-storm-conf conf id))) ;; 获取StormTopology ( ^ StormTopology getTopology [ this ^ String id ] ( system-topology! ( try-read-storm-conf conf id) ( try-read-storm-topology conf id))) ( ^ StormTopology getUserTopology [ this ^ String id ] ( try-read-storm-topology conf id)) ;; 获取当前集群的汇总信息包括supervisor汇总信息,nimbus启动时间,所有活跃topology汇总信息 ( ^ ClusterSummary getClusterInfo [ this ] ( let [ storm-cluster-state ( :storm-cluster-state nimbus) ;; supervisor-infos绑定supervisor id->SupervisorInfo对象键值对的map ;; SupervisorInfo定义:(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs]) supervisor-infos ( all-supervisor-info storm-cluster-state) ;; TODO: need to get the port info about supervisors... ;; in standalone just look at metadata, otherwise just say N/A? ;; 根据SupervisorInfo数据创建SupervisorSummary数据 supervisor-summaries ( dofor [[ id info ] supervisor-infos ] ( let [ ports ( set ( :meta info)) ;;TODO: this is only true for standalone ] ( SupervisorSummary. ( :hostname info) ( :uptime-secs info) ( count ports) ( count ( :used-ports info)) id ) )) ;; nimbus-uptime绑定nimbus启动时间 nimbus-uptime (( :uptime nimbus)) ;; bases绑定集群上所有活跃topology的Stormbase数据集合 bases ( topology-bases storm-cluster-state) ;; topology-summaries绑定活跃topology的TopologySummary数据 topology-summaries ( dofor [[ id base ] bases ] ( let [ assignment ( .assignment-info storm-cluster-state id nil )] ( TopologySummary. id ( :storm-name base) ( ->> ( :executor->node+port assignment) keys ( mapcat executor-id->tasks) count) ( ->> ( :executor->node+port assignment) keys count) ( ->> ( :executor->node+port assignment) vals set count) ( time-delta ( :launch-time-secs base)) ( extract-status-str base)) ))] ;; 创建ClusterSummary数据 ( ClusterSummary. supervisor-summaries nimbus-uptime topology-summaries) )) ;; 获取指定storm-id的topology的TopologyInfo数据 ( ^ TopologyInfo getTopologyInfo [ this ^ String storm-id ] ;; storm-cluster-state绑定StormClusterState对象 ( let [ storm-cluster-state ( :storm-cluster-state nimbus) ;; task->component绑定任务id->组件名称键值对的map,形如:{1 "boltA", 2 "boltA", 3 "boltA", 4 "boltA", 5 "boltB", 6 "boltB"} task->component ( storm-task-info ( try-read-storm-topology conf storm-id) ( try-read-storm-conf conf storm-id)) ;; bases绑storm-id的Stormbase base ( .storm-base storm-cluster-state storm-id nil) ;; assignment绑定该topology的AssignmentInfo信息,(defrecord Assignment [master-code-dir node->host executor->node+port executor->start-time-secs]) assignment ( .assignment-info storm-cluster-state storm-id nil) ;; beats绑定该topology所有executor-id->心跳信息的map beats ( .executor-beats storm-cluster-state storm-id ( :executor->node+port assignment)) ;; all-components绑定该topology所有component-id集合 all-components ( -> task->component reverse-map keys) ;; errors绑定component-id->组件错误信息的map errors ( ->> all-components ( map ( fn [ c ] [ c ( get-errors storm-cluster-state storm-id c )])) ( into {})) ;; executor-summaries绑定ExecutorSummary集合 executor-summaries ( dofor [[ executor [ node port ]] ( :executor->node+port assignment )] ( let [ host ( -> assignment :node->host ( get node)) heartbeat ( get beats executor) stats ( :stats heartbeat) stats ( if stats ( stats/thriftify-executor-stats stats ))] ( doto ( ExecutorSummary. ( thriftify-executor-id executor) ( -> executor first task->component) host port ( nil-to-zero ( :uptime heartbeat))) ( .set_stats stats)) )) ] ;; 创建TopologyInfo对象 ( TopologyInfo. storm-id ( :storm-name base) ( time-delta ( :launch-time-secs base)) executor-summaries ( extract-status-str base) errors ) )) Shutdownable ( shutdown [ this ] ( log-message "Shutting down master") ( cancel-timer ( :timer nimbus)) ( .disconnect ( :storm-cluster-state nimbus)) ( .cleanup ( :downloaders nimbus)) ( .cleanup ( :uploaders nimbus)) ( log-message "Shut down master") ) DaemonCommon ( waiting? [ this ] ( timer-waiting? ( :timer nimbus))))))
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)