datax源码解析-JobContainer的初始化阶段解析

datax源码解析-JobContainer的初始化阶段解析,第1张

datax源码解析-JobContainer的初始化阶段解析

datax源码解析-JobContainer的初始化阶段解析

写在前面

此次源码分析的版本是3.0。因为插件是datax重要的组成部分,源码分析过程中会涉及到插件部分的源码,为了保持一致性,插件都已大部分人比较熟悉的mysql为例子说明。

我所使用的任务模版的json文件是:

{
    "job":{
        "content":[
            {
                "reader":{
                    "name":"mysqlreader",
                    "parameter":{
                        "column":[
                            "id",
                            "name",
                            "age"
                        ],
                        "connection":[
                            {
                                "jdbcUrl":[
                                    "jdbc:mysql://127.0.0.1:3306/test"
                                ],
                                "table":[
                                    "t_datax_test"
                                ]
                            }
                        ],
                        "password":"11111111",
                        "username":"root"
                    }
                },
                "writer":{
                    "name":"mysqlwriter",
                    "parameter":{
                        "column":[
                            "id",
                            "name",
                            "age"
                        ],
                        "connection":[
                            {
                                "jdbcUrl":"jdbc:mysql://127.0.0.1:3306/test2",
                                "table":[
                                    "t_datax_test"
                                ]
                            }
                        ],
                        "password":"11111111",
                        "username":"root"
                    }
                }
            }
        ],
        "setting":{
            "speed":{
                "channel":"2"
            }
        }
    }
}

另外就是,我在阅读源码的时候加入了很多注释,大家看我贴出来的代码要注意看注释哦。贴出来的代码有些可能只是包含核心的片段。

JobContainer初始化阶段

接着上篇文章:

datax源码解析-启动类分析

进入JobContainer的start方法,jobContainer主要负责的工作全部在start()里面,包括:

  • preHandle,前置处理
  • init,初始化,主要是调用插件的init方法实现初始化
  • prepare,准备工作,比如清空目标表。内部还是调用各类型插件的方法来实现准备工作
  • split,根据配置的并发参数,对job进行切分,切分为多个task
  • scheduler,把上一步reader和writer split的结果整合到taskGroupContainer,启动任务
  • post,执行完任务后的后置 *** 作
  • invokeHooks,DataX 的 Hook 机制,比如我们可以实现将datax任务的执行结果发送的邮箱,短信通知等

从代码中看,也可以清晰的看到这几个过程:

public void start() {
        LOG.info("DataX jobContainer starts job.");

        boolean hasException = false;
        boolean isDryRun = false;
        try {
            this.startTimeStamp = System.currentTimeMillis();
            isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);
            if(isDryRun) {
                LOG.info("jobContainer starts to do preCheck ...");
                this.preCheck();
            } else {
                //线程安全考虑
                userConf = configuration.clone();
                LOG.debug("jobContainer starts to do preHandle ...");
                初始化preHandler插件并执行插件的preHandler
                this.preHandle();

                LOG.debug("jobContainer starts to do init ...");
                //初始化reader和writer
                this.init();
                LOG.info("jobContainer starts to do prepare ...");
                //准备工作,比如清空目标表。内部还是调用各类型插件的方法来实现准备工作。
                this.prepare();
                LOG.info("jobContainer starts to do split ...");
                //拆分task,实际的拆分工作还是调用插件的实现
                this.totalStage = this.split();
                LOG.info("jobContainer starts to do schedule ...");
                //把上一步reader和writer split的结果整合到taskGroupContainer,启动任务
                this.schedule();
                LOG.debug("jobContainer starts to do post ...");
                //执行任务后的 *** 作
                this.post();

                LOG.debug("jobContainer starts to do postHandle ...");
                //不知道是干啥的
                this.postHandle();
                LOG.info("DataX jobId [{}] completed successfully.", this.jobId);

                //DataX 的 Hook 机制,比如我们可以实现将datax任务的执行结果发送的邮箱,短信通知等
                this.invokeHooks();
                ...

本篇文章只关注前面三个部分,也就是preHandle,init,prepare三个阶段,我认为这三个阶段都属于任务开始前的初始化阶段。

preHandler

preHandler目前官方也没有实现,com.alibaba.datax.common.plugin.AbstractPlugin#preHandler方法目前是空的,所以这里我们也先略过。

init

继续看init方法,

private void init() {
        //从配置中获取jobid
        this.jobId = this.configuration.getLong(
                CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, -1);

        if (this.jobId < 0) {
            LOG.info("Set jobId = 0");
            this.jobId = 0;
            this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID,
                    this.jobId);
        }

        Thread.currentThread().setName("job-" + this.jobId);

        //DataX所有的状态及统计信息交互类,job、taskGroup、task等的消息汇报
        JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(
                this.getContainerCommunicator());
        //必须先Reader ,后Writer
        this.jobReader = this.initJobReader(jobPluginCollector);
        //writer的初始化做的事情会多一些,比如会检查写入表的字段和指定的字段个数是否一致等
        this.jobWriter = this.initJobWriter(jobPluginCollector);
    }

可以看到,init方法分别调用的是reader和writer的init方法进行初始化。先来看下initJobReader方法,

private Reader.Job initJobReader(
            JobPluginCollector jobPluginCollector) {
        this.readerPluginName = this.configuration.getString(
                CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
        classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
                PluginType.READER, this.readerPluginName));

        //loadJobPlugin需要用到jarLoader
        Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(
                PluginType.READER, this.readerPluginName);

        // 设置reader的jobConfig
        jobReader.setPluginJobConf(this.configuration.getConfiguration(
                CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));

        // 设置reader的readerConfig
        jobReader.setPeerPluginJobConf(this.configuration.getConfiguration(
                CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));

        jobReader.setJobPluginCollector(jobPluginCollector);
        //调用插件自己内部的init方法进行个性初始化,以mysql的初始化为例
        //mysql reader会检查username,password等是否存在
        jobReader.init();

        classLoaderSwapper.restoreCurrentThreadClassLoader();
        return jobReader;
    }

