dolphinscheduler 2.0.5-存储过程任务测及代码优化

dolphinscheduler 2.0.5-存储过程任务测及代码优化,第1张

dolphinscheduler 2.0.5-存储过程任务测及代码优化
  • MySQL存储过程
    • 定义
    • 测试
  • pgsql存储过程(8.2.15)
    • 定义
    • 测试
      • 报错
      • 代码
      • 解决
      • 验证
  • 输出参数o_return_code无效
    • 原因
    • 增加返回值校验
    • 验证
  • 源码
  • 其它

MySQL存储过程 定义

测试

pgsql存储过程(8.2.15) 定义

测试 报错
    [ERROR] 2022-04-29 17:10:16.456 TaskLogLogger-class org.apache.dolphinscheduler.plugin.task.procedure.ProcedureTask:[123] - procedure task error
java.sql.SQLFeatureNotSupportedException: 这个 org.postgresql.jdbc.PgCallableStatement.registerOutParameter(int,int,String) 方法尚未被实作。
	at org.postgresql.Driver.notImplemented(Driver.java:688)
	at org.postgresql.jdbc.PgCallableStatement.registerOutParameter(PgCallableStatement.java:502)
	at com.zaxxer.hikari.pool.HikariProxyCallableStatement.registerOutParameter(HikariProxyCallableStatement.java)
	at org.apache.dolphinscheduler.plugin.task.procedure.ProcedureTask.setOutParameter(ProcedureTask.java:326)
	at org.apache.dolphinscheduler.plugin.task.procedure.ProcedureTask.getOutParameterMap(ProcedureTask.java:177)
	at org.apache.dolphinscheduler.plugin.task.procedure.ProcedureTask.handle(ProcedureTask.java:113)
	at org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread.run(TaskExecuteThread.java:189)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
	at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
	at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

代码


报错原因是pgsql不支持该方法,查看其它数据类型,mysql、hive、sqlserver,有的支持有的不支持。但是其中支持的,并没有用到该value参数,莫名其妙!!!

解决

验证

输出参数o_return_code无效

前端定义的返回值1,实际返回值0,那么该任务状态应该为失败,但是结果依然显示成功

原因

代码直接写死的,只要任务正常执行完,就会返回成功!站在任务执行的角度来说也没毛病,任务确实是执行成功了,但是对应任务之间的依赖,A任务执行成功(结果成功)后执行B任务,那就需要对任务结果做判断了,任务执行成功并且结果返回成功才认为该任务成功

增加返回值校验

注释掉写死的返回值,把1.2.1版本的代码拿过来(没接触过1.3.X版本,对1.3不了解)

验证


从日志便能清晰的看出该任务的失败原因:期望值1,实际返回值0

源码

ProcedureTask

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.dolphinscheduler.plugin.task.procedure;

import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_SUCCESS;

import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DatasourceUtil;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DataType;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.Direct;
import org.apache.dolphinscheduler.spi.task.Property;
import org.apache.dolphinscheduler.spi.task.paramparser.ParamUtils;
import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;


/**
 * procedure task
 */
public class ProcedureTask extends AbstractTaskExecutor {

    /**
     * procedure parameters
     */
    private ProcedureParameters procedureParameters;

    /**
     * taskExecutionContext
     */
    private TaskRequest taskExecutionContext;

    /**
     * constructor
     *
     * @param taskExecutionContext taskExecutionContext
     */
    public ProcedureTask(TaskRequest taskExecutionContext) {
        super(taskExecutionContext);

        this.taskExecutionContext = taskExecutionContext;

        logger.info("procedure task params {}", taskExecutionContext.getTaskParams());

        this.procedureParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ProcedureParameters.class);

