Error[8]: Undefined offset: 52, File: /www/wwwroot/outofmemory.cn/tmp/plugin_ss_superseo_model_superseo.php, Line: 121
File: /www/wwwroot/outofmemory.cn/tmp/plugin_ss_superseo_model_superseo.php, Line: 473, decode(

概述问题描述 sqoop任务:sqlserver -> hdfs 运行时间:2019-05-23 00:05:30~00:37:03 sqoop任务运行成功,但是sqlserver搬运到hdfs的300W数据出现829条重复记录 问题影响 影响酒店下游任务报表数据不准确,需要重跑任务 当时临时解决方案 重跑该sqoop任务后,数据没有出现重复 防止类似情况出现,将该任务下游Base数据ETL时dist 问题描述

sqoop任务:sqlserver -> hdfs
运行时间:2019-05-23 00:05:30~00:37:03
sqoop任务运行成功,但是sqlserver搬运到hdfs的300W数据出现829条重复记录

问题影响

影响酒店下游任务报表数据不准确,需要重跑任务

当时临时解决方案

重跑该sqoop任务后,数据没有出现重复
防止类似情况出现,将该任务下游Base数据ETL时distinct

问题原因定位

该sqoop任务配置信息大致如下:

sqoop import -D mapreduce.job.name={JOB_name} --connect '{db_info=232}'  --delete-target-dir -query   " SELECT ID,star_out,hotel_type,hotel_economic,hotel_apartment,IsMultiSupply,InventoryUseType,IsSendVouchFax,auditingType,replace(replace(replace(replace(SubcityID,char(10),''),char(13),char(1),char(0),'') as  SubcityID,isshadow,AgreementEntityID FROM Hotel_Product.dbo.hotel(nolock) where   $CONDITIONS " --where '1=1' --split-by ID --null-string '\N' --null-non-string '\N' --fIElds-terminated-by '
2019-05-23 00:36:27,823 ERROR [main] org.apache.sqoop.mapreduce.db.DBRecordReader: top level exception: com.microsoft.sqlserver.jdbc.sqlServerException: Connection reset        at com.microsoft.sqlserver.jdbc.sqlServerConnection.terminate(sqlServerConnection.java:1352)        at com.microsoft.sqlserver.jdbc.sqlServerConnection.terminate(sqlServerConnection.java:1339)        at com.microsoft.sqlserver.jdbc.TDSChannel.read(IOBuffer.java:1694)        at com.microsoft.sqlserver.jdbc.TDSReader.readPacket(IOBuffer.java:3734)        at com.microsoft.sqlserver.jdbc.TDSReader.nextPacket(IOBuffer.java:3687)        at com.microsoft.sqlserver.jdbc.TDSReader.ensurePayload(IOBuffer.java:3663)        at com.microsoft.sqlserver.jdbc.TDSReader.readBytes(IOBuffer.java:3979)        at com.microsoft.sqlserver.jdbc.TDSReader.reaDWrappedBytes(IOBuffer.java:4001)        at com.microsoft.sqlserver.jdbc.TDSReader.readInt(IOBuffer.java:3942)        at com.microsoft.sqlserver.jdbc.TDSReader.readUnsignedInt(IOBuffer.java:3959)        at com.microsoft.sqlserver.jdbc.PLPinputStream.readBytesInternal(PLPinputStream.java:313)        at com.microsoft.sqlserver.jdbc.PLPinputStream.getBytes(PLPinputStream.java:129)        at com.microsoft.sqlserver.jdbc.DDC.convertStreamToObject(DDC.java:438)        at com.microsoft.sqlserver.jdbc.ServerDTVImpl.getValue(dtv.java:2441)        at com.microsoft.sqlserver.jdbc.DTV.getValue(dtv.java:176)        at com.microsoft.sqlserver.jdbc.Column.getValue(Column.java:113)        at com.microsoft.sqlserver.jdbc.sqlServerResultSet.getValue(sqlServerResultSet.java:1981)        at com.microsoft.sqlserver.jdbc.sqlServerResultSet.getValue(sqlServerResultSet.java:1966)        at com.microsoft.sqlserver.jdbc.sqlServerResultSet.getString(sqlServerResultSet.java:2291)        at org.apache.sqoop.lib.JdbcWritableBrIDge.readString(JdbcWritableBrIDge.java:71)        at com.cloudera.sqoop.lib.JdbcWritableBrIDge.readString(JdbcWritableBrIDge.java:61)        at queryResult.readFIElds(queryResult.java:1670)        at org.apache.sqoop.mapreduce.db.DBRecordReader.nextkeyvalue(DBRecordReader.java:244)        at org.apache.sqoop.mapreduce.db.sqlServerDBRecordReader.nextkeyvalue(sqlServerDBRecordReader.java:148)        at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextkeyvalue(MapTask.java:553)        at org.apache.hadoop.mapreduce.task.MapContextImpl.nextkeyvalue(MapContextImpl.java:80)        at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextkeyvalue(WrappedMapper.java:91)        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)        at org.apache.sqoop.mapreduce.AutoprogressMapper.run(AutoprogressMapper.java:64)        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)        at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:168)        at java.security.AccessController.doPrivileged(Native Method)        at javax.security.auth.Subject.doAs(Subject.java:422)        at org.apache.hadoop.security.UserGroupinformation.doAs(UserGroupinformation.java:1701)        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)
1' -m 8 --target-dir /data/BaseData/elong/dshprdt_hotel

