Skip to content

Commit 696e7cf

Browse files
committed
fix(dqe): PageSerializer DATE/TZ type support and aggregation type resolution
- Add DateType serialization via IntArrayBlock (Parquet native int32) - Add TimestampWithTimeZoneType serialization (epochMillis + picosOfMilli + tzKey) - Add date/timestamp(6) with time zone to PageSerializer type registry - Fix resolveColumnTypes() to parse MIN/MAX aggregate output types from function string (e.g., min(EventDate) → DATE, not BIGINT)
1 parent 2ac64b7 commit 696e7cf

2 files changed

Lines changed: 366 additions & 2 deletions

File tree

dqe/src/main/java/org/opensearch/sql/dqe/common/serde/PageSerializer.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,14 @@
1111
import io.trino.spi.block.BlockBuilder;
1212
import io.trino.spi.type.BigintType;
1313
import io.trino.spi.type.BooleanType;
14+
import io.trino.spi.type.DateType;
1415
import io.trino.spi.type.DoubleType;
1516
import io.trino.spi.type.IntegerType;
17+
import io.trino.spi.type.LongTimestampWithTimeZone;
1618
import io.trino.spi.type.RealType;
1719
import io.trino.spi.type.SmallintType;
1820
import io.trino.spi.type.TimestampType;
21+
import io.trino.spi.type.TimestampWithTimeZoneType;
1922
import io.trino.spi.type.TinyintType;
2023
import io.trino.spi.type.Type;
2124
import io.trino.spi.type.VarbinaryType;
@@ -47,7 +50,12 @@ public final class PageSerializer {
4750
Map.entry("tinyint", TinyintType.TINYINT),
4851
Map.entry("real", RealType.REAL),
4952
Map.entry("varbinary", VarbinaryType.VARBINARY),
50-
Map.entry("timestamp(3)", TimestampType.TIMESTAMP_MILLIS));
53+
Map.entry("timestamp(3)", TimestampType.TIMESTAMP_MILLIS),
54+
Map.entry("timestamp(6) with time zone",
55+
TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS),
56+
Map.entry("timestamp(3) with time zone",
57+
TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS),
58+
Map.entry("date", DateType.DATE));
5159

5260
private PageSerializer() {}
5361

@@ -223,6 +231,11 @@ private static void writeColumnBulk(StreamOutput out, Block block, Type type, in
223231
for (int pos = 0; pos < positionCount; pos++) {
224232
out.writeInt((int) RealType.REAL.getLong(block, pos));
225233
}
234+
} else if (type instanceof DateType) {
235+
io.trino.spi.block.IntArrayBlock intBlock = (io.trino.spi.block.IntArrayBlock) block;
236+
for (int pos = 0; pos < positionCount; pos++) {
237+
out.writeInt(intBlock.getInt(pos));
238+
}
226239
}
227240
}
228241

@@ -235,7 +248,8 @@ private static boolean isFixedWidthNumeric(Type type) {
235248
|| type instanceof SmallintType
236249
|| type instanceof TinyintType
237250
|| type instanceof BooleanType
238-
|| type instanceof RealType;
251+
|| type instanceof RealType
252+
|| type instanceof DateType;
239253
}
240254

241255
/**
@@ -360,6 +374,10 @@ private static void readColumnBulk(
360374
for (int pos = 0; pos < positionCount; pos++) {
361375
RealType.REAL.writeLong(builder, in.readInt());
362376
}
377+
} else if (type instanceof DateType) {
378+
for (int pos = 0; pos < positionCount; pos++) {
379+
((io.trino.spi.block.IntArrayBlockBuilder) builder).writeInt(in.readInt());
380+
}
363381
}
364382
}
365383

@@ -424,6 +442,14 @@ private static void writeTypedValue(StreamOutput out, Type type, Block block, in
424442
out.writeByteArray(bytes);
425443
} else if (type instanceof TimestampType) {
426444
out.writeLong(type.getLong(block, position));
445+
} else if (type instanceof TimestampWithTimeZoneType) {
446+
LongTimestampWithTimeZone tz =
447+
(LongTimestampWithTimeZone) type.getObject(block, position);
448+
out.writeLong(tz.getEpochMillis());
449+
out.writeInt(tz.getPicosOfMilli());
450+
out.writeShort(tz.getTimeZoneKey());
451+
} else if (type instanceof DateType) {
452+
out.writeInt(((io.trino.spi.block.IntArrayBlock) block).getInt(position));
427453
} else {
428454
throw new IOException("Unsupported type for serialization: " + type);
429455
}
@@ -459,6 +485,14 @@ private static void readTypedValue(StreamInput in, Type type, BlockBuilder build
459485
VarbinaryType.VARBINARY.writeSlice(builder, Slices.wrappedBuffer(bytes));
460486
} else if (type instanceof TimestampType) {
461487
type.writeLong(builder, in.readLong());
488+
} else if (type instanceof TimestampWithTimeZoneType) {
489+
long epochMillis = in.readLong();
490+
int picosOfMilli = in.readInt();
491+
short tzKey = in.readShort();
492+
type.writeObject(builder,
493+
LongTimestampWithTimeZone.fromEpochMillisAndFraction(epochMillis, picosOfMilli, tzKey));
494+
} else if (type instanceof DateType) {
495+
((io.trino.spi.block.IntArrayBlockBuilder) builder).writeInt(in.readInt());
462496
} else {
463497
throw new IOException("Unsupported type for deserialization: " + type);
464498
}

0 commit comments

Comments
 (0)