Skip to content

Commit 68ed638

Browse files
committed
add interop test
1 parent 0d05bef commit 68ed638

2 files changed

Lines changed: 656 additions & 0 deletions

File tree

Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.parquet.hadoop;
21+
22+
import static org.apache.parquet.schema.LogicalTypeAnnotation.float16Type;
23+
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
24+
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
25+
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
26+
27+
import java.io.IOException;
28+
import java.util.Arrays;
29+
import java.util.List;
30+
import org.apache.hadoop.conf.Configuration;
31+
import org.apache.hadoop.fs.FileSystem;
32+
import org.apache.hadoop.fs.Path;
33+
import org.apache.parquet.example.data.Group;
34+
import org.apache.parquet.example.data.GroupFactory;
35+
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
36+
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
37+
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
38+
import org.apache.parquet.hadoop.rewrite.ParquetRewriter;
39+
import org.apache.parquet.hadoop.rewrite.RewriteOptions;
40+
import org.apache.parquet.io.api.Binary;
41+
import org.apache.parquet.schema.ColumnOrder;
42+
import org.apache.parquet.schema.MessageType;
43+
import org.apache.parquet.schema.Types;
44+
45+
public final class FloatingPointNanInteropFileGenerator {
46+
47+
public static final String FILE_NO_NAN = "floating_orders_nan_count_no_nan.parquet";
48+
public static final String FILE_MIXED_NAN = "floating_orders_nan_count_mixed_nan.parquet";
49+
public static final String FILE_ALL_NAN = "floating_orders_nan_count_all_nan.parquet";
50+
public static final String FILE_MERGED = "floating_orders_nan_count_merged.parquet";
51+
52+
public static final int ROWS_PER_FILE = 10;
53+
54+
private static final float FLOAT_NAN_SMALL = Float.intBitsToFloat(0x7fc00001);
55+
private static final float FLOAT_NAN_LARGE = Float.intBitsToFloat(0x7fffffff);
56+
private static final double DOUBLE_NAN_SMALL = Double.longBitsToDouble(0x7ff0000000000001L);
57+
private static final double DOUBLE_NAN_LARGE = Double.longBitsToDouble(0x7fffffffffffffffL);
58+
private static final Binary FLOAT16_NAN_SMALL = float16Binary((short) 0x7c01);
59+
private static final Binary FLOAT16_NAN_LARGE = float16Binary((short) 0x7fff);
60+
61+
private static final MessageType SCHEMA = Types.buildMessage()
62+
.required(FLOAT)
63+
.columnOrder(ColumnOrder.ieee754TotalOrder())
64+
.named("float_ieee754")
65+
.required(FLOAT)
66+
.named("float_typedef")
67+
.required(DOUBLE)
68+
.columnOrder(ColumnOrder.ieee754TotalOrder())
69+
.named("double_ieee754")
70+
.required(DOUBLE)
71+
.named("double_typedef")
72+
.required(FIXED_LEN_BYTE_ARRAY)
73+
.length(2)
74+
.as(float16Type())
75+
.columnOrder(ColumnOrder.ieee754TotalOrder())
76+
.named("float16_ieee754")
77+
.required(FIXED_LEN_BYTE_ARRAY)
78+
.length(2)
79+
.as(float16Type())
80+
.named("float16_typedef")
81+
.named("msg");
82+
83+
private static final float[] NO_NAN_FLOATS = new float[] {-2f, -1f, -0f, 0f, 0.5f, 1f, 2f, 3f, 4f, 5f};
84+
private static final double[] NO_NAN_DOUBLES = new double[] {-2d, -1d, -0d, 0d, 0.5d, 1d, 2d, 3d, 4d, 5d};
85+
private static final Binary[] NO_NAN_FLOAT16 = new Binary[] {
86+
float16Binary((short) 0xc000),
87+
float16Binary((short) 0xbc00),
88+
float16Binary((short) 0x8000),
89+
float16Binary((short) 0x0000),
90+
float16Binary((short) 0x3800),
91+
float16Binary((short) 0x3c00),
92+
float16Binary((short) 0x4000),
93+
float16Binary((short) 0x4200),
94+
float16Binary((short) 0x4400),
95+
float16Binary((short) 0x4500)
96+
};
97+
98+
private static final float[] MIXED_NAN_FLOATS =
99+
new float[] {FLOAT_NAN_SMALL, -2f, FLOAT_NAN_LARGE, -1f, -0f, 0f, 1f, 2f, 3f, 5f};
100+
private static final double[] MIXED_NAN_DOUBLES =
101+
new double[] {DOUBLE_NAN_SMALL, -2d, DOUBLE_NAN_LARGE, -1d, -0d, 0d, 1d, 2d, 3d, 5d};
102+
private static final Binary[] MIXED_NAN_FLOAT16 = new Binary[] {
103+
FLOAT16_NAN_SMALL,
104+
float16Binary((short) 0xc000),
105+
FLOAT16_NAN_LARGE,
106+
float16Binary((short) 0xbc00),
107+
float16Binary((short) 0x8000),
108+
float16Binary((short) 0x0000),
109+
float16Binary((short) 0x3c00),
110+
float16Binary((short) 0x4000),
111+
float16Binary((short) 0x4200),
112+
float16Binary((short) 0x4500)
113+
};
114+
115+
private static final float[] ALL_NAN_FLOATS = new float[] {
116+
FLOAT_NAN_SMALL,
117+
FLOAT_NAN_LARGE,
118+
FLOAT_NAN_SMALL,
119+
FLOAT_NAN_LARGE,
120+
FLOAT_NAN_SMALL,
121+
FLOAT_NAN_LARGE,
122+
FLOAT_NAN_SMALL,
123+
FLOAT_NAN_LARGE,
124+
FLOAT_NAN_SMALL,
125+
FLOAT_NAN_LARGE
126+
};
127+
128+
private static final double[] ALL_NAN_DOUBLES = new double[] {
129+
DOUBLE_NAN_SMALL,
130+
DOUBLE_NAN_LARGE,
131+
DOUBLE_NAN_SMALL,
132+
DOUBLE_NAN_LARGE,
133+
DOUBLE_NAN_SMALL,
134+
DOUBLE_NAN_LARGE,
135+
DOUBLE_NAN_SMALL,
136+
DOUBLE_NAN_LARGE,
137+
DOUBLE_NAN_SMALL,
138+
DOUBLE_NAN_LARGE
139+
};
140+
141+
private static final Binary[] ALL_NAN_FLOAT16 = new Binary[] {
142+
FLOAT16_NAN_SMALL,
143+
FLOAT16_NAN_LARGE,
144+
FLOAT16_NAN_SMALL,
145+
FLOAT16_NAN_LARGE,
146+
FLOAT16_NAN_SMALL,
147+
FLOAT16_NAN_LARGE,
148+
FLOAT16_NAN_SMALL,
149+
FLOAT16_NAN_LARGE,
150+
FLOAT16_NAN_SMALL,
151+
FLOAT16_NAN_LARGE
152+
};
153+
154+
public enum Scenario {
155+
NO_NAN,
156+
MIXED_NAN,
157+
ALL_NAN
158+
}
159+
160+
public static final class GenerationResult {
161+
private final Path noNanFile;
162+
private final Path mixedNanFile;
163+
private final Path allNanFile;
164+
private final Path mergedFile;
165+
166+
private GenerationResult(Path noNanFile, Path mixedNanFile, Path allNanFile, Path mergedFile) {
167+
this.noNanFile = noNanFile;
168+
this.mixedNanFile = mixedNanFile;
169+
this.allNanFile = allNanFile;
170+
this.mergedFile = mergedFile;
171+
}
172+
173+
public Path getNoNanFile() {
174+
return noNanFile;
175+
}
176+
177+
public Path getMixedNanFile() {
178+
return mixedNanFile;
179+
}
180+
181+
public Path getAllNanFile() {
182+
return allNanFile;
183+
}
184+
185+
public Path getMergedFile() {
186+
return mergedFile;
187+
}
188+
}
189+
190+
private FloatingPointNanInteropFileGenerator() {}
191+
192+
public static MessageType schema() {
193+
return SCHEMA;
194+
}
195+
196+
public static float[] floatValues(Scenario scenario) {
197+
switch (scenario) {
198+
case NO_NAN:
199+
return Arrays.copyOf(NO_NAN_FLOATS, NO_NAN_FLOATS.length);
200+
case MIXED_NAN:
201+
return Arrays.copyOf(MIXED_NAN_FLOATS, MIXED_NAN_FLOATS.length);
202+
case ALL_NAN:
203+
return Arrays.copyOf(ALL_NAN_FLOATS, ALL_NAN_FLOATS.length);
204+
default:
205+
throw new IllegalArgumentException("Unknown scenario: " + scenario);
206+
}
207+
}
208+
209+
public static double[] doubleValues(Scenario scenario) {
210+
switch (scenario) {
211+
case NO_NAN:
212+
return Arrays.copyOf(NO_NAN_DOUBLES, NO_NAN_DOUBLES.length);
213+
case MIXED_NAN:
214+
return Arrays.copyOf(MIXED_NAN_DOUBLES, MIXED_NAN_DOUBLES.length);
215+
case ALL_NAN:
216+
return Arrays.copyOf(ALL_NAN_DOUBLES, ALL_NAN_DOUBLES.length);
217+
default:
218+
throw new IllegalArgumentException("Unknown scenario: " + scenario);
219+
}
220+
}
221+
222+
public static Binary[] float16Values(Scenario scenario) {
223+
Binary[] values;
224+
switch (scenario) {
225+
case NO_NAN:
226+
values = NO_NAN_FLOAT16;
227+
break;
228+
case MIXED_NAN:
229+
values = MIXED_NAN_FLOAT16;
230+
break;
231+
case ALL_NAN:
232+
values = ALL_NAN_FLOAT16;
233+
break;
234+
default:
235+
throw new IllegalArgumentException("Unknown scenario: " + scenario);
236+
}
237+
238+
Binary[] copy = new Binary[values.length];
239+
for (int i = 0; i < values.length; i++) {
240+
copy[i] = values[i].copy();
241+
}
242+
return copy;
243+
}
244+
245+
public static GenerationResult generateAndMerge(Configuration conf, Path outputDir) throws IOException {
246+
FileSystem fs = outputDir.getFileSystem(conf);
247+
if (!fs.exists(outputDir) && !fs.mkdirs(outputDir)) {
248+
throw new IOException("Cannot create output directory: " + outputDir);
249+
}
250+
251+
Path noNanPath = new Path(outputDir, FILE_NO_NAN);
252+
Path mixedNanPath = new Path(outputDir, FILE_MIXED_NAN);
253+
Path allNanPath = new Path(outputDir, FILE_ALL_NAN);
254+
Path mergedPath = new Path(outputDir, FILE_MERGED);
255+
256+
deleteIfExists(fs, noNanPath);
257+
deleteIfExists(fs, mixedNanPath);
258+
deleteIfExists(fs, allNanPath);
259+
deleteIfExists(fs, mergedPath);
260+
261+
writeScenarioFile(conf, noNanPath, Scenario.NO_NAN);
262+
writeScenarioFile(conf, mixedNanPath, Scenario.MIXED_NAN);
263+
writeScenarioFile(conf, allNanPath, Scenario.ALL_NAN);
264+
mergeFiles(conf, List.of(noNanPath, mixedNanPath, allNanPath), mergedPath);
265+
266+
return new GenerationResult(noNanPath, mixedNanPath, allNanPath, mergedPath);
267+
}
268+
269+
public static void writeScenarioFile(Configuration conf, Path outputFile, Scenario scenario) throws IOException {
270+
float[] floatValues = floatValues(scenario);
271+
double[] doubleValues = doubleValues(scenario);
272+
Binary[] float16Values = float16Values(scenario);
273+
274+
if (floatValues.length != ROWS_PER_FILE
275+
|| doubleValues.length != ROWS_PER_FILE
276+
|| float16Values.length != ROWS_PER_FILE) {
277+
throw new IllegalStateException("Scenario " + scenario + " must have exactly " + ROWS_PER_FILE + " rows");
278+
}
279+
280+
GroupFactory factory = new SimpleGroupFactory(SCHEMA);
281+
try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(outputFile)
282+
.withConf(conf)
283+
.withType(SCHEMA)
284+
.withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
285+
.withDictionaryEncoding(false)
286+
.build()) {
287+
for (int i = 0; i < ROWS_PER_FILE; i++) {
288+
writer.write(factory.newGroup()
289+
.append("float_ieee754", floatValues[i])
290+
.append("float_typedef", floatValues[i])
291+
.append("double_ieee754", doubleValues[i])
292+
.append("double_typedef", doubleValues[i])
293+
.append("float16_ieee754", float16Values[i])
294+
.append("float16_typedef", float16Values[i]));
295+
}
296+
}
297+
}
298+
299+
public static void mergeFiles(Configuration conf, List<Path> inputFiles, Path outputFile) throws IOException {
300+
RewriteOptions options = new RewriteOptions.Builder(conf, inputFiles, outputFile)
301+
.transform(CompressionCodecName.UNCOMPRESSED)
302+
.build();
303+
try (ParquetRewriter rewriter = new ParquetRewriter(options)) {
304+
rewriter.processBlocks();
305+
}
306+
}
307+
308+
private static void deleteIfExists(FileSystem fs, Path path) throws IOException {
309+
if (fs.exists(path) && !fs.delete(path, false)) {
310+
throw new IOException("Failed to delete existing file: " + path);
311+
}
312+
}
313+
314+
private static Binary float16Binary(short bits) {
315+
return Binary.fromConstantByteArray(new byte[] {(byte) (bits & 0xff), (byte) ((bits >> 8) & 0xff)});
316+
}
317+
318+
public static void main(String[] args) throws IOException {
319+
String outputDir = args.length > 0 ? args[0] : "target/parquet-testing/data";
320+
Configuration conf = new Configuration();
321+
GenerationResult result = generateAndMerge(conf, new Path(outputDir));
322+
System.out.println("Generated files:");
323+
System.out.println(" " + result.getNoNanFile());
324+
System.out.println(" " + result.getMixedNanFile());
325+
System.out.println(" " + result.getAllNanFile());
326+
System.out.println(" " + result.getMergedFile());
327+
}
328+
}

0 commit comments

Comments
 (0)