Skip to content

Commit 30fa1de

Browse files
PARQUET-2450: Fix Avro projection for single-field repeated record types (#1300)
1 parent 95b004c commit 30fa1de

3 files changed

Lines changed: 168 additions & 2 deletions

File tree

parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -930,6 +930,7 @@ public void end() {
930930
static boolean isElementType(Type repeatedType, Schema elementSchema) {
931931
if (repeatedType.isPrimitive()
932932
|| repeatedType.asGroupType().getFieldCount() > 1
933+
|| repeatedType.getName().equals("array")
933934
|| repeatedType.asGroupType().getType(0).isRepetition(REPEATED)) {
934935
// The repeated type must be the element type because it is an invalid
935936
// synthetic wrapper. Must be a group with one optional or required field

parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java

Lines changed: 128 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@
3737
import org.apache.hadoop.fs.Path;
3838
import org.apache.parquet.DirectWriterTest;
3939
import org.apache.parquet.hadoop.ParquetFileReader;
40+
import org.apache.parquet.io.InvalidRecordException;
4041
import org.apache.parquet.schema.MessageType;
42+
import org.apache.parquet.schema.MessageTypeParser;
4143
import org.junit.Assert;
4244
import org.junit.BeforeClass;
4345
import org.junit.Ignore;
@@ -603,7 +605,7 @@ public void testAvroCompatOptionalGroupInList() throws Exception {
603605
public void testAvroCompatOptionalGroupInListWithSchema() throws Exception {
604606
Path test = writeDirect(
605607
"message AvroCompatOptionalGroupInListWithSchema {" + " optional group locations (LIST) {"
606-
+ " repeated group array {"
608+
+ " repeated group my_list {"
607609
+ " optional group element {"
608610
+ " required double latitude;"
609611
+ " required double longitude;"
@@ -616,7 +618,7 @@ public void testAvroCompatOptionalGroupInListWithSchema() throws Exception {
616618
rc.startField("locations", 0);
617619

618620
rc.startGroup();
619-
rc.startField("array", 0); // start writing array contents
621+
rc.startField("my_list", 0); // start writing array contents
620622

621623
// write a non-null element
622624
rc.startGroup(); // array level
@@ -1103,6 +1105,130 @@ public void testListOfSingleElementStructsWithElementField() throws Exception {
11031105
assertReaderContains(newBehaviorReader(test, newDoubleSchema), newDoubleSchema, newDoubleRecord);
11041106
}
11051107

1108+
@Test
1109+
public void testIsElementTypeRequiredRepeatedRecord() {
1110+
// Test `_tuple` style naming
1111+
MessageType parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList {\n"
1112+
+ " required group list_field (LIST) {\n"
1113+
+ " repeated group list_field_tuple (LIST) {\n"
1114+
+ " repeated int32 int_field;\n"
1115+
+ " }\n"
1116+
+ " }\n"
1117+
+ "}");
1118+
Schema avroSchema = new AvroSchemaConverter().convert(parquetSchema);
1119+
1120+
Assert.assertTrue(AvroRecordConverter.isElementType(
1121+
parquetSchema.getType("list_field").asGroupType().getType("list_field_tuple"),
1122+
avroSchema.getFields().get(0).schema()));
1123+
1124+
// Test `array` style naming
1125+
parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList {\n"
1126+
+ " required group list_field (LIST) {\n"
1127+
+ " repeated group array {\n"
1128+
+ " required int32 a;\n"
1129+
+ " }\n"
1130+
+ " }\n"
1131+
+ "}");
1132+
avroSchema = new AvroSchemaConverter().convert(parquetSchema);
1133+
1134+
Assert.assertTrue(AvroRecordConverter.isElementType(
1135+
parquetSchema.getType("list_field"),
1136+
avroSchema.getFields().get(0).schema()));
1137+
}
1138+
1139+
@Test
1140+
public void testIsElementTypeOptionalRepeatedRecord() {
1141+
// Test `_tuple` style naming
1142+
MessageType parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList {\n"
1143+
+ " optional group list_field (LIST) {\n"
1144+
+ " repeated group list_field_tuple (LIST) {\n"
1145+
+ " repeated int32 int_field;\n"
1146+
+ " }\n"
1147+
+ " }\n"
1148+
+ "}");
1149+
Schema avroSchema = new AvroSchemaConverter().convert(parquetSchema);
1150+
1151+
Assert.assertTrue(AvroRecordConverter.isElementType(
1152+
parquetSchema.getType("list_field").asGroupType().getType("list_field_tuple"),
1153+
avroSchema.getFields().get(0).schema()));
1154+
1155+
// Test `array` style naming
1156+
parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList {\n"
1157+
+ " optional group list_field (LIST) {\n"
1158+
+ " repeated group array {\n"
1159+
+ " required int32 a;\n"
1160+
+ " }\n"
1161+
+ " }\n"
1162+
+ "}");
1163+
avroSchema = new AvroSchemaConverter().convert(parquetSchema);
1164+
1165+
Assert.assertTrue(AvroRecordConverter.isElementType(
1166+
parquetSchema.getType("list_field"),
1167+
avroSchema.getFields().get(0).schema()));
1168+
}
1169+
1170+
@Test
1171+
public void testIsElementTypeFailsInvalidSchema() throws Exception {
1172+
Path test = writeDirect(
1173+
"message MessageWithInvalidArraySchema {"
1174+
+ " optional group locations (LIST) {"
1175+
+ " repeated group array {"
1176+
+ " optional group element {"
1177+
+ " required double latitude;"
1178+
+ " required double longitude;"
1179+
+ " }"
1180+
+ " }"
1181+
+ " }"
1182+
+ "}",
1183+
rc -> {
1184+
rc.startMessage();
1185+
rc.startField("locations", 0);
1186+
1187+
rc.startGroup();
1188+
rc.startField("array", 0); // start writing array contents
1189+
1190+
// write a non-null element
1191+
rc.startGroup(); // array level
1192+
rc.startField("element", 0);
1193+
rc.startGroup();
1194+
rc.startField("latitude", 0);
1195+
rc.addDouble(0.0);
1196+
rc.endField("latitude", 0);
1197+
rc.startField("longitude", 1);
1198+
rc.addDouble(180.0);
1199+
rc.endField("longitude", 1);
1200+
rc.endGroup();
1201+
rc.endField("element", 0);
1202+
rc.endGroup(); // array level
1203+
1204+
rc.endField("array", 0); // finished writing array contents
1205+
rc.endGroup();
1206+
1207+
rc.endField("locations", 0);
1208+
rc.endMessage();
1209+
});
1210+
1211+
Schema location = record(
1212+
"element",
1213+
field("latitude", primitive(Schema.Type.DOUBLE)),
1214+
field("longitude", primitive(Schema.Type.DOUBLE)));
1215+
1216+
Schema newSchema =
1217+
record("MessageWithInvalidArraySchema", optionalField("locations", array(optional(location))));
1218+
GenericRecord newRecord = instance(
1219+
newSchema,
1220+
"locations",
1221+
Arrays.asList(
1222+
instance(location, "latitude", 0.0, "longitude", 180.0),
1223+
instance(location, "latitude", 0.0, "longitude", 0.0)));
1224+
1225+
Configuration oldConfWithSchema = new Configuration();
1226+
AvroReadSupport.setAvroReadSchema(oldConfWithSchema, newSchema);
1227+
1228+
AvroParquetReader<GenericRecord> reader = new AvroParquetReader<>(oldConfWithSchema, test);
1229+
Assert.assertThrows(InvalidRecordException.class, reader::read);
1230+
}
1231+
11061232
public <T extends IndexedRecord> AvroParquetReader<T> oldBehaviorReader(Path path) throws IOException {
11071233
return new AvroParquetReader<T>(OLD_BEHAVIOR_CONF, path);
11081234
}

parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.stream.Collectors;
3939
import java.util.stream.IntStream;
4040
import org.apache.avro.Schema;
41+
import org.apache.avro.SchemaBuilder;
4142
import org.apache.hadoop.conf.Configuration;
4243
import org.apache.hadoop.fs.Path;
4344
import org.apache.parquet.hadoop.ParquetReader;
@@ -232,6 +233,44 @@ public void testProjection() throws IOException {
232233
}
233234
}
234235

236+
@Test
237+
public void testRepeatedRecordProjection() throws IOException {
238+
Path path = writeCarsToParquetFile(1, CompressionCodecName.UNCOMPRESSED, false);
239+
Configuration conf = new Configuration(testConf);
240+
Schema schema = Car.getClassSchema();
241+
242+
// Project a single field from repeated record schema
243+
final Schema projectedSchema = SchemaBuilder.builder(schema.getNamespace())
244+
.record("Car")
245+
.fields()
246+
.name("serviceHistory")
247+
.type(SchemaBuilder.unionOf()
248+
.nullBuilder()
249+
.endNull()
250+
.and()
251+
.array()
252+
.items(SchemaBuilder.builder(schema.getNamespace())
253+
.record("Service")
254+
.fields()
255+
.requiredString("mechanic")
256+
.endRecord())
257+
.endUnion())
258+
.noDefault()
259+
.endRecord();
260+
261+
AvroReadSupport.setRequestedProjection(conf, projectedSchema);
262+
263+
try (ParquetReader<Car> reader = new AvroParquetReader<>(conf, path)) {
264+
for (Car car = reader.read(); car != null; car = reader.read()) {
265+
assertNotNull(car.getServiceHistory());
266+
car.getServiceHistory().forEach(service -> {
267+
assertNotNull(service.getMechanic());
268+
assertEquals(0L, service.getDate());
269+
});
270+
}
271+
}
272+
}
273+
235274
@Test
236275
public void testAvroReadSchema() throws IOException {
237276
Path path = writeCarsToParquetFile(1, CompressionCodecName.UNCOMPRESSED, false);

0 commit comments

Comments
 (0)