此次源码分析的版本是3.0。因为插件是datax重要的组成部分,源码分析过程中会涉及到插件部分的源码,为了保持一致性,插件都已大部分人比较熟悉的mysql为例子说明。
我所使用的任务模版的json文件是:
{ "job":{ "content":[ { "reader":{ "name":"mysqlreader", "parameter":{ "column":[ "id", "name", "age" ], "connection":[ { "jdbcUrl":[ "jdbc:mysql://127.0.0.1:3306/test" ], "table":[ "t_datax_test" ] } ], "password":"11111111", "username":"root" } }, "writer":{ "name":"mysqlwriter", "parameter":{ "column":[ "id", "name", "age" ], "connection":[ { "jdbcUrl":"jdbc:mysql://127.0.0.1:3306/test2", "table":[ "t_datax_test" ] } ], "password":"11111111", "username":"root" } } } ], "setting":{ "speed":{ "channel":"2" } } } }
另外就是,我在阅读源码的时候加入了很多注释,大家看我贴出来的代码要注意看注释哦。贴出来的代码有些可能只是包含核心的片段。
启动类分析
datax的启动类是com.alibaba.datax.core.Engine,通过main方法启动datax进程。
public static void main(String[] args) throws Exception { int exitCode = 0; try { Engine.entry(args); } catch (Throwable e) { exitCode = 1; LOG.error("nn经DataX智能分析,该任务最可能的错误原因是:n" + ExceptionTracker.trace(e)); ...
继续看entry方法,
public static void entry(final String[] args) throws Throwable { Options options = new Options(); options.addOption("job", true, "Job config."); options.addOption("jobid", true, "Job unique id."); options.addOption("mode", true, "Job runtime mode."); BasicParser parser = new BasicParser(); CommandLine cl = parser.parse(options, args); //datax运行目录/xxx.json String jobPath = cl.getOptionValue("job"); // 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1 String jobIdString = cl.getOptionValue("jobid"); RUNTIME_MODE = cl.getOptionValue("mode"); Configuration configuration = ConfigParser.parse(jobPath); long jobId; if (!"-1".equalsIgnoreCase(jobIdString)) { jobId = Long.parseLong(jobIdString); } else { // only for dsc & ds & datax 3 update String dscJobUrlPatternString = "/instance/(\d{1,})/config.xml"; String dsJobUrlPatternString = "/inner/job/(\d{1,})/config"; String dsTaskGroupUrlPatternString = "/inner/job/(\d{1,})/taskGroup/"; ListpatternStringList = Arrays.asList(dscJobUrlPatternString, dsJobUrlPatternString, dsTaskGroupUrlPatternString); jobId = parseJobIdFromUrl(patternStringList, jobPath); } boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE); if (!isStandAloneMode && jobId == -1) { // 如果不是 standalone 模式,那么 jobId 一定不能为-1 throw DataXException.asDataXException(frameworkErrorCode.CONFIG_ERROR, "非 standalone 模式必须在 URL 中提供有效的 jobId."); } configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId); //打印vmInfo VMInfo vmInfo = VMInfo.getVmInfo(); if (vmInfo != null) { LOG.info(vmInfo.toString()); } LOG.info("n" + Engine.filterJobConfiguration(configuration) + "n"); LOG.debug(configuration.toJSON()); ConfigurationValidate.doValidate(configuration); Engine engine = new Engine(); //完成配置初始化后该方法将实例化本身并调用其start方法 engine.start(configuration); }
首先是解析我们运行datax制定的运行参数,比如我在idea里给的配置是
-mode standalone -jobid -1 -job /Users/malu/documents/code/idea_study/DataX/core/target/datax/job/mysql2mysql.json
那自然的,jobPath的值就是/Users/malu/documents/code/idea_study/DataX/core/target/datax/job/mysql2mysql.json,jobIdString的值是-1,RUNTIME_MODE的值是standalone。
这几个关键的变量值明确之后,下面的流程明确了。
接着看一个比较重要的方法,ConfigParser.parse,这个方法返回的是Configuration类的实例,这个类在datax里非常重要,所有的配置信息都由它来管理,相当于大管家的角色。我后面打算专门写一篇介绍这个类。
public static Configuration parse(final String jobPath) { //首先从任务配置文件解析基本的配置,包括reader、writer的信息,channel的数量等 Configuration configuration = ConfigParser.parseJobConfig(jobPath); //合并datax本身的一些配置,主要是在core.json文件里,比如限速的一些配置等 configuration.merge( ConfigParser.parseCoreConfig(CoreConstant.DATAX_CONF_PATH), false); // todo config优化,只捕获需要的plugin //reader plugin的名字,比如mysql是mysqlreader String readerPluginName = configuration.getString( CoreConstant.DATAX_JOB_CONTENT_READER_NAME); //writer plugin的名字,比如mysql是mysqlwriter String writerPluginName = configuration.getString( CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME); String preHandlerName = configuration.getString( CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME); String postHandlerName = configuration.getString( CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME); SetpluginList = new HashSet (); pluginList.add(readerPluginName); pluginList.add(writerPluginName); if(StringUtils.isNotEmpty(preHandlerName)) { pluginList.add(preHandlerName); } if(StringUtils.isNotEmpty(postHandlerName)) { pluginList.add(postHandlerName); } try { configuration.merge(parsePluginConfig(new ArrayList (pluginList)), false); ...
注释写得很清楚了。
VMInfo里面放的是电脑本身的一些配置信息,这里不表。
接着是filterJobConfiguration方法,
public static String filterJobConfiguration(final Configuration configuration) { //clone一份,因为后面会修改 Configuration jobConfWithSetting = configuration.getConfiguration("job").clone(); Configuration jobContent = jobConfWithSetting.getConfiguration("content"); //过滤敏感信息,比如password filterSensitiveConfiguration(jobContent); jobConfWithSetting.set("content",jobContent); //格式化json字符串显示 return jobConfWithSetting.beautify(); }
这里也没啥好说的,都是一些基本 *** 作。然后进入start方法,
public void start(Configuration allConf) { // 绑定column转换信息 ColumnCast.bind(allConf); LoadUtil.bind(allConf); boolean isJob = !("taskGroup".equalsIgnoreCase(allConf .getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL))); //JobContainer会在schedule后再行进行设置和调整值 int channelNumber =0; AbstractContainer container; ... //缺省打开perfTrace boolean traceEnable = allConf.getBool(CoreConstant.DATAX_CORE_CONTAINER_TRACE_ENABLE, true); boolean perfReportEnable = allConf.getBool(CoreConstant.DATAX_CORE_REPORT_DATAX_PERFLOG, true); //standlone模式的datax shell任务不进行汇报 if(instanceId == -1){ perfReportEnable = false; } int priority = 0; try { priority = Integer.parseInt(System.getenv("SKYNET_PRIORITY")); }catch (NumberFormatException e){ LOG.warn("prioriy set to 0, because NumberFormatException, the value is: "+System.getProperty("PROIORY")); } Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO); //初始化PerfTrace PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable); perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber); container.start(); }
注释也比较清楚了,这里说明一点就是PerfTrace类,它是一个追踪性能的类,也就是datax在执行任务的时候记录一些指标,比如传输了多少数据,耗时多少等。下面是一个示例,它是datax在执行完一个任务后其中一部分打印内容:
2021-11-28 09:11:32.532 [job-0] INFO StandAloneJobContainerCommunicator - Total 5 records, 39 bytes | Speed 3B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
container.start方法就进入JobContainer内部了,放在下一篇文章讲吧。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)