        // check parameters
        if (!procedureParameters.checkParameters()) {
            throw new RuntimeException("procedure task params is not valid");
        }
    }

    @Override
    public void handle() throws Exception {

        logger.info("procedure type : {}, datasource : {}, method : {} , localParams : {}",
                procedureParameters.getType(),
                procedureParameters.getDatasource(),
                procedureParameters.getMethod(),
                procedureParameters.getLocalParams());

        Connection connection = null;
        CallableStatement stmt = null;
        try {
            // load class
            DbType dbType = DbType.valueOf(procedureParameters.getType());
            // get datasource
            ConnectionParam connectionParam = DatasourceUtil.buildConnectionParams(DbType.valueOf(procedureParameters.getType()),
                    taskExecutionContext.getProcedureTaskExecutionContext().getConnectionParams());

            // get jdbc connection
            connection = DataSourceClientProvider.getInstance().getConnection(dbType, connectionParam);
            Map sqlParamsMap = new HashMap<>();
            Map paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());
            String proceduerSql = formatSql(sqlParamsMap, paramsMap);
            // call method
            stmt = connection.prepareCall(proceduerSql);

            // set timeout
            setTimeout(stmt);

            // outParameterMap
            Map outParameterMap = getOutParameterMap(stmt, sqlParamsMap, paramsMap);

            stmt.executeUpdate();

            // print the output parameters to the log
            printOutParameter(stmt, paramsMap,outParameterMap);//renxiaozhao add paramsMap 20220429

            //setExitStatusCode(EXIT_CODE_SUCCESS);
        } catch (Exception e) {
            setExitStatusCode(EXIT_CODE_FAILURE);
            logger.error("procedure task error", e);
            throw e;
        } finally {
            close(stmt, connection);
        }
    }

    private String formatSql(Map sqlParamsMap, Map paramsMap) {
        // combining local and global parameters
        setSqlParamsMap(procedureParameters.getMethod(), rgex, sqlParamsMap, paramsMap, taskExecutionContext.getTaskInstanceId());
        return procedureParameters.getMethod().replaceAll(rgex, "?");
    }

    /**
     * print outParameter
     *
     * @param stmt            CallableStatement
     * @param outParameterMap outParameterMap
     * @throws SQLException SQLException
     */
    private void printOutParameter(CallableStatement stmt,Map paramsMap,
                                   Map outParameterMap) throws SQLException {
//        for (Map.Entry en : outParameterMap.entrySet()) {
//            int index = en.getKey();
//            Property property = en.getValue();
//            String prop = property.getProp();
//            DataType dataType = property.getType();
//            // get output parameter
//            procedureParameters.dealOutParam4Procedure(getOutputParameter(stmt, index, prop, dataType), prop);
//        }
    	//恢复为1.2.1版本 renxiaozhao 20220429
        Iterator> iter = outParameterMap.entrySet().iterator();
        Map outputMap = new HashMap();
        while (iter.hasNext()){
            Map.Entry en = iter.next();
            int index = en.getKey();
            Property property = en.getValue();
            String prop = property.getProp();
            DataType dataType = property.getType();
            // get output parameter
            Object output = getOutputParameter(stmt, index, prop, dataType);
            outputMap.put(prop, output);
        }
        exitStatusCode = 0;
        // add check for output
        for (String outParam : outputMap.keySet()) {
            String outSetValue = paramsMap.get(outParam).getValue();
            if (org.apache.commons.lang.StringUtils.isNotBlank(outSetValue)) {
                Object outGetValue = outputMap.get(outParam);
                if (outGetValue == null || !outSetValue.trim().equals(outGetValue.toString())) {
                    exitStatusCode = 1;
                    logger.error("Expect value[{}] for [{}], but return[{}]" ,outSetValue ,outParam, outGetValue);
                }
            }
        }
    }

    /**
     * get output parameter
     *
     * @param stmt      CallableStatement
     * @param paramsMap paramsMap
     * @return outParameterMap
     * @throws Exception Exception
     */
    private Map getOutParameterMap(CallableStatement stmt, Map paramsMap
            , Map totalParamsMap) throws Exception {
        Map outParameterMap = new HashMap<>();
        if (procedureParameters.getLocalParametersMap() == null) {
            return outParameterMap;
        }

        int index = 1;
        if (paramsMap != null) {
            for (Map.Entry entry : paramsMap.entrySet()) {
                Property property = entry.getValue();
                if (property.getDirect().equals(Direct.IN)) {
                    ParameterUtils.setInParameter(index, stmt, property.getType(), totalParamsMap.get(property.getProp()).getValue());
                } else if (property.getDirect().equals(Direct.OUT)) {
                    setOutParameter(index, stmt, property.getType(), totalParamsMap.get(property.getProp()).getValue());
                    outParameterMap.put(index, property);
                }
                index++;
            }
        }

        return outParameterMap;
    }

    /**
     * set timeout
     *
     * @param stmt CallableStatement
     */
    private void setTimeout(CallableStatement stmt) throws SQLException {
        Boolean failed = taskExecutionContext.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED;
        Boolean warnFailed = taskExecutionContext.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED;
        if (failed || warnFailed) {
            stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout());
        }
    }

    /**
     * close jdbc resource
     *
     * @param stmt       stmt
     * @param connection connection
     */
    private void close(PreparedStatement stmt, Connection connection) {
        if (stmt != null) {
            try {
                stmt.close();
            } catch (SQLException e) {
                logger.error("close prepared statement error : {}", e.getMessage(), e);
            }
        }
        if (connection != null) {
            try {
                connection.close();
            } catch (SQLException e) {
                logger.error("close connection error : {}", e.getMessage(), e);
            }
        }
    }

    /**
     * get output parameter
     *
     * @param stmt     stmt
     * @param index    index
     * @param prop     prop
     * @param dataType dataType
     * @throws SQLException SQLException
     */
    private Object getOutputParameter(CallableStatement stmt, int index, String prop, DataType dataType) throws SQLException {
        Object value = null;
        switch (dataType) {
            case VARCHAR:
                logger.info("out prameter varchar key : {} , value : {}", prop, stmt.getString(index));
                value = stmt.getString(index);
                break;
            case INTEGER:
                logger.info("out prameter integer key : {} , value : {}", prop, stmt.getInt(index));
                value = stmt.getInt(index);
                break;
            case LONG:
                logger.info("out prameter long key : {} , value : {}", prop, stmt.getLong(index));
                value = stmt.getLong(index);
                break;
            case FLOAT:
                logger.info("out prameter float key : {} , value : {}", prop, stmt.getFloat(index));
                value = stmt.getFloat(index);
                break;
            case DOUBLE:
                logger.info("out prameter double key : {} , value : {}", prop, stmt.getDouble(index));
                value = stmt.getDouble(index);
                break;
            case DATE:
                logger.info("out prameter date key : {} , value : {}", prop, stmt.getDate(index));
                value = stmt.getDate(index);
                break;
            case TIME:
                logger.info("out prameter time key : {} , value : {}", prop, stmt.getTime(index));
                value = stmt.getTime(index);
                break;
            case TIMESTAMP:
                logger.info("out prameter timestamp key : {} , value : {}", prop, stmt.getTimestamp(index));
                value = stmt.getTimestamp(index);
                break;
            case BOOLEAN:
                logger.info("out prameter boolean key : {} , value : {}", prop, stmt.getBoolean(index));
                value = stmt.getBoolean(index);
                break;
            default:
                break;
        }
        return value;
    }

    @Override
    public AbstractParameters getParameters() {
        return procedureParameters;
    }

    /**
     * set out parameter
     *
     * @param index    index
     * @param stmt     stmt
     * @param dataType dataType
     * @param value    value
     * @throws Exception exception
     */
    private void setOutParameter(int index, CallableStatement stmt, DataType dataType, String value) throws Exception {
        int sqlType;
        switch (dataType) {
            case VARCHAR:
                sqlType = Types.VARCHAR;
                break;
            case INTEGER:
            case LONG:
                sqlType = Types.INTEGER;
                break;
            case FLOAT:
                sqlType = Types.FLOAT;
                break;
            case DOUBLE:
                sqlType = Types.DOUBLE;
                break;
            case DATE:
                sqlType = Types.DATE;
                break;
            case TIME:
                sqlType = Types.TIME;
                break;
            case TIMESTAMP:
                sqlType = Types.TIMESTAMP;
                break;
            case BOOLEAN:
                sqlType = Types.BOOLEAN;
                break;
            default:
                throw new IllegalStateException("Unexpected value: " + dataType);
        }

        //if (StringUtils.isEmpty(value)) {
            stmt.registerOutParameter(index, sqlType);
        //} else {
            //stmt.registerOutParameter(index, sqlType, value);
        //}
    }
}

其它

对于pgsql接触不多,为了测试其存储过程,特意安装了一下,详情见centos7安装postgresql8.2.15及存储过程创建

PgSQL 存储过程

CREATE OR REPLACE FUNCTION "public"."p_dolphin_test_pgsql"(IN "i_date" varchar, OUT "o_return_code" int4, OUT "o_return_msg" varchar)
  RETURNS "pg_catalog"."record" AS $BODY$
declare
 vs_return_msg VARCHAR;
begin
   O_RETURN_CODE = 0;
   O_RETURN_MSG = i_date || ' 成功';
   RETURN;
end;
$BODY$
  LANGUAGE plpgsql VOLATILE

MySQL存储过程

CREATE DEFINER=`root`@`%` PROCEDURE `p_dolphin_test_mysql`(IN  i_date  DATE,
OUT o_return_code INT,
OUT o_return_msg VARCHAR(100))
BEGIN
 SELECT * from t_ds_user;                                    
COMMIT;
  SET o_return_code  = 0;
  SET o_return_msg   = '存储过程p_dolphin_test_mysql运行成功!';
END

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/791363.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-05
下一篇 2022-05-05

发表评论

登录后才能评论

评论列表(0条)

保存