Datax插件二次开发之HdfsReader支持parquet

Datax插件二次开发之HdfsReader支持parquet,第1张

Datax插件二次开发之HdfsReader支持parquet Datax插件二次开发之HdfsReader支持parquet

Date: December 17, 2021

1. 背景

在工作中,数据仓库主要使用parquet格式作为数据存储,有些场景中,需要使用datax进行数据出仓。但是,目前Alibaba Datax 的HdfsReader插件并不支持Parquet格式。在网上也查了不少博客和资料,并没有看到相关的插件开源,因此决定自己开发。

2. *** 作步骤 2.1 代码开发

从alibaba Datax官网拉取代码,并新建分支,对hdfsreader模块进行调整,增加对parquet文件读取的相关代码块。主要修改点在下边两个类中:

根据ORC读取方式的代码,结合parquet的文件格式以及parquet API 进行改造,主要在DFSUtil.java 中增加parquetFileStartRead方法,代码如下,请参考:

public void parquetFileStartRead(String sourceParquetFilePath, Configuration readerSliceConfig,
                                     RecordSender recordSender, TaskPluginCollector taskPluginCollector) {
        LOG.info(String.format("Start Read parquetfile [%s].", sourceParquetFilePath));
        List column = UnstructuredStorageReaderUtil
                .getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
        String nullFormat = readerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
        StringBuilder allColumns = new StringBuilder();
        StringBuilder allColumnTypes = new StringBuilder();
        boolean isReadAllColumns = false;
        int columnIndexMax = -1;
        // 判断是否读取所有列
        if (null == column || column.size() == 0) {
            int allColumnsCount = getParquetAllColumnsCount(sourceParquetFilePath);
            columnIndexMax = allColumnsCount - 1;
            isReadAllColumns = true;
        } else {
            columnIndexMax = getMaxIndex(column);
        }
        for (int i = 0; i <= columnIndexMax; i++) {
            allColumns.append("col");
            allColumnTypes.append("string");
            if (i != columnIndexMax) {
                allColumns.append(",");
                allColumnTypes.append(":");
            }
        }
        if (columnIndexMax >= 0) {
            JobConf conf = new JobConf(hadoopConf);
            Path parquetFilePath = new Path(sourceParquetFilePath);
            Properties p = new Properties();
            p.setProperty("columns", allColumns.toString());
            p.setProperty("columns.types", allColumnTypes.toString());

            try {

                //方式1:不采用。原因:使用ParquetHiveSerDe 读取时,会将parquet中的String类型字段读取成BytesWritable,但难以转换为String
//                ParquetHiveSerDe serde = new ParquetHiveSerDe();
//                serde.initialize(conf, p);
//                StructObjectInspector inspector = (StructObjectInspector) serde.getObjectInspector();
//                InputFormat in = new MapredParquetInputFormat();
//                FileInputFormat.setInputPaths(conf, parquetFilePath.toString());
//
//                //If the network disconnected, will retry 45 times, each time the retry interval for 20 seconds
//                //Each file as a split
//                //TODO multy threads
//                InputSplit[] splits = in.getSplits(conf, 1);
//
//                RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
//                Object key = reader.createKey();
//                Object value = reader.createvalue();
//                // 获取列信息
//                List fields = inspector.getAllStructFieldRefs();
//
//                List recordFields;
//                while (reader.next(key, value)) {
//                    recordFields = new ArrayList();
//
//                    for (int i = 0; i <= columnIndexMax; i++) {
//                        Object field = inspector.getStructFieldData(value, fields.get(i));
//                        recordFields.add(field);
//                    }
//                    transportOneRecord(column, recordFields, recordSender,
//                            taskPluginCollector, isReadAllColumns, nullFormat);
//                }

                //方式2:采用。使用Parquet的javaAPI进行读取
                GroupReadSupport support = new GroupReadSupport();
                ParquetReader reader = ParquetReader.builder(support, parquetFilePath).build();
                Group line = null ;
                List recordFields;
                while((line = reader.read()) != null ){
                    recordFields = new ArrayList();
                    //从line中获取每个字段
                    for (int i = 0; i <= columnIndexMax; i++) {
                        Object field = line.getValueToString(i, 0);
                        recordFields.add(field);
                    }
                    transportOneRecord(column, recordFields, recordSender,
                            taskPluginCollector, isReadAllColumns, nullFormat);
                }
                reader.close();
            } catch (Exception e) {
                String message = String.format("从parquetfile文件路径[%s]中读取数据发生异常,请联系系统管理员。"
                        , sourceParquetFilePath);
                LOG.error(message);
                throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
            }
        } else {
            String message = String.format("请确认您所读取的列配置正确!columnIndexMax 小于0,column:%s", JSON.toJSONString(column));
            throw DataXException.asDataXException(HdfsReaderErrorCode.BAD_CONFIG_VALUE, message);
        }
    }
 

附上本人的gitee代码仓库: https://gitee.com/jackielee4cn/bigdata-history-query.git

欢迎指正或Start。

3.使用样例 3.1 编译安装

下载源码,编译打包,找到模块文件 的 target/datax/plugin/reader/hdfsreader.zip 文件。将文件解压到datax安装目录的${DATAX_HOME}/plugin/reader/ 下 。注意提前备份原有默认的hdfsreader插件,以免出现问题时,进行回滚。

3.2 配置datax job

配置方式与官网的orc根式hdfsreader方式一致,只是这里的fileType除了可以使用text、orc、csv等格式外,还可以用 parquet 。具体样例内容如下:

test_hdfsreader_parquet.job

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3,
        "byte": 10485760
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "hdfsreader",
          "parameter": {
            "path": "/user/hive/warehouse/test.db/test_datax_parquet/date_id=20211201",
            "defaultFS": "hdfs://test01:8020",
            "fileType": "parquet",
            "skipHeader": false,
            "column": [
              {
                "index": "0",
                "type": "long"
              },
              {
                "index": "1",
                "type": "string"
              },
              {
                "index": "2",
                "type": "long"
              },
              {
                "index": "3",
                "type": "double"
              },
              {
                "index": "4",
                "type": "string"
              }
            ]
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
	    "writeMode": "replace",
            "username": "write_user",
            "password": "Writeuser@123",
            "column": [
				"`f_id`",
				"`f_order_id`",
				"`f_is_refund`",
				"`f_amount`",
				"`f_time`"
            ],
            "connection": [
              {
                "table": [
                  "test_datax_parquet"
                ],
                "jdbcUrl": "jdbc:mysql://test02:3306/test?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&dontTrackOpenResources=true"
              }
            ]
          }
        }
      }
    ]
  }
}
3.3 执行job

python ${DATAX_HOME}/bin/datax.py test_hdfsreader_parquet.job

查看控制台日志,执行正常。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5716069.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)