Skip to content

Commit 97321b8

Browse files
authored
GH-3141: Add constructor to ParquetFileReader to pass parquet footer and expose setRequestedSchema that accepts List<ColumnDescriptor> (#3262)
1 parent 6d2635b commit 97321b8

2 files changed

Lines changed: 67 additions & 13 deletions

File tree

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java

Lines changed: 58 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -567,8 +567,23 @@ public static final ParquetMetadata readFooter(InputFile file, MetadataFilter fi
567567

568568
public static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options, SeekableInputStream f)
569569
throws IOException {
570+
return readFooter(file, options, f, /*closeStreamOnFailure*/ false);
571+
}
572+
573+
private static final ParquetMetadata readFooter(
574+
InputFile file, ParquetReadOptions options, SeekableInputStream f, boolean closeStreamOnFailure)
575+
throws IOException {
570576
ParquetMetadataConverter converter = new ParquetMetadataConverter(options);
571-
return readFooter(file, options, f, converter);
577+
try {
578+
return readFooter(file, options, f, converter);
579+
} catch (Exception e) {
580+
// In case that readFooter throws an exception in the constructor, the new stream
581+
// should be closed. Otherwise, there's no way to close this outside.
582+
if (closeStreamOnFailure) {
583+
f.close();
584+
}
585+
throw e;
586+
}
572587
}
573588

574589
private static final ParquetMetadata readFooter(
@@ -729,6 +744,22 @@ public static ParquetFileReader open(InputFile file, ParquetReadOptions options,
729744
return new ParquetFileReader(file, options, f);
730745
}
731746

747+
/**
748+
* Open a {@link InputFile file} with {@link ParquetMetadata footer} and {@link ParquetReadOptions options}.
749+
*
750+
* @param file an input file
751+
* @param footer a {@link ParquetMetadata} footer already read from the file
752+
* @param options parquet read options
753+
* @param f the input stream for the file
754+
* @return an open ParquetFileReader
755+
* @throws IOException if there is an error while opening the file
756+
*/
757+
public static ParquetFileReader open(
758+
InputFile file, ParquetMetadata footer, ParquetReadOptions options, SeekableInputStream f)
759+
throws IOException {
760+
return new ParquetFileReader(file, footer, options, f);
761+
}
762+
732763
protected SeekableInputStream f;
733764
private final InputFile file;
734765
private final ParquetReadOptions options;
@@ -930,19 +961,31 @@ public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOEx
930961
this(file, options, file.newStream());
931962
}
932963

964+
/**
965+
* @param file Path to a parquet file
966+
* @param options {@link ParquetReadOptions}
967+
* @param f a {@link SeekableInputStream} for the parquet file
968+
* @throws IOException if the file can not be opened
969+
*/
933970
public ParquetFileReader(InputFile file, ParquetReadOptions options, SeekableInputStream f) throws IOException {
971+
this(file, readFooter(file, options, f, /*closeStreamOnFailure*/ true), options, f);
972+
}
973+
974+
/**
975+
* @param file Path to a parquet file
976+
* @param footer a {@link ParquetMetadata} footer already read from the file
977+
* @param options {@link ParquetReadOptions}
978+
* @param f a {@link SeekableInputStream} for the parquet file
979+
* @throws IOException if the file can not be opened
980+
*/
981+
public ParquetFileReader(InputFile file, ParquetMetadata footer, ParquetReadOptions options, SeekableInputStream f)
982+
throws IOException {
934983
this.converter = new ParquetMetadataConverter(options);
935984
this.file = file;
936985
this.f = f;
937986
this.options = options;
938-
try {
939-
this.footer = readFooter(file, options, f, converter);
940-
} catch (Exception e) {
941-
// In case that reading footer throws an exception in the constructor, the new stream
942-
// should be closed. Otherwise, there's no way to close this outside.
943-
f.close();
944-
throw e;
945-
}
987+
this.footer = footer;
988+
946989
this.fileMetaData = footer.getFileMetaData();
947990
this.fileDecryptor = fileMetaData.getFileDecryptor(); // must be called before filterRowGroups!
948991
if (null != fileDecryptor && fileDecryptor.plaintextFile()) {
@@ -1054,13 +1097,17 @@ public List<BlockMetaData> getRowGroups() {
10541097
return blocks;
10551098
}
10561099

1057-
public void setRequestedSchema(MessageType projection) {
1100+
public void setRequestedSchema(List<ColumnDescriptor> columns) {
10581101
paths.clear();
1059-
for (ColumnDescriptor col : projection.getColumns()) {
1102+
for (ColumnDescriptor col : columns) {
10601103
paths.put(ColumnPath.get(col.getPath()), col);
10611104
}
10621105
}
10631106

1107+
public void setRequestedSchema(MessageType projection) {
1108+
setRequestedSchema(projection.getColumns());
1109+
}
1110+
10641111
public void appendTo(ParquetFileWriter writer) throws IOException {
10651112
writer.appendRowGroups(f, blocks, true);
10661113
}

parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import java.util.zip.CRC32;
3737
import org.apache.hadoop.conf.Configuration;
3838
import org.apache.hadoop.fs.Path;
39+
import org.apache.parquet.HadoopReadOptions;
40+
import org.apache.parquet.ParquetReadOptions;
3941
import org.apache.parquet.bytes.BytesInput;
4042
import org.apache.parquet.bytes.HeapByteBufferAllocator;
4143
import org.apache.parquet.bytes.TrackingByteBufferAllocator;
@@ -688,8 +690,13 @@ private int getDataOffset(Page page) {
688690
*/
689691
private ParquetFileReader getParquetFileReader(Path path, Configuration conf, List<ColumnDescriptor> columns)
690692
throws IOException {
691-
ParquetMetadata footer = ParquetFileReader.readFooter(conf, path);
692-
return new ParquetFileReader(conf, footer.getFileMetaData(), path, footer.getBlocks(), columns);
693+
HadoopInputFile inputFile = HadoopInputFile.fromPath(path, conf);
694+
SeekableInputStream inputStream = inputFile.newStream();
695+
ParquetReadOptions readOptions = HadoopReadOptions.builder(conf).build();
696+
ParquetMetadata footer = ParquetFileReader.readFooter(inputFile, readOptions, inputStream);
697+
ParquetFileReader reader = ParquetFileReader.open(inputFile, footer, readOptions, inputStream);
698+
reader.setRequestedSchema(columns);
699+
return reader;
693700
}
694701

695702
/**

0 commit comments

Comments
 (0)