其中--split-by ID,-m 8 通过ID字段来分割出8个map执行。

运行在hadoop-070-126.bigdata.ly节点上的map为第map
搬运的数据范围为( ID >= 1 ) AND ( ID < 464562 )
2019-05-23 00:36:27,823 数据库连接发生异常,异常信息见如下堆栈

2019-05-23 00:36:27,862 WARN [main] org.apache.sqoop.mapreduce.db.sqlServerDBRecordReader: Trying to recover from DB read failure: java.io.IOException: sqlException in nextkeyvalue        at org.apache.sqoop.mapreduce.db.DBRecordReader.nextkeyvalue(DBRecordReader.java:277)        at org.apache.sqoop.mapreduce.db.sqlServerDBRecordReader.nextkeyvalue(sqlServerDBRecordReader.java:148)        at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextkeyvalue(MapTask.java:553)        at org.apache.hadoop.mapreduce.task.MapContextImpl.nextkeyvalue(MapContextImpl.java:80)        at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextkeyvalue(WrappedMapper.java:91)        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)        at org.apache.sqoop.mapreduce.AutoprogressMapper.run(AutoprogressMapper.java:64)        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)        at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:168)        at java.security.AccessController.doPrivileged(Native Method)        at javax.security.auth.Subject.doAs(Subject.java:422)        at org.apache.hadoop.security.UserGroupinformation.doAs(UserGroupinformation.java:1701)        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)Caused by: com.microsoft.sqlserver.jdbc.sqlServerException: Connection reset        at com.microsoft.sqlserver.jdbc.sqlServerConnection.terminate(sqlServerConnection.java:1352)        at com.microsoft.sqlserver.jdbc.sqlServerConnection.terminate(sqlServerConnection.java:1339)        at com.microsoft.sqlserver.jdbc.TDSChannel.read(IOBuffer.java:1694)        at com.microsoft.sqlserver.jdbc.TDSReader.readPacket(IOBuffer.java:3734)        at com.microsoft.sqlserver.jdbc.TDSReader.nextPacket(IOBuffer.java:3687)        at com.microsoft.sqlserver.jdbc.TDSReader.ensurePayload(IOBuffer.java:3663)        at com.microsoft.sqlserver.jdbc.TDSReader.readBytes(IOBuffer.java:3979)        at com.microsoft.sqlserver.jdbc.TDSReader.reaDWrappedBytes(IOBuffer.java:4001)        at com.microsoft.sqlserver.jdbc.TDSReader.readInt(IOBuffer.java:3942)        at com.microsoft.sqlserver.jdbc.TDSReader.readUnsignedInt(IOBuffer.java:3959)        at com.microsoft.sqlserver.jdbc.PLPinputStream.readBytesInternal(PLPinputStream.java:313)        at com.microsoft.sqlserver.jdbc.PLPinputStream.getBytes(PLPinputStream.java:129)        at com.microsoft.sqlserver.jdbc.DDC.convertStreamToObject(DDC.java:438)        at com.microsoft.sqlserver.jdbc.ServerDTVImpl.getValue(dtv.java:2441)        at com.microsoft.sqlserver.jdbc.DTV.getValue(dtv.java:176)        at com.microsoft.sqlserver.jdbc.Column.getValue(Column.java:113)        at com.microsoft.sqlserver.jdbc.sqlServerResultSet.getValue(sqlServerResultSet.java:1981)        at com.microsoft.sqlserver.jdbc.sqlServerResultSet.getValue(sqlServerResultSet.java:1966)        at com.microsoft.sqlserver.jdbc.sqlServerResultSet.getString(sqlServerResultSet.java:2291)        at org.apache.sqoop.lib.JdbcWritableBrIDge.readString(JdbcWritableBrIDge.java:71)        at com.cloudera.sqoop.lib.JdbcWritableBrIDge.readString(JdbcWritableBrIDge.java:61)        at queryResult.readFIElds(queryResult.java:1670)        at org.apache.sqoop.mapreduce.db.DBRecordReader.nextkeyvalue(DBRecordReader.java:244)        ... 13 more2019-05-23 00:36:28,130 INFO [main] org.apache.sqoop.mapreduce.db.sqlServerConnectionFailureHandler: Session context is: NulL2019-05-23 00:36:28,131 INFO [main] org.apache.sqoop.mapreduce.db.BasicRetrysqlFailureHandler: A new connection has been established2019-05-23 00:36:28,186 INFO [main] org.apache.sqoop.mapreduce.db.DBRecordReader: Working on split: ID >= 1 AND ID < 4645622019-05-23 00:36:28,198 INFO [main] org.apache.sqoop.mapreduce.db.DBRecordReader: Executing query:  SELECT ID,CorpGroupID,HotelBrandID,AgreementEntityID FROM Hotel_Product.dbo.hotel(nolock) where ( ID > 458735 ) AND ( ID < 464562 )2019-05-23 00:36:28,804 WARN [ResponseProcessor for block BP-894016253-10.12.180.10-1463057953660:blk_15531850664_15126642971] org.apache.hadoop.hdfs.DFSClIEnt: Slow ReadProcessor read fIElds took 51708ms (threshold=30000ms); ack: seqno: 21070 status: SUCCESS status: SUCCESS status: SUCCESS downstreamAckTimeNanos: 20470492,targets: [172.1.0.126:50010,172.1.0.72:50010,172.1.0.78:50010]2019-05-23 00:36:42,628 INFO [Thread-13] org.apache.sqoop.mapreduce.AutoprogressMapper: auto-progress thread is finished. keepGoing=false2019-05-23 00:36:42,693 INFO [main] org.apache.hadoop.mapred.Task: Task:attempt_1555615894016_1958403_m_000000_0 is done. And is in the process of committing2019-05-23 00:36:42,742 INFO [main] org.apache.hadoop.mapred.Task: Task attempt_1555615894016_1958403_m_000000_0 is allowed to commit Now2019-05-23 00:36:42,751 INFO [main] org.apache.hadoop.mapreduce.lib.output.fileOutputCommitter: Saved output of task 'attempt_1555615894016_1958403_m_000000_0' to vIEwfs://dcfs/data/BaseData/elong/dshprdt_hotel/_temporary/1/task_1555615894016_1958403_m_0000002019-05-23 00:36:42,816 INFO [main] org.apache.hadoop.mapred.Task: Task 'attempt_1555615894016_1958403_m_000000_0' done.

