Skip to content

Commit 00bc776

Browse files
committed
GH-3466 Fix stale packed-byte tail corruption in RLE/bit-pack decoder
1 parent 11e770d commit 00bc776

File tree

2 files changed

+50
-16
lines changed

2 files changed

+50
-16
lines changed

parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
*/
1919
package org.apache.parquet.column.values.rle;
2020

21-
import java.io.EOFException;
2221
import java.io.IOException;
2322
import java.io.InputStream;
23+
import java.util.Arrays;
2424
import org.apache.parquet.Preconditions;
2525
import org.apache.parquet.bytes.BytesUtils;
2626
import org.apache.parquet.column.values.bitpacking.BytePacker;
@@ -101,10 +101,10 @@ private void readNext() throws IOException {
101101
if (packedBytes == null || packedBytes.length < bytesNeeded) {
102102
packedBytes = new byte[bytesNeeded];
103103
}
104-
// At the end of the file RLE data though, there might not be that many bytes left.
105-
int bytesToRead = (int) Math.ceil(currentCount * bitWidth / 8.0);
106-
bytesToRead = Math.min(bytesToRead, in.available());
107-
readFully(in, packedBytes, bytesToRead);
104+
int bytesRead = in.readNBytes(packedBytes, 0, bytesNeeded);
105+
if (bytesRead < bytesNeeded) {
106+
Arrays.fill(packedBytes, bytesRead, bytesNeeded, (byte) 0);
107+
}
108108
for (int valueIndex = 0, byteIndex = 0;
109109
valueIndex < currentCount;
110110
valueIndex += 8, byteIndex += bitWidth) {
@@ -116,15 +116,4 @@ private void readNext() throws IOException {
116116
}
117117
}
118118

119-
private static void readFully(InputStream in, byte[] buf, int len) throws IOException {
120-
int offset = 0;
121-
while (offset < len) {
122-
int read = in.read(buf, offset, len - offset);
123-
if (read < 0) {
124-
throw new EOFException(
125-
"Unexpected end of stream: still needed " + (len - offset) + " bytes");
126-
}
127-
offset += read;
128-
}
129-
}
130119
}

parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
package org.apache.parquet.column.values.rle;
2020

2121
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertArrayEquals;
2223

2324
import java.io.ByteArrayInputStream;
2425
import java.util.ArrayList;
26+
import java.util.Arrays;
2527
import java.util.List;
2628
import org.apache.parquet.bytes.BytesUtils;
2729
import org.apache.parquet.bytes.DirectByteBufferAllocator;
@@ -298,6 +300,49 @@ public void testGroupBoundary() throws Exception {
298300
assertEquals(stream.available(), 0);
299301
}
300302

303+
@Test
304+
public void testTruncatedPackedRunAfterFullPackedRunDoesNotReuseStaleBytes() throws Exception {
305+
int bitWidth = 3;
306+
BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
307+
308+
int[] firstRunValues = new int[8];
309+
Arrays.fill(firstRunValues, 7);
310+
byte[] firstRunPacked = new byte[bitWidth];
311+
packer.pack8Values(firstRunValues, 0, firstRunPacked, 0);
312+
313+
int[] secondRunValues = {1, 2, 3, 4, 5, 6, 7, 0};
314+
byte[] secondRunPacked = new byte[bitWidth];
315+
packer.pack8Values(secondRunValues, 0, secondRunPacked, 0);
316+
317+
byte[] encoded = {
318+
(byte) ((1 << 1) | 1),
319+
firstRunPacked[0],
320+
firstRunPacked[1],
321+
firstRunPacked[2],
322+
(byte) ((1 << 1) | 1),
323+
secondRunPacked[0]
324+
};
325+
326+
RunLengthBitPackingHybridDecoder decoder =
327+
new RunLengthBitPackingHybridDecoder(bitWidth, new ByteArrayInputStream(encoded));
328+
329+
for (int ignored = 0; ignored < 8; ignored++) {
330+
assertEquals(7, decoder.readInt());
331+
}
332+
333+
int[] actualSecondRun = new int[8];
334+
for (int i = 0; i < 8; i++) {
335+
actualSecondRun[i] = decoder.readInt();
336+
}
337+
338+
byte[] expectedSecondPacked = new byte[bitWidth];
339+
expectedSecondPacked[0] = secondRunPacked[0];
340+
int[] expectedSecondRun = new int[8];
341+
packer.unpack8Values(expectedSecondPacked, 0, expectedSecondRun, 0);
342+
343+
assertArrayEquals(expectedSecondRun, actualSecondRun);
344+
}
345+
301346
private static List<Integer> unpack(int bitWidth, int numValues, ByteArrayInputStream is) throws Exception {
302347

303348
BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);

0 commit comments

Comments
 (0)