首先看这个方法返回的是Reader.Job这样的一个内部类,这个类是AbstractJobPlugin的一个实现。所以返回的其实是一个reader插件的实例。

接着看到是com.alibaba.datax.core.util.container.LoadUtil#getJarLoader方法,它根据类型和名称从缓存中获取,如果没有则去创建,创建的流程首先获取插件的路径.比如:D:DataXtargetdataxdataxpluginreadermysqlreader,然后根据JarLoader里面的getURLs(paths)获取插件路径下所有的jar包。创建单独的JarLoader,把创建的JarLoader缓存起来。

然后它返回一个是一个自定义的类加载器JarLoader,根据java类加载器的原理我们知道,JarLoader是Application ClassLoader的子类。DataX通过Thread.currentThread().setContextClassLoader在每次对插件调用前后的进行classLoader的切换实现jar隔离的加载机制。

接下里的loadJobPlugin就会用到这个类加载器去实例化插件的实现类。

插件加载这部分的设计还是值得学习的,即实现了jar的隔离加载,也实现了热加载功能。

最后就是调用插件本身的init方法,以mysql为例,这里主要是检查 username/password 配置是否存在等。

writer的初始化流程基本是一样的,这里不展开了。

prepare

prepare也是调用插件的prepare方法进行准备阶段的工作,

private void prepare() {
        this.prepareJobReader();
        this.prepareJobWriter();
    }

mysql reader的prepare没有实现,意味着不需要prepare,我们直接来看下writer的prepare方法。

// 一般来说,是需要推迟到 task 中进行pre 的执行(单表情况例外)
        public void prepare(Configuration originalConfig) {
            int tableNumber = originalConfig.getInt(Constant.TABLE_NUMBER_MARK);
            if (tableNumber == 1) {
                String username = originalConfig.getString(Key.USERNAME);
                String password = originalConfig.getString(Key.PASSWORD);

                List conns = originalConfig.getList(Constant.CONN_MARK,
                        Object.class);
                Configuration connConf = Configuration.from(conns.get(0)
                        .toString());

                // 这里的 jdbcUrl 已经 append 了合适后缀参数
                String jdbcUrl = connConf.getString(Key.JDBC_URL);
                originalConfig.set(Key.JDBC_URL, jdbcUrl);

                //表名
                String table = connConf.getList(Key.TABLE, String.class).get(0);
                originalConfig.set(Key.TABLE, table);

                //如果有需要提前执行的sql,比如清空表等
                List preSqls = originalConfig.getList(Key.PRE_SQL,
                        String.class);
                
                List renderedPreSqls = WriterUtil.renderPreOrPostSqls(
                        preSqls, table);

                originalConfig.remove(Constant.CONN_MARK);
                if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {
                    // 说明有 preSql 配置,则此处删除掉
                    originalConfig.remove(Key.PRE_SQL);

                    Connection conn = DBUtil.getConnection(databaseType,
                            jdbcUrl, username, password);
                    LOG.info("Begin to execute preSqls:[{}]. context info:{}.",
                            StringUtils.join(renderedPreSqls, ";"), jdbcUrl);

                    WriterUtil.executeSqls(conn, renderedPreSqls, jdbcUrl, databaseType);
                    DBUtil.closeDBResources(null, null, conn);
                }
            }
 

其实prepare的核心思想就是,看下任务的配置文件有没有需要提前执行的sql,比如清空表之类的,有的话就先执行了。


参考:

  • https://www.cnblogs.com/xmzpc/p/15193622.html

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5660281.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)