From 2aa6f067f4b6005d24443ef50e4404ab0bff3091 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 6 May 2026 23:21:43 +0200 Subject: [PATCH 1/2] GH-3549: Leverage JDK11 `readNBytes` Introduced in JDK11 and is more efficient. Closes #3549 --- .../org/apache/parquet/bytes/BytesInput.java | 22 ++++++++++++++----- .../hadoop/codec/TestCompressionCodec.java | 3 +-- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java index 0e66140744..9332e71bc1 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java @@ -19,7 +19,6 @@ package org.apache.parquet.bytes; import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -367,8 +366,18 @@ private StreamBytesInput(InputStream in, int byteCount) { @Override public void writeAllTo(OutputStream out) throws IOException { LOG.debug("write All {} bytes", byteCount); - // TODO: more efficient - out.write(this.toByteArray()); + // Transfer in chunks to avoid allocating a byteCount-sized intermediate buffer + byte[] buffer = new byte[Math.min(byteCount, 8192)]; + int remaining = byteCount; + while (remaining > 0) { + int toRead = Math.min(remaining, buffer.length); + int n = in.readNBytes(buffer, 0, toRead); + if (n < toRead) { + throw new EOFException("Reached the end of stream with " + (remaining - n) + " bytes left to read"); + } + out.write(buffer, 0, n); + remaining -= n; + } } @Override @@ -395,8 +404,11 @@ void writeInto(ByteBuffer buffer) { public byte[] toByteArray() throws IOException { LOG.debug("read all {} bytes", byteCount); - byte[] buf = new byte[byteCount]; - new DataInputStream(in).readFully(buf); + byte[] buf = in.readNBytes(byteCount); + if (buf.length != byteCount) { + throw new EOFException( + "Reached the end of stream with " + (byteCount - buf.length) + " bytes left to read"); + } return buf; } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java index ae2bd87ac9..2db4d77f6e 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java @@ -195,8 +195,7 @@ public void testLz4RawHeapDecompressorCanCopyLargePage() throws IOException { final byte[] raw = new byte[size]; new Random(42).nextBytes(raw); - try (TrackingByteBufferAllocator allocator = - TrackingByteBufferAllocator.wrap(new DirectByteBufferAllocator()); + try (TrackingByteBufferAllocator allocator = TrackingByteBufferAllocator.wrap(new DirectByteBufferAllocator()); ByteBufferReleaser releaser = new ByteBufferReleaser(allocator)) { CodecFactory heapCodecFactory = new CodecFactory(new Configuration(), pageSize); BytesInputCompressor compressor = heapCodecFactory.getCompressor(CompressionCodecName.LZ4_RAW); From 05850c89889ff7d56c3c3c3784b9d8fbf0a4452f Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Sun, 10 May 2026 09:46:58 +0200 Subject: [PATCH 2/2] `NonBlockedDecompressorStream` does not support chunked reads --- .../org/apache/parquet/bytes/BytesInput.java | 26 ++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java index 9332e71bc1..612d204f94 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java @@ -366,18 +366,9 @@ private StreamBytesInput(InputStream in, int byteCount) { @Override public void writeAllTo(OutputStream out) throws IOException { LOG.debug("write All {} bytes", byteCount); - // Transfer in chunks to avoid allocating a byteCount-sized intermediate buffer - byte[] buffer = new byte[Math.min(byteCount, 8192)]; - int remaining = byteCount; - while (remaining > 0) { - int toRead = Math.min(remaining, buffer.length); - int n = in.readNBytes(buffer, 0, toRead); - if (n < toRead) { - throw new EOFException("Reached the end of stream with " + (remaining - n) + " bytes left to read"); - } - out.write(buffer, 0, n); - remaining -= n; - } + // Cannot transfer in chunks because some InputStreams (e.g., NonBlockedDecompressorStream) + // require a full-size buffer to decompress all data in a single read() call. + out.write(this.toByteArray()); } @Override @@ -404,10 +395,15 @@ void writeInto(ByteBuffer buffer) { public byte[] toByteArray() throws IOException { LOG.debug("read all {} bytes", byteCount); - byte[] buf = in.readNBytes(byteCount); - if (buf.length != byteCount) { + // Use the 3-arg readNBytes to read directly into a byteCount-sized buffer. + // The 1-arg readNBytes(int) internally uses small (8KB) buffers which breaks + // block decompressors that require a full-size output buffer (e.g., NonBlockedDecompressorStream + // resets the decompressor after finished(), losing remaining data on partial reads). + byte[] buf = new byte[byteCount]; + int n = in.readNBytes(buf, 0, byteCount); + if (n != byteCount) { throw new EOFException( - "Reached the end of stream with " + (byteCount - buf.length) + " bytes left to read"); + "Reached the end of stream with " + (byteCount - n) + " bytes left to read"); } return buf; }