本文将csv格式的数据转化为parquet格式,涉及的数据类型包括String,timestamp,double,boolean
其中timestamp由int64存放。
- BOOLEAN: 1 bit boolean
- INT32: 32 bit signed ints
- INT64: 64 bit signed ints
- INT96: 96 bit signed ints
- FLOAT: IEEE 32-bit floating point values
- DOUBLE: IEEE 64-bit floating point values
- BYTE_ARRAY: arbitrarily long byte arrays.
具体见官网:https://parquet.apache.org/documentation/latest/
pom.xml
代码如下org.apache.parquet parquet-hadoop1.9.0 org.apache.parquet parquet-format2.3.1 org.apache.parquet parquet-encoding1.9.0 org.apache.parquet parquet-common1.9.0 org.apache.parquet parquet-column1.9.0 org.apache.parquet parquet-avro1.9.0 org.testng testngRELEASE compile org.apache.hadoop hadoop-common3.2.0 net.sourceforge.javacsv javacsv2.0 org.apache.flink flink-java1.14.0 org.jodd jodd-core5.1.5 joda-time joda-time2.10.5
TestParqueWriter_2.java
import com.csvreader.CsvReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.format.converter.ParquetmetadataConverter; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.Parquetmetadata; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.Charset; import java.sql.Time; import java.sql.Timestamp; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import static com.csv2Parquet.readCSV.gethead; public class TestParqueWriter_2 { public static Path file = new Path(System.currentTimeMillis() + ".parquet"); private static Logger logger = LoggerFactory .getLogger(TestParqueWriter_2.class); //根据实际数据,设计字段类型相对应的parseMessageType,程序通过parseMessageType来生成parquet数据 private static String schemaStr = "message schema " + "{repeated binary item_id (UTF8);" + "optional int64 bill_billing_period_start_date(TIMESTAMP_MILLIS);" + "repeated double cost ;" + "repeated binary year (UTF8);" + "repeated binary month (UTF8);}"; static MessageType schema =MessageTypeParser.parseMessageType(schemaStr); //描述:输出MessageType public static void testParseSchema(){ System.out.println(schema.toString()); } // 描述:获取parquet的Schema public static void testGetSchema() throws Exception { Configuration configuration = new Configuration(); Parquetmetadata readFooter = null; Path parquetFilePath = new Path("input.parquet"); readFooter = ParquetFileReader.readFooter(configuration, parquetFilePath, ParquetmetadataConverter.NO_FILTER); MessageType schema =readFooter.getFilemetaData().getSchema(); System.out.println(schema.toString()); } //自动读取csv表头和数据写入到parquet文件中 private static void testParquetWriter() throws IOException { //以时间戳为输出的文件名 DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String csvFile = "output.csv"; CsvReader csvReader = null; String pattern_3="cost"; //读取csv文件 csvReader = new CsvReader(csvFile, ',', Charset.forName("UTF-8")); ExampleParquetWriter.Builder builder = ExampleParquetWriter .builder(file).withWriteMode(ParquetFileWriter.Mode.CREATE) .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) .withCompressionCodec(CompressionCodecName.SNAPPY) //.withConf(configuration) .withType(schema); ParquetWriterwriter = builder.build(); String[] csvhead = gethead("input.csv"); SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); csvReader.readHeaders(); while (true) { if (!csvReader.readRecord()) break; String[] str = csvReader.getValues(); Group group =groupFactory.newGroup(); String newName=""; for (int j = 0; j < 字段个数; j++) { System.out.println(csvhead[j]+":"+str[j]); String dirDiveded[] = csvhead[j].split("_"); newName = dirDiveded[dirDiveded.length-1]; //根据字段名末尾单词判断是否为double类型 if(newName.equals(pattern_3)) { if(!str[j].isEmpty()) group.add(csvhead[j], Double.parseDouble(str[j])); else group.add(csvhead[j], Double.NaN); } //根据字段名末尾单词判断是否为boolean类型 else if(csvhead[j].equals("workingsupport")) if(!str[j].isEmpty()) group.add(csvhead[j], Boolean.parseBoolean(str[j])); // else group.append(csvhead[j], (NanoTime) null); else group.add(csvhead[j], Boolean.parseBoolean(null)); //根据字段名末尾单词判断是否为timestamp类型 else if(newName.equals("date")) { Date date = new Date(); Date date_1 = new Date(); //注意format的格式要与日期String的格式相匹配 try { //字符串转Date date = sdf.parse(str[j]); //时间加八个小时 date_1 = new Date(date.getTime()+8 * 60 * 60 * 1000); } catch (ParseException e) { e.printStackTrace(); } Timestamp ts = new Timestamp(date_1.getTime()); group.add(csvhead[j], date_1.getTime() ); else if(!str[j].isEmpty()) group.add(csvhead[j],str[j]); else group.add(csvhead[j], Binary.EMPTY); } writer.write(group); } writer.close(); } //描述:测试读parquet文件 private static void testParquetReader() throws IOException{ Path file = new Path("output.parquet"); ParquetReader.Builder builder = ParquetReader.builder(new GroupReadSupport(), file); ParquetReader reader = builder.build(); // SimpleGroup group =(SimpleGroup) reader.read(); // System.out.println("schema:"+group.getType().toString()); // System.out.println(group.get("")); //System.out.println("identity_line_item_id:"+group.getString(1, 0)); Group line = null; while((line = reader.read()) != null) { System.out.println(line.getString("date", 0)); System.out.println(line.getLong("cost", 0)); } } public static void main(String[] args) throws Exception { testGetSchema(); // testParseSchema(); testParquetWriter(); //testParquetReader(); } }
数据涉及隐私,不做展示,有问题需要探讨可留言或者私信
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)