java csv数据转parquet格式

java csv数据转parquet格式,第1张

java csv数据转parquet格式

本文将csv格式的数据转化为parquet格式,涉及的数据类型包括String,timestamp,double,boolean
其中timestamp由int64存放。

parquet元数据

由上图可知parquet支持的类型如下:
  • BOOLEAN: 1 bit boolean
  • INT32: 32 bit signed ints
  • INT64: 64 bit signed ints
  • INT96: 96 bit signed ints
  • FLOAT: IEEE 32-bit floating point values
  • DOUBLE: IEEE 64-bit floating point values
  • BYTE_ARRAY: arbitrarily long byte arrays.
    具体见官网:https://parquet.apache.org/documentation/latest/
涉及的maven依赖包如下:

pom.xml

 
        
            org.apache.parquet
            parquet-hadoop
            1.9.0
        
        
        
            org.apache.parquet
            parquet-format
            2.3.1
        
        
            org.apache.parquet
            parquet-encoding
            1.9.0
        
        
            org.apache.parquet
            parquet-common
            1.9.0
        
        
            org.apache.parquet
            parquet-column
            1.9.0
        
        
            org.apache.parquet
            parquet-avro
            1.9.0
        
        
            org.testng
            testng
            RELEASE
            compile
        
        
        
            org.apache.hadoop
            hadoop-common
            3.2.0
        
        
        
            net.sourceforge.javacsv
            javacsv
            2.0
        
        
            org.apache.flink
            flink-java
            1.14.0
        
        
            org.jodd
            jodd-core
            5.1.5
        
        
        joda-time
        joda-time
        2.10.5
        
代码如下

TestParqueWriter_2.java

import com.csvreader.CsvReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.format.converter.ParquetmetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.Parquetmetadata;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.Charset;
import java.sql.Time;
import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import static com.csv2Parquet.readCSV.gethead;

public class TestParqueWriter_2 {

    public static Path file = new Path(System.currentTimeMillis() + ".parquet");
    private static Logger logger = LoggerFactory
            .getLogger(TestParqueWriter_2.class);
            //根据实际数据,设计字段类型相对应的parseMessageType,程序通过parseMessageType来生成parquet数据
    private static String schemaStr =   "message schema " +
            "{repeated binary item_id (UTF8);" +
            "optional int64 bill_billing_period_start_date(TIMESTAMP_MILLIS);" +
            "repeated double cost ;" +
            "repeated binary year (UTF8);" +
            "repeated binary month (UTF8);}";
    static MessageType schema =MessageTypeParser.parseMessageType(schemaStr);
    //描述:输出MessageType
    public static void testParseSchema(){
        System.out.println(schema.toString());
    }
    // 描述:获取parquet的Schema
    public static void testGetSchema() throws Exception {
        Configuration configuration = new Configuration();
        Parquetmetadata readFooter = null;
        Path parquetFilePath = new Path("input.parquet");
        readFooter = ParquetFileReader.readFooter(configuration,
                parquetFilePath, ParquetmetadataConverter.NO_FILTER);
        MessageType schema =readFooter.getFilemetaData().getSchema();
        System.out.println(schema.toString());
    }

//自动读取csv表头和数据写入到parquet文件中
    private static void testParquetWriter() throws IOException {
    //以时间戳为输出的文件名
        DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String csvFile = "output.csv";
        CsvReader csvReader = null;
        String pattern_3="cost";
        //读取csv文件
        csvReader = new CsvReader(csvFile, ',', Charset.forName("UTF-8"));
        ExampleParquetWriter.Builder builder = ExampleParquetWriter
                .builder(file).withWriteMode(ParquetFileWriter.Mode.CREATE)
                .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                //.withConf(configuration)
                .withType(schema);
        ParquetWriter writer = builder.build();
        String[] csvhead = gethead("input.csv");
        SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema);
        csvReader.readHeaders();
        while (true) {
            if (!csvReader.readRecord()) break;
            String[] str = csvReader.getValues();
            Group group =groupFactory.newGroup();
            String newName="";
            for (int j = 0; j < 字段个数; j++) {
                System.out.println(csvhead[j]+":"+str[j]);
                String dirDiveded[] = csvhead[j].split("_");
                newName = dirDiveded[dirDiveded.length-1];
                //根据字段名末尾单词判断是否为double类型
                if(newName.equals(pattern_3)) {
                    if(!str[j].isEmpty())
                    group.add(csvhead[j], Double.parseDouble(str[j]));
                    else group.add(csvhead[j], Double.NaN);
                }
                //根据字段名末尾单词判断是否为boolean类型
                else if(csvhead[j].equals("workingsupport"))
                    if(!str[j].isEmpty())
                        group.add(csvhead[j], Boolean.parseBoolean(str[j]));
                        // else group.append(csvhead[j], (NanoTime) null);
                    else group.add(csvhead[j], Boolean.parseBoolean(null));
                    //根据字段名末尾单词判断是否为timestamp类型
                else if(newName.equals("date"))
                {
                    Date date = new Date();
                    Date date_1 = new Date();
                    //注意format的格式要与日期String的格式相匹配
                    try {
                    //字符串转Date
                        date = sdf.parse(str[j]);
                        //时间加八个小时
                        date_1 = new Date(date.getTime()+8 * 60 * 60 * 1000);
                    } catch (ParseException e) {
                        e.printStackTrace();
                    }
                    Timestamp ts = new Timestamp(date_1.getTime());
                    group.add(csvhead[j], date_1.getTime() );
                else
                    if(!str[j].isEmpty())
                    group.add(csvhead[j],str[j]);
                    else group.add(csvhead[j], Binary.EMPTY);

            }
            writer.write(group);
        }
        writer.close();
    }

    //描述:测试读parquet文件

    private static void testParquetReader() throws IOException{
        Path file = new Path("output.parquet");
        ParquetReader.Builder builder = ParquetReader.builder(new GroupReadSupport(), file);
        ParquetReader reader = builder.build();
       // SimpleGroup group =(SimpleGroup) reader.read();
       // System.out.println("schema:"+group.getType().toString());
       // System.out.println(group.get(""));
        //System.out.println("identity_line_item_id:"+group.getString(1, 0));
        Group line = null;
        while((line = reader.read()) != null) {
            System.out.println(line.getString("date", 0));
            System.out.println(line.getLong("cost", 0));
        }
    }

    public static void main(String[] args) throws Exception {
        testGetSchema();
        // testParseSchema();
      testParquetWriter();
       //testParquetReader();
    }
}

数据涉及隐私,不做展示,有问题需要探讨可留言或者私信

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5683785.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存