- MySQL存储过程
- 定义
- 测试
- pgsql存储过程(8.2.15)
- 定义
- 测试
- 报错
- 代码
- 解决
- 验证
- 输出参数o_return_code无效
- 原因
- 增加返回值校验
- 验证
- 源码
- 其它
[ERROR] 2022-04-29 17:10:16.456 TaskLogLogger-class org.apache.dolphinscheduler.plugin.task.procedure.ProcedureTask:[123] - procedure task error
java.sql.SQLFeatureNotSupportedException: 这个 org.postgresql.jdbc.PgCallableStatement.registerOutParameter(int,int,String) 方法尚未被实作。
at org.postgresql.Driver.notImplemented(Driver.java:688)
at org.postgresql.jdbc.PgCallableStatement.registerOutParameter(PgCallableStatement.java:502)
at com.zaxxer.hikari.pool.HikariProxyCallableStatement.registerOutParameter(HikariProxyCallableStatement.java)
at org.apache.dolphinscheduler.plugin.task.procedure.ProcedureTask.setOutParameter(ProcedureTask.java:326)
at org.apache.dolphinscheduler.plugin.task.procedure.ProcedureTask.getOutParameterMap(ProcedureTask.java:177)
at org.apache.dolphinscheduler.plugin.task.procedure.ProcedureTask.handle(ProcedureTask.java:113)
at org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread.run(TaskExecuteThread.java:189)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
代码
报错原因是pgsql不支持该方法,查看其它数据类型,mysql、hive、sqlserver,有的支持有的不支持。但是其中支持的,并没有用到该value参数,莫名其妙!!!
前端定义的返回值1,实际返回值0,那么该任务状态应该为失败,但是结果依然显示成功
代码直接写死的,只要任务正常执行完,就会返回成功!站在任务执行的角度来说也没毛病,任务确实是执行成功了,但是对应任务之间的依赖,A任务执行成功(结果成功)后执行B任务,那就需要对任务结果做判断了,任务执行成功并且结果返回成功才认为该任务成功
注释掉写死的返回值,把1.2.1版本的代码拿过来(没接触过1.3.X版本,对1.3不了解)
从日志便能清晰的看出该任务的失败原因:期望值1,实际返回值0
ProcedureTask
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.procedure;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_SUCCESS;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DatasourceUtil;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DataType;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.Direct;
import org.apache.dolphinscheduler.spi.task.Property;
import org.apache.dolphinscheduler.spi.task.paramparser.ParamUtils;
import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
* procedure task
*/
public class ProcedureTask extends AbstractTaskExecutor {
/**
* procedure parameters
*/
private ProcedureParameters procedureParameters;
/**
* taskExecutionContext
*/
private TaskRequest taskExecutionContext;
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
*/
public ProcedureTask(TaskRequest taskExecutionContext) {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
logger.info("procedure task params {}", taskExecutionContext.getTaskParams());
this.procedureParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ProcedureParameters.class);
// check parameters
if (!procedureParameters.checkParameters()) {
throw new RuntimeException("procedure task params is not valid");
}
}
@Override
public void handle() throws Exception {
logger.info("procedure type : {}, datasource : {}, method : {} , localParams : {}",
procedureParameters.getType(),
procedureParameters.getDatasource(),
procedureParameters.getMethod(),
procedureParameters.getLocalParams());
Connection connection = null;
CallableStatement stmt = null;
try {
// load class
DbType dbType = DbType.valueOf(procedureParameters.getType());
// get datasource
ConnectionParam connectionParam = DatasourceUtil.buildConnectionParams(DbType.valueOf(procedureParameters.getType()),
taskExecutionContext.getProcedureTaskExecutionContext().getConnectionParams());
// get jdbc connection
connection = DataSourceClientProvider.getInstance().getConnection(dbType, connectionParam);
Map sqlParamsMap = new HashMap<>();
Map paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());
String proceduerSql = formatSql(sqlParamsMap, paramsMap);
// call method
stmt = connection.prepareCall(proceduerSql);
// set timeout
setTimeout(stmt);
// outParameterMap
Map outParameterMap = getOutParameterMap(stmt, sqlParamsMap, paramsMap);
stmt.executeUpdate();
// print the output parameters to the log
printOutParameter(stmt, paramsMap,outParameterMap);//renxiaozhao add paramsMap 20220429
//setExitStatusCode(EXIT_CODE_SUCCESS);
} catch (Exception e) {
setExitStatusCode(EXIT_CODE_FAILURE);
logger.error("procedure task error", e);
throw e;
} finally {
close(stmt, connection);
}
}
private String formatSql(Map sqlParamsMap, Map paramsMap) {
// combining local and global parameters
setSqlParamsMap(procedureParameters.getMethod(), rgex, sqlParamsMap, paramsMap, taskExecutionContext.getTaskInstanceId());
return procedureParameters.getMethod().replaceAll(rgex, "?");
}
/**
* print outParameter
*
* @param stmt CallableStatement
* @param outParameterMap outParameterMap
* @throws SQLException SQLException
*/
private void printOutParameter(CallableStatement stmt,Map paramsMap,
Map outParameterMap) throws SQLException {
// for (Map.Entry en : outParameterMap.entrySet()) {
// int index = en.getKey();
// Property property = en.getValue();
// String prop = property.getProp();
// DataType dataType = property.getType();
// // get output parameter
// procedureParameters.dealOutParam4Procedure(getOutputParameter(stmt, index, prop, dataType), prop);
// }
//恢复为1.2.1版本 renxiaozhao 20220429
Iterator> iter = outParameterMap.entrySet().iterator();
Map outputMap = new HashMap();
while (iter.hasNext()){
Map.Entry en = iter.next();
int index = en.getKey();
Property property = en.getValue();
String prop = property.getProp();
DataType dataType = property.getType();
// get output parameter
Object output = getOutputParameter(stmt, index, prop, dataType);
outputMap.put(prop, output);
}
exitStatusCode = 0;
// add check for output
for (String outParam : outputMap.keySet()) {
String outSetValue = paramsMap.get(outParam).getValue();
if (org.apache.commons.lang.StringUtils.isNotBlank(outSetValue)) {
Object outGetValue = outputMap.get(outParam);
if (outGetValue == null || !outSetValue.trim().equals(outGetValue.toString())) {
exitStatusCode = 1;
logger.error("Expect value[{}] for [{}], but return[{}]" ,outSetValue ,outParam, outGetValue);
}
}
}
}
/**
* get output parameter
*
* @param stmt CallableStatement
* @param paramsMap paramsMap
* @return outParameterMap
* @throws Exception Exception
*/
private Map getOutParameterMap(CallableStatement stmt, Map paramsMap
, Map totalParamsMap) throws Exception {
Map outParameterMap = new HashMap<>();
if (procedureParameters.getLocalParametersMap() == null) {
return outParameterMap;
}
int index = 1;
if (paramsMap != null) {
for (Map.Entry entry : paramsMap.entrySet()) {
Property property = entry.getValue();
if (property.getDirect().equals(Direct.IN)) {
ParameterUtils.setInParameter(index, stmt, property.getType(), totalParamsMap.get(property.getProp()).getValue());
} else if (property.getDirect().equals(Direct.OUT)) {
setOutParameter(index, stmt, property.getType(), totalParamsMap.get(property.getProp()).getValue());
outParameterMap.put(index, property);
}
index++;
}
}
return outParameterMap;
}
/**
* set timeout
*
* @param stmt CallableStatement
*/
private void setTimeout(CallableStatement stmt) throws SQLException {
Boolean failed = taskExecutionContext.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED;
Boolean warnFailed = taskExecutionContext.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED;
if (failed || warnFailed) {
stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout());
}
}
/**
* close jdbc resource
*
* @param stmt stmt
* @param connection connection
*/
private void close(PreparedStatement stmt, Connection connection) {
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
logger.error("close prepared statement error : {}", e.getMessage(), e);
}
}
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
logger.error("close connection error : {}", e.getMessage(), e);
}
}
}
/**
* get output parameter
*
* @param stmt stmt
* @param index index
* @param prop prop
* @param dataType dataType
* @throws SQLException SQLException
*/
private Object getOutputParameter(CallableStatement stmt, int index, String prop, DataType dataType) throws SQLException {
Object value = null;
switch (dataType) {
case VARCHAR:
logger.info("out prameter varchar key : {} , value : {}", prop, stmt.getString(index));
value = stmt.getString(index);
break;
case INTEGER:
logger.info("out prameter integer key : {} , value : {}", prop, stmt.getInt(index));
value = stmt.getInt(index);
break;
case LONG:
logger.info("out prameter long key : {} , value : {}", prop, stmt.getLong(index));
value = stmt.getLong(index);
break;
case FLOAT:
logger.info("out prameter float key : {} , value : {}", prop, stmt.getFloat(index));
value = stmt.getFloat(index);
break;
case DOUBLE:
logger.info("out prameter double key : {} , value : {}", prop, stmt.getDouble(index));
value = stmt.getDouble(index);
break;
case DATE:
logger.info("out prameter date key : {} , value : {}", prop, stmt.getDate(index));
value = stmt.getDate(index);
break;
case TIME:
logger.info("out prameter time key : {} , value : {}", prop, stmt.getTime(index));
value = stmt.getTime(index);
break;
case TIMESTAMP:
logger.info("out prameter timestamp key : {} , value : {}", prop, stmt.getTimestamp(index));
value = stmt.getTimestamp(index);
break;
case BOOLEAN:
logger.info("out prameter boolean key : {} , value : {}", prop, stmt.getBoolean(index));
value = stmt.getBoolean(index);
break;
default:
break;
}
return value;
}
@Override
public AbstractParameters getParameters() {
return procedureParameters;
}
/**
* set out parameter
*
* @param index index
* @param stmt stmt
* @param dataType dataType
* @param value value
* @throws Exception exception
*/
private void setOutParameter(int index, CallableStatement stmt, DataType dataType, String value) throws Exception {
int sqlType;
switch (dataType) {
case VARCHAR:
sqlType = Types.VARCHAR;
break;
case INTEGER:
case LONG:
sqlType = Types.INTEGER;
break;
case FLOAT:
sqlType = Types.FLOAT;
break;
case DOUBLE:
sqlType = Types.DOUBLE;
break;
case DATE:
sqlType = Types.DATE;
break;
case TIME:
sqlType = Types.TIME;
break;
case TIMESTAMP:
sqlType = Types.TIMESTAMP;
break;
case BOOLEAN:
sqlType = Types.BOOLEAN;
break;
default:
throw new IllegalStateException("Unexpected value: " + dataType);
}
//if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, sqlType);
//} else {
//stmt.registerOutParameter(index, sqlType, value);
//}
}
}
其它
对于pgsql接触不多,为了测试其存储过程,特意安装了一下,详情见centos7安装postgresql8.2.15及存储过程创建
PgSQL 存储过程
CREATE OR REPLACE FUNCTION "public"."p_dolphin_test_pgsql"(IN "i_date" varchar, OUT "o_return_code" int4, OUT "o_return_msg" varchar)
RETURNS "pg_catalog"."record" AS $BODY$
declare
vs_return_msg VARCHAR;
begin
O_RETURN_CODE = 0;
O_RETURN_MSG = i_date || ' 成功';
RETURN;
end;
$BODY$
LANGUAGE plpgsql VOLATILE
MySQL存储过程
CREATE DEFINER=`root`@`%` PROCEDURE `p_dolphin_test_mysql`(IN i_date DATE,
OUT o_return_code INT,
OUT o_return_msg VARCHAR(100))
BEGIN
SELECT * from t_ds_user;
COMMIT;
SET o_return_code = 0;
SET o_return_msg = '存储过程p_dolphin_test_mysql运行成功!';
END
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)