删除限制
修改WriterUtil.java添加oracle 数据增量模式插入类型转换
// 替换原先的代码块 public static String getWriteTemplate(ListcolumnHolders, List valueHolders, String writeMode, DatabaseType databaseType, boolean forceUseUpdate) { boolean update = writeMode.trim().toLowerCase().startsWith("update"); boolean isWriteModeLegal = writeMode.trim().toLowerCase().startsWith("insert") || writeMode.trim().toLowerCase().startsWith("replace") || update; if (!isWriteModeLegal) { throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE, String.format("您所配置的 writeMode:%s 错误. 因为DataX 目前仅支持replace,update 或 insert 方式. 请检查您的配置并作出修改.", writeMode)); } // && writeMode.trim().toLowerCase().startsWith("replace") String writeDataSqlTemplate; if (forceUseUpdate || ((databaseType == DatabaseType.MySql || databaseType == DatabaseType.Tddl) && update)) { //update只在mysql下使用 writeDataSqlTemplate = "INSERT INTO %s (" + StringUtils.join(columnHolders, ",") + ") VALUES(" + StringUtils.join(valueHolders, ",") + ")" + onDuplicateKeyUpdateString(columnHolders); } else { if (databaseType == DatabaseType.Oracle && update) { writeDataSqlTemplate = onMergeIntoDoString(writeMode, columnHolders, valueHolders) + "INSERT (" + StringUtils.join(columnHolders, ",") + ") VALUES(" + StringUtils.join(valueHolders, ",") + ")"; } else { //这里是保护,如果其他错误的使用了update,需要更换为replace if (update) { writeMode = "replace"; } writeDataSqlTemplate = writeMode + " INTO %s (" + StringUtils.join(columnHolders, ",") + ") VALUES(" + StringUtils.join(valueHolders, ",") + ")"; } } return writeDataSqlTemplate; }
public static String onMergeIntoDoString(String merge, ListcolumnHolders, List valueHolders) { String[] sArray = getStrings(merge); StringBuilder sb = new StringBuilder(); sb.append("MERGE INTO %s A USING ( SELECT "); boolean first = true; boolean first1 = true; StringBuilder str = new StringBuilder(); StringBuilder update = new StringBuilder(); for (String columnHolder : columnHolders) { if (Arrays.asList(sArray).contains(columnHolder)) { if (!first) { sb.append(","); str.append(" AND "); } else { first = false; } str.append("TMP.").append(columnHolder); sb.append("?"); str.append(" = "); sb.append(" AS "); str.append("A.").append(columnHolder); sb.append(columnHolder); } } for (String columnHolder : columnHolders) { if (!Arrays.asList(sArray).contains(columnHolder)) { if (!first1) { update.append(","); } else { first1 = false; } update.append(columnHolder); update.append(" = "); update.append("?"); } } sb.append(" FROM DUAL ) TMP ON ("); sb.append(str); sb.append(" ) WHEN MATCHED THEN UPDATe SET "); sb.append(update); sb.append(" WHEN NOT MATCHED THEN "); return sb.toString(); }
public static String[] getStrings(String merge) { merge = merge.replace("update", ""); merge = merge.replace("(", ""); merge = merge.replace(")", ""); merge = merge.replace(" ", ""); return merge.split(","); }修改CommonRdbmsWriter.java
// 替换原先的代码块 public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) { this.taskPluginCollector = taskPluginCollector; Listcolumns = new linkedList<>(); if (this.databaseType == DatabaseType.Oracle && writeMode.trim().toLowerCase().startsWith("update") ) { String merge = this.writeMode; String[] sArray = WriterUtil.getStrings(merge); this.columns.forEach(column->{ if (Arrays.asList(sArray).contains(column)) { columns.add(column); } }); this.columns.forEach(column->{ if (!Arrays.asList(sArray).contains(column)) { columns.add(column); } }); } columns.addAll(this.columns); // 用于写入数据的时候的类型根据目的表字段类型转换 this.resultSetmetaData = DBUtil.getColumnmetaData(connection, this.table, StringUtils.join(columns, ",")); // 写数据库的SQL语句 calcWriteRecordSql(); List writeBuffer = new ArrayList (this.batchSize); int bufferBytes = 0; try { Record record; while ((record = recordReceiver.getFromReader()) != null) { if (record.getColumnNumber() != this.columnNumber) { // 源头读取字段列数与目的表字段写入列数不相等,直接报错 throw DataXException .asDataXException( DBUtilErrorCode.CONF_ERROR, String.format( "列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.", record.getColumnNumber(), this.columnNumber)); } writeBuffer.add(record); bufferBytes += record.getMemorySize(); if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) { doBatchInsert(connection, writeBuffer); writeBuffer.clear(); bufferBytes = 0; } } if (!writeBuffer.isEmpty()) { doBatchInsert(connection, writeBuffer); writeBuffer.clear(); bufferBytes = 0; } } catch (Exception e) { throw DataXException.asDataXException( DBUtilErrorCode.WRITE_DATA_ERROR, e); } finally { writeBuffer.clear(); bufferBytes = 0; DBUtil.closeDBResources(null, null, connection); } }
// 替换原先代码块 protected PreparedStatement fillPreparedStatement(PreparedStatement preparedStatement, Record record) throws SQLException { for (int i = 0; i < this.resultSetmetaData.getLeft().size(); i++) { int columnSqlType = this.resultSetmetaData.getMiddle().get(i); String typeName = this.resultSetmetaData.getRight().get(i); String column = this.resultSetmetaData.getLeft().get(i); Column columnValue = record.getColumn(this.columns.indexOf(column)); preparedStatement = fillPreparedStatementColumnType(preparedStatement, i, columnSqlType, typeName,columnValue); } return preparedStatement; }示例
{ "job": { "setting": { "speed": { "channel": 5 } }, "content": [ { "reader": { "name": "ftpreader", "parameter": { "username": "ftp", "password": "ftp", "column": [ "*" ], "protocol": "ftp", "host": "*****", "encoding": "UTF-8", "fieldDelimiter": ",", "port": "21", "path": [ "/csvTem/blade_log_api" ] } }, "writer": { "name": "oraclewriter", "parameter": { "username": "****", "password": "****", "column": [ "IS_SUCCESS", "RE_APP", "TENANT_ID", "SERVICE_ID", "SERVER_HOST", "SERVER_IP", "ENV", "TYPE", "TITLE", "METHOD", "ID" ], "writeMode": "update(ID)", "connection": [ { "table": [ "BLADE_LOG_API" ], "jdbcUrl": "jdbc:oracle:thin:@localhost:1521/XE" } ] } } } ] } }改完后的jar包
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)