|
29 | 29 | import com.google.common.collect.ImmutableList; |
30 | 30 | import com.google.common.collect.Iterables; |
31 | 31 | import com.google.common.collect.Streams; |
| 32 | +import java.math.BigDecimal; |
32 | 33 | import java.util.ArrayList; |
33 | 34 | import java.util.Arrays; |
34 | 35 | import java.util.BitSet; |
|
86 | 87 | import org.opensearch.sql.ast.expression.Alias; |
87 | 88 | import org.opensearch.sql.ast.expression.AllFields; |
88 | 89 | import org.opensearch.sql.ast.expression.AllFieldsExcludeMeta; |
| 90 | +import org.opensearch.sql.ast.expression.And; |
89 | 91 | import org.opensearch.sql.ast.expression.Argument; |
90 | 92 | import org.opensearch.sql.ast.expression.Argument.ArgumentMap; |
| 93 | +import org.opensearch.sql.ast.expression.Compare; |
91 | 94 | import org.opensearch.sql.ast.expression.Field; |
92 | 95 | import org.opensearch.sql.ast.expression.Function; |
93 | 96 | import org.opensearch.sql.ast.expression.Let; |
|
150 | 153 | import org.opensearch.sql.ast.tree.StreamWindow; |
151 | 154 | import org.opensearch.sql.ast.tree.SubqueryAlias; |
152 | 155 | import org.opensearch.sql.ast.tree.TableFunction; |
| 156 | +import org.opensearch.sql.ast.tree.Timewrap; |
153 | 157 | import org.opensearch.sql.ast.tree.Trendline; |
154 | 158 | import org.opensearch.sql.ast.tree.Trendline.TrendlineType; |
155 | 159 | import org.opensearch.sql.ast.tree.UnresolvedPlan; |
|
172 | 176 | import org.opensearch.sql.exception.CalciteUnsupportedException; |
173 | 177 | import org.opensearch.sql.exception.SemanticCheckException; |
174 | 178 | import org.opensearch.sql.expression.function.BuiltinFunctionName; |
| 179 | +import org.opensearch.sql.expression.function.PPLBuiltinOperators; |
175 | 180 | import org.opensearch.sql.expression.function.PPLFuncImpTable; |
176 | 181 | import org.opensearch.sql.expression.parse.RegexCommonUtils; |
177 | 182 | import org.opensearch.sql.utils.ParseUtils; |
@@ -3091,6 +3096,249 @@ public RelNode visitChart(Chart node, CalcitePlanContext context) { |
3091 | 3096 | return relBuilder.peek(); |
3092 | 3097 | } |
3093 | 3098 |
|
| 3099 | + private static final int TIMEWRAP_MAX_PERIODS = 20; |
| 3100 | + |
| 3101 | + @Override |
| 3102 | + public RelNode visitTimewrap(Timewrap node, CalcitePlanContext context) { |
| 3103 | + visitChildren(node, context); |
| 3104 | + |
| 3105 | + // Signal the execution engine to strip all-null columns and rename with absolute offsets |
| 3106 | + CalcitePlanContext.stripNullColumns.set(true); |
| 3107 | + // Both align=now and align=end use _before suffix (matching Splunk behavior). |
| 3108 | + // align=end would use search end time as reference, but PPL has no search time range |
| 3109 | + // context, so both modes currently use query execution time. |
| 3110 | + CalcitePlanContext.timewrapUnitName.set( |
| 3111 | + timewrapUnitBaseName(node.getUnit(), node.getValue()) + "|_before"); |
| 3112 | + |
| 3113 | + RelBuilder b = context.relBuilder; |
| 3114 | + RexBuilder rx = context.rexBuilder; |
| 3115 | + |
| 3116 | + List<String> fieldNames = |
| 3117 | + b.peek().getRowType().getFieldNames().stream().filter(f -> !isMetadataField(f)).toList(); |
| 3118 | + String tsFieldName = fieldNames.get(0); |
| 3119 | + List<String> valueFieldNames = fieldNames.subList(1, fieldNames.size()); |
| 3120 | + |
| 3121 | + long spanSec = timewrapSpanToSeconds(node.getUnit(), node.getValue()); |
| 3122 | + RelDataType bigintType = rx.getTypeFactory().createSqlType(SqlTypeName.BIGINT); |
| 3123 | + |
| 3124 | + // Step 1: Convert timestamps to epoch seconds via UNIX_TIMESTAMP, add MAX OVER() |
| 3125 | + RexNode tsEpochExpr = |
| 3126 | + rx.makeCast( |
| 3127 | + bigintType, |
| 3128 | + rx.makeCall(PPLBuiltinOperators.UNIX_TIMESTAMP, b.field(tsFieldName)), |
| 3129 | + true); |
| 3130 | + b.projectPlus( |
| 3131 | + b.alias(tsEpochExpr, "__ts_epoch__"), |
| 3132 | + b.aggregateCall(SqlStdOperatorTable.MAX, tsEpochExpr).over().as("__max_epoch__")); |
| 3133 | + |
| 3134 | + // Step 2: Compute period_number and offset |
| 3135 | + RexNode tsEpoch = b.field("__ts_epoch__"); |
| 3136 | + RexNode maxEpoch = b.field("__max_epoch__"); |
| 3137 | + RexNode spanLit = rx.makeBigintLiteral(BigDecimal.valueOf(spanSec)); |
| 3138 | + |
| 3139 | + // period = (max_epoch - ts_epoch) / span_sec + 1 (integer division truncates) |
| 3140 | + RexNode diff = rx.makeCall(SqlStdOperatorTable.MINUS, maxEpoch, tsEpoch); |
| 3141 | + RexNode periodNum = |
| 3142 | + rx.makeCall( |
| 3143 | + SqlStdOperatorTable.PLUS, |
| 3144 | + rx.makeCall(SqlStdOperatorTable.DIVIDE, diff, spanLit), |
| 3145 | + rx.makeExactLiteral(BigDecimal.ONE, bigintType)); |
| 3146 | + |
| 3147 | + // offset_sec = ts_epoch MOD span_sec |
| 3148 | + // Convert back to actual timestamp: latest_period_start + offset |
| 3149 | + RexNode offsetSec = rx.makeCall(SqlStdOperatorTable.MOD, tsEpoch, spanLit); |
| 3150 | + RexNode latestPeriodStart = |
| 3151 | + rx.makeCall( |
| 3152 | + SqlStdOperatorTable.MINUS, |
| 3153 | + maxEpoch, |
| 3154 | + rx.makeCall(SqlStdOperatorTable.MOD, maxEpoch, spanLit)); |
| 3155 | + RexNode displayEpoch = rx.makeCall(SqlStdOperatorTable.PLUS, latestPeriodStart, offsetSec); |
| 3156 | + RexNode displayTimestamp = rx.makeCall(PPLBuiltinOperators.FROM_UNIXTIME, displayEpoch); |
| 3157 | + |
| 3158 | + // Compute base_offset for absolute period naming in execution engine. |
| 3159 | + // align=now: reference = current time |
| 3160 | + // align=end: reference = WHERE upper bound (search end time), fallback to now |
| 3161 | + RexNode baseOffset; |
| 3162 | + long nowEpochSec = context.functionProperties.getQueryStartClock().millis() / 1000; |
| 3163 | + Long referenceEpoch = null; |
| 3164 | + if ("end".equals(node.getAlign())) { |
| 3165 | + // Try to extract the upper bound from a WHERE clause on the timestamp field |
| 3166 | + referenceEpoch = extractTimestampUpperBound(node); |
| 3167 | + } |
| 3168 | + if (referenceEpoch == null) { |
| 3169 | + referenceEpoch = nowEpochSec; |
| 3170 | + } |
| 3171 | + RexNode refLit = rx.makeBigintLiteral(BigDecimal.valueOf(referenceEpoch)); |
| 3172 | + baseOffset = |
| 3173 | + rx.makeCall( |
| 3174 | + SqlStdOperatorTable.DIVIDE, |
| 3175 | + rx.makeCall(SqlStdOperatorTable.MINUS, refLit, maxEpoch), |
| 3176 | + spanLit); |
| 3177 | + |
| 3178 | + // Step 3: Project [display_timestamp, value_columns..., base_offset, period] |
| 3179 | + // base_offset is included in the group key so it survives the PIVOT |
| 3180 | + List<RexNode> projections = new ArrayList<>(); |
| 3181 | + projections.add(b.alias(displayTimestamp, tsFieldName)); |
| 3182 | + for (String vf : valueFieldNames) { |
| 3183 | + projections.add(b.field(vf)); |
| 3184 | + } |
| 3185 | + projections.add(b.alias(baseOffset, "__base_offset__")); |
| 3186 | + projections.add(b.alias(periodNum, "__period__")); |
| 3187 | + b.project(projections); |
| 3188 | + |
| 3189 | + // Step 4: PIVOT on period, grouped by [offset, __base_offset__] |
| 3190 | + // __base_offset__ is constant across all rows so it doesn't affect grouping, |
| 3191 | + // but survives the PIVOT so the execution engine can use it for absolute column naming |
| 3192 | + b.pivot( |
| 3193 | + b.groupKey(b.field(tsFieldName), b.field("__base_offset__")), |
| 3194 | + valueFieldNames.stream().map(f -> (RelBuilder.AggCall) b.max(b.field(f)).as("")).toList(), |
| 3195 | + ImmutableList.of(b.field("__period__")), |
| 3196 | + IntStream.rangeClosed(1, TIMEWRAP_MAX_PERIODS) |
| 3197 | + .map(i -> TIMEWRAP_MAX_PERIODS + 1 - i) // reverse: oldest period first |
| 3198 | + .mapToObj( |
| 3199 | + i -> |
| 3200 | + Map.entry( |
| 3201 | + // Use placeholder relative names; execution engine renames to absolute |
| 3202 | + String.valueOf(i), ImmutableList.of((RexNode) b.literal((long) i)))) |
| 3203 | + .collect(Collectors.toList())); |
| 3204 | + |
| 3205 | + // Step 5: Rename columns — add agg name prefix, clean up pivot artifacts |
| 3206 | + // Use relative period numbers as temporary names; the execution engine will compute |
| 3207 | + // absolute offsets using __base_offset__ and rename accordingly |
| 3208 | + List<String> pivotColNames = b.peek().getRowType().getFieldNames(); |
| 3209 | + List<String> cleanNames = new ArrayList<>(); |
| 3210 | + cleanNames.add(tsFieldName); |
| 3211 | + cleanNames.add("__base_offset__"); |
| 3212 | + for (int i = 2; i < pivotColNames.size(); i++) { |
| 3213 | + String name = pivotColNames.get(i); |
| 3214 | + if (name.endsWith("_")) { |
| 3215 | + name = name.substring(0, name.length() - 1); |
| 3216 | + } |
| 3217 | + // Prefix with agg name and _before suffix |
| 3218 | + if (valueFieldNames.size() == 1) { |
| 3219 | + name = valueFieldNames.get(0) + "_" + name + "_before"; |
| 3220 | + } |
| 3221 | + cleanNames.add(name); |
| 3222 | + } |
| 3223 | + b.rename(cleanNames); |
| 3224 | + |
| 3225 | + // Step 6: Sort by offset |
| 3226 | + b.sort(b.field(0)); |
| 3227 | + |
| 3228 | + return b.peek(); |
| 3229 | + } |
| 3230 | + |
| 3231 | + /** |
| 3232 | + * Convert a span unit and value to approximate seconds. Variable-length units use standard |
| 3233 | + * approximations: month=30 days, quarter=91 days, year=365 days. |
| 3234 | + */ |
| 3235 | + private long timewrapSpanToSeconds(SpanUnit unit, int value) { |
| 3236 | + return switch (unit.getName()) { |
| 3237 | + case "s" -> value; |
| 3238 | + case "m" -> value * 60L; |
| 3239 | + case "h" -> value * 3_600L; |
| 3240 | + case "d" -> value * 86_400L; |
| 3241 | + case "w" -> value * 7L * 86_400L; |
| 3242 | + case "M" -> value * 30L * 86_400L; // month ≈ 30 days |
| 3243 | + case "q" -> value * 91L * 86_400L; // quarter ≈ 91 days |
| 3244 | + case "y" -> value * 365L * 86_400L; // year ≈ 365 days |
| 3245 | + default -> |
| 3246 | + throw new SemanticCheckException("Unsupported time unit in timewrap: " + unit.getName()); |
| 3247 | + }; |
| 3248 | + } |
| 3249 | + |
| 3250 | + /** |
| 3251 | + * Get the timescale base name for timewrap column naming. Returns singular and plural forms |
| 3252 | + * separated by "|", e.g., "day|days". Used by the execution engine to build absolute period names |
| 3253 | + * like "501days_before". |
| 3254 | + */ |
| 3255 | + private String timewrapUnitBaseName(SpanUnit unit, int value) { |
| 3256 | + String singular = |
| 3257 | + switch (unit.getName()) { |
| 3258 | + case "s" -> "second"; |
| 3259 | + case "m" -> "minute"; |
| 3260 | + case "h" -> "hour"; |
| 3261 | + case "d" -> "day"; |
| 3262 | + case "w" -> "week"; |
| 3263 | + case "M" -> "month"; |
| 3264 | + case "q" -> "quarter"; |
| 3265 | + case "y" -> "year"; |
| 3266 | + default -> "period"; |
| 3267 | + }; |
| 3268 | + String plural = singular + "s"; |
| 3269 | + // Encode value so execution engine can compute totalUnits = (base_offset + period) * value |
| 3270 | + return value + "|" + singular + "|" + plural; |
| 3271 | + } |
| 3272 | + |
| 3273 | + /** |
| 3274 | + * Walk the AST from a Timewrap node to find a WHERE clause with an upper bound on the timestamp |
| 3275 | + * field (e.g., @timestamp <= '2024-07-03 18:00:00'). Returns the upper bound as epoch seconds, or |
| 3276 | + * null if not found. |
| 3277 | + */ |
| 3278 | + private Long extractTimestampUpperBound(Timewrap node) { |
| 3279 | + // Walk: Timewrap → Chart → Filter → inspect condition |
| 3280 | + Node current = node; |
| 3281 | + while (current != null && !current.getChild().isEmpty()) { |
| 3282 | + current = current.getChild().get(0); |
| 3283 | + if (current instanceof Filter filter) { |
| 3284 | + return findUpperBound(filter.getCondition()); |
| 3285 | + } |
| 3286 | + } |
| 3287 | + return null; |
| 3288 | + } |
| 3289 | + |
| 3290 | + /** Recursively search an expression tree for a timestamp upper bound (<=). */ |
| 3291 | + private Long findUpperBound(UnresolvedExpression expr) { |
| 3292 | + if (expr instanceof And) { |
| 3293 | + And and = (And) expr; |
| 3294 | + Long left = findUpperBound(and.getLeft()); |
| 3295 | + Long right = findUpperBound(and.getRight()); |
| 3296 | + // If both sides have upper bounds, use the smaller one (tighter bound) |
| 3297 | + if (left != null && right != null) return Math.min(left, right); |
| 3298 | + return left != null ? left : right; |
| 3299 | + } |
| 3300 | + if (expr instanceof Compare cmp) { |
| 3301 | + String op = cmp.getOperator(); |
| 3302 | + // Check for @timestamp <= X or @timestamp < X |
| 3303 | + if (("<=".equals(op) || "<".equals(op)) && isTimestampField(cmp.getLeft())) { |
| 3304 | + return parseTimestampLiteral(cmp.getRight()); |
| 3305 | + } |
| 3306 | + // Check for X >= @timestamp or X > @timestamp |
| 3307 | + if ((">=".equals(op) || ">".equals(op)) && isTimestampField(cmp.getRight())) { |
| 3308 | + return parseTimestampLiteral(cmp.getLeft()); |
| 3309 | + } |
| 3310 | + } |
| 3311 | + return null; |
| 3312 | + } |
| 3313 | + |
| 3314 | + private boolean isTimestampField(UnresolvedExpression expr) { |
| 3315 | + if (expr instanceof Field field) { |
| 3316 | + String name = field.getField().toString(); |
| 3317 | + return "@timestamp".equals(name) || "timestamp".equals(name); |
| 3318 | + } |
| 3319 | + return false; |
| 3320 | + } |
| 3321 | + |
| 3322 | + private Long parseTimestampLiteral(UnresolvedExpression expr) { |
| 3323 | + if (expr instanceof Literal lit && lit.getValue() instanceof String s) { |
| 3324 | + try { |
| 3325 | + // Parse "yyyy-MM-dd HH:mm:ss" format |
| 3326 | + java.time.LocalDateTime ldt = |
| 3327 | + java.time.LocalDateTime.parse( |
| 3328 | + s, java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); |
| 3329 | + return ldt.toEpochSecond(java.time.ZoneOffset.UTC); |
| 3330 | + } catch (Exception e) { |
| 3331 | + // Try ISO format |
| 3332 | + try { |
| 3333 | + return java.time.Instant.parse(s).getEpochSecond(); |
| 3334 | + } catch (Exception ignored) { |
| 3335 | + return null; |
| 3336 | + } |
| 3337 | + } |
| 3338 | + } |
| 3339 | + return null; |
| 3340 | + } |
| 3341 | + |
3094 | 3342 | /** |
3095 | 3343 | * Aggregate by column split then rank by grand total (summed value of each category). The output |
3096 | 3344 | * is <code>[col-split, grand-total, row-number]</code> |
|
0 commit comments