datax源码解析-JobContainer的初始化阶段解析
写在前面此次源码分析的版本是3.0。因为插件是datax重要的组成部分,源码分析过程中会涉及到插件部分的源码,为了保持一致性,插件都已大部分人比较熟悉的mysql为例子说明。
我所使用的任务模版的json文件是:
{ "job":{ "content":[ { "reader":{ "name":"mysqlreader", "parameter":{ "column":[ "id", "name", "age" ], "connection":[ { "jdbcUrl":[ "jdbc:mysql://127.0.0.1:3306/test" ], "table":[ "t_datax_test" ] } ], "password":"11111111", "username":"root" } }, "writer":{ "name":"mysqlwriter", "parameter":{ "column":[ "id", "name", "age" ], "connection":[ { "jdbcUrl":"jdbc:mysql://127.0.0.1:3306/test2", "table":[ "t_datax_test" ] } ], "password":"11111111", "username":"root" } } } ], "setting":{ "speed":{ "channel":"2" } } } }
另外就是,我在阅读源码的时候加入了很多注释,大家看我贴出来的代码要注意看注释哦。贴出来的代码有些可能只是包含核心的片段。
JobContainer初始化阶段接着上篇文章:
datax源码解析-启动类分析
进入JobContainer的start方法,jobContainer主要负责的工作全部在start()里面,包括:
- preHandle,前置处理
- init,初始化,主要是调用插件的init方法实现初始化
- prepare,准备工作,比如清空目标表。内部还是调用各类型插件的方法来实现准备工作
- split,根据配置的并发参数,对job进行切分,切分为多个task
- scheduler,把上一步reader和writer split的结果整合到taskGroupContainer,启动任务
- post,执行完任务后的后置 *** 作
- invokeHooks,DataX 的 Hook 机制,比如我们可以实现将datax任务的执行结果发送的邮箱,短信通知等
从代码中看,也可以清晰的看到这几个过程:
public void start() { LOG.info("DataX jobContainer starts job."); boolean hasException = false; boolean isDryRun = false; try { this.startTimeStamp = System.currentTimeMillis(); isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false); if(isDryRun) { LOG.info("jobContainer starts to do preCheck ..."); this.preCheck(); } else { //线程安全考虑 userConf = configuration.clone(); LOG.debug("jobContainer starts to do preHandle ..."); 初始化preHandler插件并执行插件的preHandler this.preHandle(); LOG.debug("jobContainer starts to do init ..."); //初始化reader和writer this.init(); LOG.info("jobContainer starts to do prepare ..."); //准备工作,比如清空目标表。内部还是调用各类型插件的方法来实现准备工作。 this.prepare(); LOG.info("jobContainer starts to do split ..."); //拆分task,实际的拆分工作还是调用插件的实现 this.totalStage = this.split(); LOG.info("jobContainer starts to do schedule ..."); //把上一步reader和writer split的结果整合到taskGroupContainer,启动任务 this.schedule(); LOG.debug("jobContainer starts to do post ..."); //执行任务后的 *** 作 this.post(); LOG.debug("jobContainer starts to do postHandle ..."); //不知道是干啥的 this.postHandle(); LOG.info("DataX jobId [{}] completed successfully.", this.jobId); //DataX 的 Hook 机制,比如我们可以实现将datax任务的执行结果发送的邮箱,短信通知等 this.invokeHooks(); ...
本篇文章只关注前面三个部分,也就是preHandle,init,prepare三个阶段,我认为这三个阶段都属于任务开始前的初始化阶段。
preHandlerpreHandler目前官方也没有实现,com.alibaba.datax.common.plugin.AbstractPlugin#preHandler方法目前是空的,所以这里我们也先略过。
init继续看init方法,
private void init() { //从配置中获取jobid this.jobId = this.configuration.getLong( CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, -1); if (this.jobId < 0) { LOG.info("Set jobId = 0"); this.jobId = 0; this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, this.jobId); } Thread.currentThread().setName("job-" + this.jobId); //DataX所有的状态及统计信息交互类,job、taskGroup、task等的消息汇报 JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector( this.getContainerCommunicator()); //必须先Reader ,后Writer this.jobReader = this.initJobReader(jobPluginCollector); //writer的初始化做的事情会多一些,比如会检查写入表的字段和指定的字段个数是否一致等 this.jobWriter = this.initJobWriter(jobPluginCollector); }
可以看到,init方法分别调用的是reader和writer的init方法进行初始化。先来看下initJobReader方法,
private Reader.Job initJobReader( JobPluginCollector jobPluginCollector) { this.readerPluginName = this.configuration.getString( CoreConstant.DATAX_JOB_CONTENT_READER_NAME); classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( PluginType.READER, this.readerPluginName)); //loadJobPlugin需要用到jarLoader Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin( PluginType.READER, this.readerPluginName); // 设置reader的jobConfig jobReader.setPluginJobConf(this.configuration.getConfiguration( CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER)); // 设置reader的readerConfig jobReader.setPeerPluginJobConf(this.configuration.getConfiguration( CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER)); jobReader.setJobPluginCollector(jobPluginCollector); //调用插件自己内部的init方法进行个性初始化,以mysql的初始化为例 //mysql reader会检查username,password等是否存在 jobReader.init(); classLoaderSwapper.restoreCurrentThreadClassLoader(); return jobReader; }
首先看这个方法返回的是Reader.Job这样的一个内部类,这个类是AbstractJobPlugin的一个实现。所以返回的其实是一个reader插件的实例。
接着看到是com.alibaba.datax.core.util.container.LoadUtil#getJarLoader方法,它根据类型和名称从缓存中获取,如果没有则去创建,创建的流程首先获取插件的路径.比如:D:DataXtargetdataxdataxpluginreadermysqlreader,然后根据JarLoader里面的getURLs(paths)获取插件路径下所有的jar包。创建单独的JarLoader,把创建的JarLoader缓存起来。
然后它返回一个是一个自定义的类加载器JarLoader,根据java类加载器的原理我们知道,JarLoader是Application ClassLoader的子类。DataX通过Thread.currentThread().setContextClassLoader在每次对插件调用前后的进行classLoader的切换实现jar隔离的加载机制。
接下里的loadJobPlugin就会用到这个类加载器去实例化插件的实现类。
插件加载这部分的设计还是值得学习的,即实现了jar的隔离加载,也实现了热加载功能。
最后就是调用插件本身的init方法,以mysql为例,这里主要是检查 username/password 配置是否存在等。
writer的初始化流程基本是一样的,这里不展开了。
prepareprepare也是调用插件的prepare方法进行准备阶段的工作,
private void prepare() { this.prepareJobReader(); this.prepareJobWriter(); }
mysql reader的prepare没有实现,意味着不需要prepare,我们直接来看下writer的prepare方法。
// 一般来说,是需要推迟到 task 中进行pre 的执行(单表情况例外) public void prepare(Configuration originalConfig) { int tableNumber = originalConfig.getInt(Constant.TABLE_NUMBER_MARK); if (tableNumber == 1) { String username = originalConfig.getString(Key.USERNAME); String password = originalConfig.getString(Key.PASSWORD); List
其实prepare的核心思想就是,看下任务的配置文件有没有需要提前执行的sql,比如清空表之类的,有的话就先执行了。
参考:
- https://www.cnblogs.com/xmzpc/p/15193622.html
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)