Date: December 17, 2021
1. 背景在工作中,数据仓库主要使用parquet格式作为数据存储,有些场景中,需要使用datax进行数据出仓。但是,目前Alibaba Datax 的HdfsReader插件并不支持Parquet格式。在网上也查了不少博客和资料,并没有看到相关的插件开源,因此决定自己开发。
2. *** 作步骤 2.1 代码开发从alibaba Datax官网拉取代码,并新建分支,对hdfsreader模块进行调整,增加对parquet文件读取的相关代码块。主要修改点在下边两个类中:
根据ORC读取方式的代码,结合parquet的文件格式以及parquet API 进行改造,主要在DFSUtil.java 中增加parquetFileStartRead方法,代码如下,请参考:
public void parquetFileStartRead(String sourceParquetFilePath, Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector) { LOG.info(String.format("Start Read parquetfile [%s].", sourceParquetFilePath)); Listcolumn = UnstructuredStorageReaderUtil .getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN); String nullFormat = readerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT); StringBuilder allColumns = new StringBuilder(); StringBuilder allColumnTypes = new StringBuilder(); boolean isReadAllColumns = false; int columnIndexMax = -1; // 判断是否读取所有列 if (null == column || column.size() == 0) { int allColumnsCount = getParquetAllColumnsCount(sourceParquetFilePath); columnIndexMax = allColumnsCount - 1; isReadAllColumns = true; } else { columnIndexMax = getMaxIndex(column); } for (int i = 0; i <= columnIndexMax; i++) { allColumns.append("col"); allColumnTypes.append("string"); if (i != columnIndexMax) { allColumns.append(","); allColumnTypes.append(":"); } } if (columnIndexMax >= 0) { JobConf conf = new JobConf(hadoopConf); Path parquetFilePath = new Path(sourceParquetFilePath); Properties p = new Properties(); p.setProperty("columns", allColumns.toString()); p.setProperty("columns.types", allColumnTypes.toString()); try { //方式1:不采用。原因:使用ParquetHiveSerDe 读取时,会将parquet中的String类型字段读取成BytesWritable,但难以转换为String // ParquetHiveSerDe serde = new ParquetHiveSerDe(); // serde.initialize(conf, p); // StructObjectInspector inspector = (StructObjectInspector) serde.getObjectInspector(); // InputFormat, ?> in = new MapredParquetInputFormat(); // FileInputFormat.setInputPaths(conf, parquetFilePath.toString()); // // //If the network disconnected, will retry 45 times, each time the retry interval for 20 seconds // //Each file as a split // //TODO multy threads // InputSplit[] splits = in.getSplits(conf, 1); // // RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL); // Object key = reader.createKey(); // Object value = reader.createvalue(); // // 获取列信息 // List extends StructField> fields = inspector.getAllStructFieldRefs(); // // List
附上本人的gitee代码仓库: https://gitee.com/jackielee4cn/bigdata-history-query.git
欢迎指正或Start。
3.使用样例 3.1 编译安装下载源码,编译打包,找到模块文件 的 target/datax/plugin/reader/hdfsreader.zip 文件。将文件解压到datax安装目录的${DATAX_HOME}/plugin/reader/ 下 。注意提前备份原有默认的hdfsreader插件,以免出现问题时,进行回滚。
3.2 配置datax job配置方式与官网的orc根式hdfsreader方式一致,只是这里的fileType除了可以使用text、orc、csv等格式外,还可以用 parquet 。具体样例内容如下:
test_hdfsreader_parquet.job
{ "job": { "setting": { "speed": { "channel": 3, "byte": 10485760 }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [ { "reader": { "name": "hdfsreader", "parameter": { "path": "/user/hive/warehouse/test.db/test_datax_parquet/date_id=20211201", "defaultFS": "hdfs://test01:8020", "fileType": "parquet", "skipHeader": false, "column": [ { "index": "0", "type": "long" }, { "index": "1", "type": "string" }, { "index": "2", "type": "long" }, { "index": "3", "type": "double" }, { "index": "4", "type": "string" } ] } }, "writer": { "name": "mysqlwriter", "parameter": { "writeMode": "replace", "username": "write_user", "password": "Writeuser@123", "column": [ "`f_id`", "`f_order_id`", "`f_is_refund`", "`f_amount`", "`f_time`" ], "connection": [ { "table": [ "test_datax_parquet" ], "jdbcUrl": "jdbc:mysql://test02:3306/test?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&dontTrackOpenResources=true" } ] } } } ] } }3.3 执行job
python ${DATAX_HOME}/bin/datax.py test_hdfsreader_parquet.job
查看控制台日志,执行正常。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)