Skip to content

Commit c773953

Browse files
authored
feat(tesseract): Support multiple time dimensions in pre-aggregations (#10599)
1 parent 8e38438 commit c773953

21 files changed

Lines changed: 472 additions & 243 deletions

packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations-multi-stage.test.ts

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,85 @@ describe('PreAggregationsMultiStage', () => {
170170
171171
})
172172
173+
cube('coach', {
174+
sql: \`
175+
SELECT 101 AS id, '2025-01-01'::TIMESTAMP AS time UNION ALL
176+
SELECT 102 AS id, '2025-02-01'::TIMESTAMP AS time UNION ALL
177+
SELECT 103 AS id, '2025-02-02'::TIMESTAMP AS time UNION ALL
178+
SELECT 104 AS id, '2025-03-01'::TIMESTAMP AS time UNION ALL
179+
SELECT 105 AS id, '2025-03-02'::TIMESTAMP AS time UNION ALL
180+
SELECT 106 AS id, '2025-03-03'::TIMESTAMP AS time UNION ALL
181+
SELECT 107 AS id, '2025-04-01'::TIMESTAMP AS time UNION ALL
182+
SELECT 108 AS id, '2025-04-02'::TIMESTAMP AS time UNION ALL
183+
SELECT 109 AS id, '2025-04-03'::TIMESTAMP AS time UNION ALL
184+
SELECT 110 AS id, '2025-04-04'::TIMESTAMP AS time UNION ALL
185+
SELECT 111 AS id, '2025-05-01'::TIMESTAMP AS time UNION ALL
186+
SELECT 112 AS id, '2025-05-02'::TIMESTAMP AS time UNION ALL
187+
SELECT 113 AS id, '2025-05-03'::TIMESTAMP AS time UNION ALL
188+
SELECT 114 AS id, '2025-05-04'::TIMESTAMP AS time UNION ALL
189+
SELECT 115 AS id, '2025-05-05'::TIMESTAMP AS time UNION ALL
190+
SELECT 116 AS id, '2025-06-01'::TIMESTAMP AS time UNION ALL
191+
SELECT 117 AS id, '2025-06-02'::TIMESTAMP AS time UNION ALL
192+
SELECT 118 AS id, '2025-06-03'::TIMESTAMP AS time UNION ALL
193+
SELECT 119 AS id, '2025-06-04'::TIMESTAMP AS time UNION ALL
194+
SELECT 120 AS id, '2025-06-05'::TIMESTAMP AS time UNION ALL
195+
SELECT 121 AS id, '2025-06-06'::TIMESTAMP AS time UNION ALL
196+
SELECT 122 AS id, '2025-07-01'::TIMESTAMP AS time UNION ALL
197+
SELECT 123 AS id, '2025-07-02'::TIMESTAMP AS time UNION ALL
198+
SELECT 124 AS id, '2025-07-03'::TIMESTAMP AS time UNION ALL
199+
SELECT 125 AS id, '2025-07-04'::TIMESTAMP AS time UNION ALL
200+
SELECT 126 AS id, '2025-07-05'::TIMESTAMP AS time UNION ALL
201+
SELECT 127 AS id, '2025-07-06'::TIMESTAMP AS time UNION ALL
202+
SELECT 128 AS id, '2025-07-07'::TIMESTAMP AS time
203+
\`,
204+
205+
dimensions: {
206+
time: {
207+
sql: 'time',
208+
type: 'time',
209+
public: false,
210+
},
211+
},
212+
213+
measures: {
214+
count_distinct: {
215+
sql: 'id',
216+
type: 'countDistinct',
217+
public: false,
218+
},
219+
count_distinct__sum_by_quarter_aux: {
220+
multi_stage: true,
221+
sql: \`\${count_distinct}\`,
222+
type: 'sum',
223+
add_group_by: [time.month],
224+
group_by: [time.quarter],
225+
public: false,
226+
},
227+
count_distinct__sum_by_quarter: {
228+
multi_stage: true,
229+
sql: \`\${count_distinct__sum_by_quarter_aux}\`,
230+
type: 'sum',
231+
add_group_by: [time.quarter],
232+
},
233+
},
234+
235+
preAggregations: {
236+
main: {
237+
type: 'rollup',
238+
measures: [count_distinct],
239+
timeDimensions: [
240+
{
241+
dimension: time,
242+
granularity: 'month'
243+
},
244+
{
245+
dimension: time,
246+
granularity: 'quarter'
247+
}
248+
]
249+
},
250+
},
251+
})
173252
174253
`);
175254

@@ -285,6 +364,30 @@ describe('PreAggregationsMultiStage', () => {
285364
);
286365
});
287366
}));
367+
368+
it('multi stage count_distinct sum by quarter with pre-aggregation', () => compiler.compile().then(() => {
369+
const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, {
370+
measures: [
371+
'coach.count_distinct__sum_by_quarter'
372+
],
373+
timezone: 'UTC',
374+
preAggregationsSchema: '',
375+
cubestoreSupportMultistage: true
376+
});
377+
378+
const preAggregationsDescription: any = query.preAggregations?.preAggregationsDescription();
379+
const sqlAndParams = query.buildSqlAndParams();
380+
expect(preAggregationsDescription[0].tableName).toEqual('coach_main');
381+
expect(sqlAndParams[0]).toContain('coach_main');
382+
383+
return dbRunner.evaluateQueryWithPreAggregations(query).then(res => {
384+
expect(res).toEqual(
385+
[
386+
{ coach__count_distinct__sum_by_quarter: '28' },
387+
]
388+
);
389+
});
390+
}));
288391
} else {
289392
it.skip('multi stage pre-aggregations', () => {
290393
// Skipping because it works only in Tesseract

rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ pub mod member_sql;
2929
pub mod options_member;
3030
pub mod pre_aggregation_description;
3131
pub mod pre_aggregation_obj;
32+
pub mod pre_aggregation_time_dimension;
3233
pub mod security_context;
3334
pub mod segment_definition;
3435
pub mod sql_templates_render;

rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/pre_aggregation_description.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
use super::member_sql::{MemberSql, NativeMemberSql};
2+
use super::pre_aggregation_time_dimension::{
3+
NativePreAggregationTimeDimension, PreAggregationTimeDimension,
4+
};
25
use cubenativeutils::wrappers::serializer::{
36
NativeDeserialize, NativeDeserializer, NativeSerialize,
47
};
8+
use cubenativeutils::wrappers::NativeArray;
59
use cubenativeutils::wrappers::NativeContextHolder;
610
use cubenativeutils::wrappers::NativeObjectHandle;
711
use cubenativeutils::CubeError;
@@ -38,4 +42,9 @@ pub trait PreAggregationDescription {
3842

3943
#[nbridge(field, optional)]
4044
fn rollup_references(&self) -> Result<Option<Rc<dyn MemberSql>>, CubeError>;
45+
46+
#[nbridge(field, vec, optional)]
47+
fn time_dimension_references(
48+
&self,
49+
) -> Result<Option<Vec<Rc<dyn PreAggregationTimeDimension>>>, CubeError>;
4150
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
use super::member_sql::{MemberSql, NativeMemberSql};
2+
use cubenativeutils::wrappers::serializer::{
3+
NativeDeserialize, NativeDeserializer, NativeSerialize,
4+
};
5+
use cubenativeutils::wrappers::NativeContextHolder;
6+
use cubenativeutils::wrappers::NativeObjectHandle;
7+
use cubenativeutils::CubeError;
8+
use serde::{Deserialize, Serialize};
9+
use std::any::Any;
10+
use std::rc::Rc;
11+
12+
#[derive(Deserialize, Serialize, Clone, Debug)]
13+
pub struct PreAggregationTimeDimensionStatic {
14+
pub granularity: String,
15+
}
16+
17+
#[nativebridge::native_bridge(PreAggregationTimeDimensionStatic)]
18+
pub trait PreAggregationTimeDimension {
19+
#[nbridge(field)]
20+
fn dimension(&self) -> Result<Rc<dyn MemberSql>, CubeError>;
21+
}

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/dimension_matcher.rs

Lines changed: 46 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pub struct DimensionMatcher<'a> {
3434
query_tools: Rc<QueryTools>,
3535
pre_aggregation: &'a CompiledPreAggregation,
3636
pre_aggregation_dimensions: HashMap<String, bool>,
37-
pre_aggregation_time_dimensions: HashMap<String, (Option<Rc<TimeDimensionSymbol>>, bool)>,
37+
pre_aggregation_time_dimensions: HashMap<String, Vec<(Rc<TimeDimensionSymbol>, bool)>>,
3838
pre_aggregation_segments: HashMap<String, bool>,
3939
result: MatchState,
4040
}
@@ -46,17 +46,17 @@ impl<'a> DimensionMatcher<'a> {
4646
.iter()
4747
.map(|d| (d.full_name(), false))
4848
.collect();
49-
let pre_aggregation_time_dimensions = pre_aggregation
50-
.time_dimensions
51-
.iter()
52-
.map(|dim| {
53-
if let Ok(td) = dim.as_time_dimension() {
54-
(td.base_symbol().full_name(), (Some(td), false))
55-
} else {
56-
(dim.full_name(), (None, false))
57-
}
58-
})
59-
.collect::<HashMap<_, _>>();
49+
let mut pre_aggregation_time_dimensions =
50+
HashMap::<String, Vec<(Rc<TimeDimensionSymbol>, bool)>>::new();
51+
for dim in pre_aggregation.time_dimensions.iter() {
52+
if let Ok(td) = dim.as_time_dimension() {
53+
let key = td.base_symbol().full_name();
54+
pre_aggregation_time_dimensions
55+
.entry(key)
56+
.or_default()
57+
.push((td, false));
58+
}
59+
}
6060
let pre_aggregation_segments = pre_aggregation
6161
.segments
6262
.iter()
@@ -128,12 +128,15 @@ impl<'a> DimensionMatcher<'a> {
128128
MatchState::Partial
129129
};
130130
self.result = self.result.combine(&dimension_coverage_result);
131-
let time_dimension_coverage_result =
132-
if self.pre_aggregation_time_dimensions.values().all(|v| v.1) {
133-
MatchState::Full
134-
} else {
135-
MatchState::Partial
136-
};
131+
let time_dimension_coverage_result = if self
132+
.pre_aggregation_time_dimensions
133+
.values()
134+
.all(|entries| entries.iter().all(|(_, matched)| *matched))
135+
{
136+
MatchState::Full
137+
} else {
138+
MatchState::Partial
139+
};
137140
self.result = self.result.combine(&time_dimension_coverage_result);
138141
let segment_coverage_result = if self.pre_aggregation_segments.values().all(|v| *v) {
139142
MatchState::Full
@@ -233,39 +236,41 @@ impl<'a> DimensionMatcher<'a> {
233236
};
234237
let base_symbol_name = time_dimension.base_symbol().full_name();
235238

236-
if let Some(found) = self
239+
if let Some(entries) = self
237240
.pre_aggregation_time_dimensions
238241
.get_mut(&base_symbol_name)
239242
{
240-
if add_to_matched_dimension {
241-
found.1 = true;
243+
// First, look for exact granularity match
244+
let exact_match = entries
245+
.iter_mut()
246+
.find(|(td, _)| granularity.is_none() || td.granularity() == &granularity);
247+
if let Some((_, matched)) = exact_match {
248+
if add_to_matched_dimension {
249+
*matched = true;
250+
}
251+
return Ok(MatchState::Full);
242252
}
243253

244-
let pre_agg_td = &found.0;
245-
let pre_aggr_granularity = if let Some(pre_agg_td) = pre_agg_td {
246-
pre_agg_td.granularity().clone()
247-
} else {
248-
None
249-
};
250-
251-
if granularity.is_none() || pre_aggr_granularity == granularity {
252-
Ok(MatchState::Full)
253-
} else if pre_aggr_granularity.is_none() {
254-
Ok(MatchState::NotMatched)
255-
} else if let Some(pre_agg_td) = pre_agg_td {
254+
// No exact match — find the finest pre-agg granularity that covers the query
255+
let mut best_match = MatchState::NotMatched;
256+
for (pre_agg_td, matched) in entries.iter_mut() {
257+
let pre_aggr_granularity = pre_agg_td.granularity();
258+
if pre_aggr_granularity.is_none() {
259+
continue;
260+
}
256261
let min_granularity = GranularityHelper::min_granularity_for_time_dimensions(
257262
(&granularity, time_dimension),
258-
(&pre_aggr_granularity, &pre_agg_td),
263+
(pre_aggr_granularity, pre_agg_td),
259264
)?;
260-
261-
if min_granularity == pre_aggr_granularity {
262-
Ok(MatchState::Partial)
263-
} else {
264-
Ok(MatchState::NotMatched)
265+
if min_granularity == *pre_aggr_granularity {
266+
if add_to_matched_dimension {
267+
*matched = true;
268+
}
269+
best_match = MatchState::Partial;
270+
break;
265271
}
266-
} else {
267-
Ok(MatchState::NotMatched)
268272
}
273+
Ok(best_match)
269274
} else {
270275
if time_dimension.owned_by_cube() {
271276
Ok(MatchState::NotMatched)

0 commit comments

Comments
 (0)