该数据库连接异常导致map任务重新和数据库建立连接进行恢复,继续进行数据搬运,最终该map成功完成。recover过程见下面日志

sqlServerDBRecordReader.java  public boolean nextkeyvalue() throws IOException {    boolean valueReceived = false;    int retryCount = RETRY_MAX;    boolean doRetry = true;    do {      try {        // Try to get the next key/value pairs        valueReceived = super.nextkeyvalue();        doRetry = false;      } catch (IOException ioEx) {        LOG.warn("Trying to recover from DB read failure: ",ioEx);        Throwable cause = ioEx.getCause();        // Use configured connection handler to recover from the connection        // failure and use the newly constructed connection.        // If the failure cannot be recovered,an exception is thrown        if (failureHandler.canHandleFailure(cause)) {          // Recover from connection failure          Connection conn = failureHandler.recover();          // Configure the new connection before using it          configureConnection(conn);          setConnection(conn);          --retryCount;          doRetry = (retryCount >= 0);        } else {          // Cannot recovered using configured handler,re-throw          throw new IOException("Cannection handler cannot recover failure: ",ioEx);        }      }    } while (doRetry);    // Rethrow the exception if all retry attempts are consumed    if (retryCount < 0) {      throw new IOException("Failed to read from database after "        + RETRY_MAX + " retrIEs.");    }    return valueReceived;  }

注意上述日志中重新建立数据库连接后查询的ID范围为
where ( ID > 458735 ) AND ( ID < 464562 )
为什么恢复后的任务是ID查询的是大于458735的数据?
先看下sqoop恢复任务的代码

DBRecordReader.java    @OverrIDe  public boolean nextkeyvalue() throws IOException {    try {      if (key == null) {        key = new LongWritable();      }      if (value == null) {        value = createValue();      }      if (null == this.results) {        // First time into this method,run the query.        LOG.info("Working on split: " + split);        this.results = executequery(getSelectquery());      }      if (!results.next()) {        return false;      }      ...    }

从上述代码可知,nextkeyvalue()获取数据出现数据连接相关异常后会进行3次重试。
然后继续执行super.nextkeyvalue()。此时因为是新的数据连接,要重新执行一次数据查询(见如下代码),新的查询sql是通过getSelectquery()方法构造出来的

sqlServerDBRecordReader.java    protected String getSelectquery() {    // Last seen record key is only expected to be unavailable if no reads    // ever happened    String selectquery;    if (lastRecordKey == null) {      selectquery = super.getSelectquery();    } else {      // If last record key is available,construct the select query to start      // from      DataDrivendBinputFormat.DataDrivendBinputSplit dataSplit =          (DataDrivendBinputFormat.DataDrivendBinputSplit) getSplit();      StringBuilder lowerClause = new StringBuilder();      lowerClause.append(getDBConf().getinputOrderBy());      lowerClause.append(" > ");      lowerClause.append(lastRecordKey.toString());      // Get the select query with the lowerClause,and split upper clause      selectquery = getSelectquery(lowerClause.toString(),dataSplit.getUpperClause());    }    return selectquery;  }

仔细看看getSelectquery()方法是如何构建出来的(见下面代码),构建新的查询语句的时候会添加一个lowerClause,这个是动态拼接出来的查询下界:( ID > 458735 )
至于查询上界 ( ID < 464562 ) 是写死的,这个map执行根据split出来最大上限。
我们最关心的是这个动态生成的查询下界是从哪里获取的

sqlServerDBRecordReader.java    public T getCurrentValue() {    T val = super.getCurrentValue();    // Lookup the key of the last read record to use for recovering    // As documented,the map may not be null,though it may be empty.    Object lastRecordSplitCol = val.getFIEldMap().get(splitColumn);    lastRecordKey = (lastRecordSplitCol == null) ? null        : lastRecordSplitCol.toString();    return val;  }    queryResult.java  public Map<String,Object> getFIEldMap() {    Map<String,Object> __sqoop$fIEld_map = new TreeMap<String,Object>();    __sqoop$fIEld_map.put("ID",this.ID);    ... }

查询下界的值是lastRecordKey,map是当前故障发生前处理的最后一条成功数据的splitColumn所对应的值(--split-by ID)。此处代码如下

ID

因为我们使用的ID 不是主键(主键为hotel_ID),且不保证有序,所以sqoop利用游标查询数据库时返回数据可能会出现如下情况

hotel_name 1
A酒店 3
B酒店 4
C酒店 5
D酒店 6
E酒店 7
F酒店 8
G酒店 2
H酒店 9
I酒店 10
J酒店 11
K酒店 ID

假设当我们MapReduce到ID为2的酒店时候并且将该酒店数据刷到hdfs上时,数据库连接出现异常导致查询中断。此时我们实际已经将如下酒店数据搬运完毕
ID | hotel_name
---|---
1 | A酒店
3 | B酒店
4 | C酒店
5 | D酒店
6 | E酒店
7 | F酒店
8 | G酒店
2 | H酒店
然后sqoop开始恢复任务,lastRecordKey获取最后一个成功搬运的数据为H酒店(ID为2),因为新的查询语句为 select * from table where (ID > 2) AND (ID <= 11),本来只剩3个酒店数据未搬运,但此时重新恢复连接搬运的数据为:

select ID,hotel_ID from Hotel_Product.dbo.hotel(nolock) where ID < 464562
hotel_name 3
B酒店 4
C酒店 5
D酒店 6
E酒店 7
F酒店 8
G酒店 9
I酒店 10
J酒店 11
K酒店

有6个酒店数据记录被重复搬运了!!!
第一次搬运的数据中,只要ID还有大于lastRecordKey的都会被重复搬运。
还有更严重的情况,如果故障前处理的最后一条数据为ID:8
则重新恢复连接搬运的数据范围为 ID > 8 & ID <= 11
此时会跳过ID:2的数据,造成数据丢失!!!

线上数据验证

通过如下语句,下载map1应当搬运的全量数据456629条到csv中

[+++]

异常发生时候处理的最后一条数据为ID: 458735
在异常发生前的csv数据内搜索ID大于458735数据,有829条重复数据。

结论

起因:本次问题起因由数据库连接异常引起。
sqoop从sqlserver导数据配置时候--split-by 的字段必须为长整型顺序唯一字段。而本次搬运涉及的表结构没有很好符合该规范。从而使得连接异常时恢复链接后继续搬运数据出现重复。

改进点 业务方后续新增的库表时设计要按照数据库规范来:表必须定义主键,默认为ID,长整型自增,且自增ID不能作为业务场景使用。 sqoop搬运的sqlserver表时,--split-by 的字段指定长整型顺序唯一的主键字段; 或者--split-by 的字段 指定一个有唯一性约束的字段,对于无序的问题通过语句中加ORDER BY 字段 解决无序问题; 修改sqoop代码将sqlserver数据库连接异常的报错抛出来,不进行重试,让sqoop的MR任务失败,直接在调度层面让sqoop任务重跑; 总结

以上是内存溢出为你收集整理的Sqoop任务搬运数据出现重复全部内容,希望文章能够帮你解决Sqoop任务搬运数据出现重复所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

)
File: /www/wwwroot/outofmemory.cn/tmp/route_read.php, Line: 126, InsideLink()
File: /www/wwwroot/outofmemory.cn/tmp/index.inc.php, Line: 166, include(/www/wwwroot/outofmemory.cn/tmp/route_read.php)
File: /www/wwwroot/outofmemory.cn/index.php, Line: 30, include(/www/wwwroot/outofmemory.cn/tmp/index.inc.php)
Sqoop任务搬运数据出现重复_sql_内存溢出

Sqoop任务搬运数据出现重复

Sqoop任务搬运数据出现重复,第1张

概述问题描述 sqoop任务:sqlserver -> hdfs 运行时间:2019-05-23 00:05:30~00:37:03 sqoop任务运行成功,但是sqlserver搬运到hdfs的300W数据出现829条重复记录 问题影响 影响酒店下游任务报表数据不准确,需要重跑任务 当时临时解决方案 重跑该sqoop任务后,数据没有出现重复 防止类似情况出现,将该任务下游Base数据ETL时dist 问题描述

sqoop任务:sqlserver -> hdfs
运行时间:2019-05-23 00:05:30~00:37:03
sqoop任务运行成功,但是sqlserver搬运到hdfs的300W数据出现829条重复记录

问题影响

影响酒店下游任务报表数据不准确,需要重跑任务

当时临时解决方案

重跑该sqoop任务后,数据没有出现重复
防止类似情况出现,将该任务下游Base数据ETL时distinct

问题原因定位

该sqoop任务配置信息大致如下:

sqoop import -D mapreduce.job.name={JOB_name} --connect '{db_info=232}'  --delete-target-dir -query   " SELECT ID,star_out,hotel_type,hotel_economic,hotel_apartment,IsMultiSupply,InventoryUseType,IsSendVouchFax,auditingType,replace(replace(replace(replace(SubcityID,char(10),''),char(13),char(1),char(0),'') as  SubcityID,isshadow,AgreementEntityID FROM Hotel_Product.dbo.hotel(nolock) where   $CONDITIONS " --where '1=1' --split-by ID --null-string '\N' --null-non-string '\N' --fIElds-terminated-by '
2019-05-23 00:36:27,823 ERROR [main] org.apache.sqoop.mapreduce.db.DBRecordReader: top level exception: com.microsoft.sqlserver.jdbc.sqlServerException: Connection reset        at com.microsoft.sqlserver.jdbc.sqlServerConnection.terminate(sqlServerConnection.java:1352)        at com.microsoft.sqlserver.jdbc.sqlServerConnection.terminate(sqlServerConnection.java:1339)        at com.microsoft.sqlserver.jdbc.TDSChannel.read(IOBuffer.java:1694)        at com.microsoft.sqlserver.jdbc.TDSReader.readPacket(IOBuffer.java:3734)        at com.microsoft.sqlserver.jdbc.TDSReader.nextPacket(IOBuffer.java:3687)        at com.microsoft.sqlserver.jdbc.TDSReader.ensurePayload(IOBuffer.java:3663)        at com.microsoft.sqlserver.jdbc.TDSReader.readBytes(IOBuffer.java:3979)        at com.microsoft.sqlserver.jdbc.TDSReader.reaDWrappedBytes(IOBuffer.java:4001)        at com.microsoft.sqlserver.jdbc.TDSReader.readInt(IOBuffer.java:3942)        at com.microsoft.sqlserver.jdbc.TDSReader.readUnsignedInt(IOBuffer.java:3959)        at com.microsoft.sqlserver.jdbc.PLPinputStream.readBytesInternal(PLPinputStream.java:313)        at com.microsoft.sqlserver.jdbc.PLPinputStream.getBytes(PLPinputStream.java:129)        at com.microsoft.sqlserver.jdbc.DDC.convertStreamToObject(DDC.java:438)        at com.microsoft.sqlserver.jdbc.ServerDTVImpl.getValue(dtv.java:2441)        at com.microsoft.sqlserver.jdbc.DTV.getValue(dtv.java:176)        at com.microsoft.sqlserver.jdbc.Column.getValue(Column.java:113)        at com.microsoft.sqlserver.jdbc.sqlServerResultSet.getValue(sqlServerResultSet.java:1981)        at com.microsoft.sqlserver.jdbc.sqlServerResultSet.getValue(sqlServerResultSet.java:1966)        at com.microsoft.sqlserver.jdbc.sqlServerResultSet.getString(sqlServerResultSet.java:2291)        at org.apache.sqoop.lib.JdbcWritableBrIDge.readString(JdbcWritableBrIDge.java:71)        at com.cloudera.sqoop.lib.JdbcWritableBrIDge.readString(JdbcWritableBrIDge.java:61)        at queryResult.readFIElds(queryResult.java:1670)        at org.apache.sqoop.mapreduce.db.DBRecordReader.nextkeyvalue(DBRecordReader.java:244)        at org.apache.sqoop.mapreduce.db.sqlServerDBRecordReader.nextkeyvalue(sqlServerDBRecordReader.java:148)        at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextkeyvalue(MapTask.java:553)        at org.apache.hadoop.mapreduce.task.MapContextImpl.nextkeyvalue(MapContextImpl.java:80)        at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextkeyvalue(WrappedMapper.java:91)        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)        at org.apache.sqoop.mapreduce.AutoprogressMapper.run(AutoprogressMapper.java:64)        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)        at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:168)        at java.security.AccessController.doPrivileged(Native Method)        at javax.security.auth.Subject.doAs(Subject.java:422)        at org.apache.hadoop.security.UserGroupinformation.doAs(UserGroupinformation.java:1701)        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)
1' -m 8 --target-dir /data/BaseData/elong/dshprdt_hotel

其中--split-by ID,-m 8 通过ID字段来分割出8个map执行。

运行在hadoop-070-126.bigdata.ly节点上的map为第map
搬运的数据范围为( ID >= 1 ) AND ( ID < 464562 )
2019-05-23 00:36:27,823 数据库连接发生异常,异常信息见如下堆栈

2019-05-23 00:36:27,862 WARN [main] org.apache.sqoop.mapreduce.db.sqlServerDBRecordReader: Trying to recover from DB read failure: java.io.IOException: sqlException in nextkeyvalue        at org.apache.sqoop.mapreduce.db.DBRecordReader.nextkeyvalue(DBRecordReader.java:277)        at org.apache.sqoop.mapreduce.db.sqlServerDBRecordReader.nextkeyvalue(sqlServerDBRecordReader.java:148)        at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextkeyvalue(MapTask.java:553)        at org.apache.hadoop.mapreduce.task.MapContextImpl.nextkeyvalue(MapContextImpl.java:80)        at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextkeyvalue(WrappedMapper.java:91)        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)        at org.apache.sqoop.mapreduce.AutoprogressMapper.run(AutoprogressMapper.java:64)        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)        at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:168)        at java.security.AccessController.doPrivileged(Native Method)        at javax.security.auth.Subject.doAs(Subject.java:422)        at org.apache.hadoop.security.UserGroupinformation.doAs(UserGroupinformation.java:1701)        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)Caused by: com.microsoft.sqlserver.jdbc.sqlServerException: Connection reset        at com.microsoft.sqlserver.jdbc.sqlServerConnection.terminate(sqlServerConnection.java:1352)        at com.microsoft.sqlserver.jdbc.sqlServerConnection.terminate(sqlServerConnection.java:1339)        at com.microsoft.sqlserver.jdbc.TDSChannel.read(IOBuffer.java:1694)        at com.microsoft.sqlserver.jdbc.TDSReader.readPacket(IOBuffer.java:3734)        at com.microsoft.sqlserver.jdbc.TDSReader.nextPacket(IOBuffer.java:3687)        at com.microsoft.sqlserver.jdbc.TDSReader.ensurePayload(IOBuffer.java:3663)        at com.microsoft.sqlserver.jdbc.TDSReader.readBytes(IOBuffer.java:3979)        at com.microsoft.sqlserver.jdbc.TDSReader.reaDWrappedBytes(IOBuffer.java:4001)        at com.microsoft.sqlserver.jdbc.TDSReader.readInt(IOBuffer.java:3942)        at com.microsoft.sqlserver.jdbc.TDSReader.readUnsignedInt(IOBuffer.java:3959)        at com.microsoft.sqlserver.jdbc.PLPinputStream.readBytesInternal(PLPinputStream.java:313)        at com.microsoft.sqlserver.jdbc.PLPinputStream.getBytes(PLPinputStream.java:129)        at com.microsoft.sqlserver.jdbc.DDC.convertStreamToObject(DDC.java:438)        at com.microsoft.sqlserver.jdbc.ServerDTVImpl.getValue(dtv.java:2441)        at com.microsoft.sqlserver.jdbc.DTV.getValue(dtv.java:176)        at com.microsoft.sqlserver.jdbc.Column.getValue(Column.java:113)        at com.microsoft.sqlserver.jdbc.sqlServerResultSet.getValue(sqlServerResultSet.java:1981)        at com.microsoft.sqlserver.jdbc.sqlServerResultSet.getValue(sqlServerResultSet.java:1966)        at com.microsoft.sqlserver.jdbc.sqlServerResultSet.getString(sqlServerResultSet.java:2291)        at org.apache.sqoop.lib.JdbcWritableBrIDge.readString(JdbcWritableBrIDge.java:71)        at com.cloudera.sqoop.lib.JdbcWritableBrIDge.readString(JdbcWritableBrIDge.java:61)        at queryResult.readFIElds(queryResult.java:1670)        at org.apache.sqoop.mapreduce.db.DBRecordReader.nextkeyvalue(DBRecordReader.java:244)        ... 13 more2019-05-23 00:36:28,130 INFO [main] org.apache.sqoop.mapreduce.db.sqlServerConnectionFailureHandler: Session context is: NulL2019-05-23 00:36:28,131 INFO [main] org.apache.sqoop.mapreduce.db.BasicRetrysqlFailureHandler: A new connection has been established2019-05-23 00:36:28,186 INFO [main] org.apache.sqoop.mapreduce.db.DBRecordReader: Working on split: ID >= 1 AND ID < 4645622019-05-23 00:36:28,198 INFO [main] org.apache.sqoop.mapreduce.db.DBRecordReader: Executing query:  SELECT ID,CorpGroupID,HotelBrandID,AgreementEntityID FROM Hotel_Product.dbo.hotel(nolock) where ( ID > 458735 ) AND ( ID < 464562 )2019-05-23 00:36:28,804 WARN [ResponseProcessor for block BP-894016253-10.12.180.10-1463057953660:blk_15531850664_15126642971] org.apache.hadoop.hdfs.DFSClIEnt: Slow ReadProcessor read fIElds took 51708ms (threshold=30000ms); ack: seqno: 21070 status: SUCCESS status: SUCCESS status: SUCCESS downstreamAckTimeNanos: 20470492,targets: [172.1.0.126:50010,172.1.0.72:50010,172.1.0.78:50010]2019-05-23 00:36:42,628 INFO [Thread-13] org.apache.sqoop.mapreduce.AutoprogressMapper: auto-progress thread is finished. keepGoing=false2019-05-23 00:36:42,693 INFO [main] org.apache.hadoop.mapred.Task: Task:attempt_1555615894016_1958403_m_000000_0 is done. And is in the process of committing2019-05-23 00:36:42,742 INFO [main] org.apache.hadoop.mapred.Task: Task attempt_1555615894016_1958403_m_000000_0 is allowed to commit Now2019-05-23 00:36:42,751 INFO [main] org.apache.hadoop.mapreduce.lib.output.fileOutputCommitter: Saved output of task 'attempt_1555615894016_1958403_m_000000_0' to vIEwfs://dcfs/data/BaseData/elong/dshprdt_hotel/_temporary/1/task_1555615894016_1958403_m_0000002019-05-23 00:36:42,816 INFO [main] org.apache.hadoop.mapred.Task: Task 'attempt_1555615894016_1958403_m_000000_0' done.

