Parquet在大数据场景下的应用已经相当广泛,相较于结构化文本以及行存储形式在存储系统中可以节省大量存储成本,数据提取中也可以减少大量IO消耗,提高数据读取速度,Parquet 中对各列数据进行了压缩处理,提供了SNAPPY/GZIP/LZO/UNCOMPRESSED四种压缩方式,为了得到近一步节省空间,实现中对数据还进行了RLE/Bit Pack混合编码处理,本文通过 Binary 类型字段的写入过程,对数据的处理中的RLE/Bit Pack编码进行梳理。
默认情况下,Parquet 中根据 Schema 中定义的列类型在 DefaultValuesWriterFactory 进行类型匹配,选择对应的 writer 进行该列数据处理,
static DictionaryValuesWriter dictionaryWriter(ColumnDescriptor path, ParquetProperties properties, Encoding dictPageEncoding, Encoding dataPageEncoding) { switch (path.getType()) { case BOOLEAN: throw new IllegalArgumentException("no dictionary encoding for BOOLEAN"); case BINARY: return new DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(), dataPageEncoding, dictPageEncoding, properties.getAllocator()); case INT32: return new DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(), dataPageEncoding, dictPageEncoding, properties.getAllocator()); case INT64: return new DictionaryValuesWriter.PlainLongDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(), dataPageEncoding, dictPageEncoding, properties.getAllocator()); case INT96: return new DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(), 12, dataPageEncoding, dictPageEncoding, properties.getAllocator()); case DOUBLE: return new DictionaryValuesWriter.PlainDoubleDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(), dataPageEncoding, dictPageEncoding, properties.getAllocator()); case FLOAT: return new DictionaryValuesWriter.PlainFloatDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(), dataPageEncoding, dictPageEncoding, properties.getAllocator()); case FIXED_LEN_BYTE_ARRAY: return new DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(), path.getTypeLength(), dataPageEncoding, dictPageEncoding, properties.getAllocator()); default: throw new IllegalArgumentException("Unknown type " + path.getType()); } }
首先要解释一下,在Parquet中各列数据在被写入时,会存在一个字典记录已经被写入过的值,称为字典,另外在写入过程中,数据会不断在字典中进行匹配,一方面更新字典,一方面会记录该值在字典中的索引值。RLE/BitPack编码是对已经记录的索引值列表进行的。
Schema中定义的类型为binary的字段将会被 PlainBinaryDictionaryValuesWriter 类处理,PlainBinaryDictionaryValuesWriter类继承自 DictionaryValuesWriter (实际上上面代码中所有的类型的数据处理类都继承于它)。DictionaryValuesWriter 通过定义了名为encodedValues的IntList对象供子类进行字典索引缓存,还实现了父类 ValuesWriter 的 getBytes 方法对缓存的索引数据做统一编码处理,以下为 DictionaryValuesWriter 代码片段。
//缓存数据的intList数组 protected IntList encodedValues = new IntList(); @Override public BytesInput getBytes() { //获取字典元素集合 size int maxDicId = getDictionarySize() - 1; if (DEBUG) LOG.debug("max dic id " + maxDicId); //计算二进制位宽 int bitWidth = BytesUtils.getWidthFromMaxInt(maxDicId); int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_INITIAL_SLAB_SIZE, maxDictionaryByteSize, 10); //获取编码器 RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, initialSlabSize, maxDictionaryByteSize, this.allocator); encoders.add(encoder); //迭代字典映射值的集合(数据在字典索引) IntIterator iterator = encodedValues.iterator(); try { while (iterator.hasNext()) { //编码处理 encoder.writeInt(iterator.next()); } // encodes the bit width byte[] bytesHeader = new byte[] { (byte) bitWidth }; BytesInput rleEncodedBytes = encoder.toBytes(); if (DEBUG) LOG.debug("rle encoded bytes " + rleEncodedBytes.size()); BytesInput bytes = concat(BytesInput.from(bytesHeader), rleEncodedBytes); // remember size of dictionary when we last wrote a page lastUsedDictionarySize = getDictionarySize(); lastUsedDictionaryByteSize = dictionaryByteSize; return bytes; } catch (IOException e) { throw new ParquetEncodingException("could not encode the values", e); } }
上面 encodedValues 存储了所有写入值在字典中的索引,在PlainBinaryDictionaryValuesWriter中使用了Object2IntlinkedOpenHashMap进行存储字典信息,部分代码如下:
@Override public void writeBytes(Binary v) { int id = binaryDictionaryContent.getInt(v); if (id == -1) { id = binaryDictionaryContent.size(); binaryDictionaryContent.put(v.copy(), id); // length as int (4 bytes) + actual bytes dictionaryByteSize += 4 + v.length(); } //缓存索引 encodedValues.add(id); }
getBytes 方法使用 RunLengthBitPackingHybridEncoder 对缓存索引数据进行编码,RunLengthBitPackingHybridEncoder 类关键代码如下:
public RunLengthBitPackingHybridEncoder(int bitWidth, int initialCapacity, int pageSize, ByteBufferAllocator allocator) { if (DEBUG) { LOG.debug(String.format("Encoding: RunLengthBitPackingHybridEncoder with " + "bithWidth: %d initialCapacity %d", bitWidth, initialCapacity)); } Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32"); this.bitWidth = bitWidth; this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize, allocator); //用于将 bufferedValues 中的各元素以小端模式按位存储 this.packBuffer = new byte[bitWidth]; this.bufferedValues = new int[8]; //创建二进制包装器, 负责将 encodedValues 的每个值通过位运算包装为 bitWidth 大小的字节数组 如位宽为 5 则将数据包装为 byte[5] this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); reset(false); } public void writeInt(int value) throws IOException { if (value == previousValue) { //连续重复 ++repeatCount; if (repeatCount >= 8) { return; } } else { if (repeatCount >= 8) { //RLE编码 writeRleRun(); } repeatCount = 1; previousValue = value; } //在非连续重复的情况下,记录不重复的值,用于bit-packed bufferedValues[numBufferedValues] = value; ++numBufferedValues; //当bufferedValues出现连续重复不足8个的情况下,以 8 个字节为一个包,进行BitPack编码处理 if (numBufferedValues == 8) { writeOrAppendBitPackedRun(); } } private void writeOrAppendBitPackedRun() throws IOException { // 由于在长度标记处以 (bitPackedGroupCount << 1) | 1) 进行计数标记, // 所以最大只能支持到 63 个Bit-pack编码。 if (bitPackedGroupCount >= 63) { endPreviousBitPackedRun(); } if (bitPackedRunHeaderPointer == -1) { //这里通过写入标记位获取当前记录的位置,在后续位包编写写完之后修改这个值 baos.write(0); // write a sentinel value bitPackedRunHeaderPointer = baos.getCurrentIndex(); } //包装数据 bufferedValues 到 packBuffer 中 packer.pack8Values(bufferedValues, 0, packBuffer, 0); baos.write(packBuffer); numBufferedValues = 0; repeatCount = 0; ++bitPackedGroupCount; } private void endPreviousBitPackedRun() { //非bit-pack编码情况下返回 if (bitPackedRunHeaderPointer == -1) { return; } // 在bit-pack的开始的标记位位置写入数据标记 0111 1111 byte bitPackHeader = (byte) ((bitPackedGroupCount << 1) | 1); baos.setByte(bitPackedRunHeaderPointer, bitPackHeader); // 重置标记位置和计数器 bitPackedRunHeaderPointer = -1; bitPackedGroupCount = 0; } private void writeRleRun() throws IOException { // 结束之前的bit-pack编码 endPreviousBitPackedRun(); // 先写入重复次数, BytesUtils.writeUnsignedVarInt(repeatCount << 1, baos); // 在重复次数后面写入重复的具体值 BytesUtils.writeIntLittleEndianPaddedOnBitWidth(baos, previousValue, bitWidth); // 重置记数器和数组下标 repeatCount = 0; numBufferedValues = 0; }
通过代码可以看到在整个编码过程中如果出现重复次数在8次及以上使用RLE编码,如果出现少于8次的重复使用bit-pack进行编码,值得注意的是,在bit-pack实现中,按位截取每个值的 bitWidth 长,再以小端模式放置在 packBuffer 中,然后被写入文件,当 bitWidth 较大,值较小时会出现空间浪费情况。
这两种都是通过在编码内容之前写入重复数和bit-pack打包次数,在数据提取时通过该值解码数据,在读取过程中通过以下代码判断后面数据的编码方式。
private void readNext() throws IOException { Preconditions.checkArgument(in.available() > 0, "Reading past RLE/BitPacking stream."); final int header = BytesUtils.readUnsignedVarInt(in); mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED; switch (mode) { case RLE: currentCount = header >>> 1; //略 break; case PACKED: int numGroups = header >>> 1; currentCount = numGroups * 8; //略 break; default: throw new ParquetDecodingException("not a valid mode " + mode); } } public static int readUnsignedVarInt(InputStream in) throws IOException { int value = 0; int i = 0; int b; while (((b = in.read()) & 0x80) != 0) { value |= (b & 0x7F) << i; i += 7; } return value | (b << i); }
以上对Parquet文件中数据压缩前的RLE/Bit Pack编码编码实现进行了简单梳理,能力有限,如有错误还请指正,感谢。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)