该数据库连接异常导致map任务重新和数据库建立连接进行恢复,继续进行数据搬运,最终该map成功完成。recover过程见下面日志

sqlServerDBRecordReader.java  public boolean nextkeyvalue() throws IOException {    boolean valueReceived = false;    int retryCount = RETRY_MAX;    boolean doRetry = true;    do {      try {        // Try to get the next key/value pairs        valueReceived = super.nextkeyvalue();        doRetry = false;      } catch (IOException ioEx) {        LOG.warn("Trying to recover from DB read failure: ",ioEx);        Throwable cause = ioEx.getCause();        // Use configured connection handler to recover from the connection        // failure and use the newly constructed connection.        // If the failure cannot be recovered,an exception is thrown        if (failureHandler.canHandleFailure(cause)) {          // Recover from connection failure          Connection conn = failureHandler.recover();          // Configure the new connection before using it          configureConnection(conn);          setConnection(conn);          --retryCount;          doRetry = (retryCount >= 0);        } else {          // Cannot recovered using configured handler,re-throw          throw new IOException("Cannection handler cannot recover failure: ",ioEx);        }      }    } while (doRetry);    // Rethrow the exception if all retry attempts are consumed    if (retryCount < 0) {      throw new IOException("Failed to read from database after "        + RETRY_MAX + " retrIEs.");    }    return valueReceived;  }

注意上述日志中重新建立数据库连接后查询的ID范围为
where ( ID > 458735 ) AND ( ID < 464562 )
为什么恢复后的任务是ID查询的是大于458735的数据?
先看下sqoop恢复任务的代码

DBRecordReader.java    @OverrIDe  public boolean nextkeyvalue() throws IOException {    try {      if (key == null) {        key = new LongWritable();      }      if (value == null) {        value = createValue();      }      if (null == this.results) {        // First time into this method,run the query.        LOG.info("Working on split: " + split);        this.results = executequery(getSelectquery());      }      if (!results.next()) {        return false;      }      ...    }

从上述代码可知,nextkeyvalue()获取数据出现数据连接相关异常后会进行3次重试。
然后继续执行super.nextkeyvalue()。此时因为是新的数据连接,要重新执行一次数据查询(见如下代码),新的查询sql是通过getSelectquery()方法构造出来的

sqlServerDBRecordReader.java    protected String getSelectquery() {    // Last seen record key is only expected to be unavailable if no reads    // ever happened    String selectquery;    if (lastRecordKey == null) {      selectquery = super.getSelectquery();    } else {      // If last record key is available,construct the select query to start      // from      DataDrivendBinputFormat.DataDrivendBinputSplit dataSplit =          (DataDrivendBinputFormat.DataDrivendBinputSplit) getSplit();      StringBuilder lowerClause = new StringBuilder();      lowerClause.append(getDBConf().getinputOrderBy());      lowerClause.append(" > ");      lowerClause.append(lastRecordKey.toString());      // Get the select query with the lowerClause,and split upper clause      selectquery = getSelectquery(lowerClause.toString(),dataSplit.getUpperClause());    }    return selectquery;  }

仔细看看getSelectquery()方法是如何构建出来的(见下面代码),构建新的查询语句的时候会添加一个lowerClause,这个是动态拼接出来的查询下界:( ID > 458735 )
至于查询上界 ( ID < 464562 ) 是写死的,这个map执行根据split出来最大上限。
我们最关心的是这个动态生成的查询下界是从哪里获取的

sqlServerDBRecordReader.java    public T getCurrentValue() {    T val = super.getCurrentValue();    // Lookup the key of the last read record to use for recovering    // As documented,the map may not be null,though it may be empty.    Object lastRecordSplitCol = val.getFIEldMap().get(splitColumn);    lastRecordKey = (lastRecordSplitCol == null) ? null        : lastRecordSplitCol.toString();    return val;  }    queryResult.java  public Map<String,Object> getFIEldMap() {    Map<String,Object> __sqoop$fIEld_map = new TreeMap<String,Object>();    __sqoop$fIEld_map.put("ID",this.ID);    ... }

查询下界的值是lastRecordKey,map是当前故障发生前处理的最后一条成功数据的splitColumn所对应的值(--split-by ID)。此处代码如下

ID

因为我们使用的ID 不是主键(主键为hotel_ID),且不保证有序,所以sqoop利用游标查询数据库时返回数据可能会出现如下情况

hotel_name 1
A酒店 3
B酒店 4
C酒店 5
D酒店 6
E酒店 7
F酒店 8
G酒店 2
H酒店 9
I酒店 10
J酒店 11
K酒店 ID

假设当我们MapReduce到ID为2的酒店时候并且将该酒店数据刷到hdfs上时,数据库连接出现异常导致查询中断。此时我们实际已经将如下酒店数据搬运完毕
ID | hotel_name
---|---
1 | A酒店
3 | B酒店
4 | C酒店
5 | D酒店
6 | E酒店
7 | F酒店
8 | G酒店
2 | H酒店
然后sqoop开始恢复任务,lastRecordKey获取最后一个成功搬运的数据为H酒店(ID为2),因为新的查询语句为 select * from table where (ID > 2) AND (ID <= 11),本来只剩3个酒店数据未搬运,但此时重新恢复连接搬运的数据为:

select ID,hotel_ID from Hotel_Product.dbo.hotel(nolock) where ID < 464562
hotel_name 3
B酒店 4
C酒店 5
D酒店 6
E酒店 7
F酒店 8
G酒店 9
I酒店 10
J酒店 11
K酒店

有6个酒店数据记录被重复搬运了!!!
第一次搬运的数据中,只要ID还有大于lastRecordKey的都会被重复搬运。
还有更严重的情况,如果故障前处理的最后一条数据为ID:8
则重新恢复连接搬运的数据范围为 ID > 8 & ID <= 11
此时会跳过ID:2的数据,造成数据丢失!!!

线上数据验证

通过如下语句,下载map1应当搬运的全量数据456629条到csv中

异常发生时候处理的最后一条数据为ID: 458735
在异常发生前的csv数据内搜索ID大于458735数据,有829条重复数据。

结论

起因:本次问题起因由数据库连接异常引起。
sqoop从sqlserver导数据配置时候--split-by 的字段必须为长整型顺序唯一字段。而本次搬运涉及的表结构没有很好符合该规范。从而使得连接异常时恢复链接后继续搬运数据出现重复。

改进点 业务方后续新增的库表时设计要按照数据库规范来:表必须定义主键,默认为ID,长整型自增,且自增ID不能作为业务场景使用。 sqoop搬运的sqlserver表时,--split-by 的字段指定长整型顺序唯一的主键字段; 或者--split-by 的字段 指定一个有唯一性约束的字段,对于无序的问题通过语句中加ORDER BY 字段 解决无序问题; 修改sqoop代码将sqlserver数据库连接异常的报错抛出来,不进行重试,让sqoop的MR任务失败,直接在调度层面让sqoop任务重跑; 总结

以上是内存溢出为你收集整理的Sqoop任务搬运数据出现重复全部内容,希望文章能够帮你解决Sqoop任务搬运数据出现重复所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/sjk/1176426.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-02
下一篇 2022-06-02

发表评论

登录后才能评论

评论列表(0条)

保存